View source with raw comments or as raw
   1/*  Part of SWI-Prolog
   2
   3    Author:        Jan Wielemaker
   4    E-mail:        J.Wielemaker@vu.nl
   5    WWW:           http://www.swi-prolog.org
   6    Copyright (c)  2007-2016, University of Amsterdam
   7                              VU University Amsterdam
   8    All rights reserved.
   9
  10    Redistribution and use in source and binary forms, with or without
  11    modification, are permitted provided that the following conditions
  12    are met:
  13
  14    1. Redistributions of source code must retain the above copyright
  15       notice, this list of conditions and the following disclaimer.
  16
  17    2. Redistributions in binary form must reproduce the above copyright
  18       notice, this list of conditions and the following disclaimer in
  19       the documentation and/or other materials provided with the
  20       distribution.
  21
  22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
  25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
  26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
  27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
  28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
  30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
  32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  33    POSSIBILITY OF SUCH DAMAGE.
  34*/
  35
  36:- module(thread,
  37          [ concurrent/3,               % +Threads, :Goals, +Options
  38            concurrent_maplist/2,       % :Goal, +List
  39            concurrent_maplist/3,       % :Goal, ?List1, ?List2
  40            concurrent_maplist/4,       % :Goal, ?List1, ?List2, ?List3
  41            first_solution/3            % -Var, :Goals, +Options
  42          ]).
  43:- use_module(library(debug)).
  44:- use_module(library(error)).
  45:- use_module(library(lists)).
  46:- use_module(library(apply)).
  47
  48%:- debug(concurrent).
  49
  50:- meta_predicate
  51    concurrent(+, :, +),
  52    concurrent_maplist(1, +),
  53    concurrent_maplist(2, ?, ?),
  54    concurrent_maplist(3, ?, ?, ?),
  55    first_solution(-, :, +).
  56
  57:- predicate_options(concurrent/3, 3,
  58                     [ pass_to(system:thread_create/3, 3)
  59                     ]).
  60:- predicate_options(first_solution/3, 3,
  61                     [ on_fail(oneof([stop,continue])),
  62                       on_error(oneof([stop,continue])),
  63                       pass_to(system:thread_create/3, 3)
  64                     ]).
  65
  66/** <module> High level thread primitives
  67
  68This  module  defines  simple  to  use   predicates  for  running  goals
  69concurrently.  Where  the  core  multi-threaded    API  is  targeted  at
  70communicating long-living threads, the predicates   here  are defined to
  71run goals concurrently without having to   deal with thread creation and
  72maintenance explicitely.
  73
  74Note that these predicates run goals   concurrently  and therefore these
  75goals need to be thread-safe. As  the   predicates  in  this module also
  76abort branches of the computation that  are no longer needed, predicates
  77that have side-effect must act properly.  In   a  nutshell, this has the
  78following consequences:
  79
  80  * Nice clean Prolog code without side-effects (but with cut) works
  81    fine.
  82  * Side-effects are bad news.  If you really need assert to store
  83    intermediate results, use the thread_local/1 declaration.  This
  84    also guarantees cleanup of left-over clauses if the thread is
  85    cancelled.  For other side-effects, make sure to use call_cleanup/2
  86    to undo them should the thread be cancelled.
  87  * Global variables are ok as they are thread-local and destroyed
  88    on thread cancellation.  Note however that global variables in
  89    the calling thread are *not* available in the threads that are
  90    created.  You have to pass the value as an argument and initialise
  91    the variable in the new thread.
  92  * Thread-cancellation uses thread_signal/2.  Using this code
  93    with long-blocking foreign predicates may result in long delays,
  94    even if another thread asks for cancellation.
  95
  96@author Jan Wielemaker
  97*/
  98
  99%!  concurrent(+N, :Goals, Options) is semidet.
 100%
 101%   Run Goals in parallel using N   threads.  This call blocks until
 102%   all work has been done.  The   Goals  must  be independent. They
 103%   should not communicate using shared  variables   or  any form of
 104%   global data. All Goals must be thread-safe.
 105%
 106%   Execution succeeds if all goals  have   succeeded.  If  one goal
 107%   fails or throws an exception,  other   workers  are abandoned as
 108%   soon as possible and the entire   computation fails or re-throws
 109%   the exception. Note that if  multiple   goals  fail  or raise an
 110%   error it is not defined which error or failure is reported.
 111%
 112%   On successful completion, variable bindings   are returned. Note
 113%   however that threads have independent   stacks and therefore the
 114%   goal is copied to the worker  thread   and  the result is copied
 115%   back to the caller of concurrent/3.
 116%
 117%   Choosing the right number of threads is not always obvious. Here
 118%   are some scenarios:
 119%
 120%     * If the goals are CPU intensive and normally all succeeding,
 121%     typically the number of CPUs is the optimal number of
 122%     threads.  Less does not use all CPUs, more wastes time in
 123%     context switches and also uses more memory.
 124%
 125%     * If the tasks are I/O bound the number of threads is
 126%     typically higher than the number of CPUs.
 127%
 128%     * If one or more of the goals may fail or produce an errors,
 129%     using a higher number of threads may find this earlier.
 130%
 131%   @param N Number of worker-threads to create. Using 1, no threads
 132%          are created.  If N is larger than the number of Goals we
 133%          create exactly as many threads as there are Goals.
 134%   @param Goals List of callable terms.
 135%   @param Options Passed to thread_create/3 for creating the
 136%          workers.  Only options changing the stack-sizes can
 137%          be used. In particular, do not pass the detached or alias
 138%          options.
 139%   @see In many cases, concurrent_maplist/2 and friends
 140%        is easier to program and is tractable to program
 141%        analysis.
 142
 143concurrent(1, M:List, _) :-
 144    !,
 145    maplist(M:call, List).
 146concurrent(N, M:List, Options) :-
 147    must_be(positive_integer, N),
 148    must_be(list(callable), List),
 149    length(List, JobCount),
 150    message_queue_create(Done),
 151    message_queue_create(Queue),
 152    WorkerCount is min(N, JobCount),
 153    create_workers(WorkerCount, Queue, Done, Workers, Options),
 154    submit_goals(List, 1, M, Queue, VarList),
 155    forall(between(1, WorkerCount, _),
 156           thread_send_message(Queue, done)),
 157    VT =.. [vars|VarList],
 158    concur_wait(JobCount, Done, VT, Result, Exitted),
 159    subtract(Workers, Exitted, RemainingWorkers),
 160    concur_cleanup(Result, RemainingWorkers, [Queue, Done]),
 161    (   Result == true
 162    ->  true
 163    ;   Result = false
 164    ->  fail
 165    ;   Result = exception(Error)
 166    ->  throw(Error)
 167    ).
 168
 169%!  submit_goals(+List, +Id0, +Module, +Queue, -Vars) is det.
 170%
 171%   Send all jobs from List to Queue. Each goal is added to Queue as
 172%   a term goal(Id, Goal, Vars). Vars  is   unified  with  a list of
 173%   lists of free variables appearing in each goal.
 174
 175submit_goals([], _, _, _, []).
 176submit_goals([H|T], I, M, Queue, [Vars|VT]) :-
 177    term_variables(H, Vars),
 178    thread_send_message(Queue, goal(I, M:H, Vars)),
 179    I2 is I + 1,
 180    submit_goals(T, I2, M, Queue, VT).
 181
 182
 183%!  concur_wait(+N, +Done:queue, +VT:compound, -Result, -Exitted) is semidet.
 184%
 185%   Wait for completion, failure or error.
 186%
 187%   @param  Exited  List of thread-ids with threads that completed before
 188%                   all work was done.
 189
 190concur_wait(0, _, _, true, []) :- !.
 191concur_wait(N, Done, VT, Status, Exitted) :-
 192    debug(concurrent, 'Waiting: ...', []),
 193    thread_get_message(Done, Exit),
 194    debug(concurrent, 'Waiting: received ~p', [Exit]),
 195    (   Exit = done(Id, Vars)
 196    ->  arg(Id, VT, Vars),
 197        N2 is N - 1,
 198        concur_wait(N2, Done, VT, Status, Exitted)
 199    ;   Exit = finished(Thread)
 200    ->  thread_join(Thread, JoinStatus),
 201        debug(concurrent, 'Joined ~w with ~p', [Thread, JoinStatus]),
 202        (   JoinStatus == true
 203        ->  Exitted = [Thread|Exitted2],
 204            concur_wait(N, Done, VT, Status, Exitted2)
 205        ;   Status = JoinStatus,
 206            Exitted = [Thread]
 207        )
 208    ).
 209
 210
 211create_workers(N, Queue, Done, [Id|Ids], Options) :-
 212    N > 0,
 213    !,
 214    thread_create(worker(Queue, Done), Id,
 215                  [ at_exit(thread_send_message(Done, finished(Id)))
 216                  | Options
 217                  ]),
 218    N2 is N - 1,
 219    create_workers(N2, Queue, Done, Ids, Options).
 220create_workers(_, _, _, [], _).
 221
 222
 223%!  worker(+WorkQueue, +DoneQueue) is det.
 224%
 225%   Process jobs from WorkQueue and send the results to DoneQueue.
 226
 227worker(Queue, Done) :-
 228    thread_get_message(Queue, Message),
 229    debug(concurrent, 'Worker: received ~p', [Message]),
 230    (   Message = goal(Id, Goal, Vars)
 231    ->  (   Goal
 232        ->  thread_send_message(Done, done(Id, Vars)),
 233            worker(Queue, Done)
 234        )
 235    ;   true
 236    ).
 237
 238
 239%!  concur_cleanup(+Result, +Workers:list, +Queues:list) is det.
 240%
 241%   Cleanup the concurrent workers and message  queues. If Result is
 242%   not =true=, signal all workers to make them stop prematurely. If
 243%   result is true we assume  all   workers  have been instructed to
 244%   stop or have stopped themselves.
 245
 246concur_cleanup(Result, Workers, Queues) :-
 247    !,
 248    (   Result == true
 249    ->  true
 250    ;   kill_workers(Workers)
 251    ),
 252    join_all(Workers),
 253    maplist(message_queue_destroy, Queues).
 254
 255kill_workers([]).
 256kill_workers([Id|T]) :-
 257    debug(concurrent, 'Signalling ~w', [Id]),
 258    catch(thread_signal(Id, abort), _, true),
 259    kill_workers(T).
 260
 261join_all([]).
 262join_all([Id|T]) :-
 263    thread_join(Id, _),
 264    join_all(T).
 265
 266
 267                 /*******************************
 268                 *             MAPLIST          *
 269                 *******************************/
 270
 271%!  concurrent_maplist(:Goal, +List).
 272%!  concurrent_maplist(:Goal, +List1, +List2).
 273%!  concurrent_maplist(:Goal, +List1, +List2, +List3).
 274%
 275%   Concurrent   version   of   maplist/2.   This   predicate   uses
 276%   concurrent/3, using multiple _worker_  threads.   The  number of
 277%   threads is the minimum of the  list   length  and  the number of
 278%   cores available. The number of  cores   is  determined using the
 279%   prolog flag =cpu_count=. If this flag is absent or 1 or List has
 280%   less  than  two  elements,  this   predicate  simply  calls  the
 281%   corresponding maplist/N version.
 282%
 283%   Note that the the overhead of this predicate is considerable and
 284%   therefore Goal must be fairly  expensive   before  one reaches a
 285%   speedup.
 286
 287concurrent_maplist(Goal, List) :-
 288    workers(List, WorkerCount),
 289    !,
 290    maplist(ml_goal(Goal), List, Goals),
 291    concurrent(WorkerCount, Goals, []).
 292concurrent_maplist(Goal, List) :-
 293    maplist(Goal, List).
 294
 295ml_goal(Goal, Elem, call(Goal, Elem)).
 296
 297concurrent_maplist(Goal, List1, List2) :-
 298    same_length(List1, List2),
 299    workers(List1, WorkerCount),
 300    !,
 301    maplist(ml_goal(Goal), List1, List2, Goals),
 302    concurrent(WorkerCount, Goals, []).
 303concurrent_maplist(Goal, List1, List2) :-
 304    maplist(Goal, List1, List2).
 305
 306ml_goal(Goal, Elem1, Elem2, call(Goal, Elem1, Elem2)).
 307
 308concurrent_maplist(Goal, List1, List2, List3) :-
 309    same_length(List1, List2, List3),
 310    workers(List1, WorkerCount),
 311    !,
 312    maplist(ml_goal(Goal), List1, List2, List3, Goals),
 313    concurrent(WorkerCount, Goals, []).
 314concurrent_maplist(Goal, List1, List2, List3) :-
 315    maplist(Goal, List1, List2, List3).
 316
 317ml_goal(Goal, Elem1, Elem2, Elem3, call(Goal, Elem1, Elem2, Elem3)).
 318
 319workers(List, Count) :-
 320    current_prolog_flag(cpu_count, Cores),
 321    Cores > 1,
 322    length(List, Len),
 323    Count is min(Cores,Len),
 324    Count > 1,
 325    !.
 326
 327same_length([], [], []).
 328same_length([_|T1], [_|T2], [_|T3]) :-
 329    same_length(T1, T2, T3).
 330
 331
 332                 /*******************************
 333                 *             FIRST            *
 334                 *******************************/
 335
 336%!  first_solution(-X, :Goals, +Options) is semidet.
 337%
 338%   Try  alternative  solvers  concurrently,   returning  the  first
 339%   answer. In a typical scenario, solving any of the goals in Goals
 340%   is satisfactory for the application to  continue. As soon as one
 341%   of the tried alternatives is  successful,   all  the others are
 342%   killed and first_solution/3 succeeds.
 343%
 344%   For example, if it is unclear whether   it is better to search a
 345%   graph breadth-first or depth-first we can use:
 346%
 347%   ==
 348%   search_graph(Grap, Path) :-
 349%            first_solution(Path, [ breadth_first(Graph, Path),
 350%                                   depth_first(Graph, Path)
 351%                                 ],
 352%                           []).
 353%   ==
 354%
 355%   Options include thread stack-sizes passed   to thread_create, as
 356%   well as the options =on_fail= and   =on_error= that specify what
 357%   to do if a  solver  fails  or   triggers  an  error.  By default
 358%   execution of all  solvers  is  terminated   and  the  result  is
 359%   returned. Sometimes one may wish to  continue. One such scenario
 360%   is if one of the solvers may run  out of resources or one of the
 361%   solvers is known to be incomplete.
 362%
 363%           * on_fail(Action)
 364%           If =stop= (default), terminate all threads and stop with
 365%           the failure.  If =continue=, keep waiting.
 366%           * on_error(Action)
 367%           As above, re-throwing the error if an error appears.
 368%
 369%   @bug    first_solution/3 cannot deal with non-determinism.  There
 370%           is no obvious way to fit non-determinism into it.  If multiple
 371%           solutions are needed wrap the solvers in findall/3.
 372
 373
 374first_solution(X, M:List, Options) :-
 375    message_queue_create(Done),
 376    thread_options(Options, ThreadOptions, RestOptions),
 377    length(List, JobCount),
 378    create_solvers(List, M, X, Done, Solvers, ThreadOptions),
 379    wait_for_one(JobCount, Done, Result, RestOptions),
 380    concur_cleanup(kill, Solvers, [Done]),
 381    (   Result = done(_, Var)
 382    ->  X = Var
 383    ;   Result = error(_, Error)
 384    ->  throw(Error)
 385    ).
 386
 387create_solvers([], _, _, _, [], _).
 388create_solvers([H|T], M, X, Done, [Id|IDs], Options) :-
 389    thread_create(solve(M:H, X, Done), Id, Options),
 390    create_solvers(T, M, X, Done, IDs, Options).
 391
 392solve(Goal, Var, Queue) :-
 393    thread_self(Me),
 394    (   catch(Goal, E, true)
 395    ->  (   var(E)
 396        ->  thread_send_message(Queue, done(Me, Var))
 397        ;   thread_send_message(Queue, error(Me, E))
 398        )
 399    ;   thread_send_message(Queue, failed(Me))
 400    ).
 401
 402wait_for_one(0, _, failed, _) :- !.
 403wait_for_one(JobCount, Queue, Result, Options) :-
 404    thread_get_message(Queue, Msg),
 405    LeftCount is JobCount - 1,
 406    (   Msg = done(_, _)
 407    ->  Result = Msg
 408    ;   Msg = failed(_)
 409    ->  (   option(on_fail(stop), Options, stop)
 410        ->  Result = Msg
 411        ;   wait_for_one(LeftCount, Queue, Result, Options)
 412        )
 413    ;   Msg = error(_, _)
 414    ->  (   option(on_error(stop), Options, stop)
 415        ->  Result = Msg
 416        ;   wait_for_one(LeftCount, Queue, Result, Options)
 417        )
 418    ).
 419
 420
 421%!  thread_options(+Options, -ThreadOptions, -RestOptions) is det.
 422%
 423%   Split the option  list  over   thread(-size)  options  and other
 424%   options.
 425
 426thread_options([], [], []).
 427thread_options([H|T], [H|Th], O) :-
 428    thread_option(H),
 429    !,
 430    thread_options(T, Th, O).
 431thread_options([H|T], Th, [H|O]) :-
 432    thread_options(T, Th, O).
 433
 434thread_option(local(_)).
 435thread_option(global(_)).
 436thread_option(trail(_)).
 437thread_option(argument(_)).
 438thread_option(stack(_)).