View source with raw comments or as raw
   1/*  Part of SWI-Prolog
   2
   3    Author:        Jan Wielemaker
   4    E-mail:        J.Wielemaker@vu.nl
   5    WWW:           http://www.swi-prolog.org
   6    Copyright (c)  2008-2016, University of Amsterdam
   7                              VU University Amsterdam
   8    All rights reserved.
   9
  10    Redistribution and use in source and binary forms, with or without
  11    modification, are permitted provided that the following conditions
  12    are met:
  13
  14    1. Redistributions of source code must retain the above copyright
  15       notice, this list of conditions and the following disclaimer.
  16
  17    2. Redistributions in binary form must reproduce the above copyright
  18       notice, this list of conditions and the following disclaimer in
  19       the documentation and/or other materials provided with the
  20       distribution.
  21
  22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
  25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
  26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
  27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
  28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
  30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
  32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  33    POSSIBILITY OF SUCH DAMAGE.
  34*/
  35
  36:- module(thread_pool,
  37          [ thread_pool_create/3,       % +Pool, +Size, +Options
  38            thread_pool_destroy/1,      % +Pool
  39            thread_create_in_pool/4,    % +Pool, :Goal, -Id, +Options
  40
  41            current_thread_pool/1,      % ?Pool
  42            thread_pool_property/2      % ?Pool, ?Property
  43          ]).
  44:- use_module(library(error)).
  45:- use_module(library(lists)).
  46:- use_module(library(option)).
  47:- use_module(library(rbtrees)).
  48:- use_module(library(debug)).
  49
  50
  51/** <module> Resource bounded thread management
  52
  53The module library(thread_pool) manages threads in pools. A pool defines
  54properties of its member threads and the  maximum number of threads that
  55can coexist in the pool. The   call  thread_create_in_pool/4 allocates a
  56thread in the pool, just like  thread_create/3.   If  the  pool is fully
  57allocated it can be asked to wait or raise an error.
  58
  59The library has been designed  to   deal  with  server applications that
  60receive a variety of requests, such as   HTTP servers. Simply starting a
  61thread for each request is a bit too simple minded for such servers:
  62
  63    * Creating many CPU intensive threads often leads to a slow-down
  64    rather than a speedup.
  65    * Creating many memory intensive threads may exhaust resources
  66    * Tasks that require little CPU and memory but take long waiting
  67    for external resources can run many threads.
  68
  69Using this library, one can define a  pool   for  each set of tasks with
  70comparable characteristics and create threads in   this pool. Unlike the
  71worker-pool model, threads are not started immediately. Depending on the
  72design, both approaches can be attractive.
  73
  74The library is implemented by means of   a manager thread with the fixed
  75thread id =|__thread_pool_manager|=. All  state   is  maintained in this
  76manager thread, which receives and  processes   requests  to  create and
  77destroy pools, create  threads  in  a   pool  and  handle  messages from
  78terminated threads. Thread pools are _not_ saved   in  a saved state and
  79must therefore be recreated  using   the  initialization/1  directive or
  80otherwise during startup of the application.
  81
  82@see http_handler/3 and http_spawn/2.
  83*/
  84
  85:- meta_predicate
  86    thread_create_in_pool(+, 0, -, :).
  87:- predicate_options(thread_create_in_pool/4, 4,
  88                     [ wait(boolean),
  89                       pass_to(system:thread_create/3, 3)
  90                     ]).
  91
  92:- multifile
  93    create_pool/1.
  94
  95%!  thread_pool_create(+Pool, +Size, +Options) is det.
  96%
  97%   Create a pool of threads. A pool of threads is a declaration for
  98%   creating threads with shared  properties   (stack  sizes)  and a
  99%   limited  number  of  threads.   Threads    are   created   using
 100%   thread_create_in_pool/4. If all threads in the  pool are in use,
 101%   the   behaviour   depends   on    the     =wait=    option    of
 102%   thread_create_in_pool/4  and  the  =backlog=   option  described
 103%   below.  Options are passed to thread_create/3, except for
 104%
 105%       * backlog(+MaxBackLog)
 106%       Maximum number of requests that can be suspended.  Default
 107%       is =infinite=.  Otherwise it must be a non-negative integer.
 108%       Using backlog(0) will never delay thread creation for this
 109%       pool.
 110%
 111%   The pooling mechanism does _not_   interact  with the =detached=
 112%   state of a thread. Threads can   be  created both =detached= and
 113%   normal and must be joined using   thread_join/2  if they are not
 114%   detached.
 115
 116thread_pool_create(Name, Size, Options) :-
 117    must_be(list, Options),
 118    pool_manager(Manager),
 119    thread_self(Me),
 120    thread_send_message(Manager, create_pool(Name, Size, Options, Me)),
 121    wait_reply.
 122
 123%!  thread_pool_destroy(+Name) is det.
 124%
 125%   Destroy the thread pool named Name.
 126%
 127%   @error  existence_error(thread_pool, Name).
 128
 129thread_pool_destroy(Name) :-
 130    pool_manager(Manager),
 131    thread_self(Me),
 132    thread_send_message(Manager, destroy_pool(Name, Me)),
 133    wait_reply.
 134
 135
 136%!  current_thread_pool(?Name) is nondet.
 137%
 138%   True if Name refers to a defined thread pool.
 139
 140current_thread_pool(Name) :-
 141    pool_manager(Manager),
 142    thread_self(Me),
 143    thread_send_message(Manager, current_pools(Me)),
 144    wait_reply(Pools),
 145    (   atom(Name)
 146    ->  memberchk(Name, Pools)
 147    ;   member(Name, Pools)
 148    ).
 149
 150%!  thread_pool_property(?Name, ?Property) is nondet.
 151%
 152%   True if Property is a property of thread pool Name. Defined
 153%   properties are:
 154%
 155%       * options(Options)
 156%       Thread creation options for this pool
 157%       * free(Size)
 158%       Number of free slots on this pool
 159%       * size(Size)
 160%       Total number of slots on this pool
 161%       * members(ListOfIDs)
 162%       ListOfIDs is the list or threads running in this pool
 163%       * running(Running)
 164%       Number of running threads in this pool
 165%       * backlog(Size)
 166%       Number of delayed thread creations on this pool
 167
 168thread_pool_property(Name, Property) :-
 169    current_thread_pool(Name),
 170    pool_manager(Manager),
 171    thread_self(Me),
 172    thread_send_message(Manager, pool_properties(Me, Name, Property)),
 173    wait_reply(Props),
 174    (   nonvar(Property)
 175    ->  memberchk(Property, Props)
 176    ;   member(Property, Props)
 177    ).
 178
 179
 180%!  thread_create_in_pool(+Pool, :Goal, -Id, +Options) is det.
 181%
 182%   Create  a  thread  in  Pool.  Options  overrule  default  thread
 183%   creation options associated  to  the   pool.  In  addition,  the
 184%   following option is defined:
 185%
 186%       * wait(+Boolean)
 187%       If =true= (default) and the pool is full, wait until a
 188%       member of the pool completes.  If =false=, throw a
 189%       resource_error.
 190%
 191%   @error  resource_error(threads_in_pool(Pool)) is raised if wait
 192%           is =false= or the backlog limit has been reached.
 193%   @error  existence_error(thread_pool, Pool) if Pool does not
 194%           exist.
 195
 196thread_create_in_pool(Pool, Goal, Id, QOptions) :-
 197    meta_options(is_meta, QOptions, Options),
 198    catch(thread_create_in_pool_(Pool, Goal, Id, Options),
 199          Error, true),
 200    (   var(Error)
 201    ->  true
 202    ;   Error = error(existence_error(thread_pool, Pool), _),
 203        create_pool_lazily(Pool)
 204    ->  thread_create_in_pool_(Pool, Goal, Id, Options)
 205    ;   throw(Error)
 206    ).
 207
 208thread_create_in_pool_(Pool, Goal, Id, Options) :-
 209    select_option(wait(Wait), Options, ThreadOptions, true),
 210    pool_manager(Manager),
 211    thread_self(Me),
 212    thread_send_message(Manager,
 213                        create(Pool, Goal, Me, Wait, Id, ThreadOptions)),
 214    wait_reply(Id).
 215
 216is_meta(at_exit).
 217
 218
 219%!  create_pool_lazily(+Pool) is semidet.
 220%
 221%   Call the hook create_pool/1 to create the pool lazily.
 222
 223create_pool_lazily(Pool) :-
 224    with_mutex(Pool,
 225               ( mutex_destroy(Pool),
 226                 create_pool_sync(Pool)
 227               )).
 228
 229create_pool_sync(Pool) :-
 230    current_thread_pool(Pool),
 231    !.
 232create_pool_sync(Pool) :-
 233    create_pool(Pool).
 234
 235
 236                 /*******************************
 237                 *        START MANAGER         *
 238                 *******************************/
 239
 240%!  pool_manager(-ThreadID) is det.
 241%
 242%   ThreadID is the thread (alias) identifier of the manager. Starts
 243%   the manager if it is not running.
 244
 245pool_manager(TID) :-
 246    TID = '__thread_pool_manager',
 247    (   thread_running(TID)
 248    ->  true
 249    ;   with_mutex('__thread_pool', create_pool_manager(TID))
 250    ).
 251
 252thread_running(Thread) :-
 253    catch(thread_property(Thread, status(Status)),
 254          E, true),
 255    (   var(E)
 256    ->  (   Status == running
 257        ->  true
 258        ;   thread_join(Thread, _),
 259            print_message(warning, thread_pool(manager_died(Status))),
 260            fail
 261        )
 262    ;   E = error(existence_error(thread, Thread), _)
 263    ->  fail
 264    ;   throw(E)
 265    ).
 266
 267create_pool_manager(Thread) :-
 268    thread_running(Thread),
 269    !.
 270create_pool_manager(Thread) :-
 271    thread_create(pool_manager_main, _,
 272                  [ alias(Thread),
 273                    inherit_from(main)
 274                  ]).
 275
 276
 277pool_manager_main :-
 278    rb_new(State0),
 279    manage_thread_pool(State0).
 280
 281
 282                 /*******************************
 283                 *        MANAGER LOGIC         *
 284                 *******************************/
 285
 286%!  manage_thread_pool(+State)
 287
 288manage_thread_pool(State0) :-
 289    thread_get_message(Message),
 290    (   update_thread_pool(Message, State0, State)
 291    ->  debug(thread_pool(state), 'Message ~p --> ~p', [Message, State]),
 292        manage_thread_pool(State)
 293    ;   format(user_error, 'Update failed: ~p~n', [Message])
 294    ).
 295
 296
 297update_thread_pool(create_pool(Name, Size, Options, For), State0, State) :-
 298    !,
 299    (   rb_insert_new(State0,
 300                      Name, tpool(Options, Size, Size, WP, WP, []),
 301                      State)
 302    ->  thread_send_message(For, thread_pool(true))
 303    ;   reply_error(For, permission_error(create, thread_pool, Name)),
 304        State = State0
 305    ).
 306update_thread_pool(destroy_pool(Name, For), State0, State) :-
 307    !,
 308    (   rb_delete(State0, Name, State)
 309    ->  thread_send_message(For, thread_pool(true))
 310    ;   reply_error(For, existence_error(thread_pool, Name)),
 311        State = State0
 312    ).
 313update_thread_pool(current_pools(For), State, State) :-
 314    !,
 315    rb_keys(State, Keys),
 316    debug(thread_pool(current), 'Reply to ~w: ~p', [For, Keys]),
 317    reply(For, Keys).
 318update_thread_pool(pool_properties(For, Name, P), State, State) :-
 319    !,
 320    (   rb_lookup(Name, Pool, State)
 321    ->  findall(P, pool_property(P, Pool), List),
 322        reply(For, List)
 323    ;   reply_error(For, existence_error(thread_pool, Name))
 324    ).
 325update_thread_pool(Message, State0, State) :-
 326    arg(1, Message, Name),
 327    (   rb_lookup(Name, Pool0, State0)
 328    ->  update_pool(Message, Pool0, Pool),
 329        rb_update(State0, Name, Pool, State)
 330    ;   State = State0,
 331        (   Message = create(Name, _, For, _, _, _)
 332        ->  reply_error(For, existence_error(thread_pool, Name))
 333        ;   true
 334        )
 335    ).
 336
 337pool_property(options(Options),
 338              tpool(Options, _Free, _Size, _WP, _WPT, _Members)).
 339pool_property(backlog(Size),
 340              tpool(_, _Free, _Size, WP, WPT, _Members)) :-
 341    diff_list_length(WP, WPT, Size).
 342pool_property(free(Free),
 343              tpool(_, Free, _Size, _, _, _)).
 344pool_property(size(Size),
 345              tpool(_, _Free, Size, _, _, _)).
 346pool_property(running(Count),
 347              tpool(_, Free, Size, _, _, _)) :-
 348    Count is Size - Free.
 349pool_property(members(IDList),
 350              tpool(_, _, _, _, _, IDList)).
 351
 352diff_list_length(List, Tail, Size) :-
 353    '$skip_list'(Length, List, Rest),
 354    (   Rest == Tail
 355    ->  Size = Length
 356    ;   type_error(difference_list, List/Tail)
 357    ).
 358
 359
 360%!  update_pool(+Message, +Pool0, -Pool) is det.
 361%
 362%   Deal with create requests and  completion   messages  on a given
 363%   pool.  There are two messages:
 364%
 365%       * create(PoolName, Goal, ForThread, Wait, Id, Options)
 366%       Create a new thread on behalf of ForThread.  There are
 367%       two cases:
 368%            * Free slots: create the thread
 369%            * No free slots: error or add to waiting
 370%       * exitted(PoolName, Thread)
 371%       A thread completed.  If there is a request waiting,
 372%       create a new one.
 373
 374update_pool(create(Name, Goal, For, _, Id, MyOptions),
 375            tpool(Options, Free0, Size, WP, WPT, Members0),
 376            tpool(Options, Free, Size, WP, WPT, Members)) :-
 377    succ(Free, Free0),
 378    !,
 379    merge_options(MyOptions, Options, ThreadOptions),
 380    select_option(at_exit(AtExit), ThreadOptions, ThreadOptions1, true),
 381    catch(thread_create(Goal, Id,
 382                        [ at_exit(worker_exitted(Name, Id, AtExit))
 383                        | ThreadOptions1
 384                        ]),
 385          E, true),
 386    (   var(E)
 387    ->  Members = [Id|Members0],
 388        reply(For, Id)
 389    ;   reply_error(For, E),
 390        Members = Members0
 391    ).
 392update_pool(Create,
 393            tpool(Options, 0, Size, WP, WPT0, Members),
 394            tpool(Options, 0, Size, WP, WPT, Members)) :-
 395    Create = create(Name, _Goal, For, Wait, _, _Options),
 396    !,
 397    option(backlog(BackLog), Options, infinite),
 398    (   can_delay(Wait, BackLog, WP, WPT0)
 399    ->  WPT0 = [Create|WPT],
 400        debug(thread_pool, 'Delaying ~p', [Create])
 401    ;   WPT = WPT0,
 402        reply_error(For, resource_error(threads_in_pool(Name)))
 403    ).
 404update_pool(exitted(_Name, Id),
 405            tpool(Options, Free0, Size, WP0, WPT, Members0),
 406            Pool) :-
 407    succ(Free0, Free),
 408    delete(Members0, Id, Members1),
 409    Pool1 = tpool(Options, Free, Size, WP, WPT, Members1),
 410    (   WP0 == WPT
 411    ->  WP = WP0,
 412        Pool = Pool1
 413    ;   WP0 = [Waiting|WP],
 414        debug(thread_pool, 'Start delayed ~p', [Waiting]),
 415        update_pool(Waiting, Pool1, Pool)
 416    ).
 417
 418
 419can_delay(true, infinite, _, _) :- !.
 420can_delay(true, BackLog, WP, WPT) :-
 421    diff_list_length(WP, WPT, Size),
 422    BackLog > Size.
 423
 424%!  worker_exitted(+PoolName, +WorkerId, :AtExit)
 425%
 426%   It is possible that  '__thread_pool_manager'   no  longer exists
 427%   while closing down the process because   the  manager was killed
 428%   before the worker.
 429%
 430%   @tbd Find a way to discover that we are terminating Prolog.
 431
 432worker_exitted(Name, Id, AtExit) :-
 433    catch(thread_send_message('__thread_pool_manager', exitted(Name, Id)),
 434          _, true),
 435    call(AtExit).
 436
 437
 438                 /*******************************
 439                 *             UTIL             *
 440                 *******************************/
 441
 442reply(To, Term) :-
 443    thread_send_message(To, thread_pool(true(Term))).
 444
 445reply_error(To, Error) :-
 446    thread_send_message(To, thread_pool(error(Error, _))).
 447
 448wait_reply :-
 449    thread_get_message(thread_pool(Result)),
 450    (   Result == true
 451    ->  true
 452    ;   Result == fail
 453    ->  fail
 454    ;   throw(Result)
 455    ).
 456
 457wait_reply(Value) :-
 458    thread_get_message(thread_pool(Reply)),
 459    (   Reply = true(Value0)
 460    ->  Value = Value0
 461    ;   Reply == fail
 462    ->  fail
 463    ;   throw(Reply)
 464    ).
 465
 466
 467                 /*******************************
 468                 *             HOOKS            *
 469                 *******************************/
 470
 471%!  create_pool(+PoolName) is semidet.
 472%
 473%   Hook to create a thread  pool  lazily.   The  hook  is called if
 474%   thread_create_in_pool/4 discovers that the thread  pool does not
 475%   exist. If the  hook   succeeds,  thread_create_in_pool/4 retries
 476%   creating the thread. For  example,  we   can  use  the following
 477%   declaration to create threads in the pool =media=, which holds a
 478%   maximum of 20 threads.
 479%
 480%     ==
 481%     :- multifile thread_pool:create_pool/1.
 482%
 483%     thread_pool:create_pool(media) :-
 484%         thread_pool_create(media, 20, []).
 485%     ==
 486
 487                 /*******************************
 488                 *            MESSAGES          *
 489                 *******************************/
 490:- multifile
 491    prolog:message/3.
 492
 493prolog:message(thread_pool(Message)) -->
 494    message(Message).
 495
 496message(manager_died(Status)) -->
 497    [ 'Thread-pool: manager died on status ~p; restarting'-[Status] ].