1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2007-2016, University of Amsterdam 7 VU University Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(thread, 37 [ concurrent/3, % +Threads, :Goals, +Options 38 concurrent_maplist/2, % :Goal, +List 39 concurrent_maplist/3, % :Goal, ?List1, ?List2 40 concurrent_maplist/4, % :Goal, ?List1, ?List2, ?List3 41 first_solution/3 % -Var, :Goals, +Options 42 ]). 43:- use_module(library(debug)). 44:- use_module(library(error)). 45:- use_module(library(lists)). 46:- use_module(library(apply)). 47 48%:- debug(concurrent). 49 50:- meta_predicate 51 concurrent( , , ), 52 concurrent_maplist( , ), 53 concurrent_maplist( , , ), 54 concurrent_maplist( , , , ), 55 first_solution( , , ). 56 57:- predicate_options(concurrent/3, 3, 58 [ pass_to(system:thread_create/3, 3) 59 ]). 60:- predicate_options(first_solution/3, 3, 61 [ on_fail(oneof([stop,continue])), 62 on_error(oneof([stop,continue])), 63 pass_to(system:thread_create/3, 3) 64 ]). 65 66/** <module> High level thread primitives 67 68This module defines simple to use predicates for running goals 69concurrently. Where the core multi-threaded API is targeted at 70communicating long-living threads, the predicates here are defined to 71run goals concurrently without having to deal with thread creation and 72maintenance explicitely. 73 74Note that these predicates run goals concurrently and therefore these 75goals need to be thread-safe. As the predicates in this module also 76abort branches of the computation that are no longer needed, predicates 77that have side-effect must act properly. In a nutshell, this has the 78following consequences: 79 80 * Nice clean Prolog code without side-effects (but with cut) works 81 fine. 82 * Side-effects are bad news. If you really need assert to store 83 intermediate results, use the thread_local/1 declaration. This 84 also guarantees cleanup of left-over clauses if the thread is 85 cancelled. For other side-effects, make sure to use call_cleanup/2 86 to undo them should the thread be cancelled. 87 * Global variables are ok as they are thread-local and destroyed 88 on thread cancellation. Note however that global variables in 89 the calling thread are *not* available in the threads that are 90 created. You have to pass the value as an argument and initialise 91 the variable in the new thread. 92 * Thread-cancellation uses thread_signal/2. Using this code 93 with long-blocking foreign predicates may result in long delays, 94 even if another thread asks for cancellation. 95 96@author Jan Wielemaker 97*/ 98 99%! concurrent(+N, :Goals, Options) is semidet. 100% 101% Run Goals in parallel using N threads. This call blocks until 102% all work has been done. The Goals must be independent. They 103% should not communicate using shared variables or any form of 104% global data. All Goals must be thread-safe. 105% 106% Execution succeeds if all goals have succeeded. If one goal 107% fails or throws an exception, other workers are abandoned as 108% soon as possible and the entire computation fails or re-throws 109% the exception. Note that if multiple goals fail or raise an 110% error it is not defined which error or failure is reported. 111% 112% On successful completion, variable bindings are returned. Note 113% however that threads have independent stacks and therefore the 114% goal is copied to the worker thread and the result is copied 115% back to the caller of concurrent/3. 116% 117% Choosing the right number of threads is not always obvious. Here 118% are some scenarios: 119% 120% * If the goals are CPU intensive and normally all succeeding, 121% typically the number of CPUs is the optimal number of 122% threads. Less does not use all CPUs, more wastes time in 123% context switches and also uses more memory. 124% 125% * If the tasks are I/O bound the number of threads is 126% typically higher than the number of CPUs. 127% 128% * If one or more of the goals may fail or produce an errors, 129% using a higher number of threads may find this earlier. 130% 131% @param N Number of worker-threads to create. Using 1, no threads 132% are created. If N is larger than the number of Goals we 133% create exactly as many threads as there are Goals. 134% @param Goals List of callable terms. 135% @param Options Passed to thread_create/3 for creating the 136% workers. Only options changing the stack-sizes can 137% be used. In particular, do not pass the detached or alias 138% options. 139% @see In many cases, concurrent_maplist/2 and friends 140% is easier to program and is tractable to program 141% analysis. 142 143concurrent(1, M:List, _) :- 144 !, 145 maplist(M:call, List). 146concurrent(N, M:List, Options) :- 147 must_be(positive_integer, N), 148 must_be(list(callable), List), 149 length(List, JobCount), 150 message_queue_create(Done), 151 message_queue_create(Queue), 152 WorkerCount is min(N, JobCount), 153 create_workers(WorkerCount, Queue, Done, Workers, Options), 154 submit_goals(List, 1, M, Queue, VarList), 155 forall(between(1, WorkerCount, _), 156 thread_send_message(Queue, done)), 157 VT =.. [vars|VarList], 158 concur_wait(JobCount, Done, VT, Result, Exitted), 159 subtract(Workers, Exitted, RemainingWorkers), 160 concur_cleanup(Result, RemainingWorkers, [Queue, Done]), 161 ( Result == true 162 -> true 163 ; Result = false 164 -> fail 165 ; Result = exception(Error) 166 -> throw(Error) 167 ). 168 169%! submit_goals(+List, +Id0, +Module, +Queue, -Vars) is det. 170% 171% Send all jobs from List to Queue. Each goal is added to Queue as 172% a term goal(Id, Goal, Vars). Vars is unified with a list of 173% lists of free variables appearing in each goal. 174 175submit_goals([], _, _, _, []). 176submit_goals([H|T], I, M, Queue, [Vars|VT]) :- 177 term_variables(H, Vars), 178 thread_send_message(Queue, goal(I, M:H, Vars)), 179 I2 is I + 1, 180 submit_goals(T, I2, M, Queue, VT). 181 182 183%! concur_wait(+N, +Done:queue, +VT:compound, -Result, -Exitted) is semidet. 184% 185% Wait for completion, failure or error. 186% 187% @param Exited List of thread-ids with threads that completed before 188% all work was done. 189 190concur_wait(0, _, _, true, []) :- !. 191concur_wait(N, Done, VT, Status, Exitted) :- 192 debug(concurrent, 'Waiting: ...', []), 193 thread_get_message(Done, Exit), 194 debug(concurrent, 'Waiting: received ~p', [Exit]), 195 ( Exit = done(Id, Vars) 196 -> arg(Id, VT, Vars), 197 N2 is N - 1, 198 concur_wait(N2, Done, VT, Status, Exitted) 199 ; Exit = finished(Thread) 200 -> thread_join(Thread, JoinStatus), 201 debug(concurrent, 'Joined ~w with ~p', [Thread, JoinStatus]), 202 ( JoinStatus == true 203 -> Exitted = [Thread|Exitted2], 204 concur_wait(N, Done, VT, Status, Exitted2) 205 ; Status = JoinStatus, 206 Exitted = [Thread] 207 ) 208 ). 209 210 211create_workers(N, Queue, Done, [Id|Ids], Options) :- 212 N > 0, 213 !, 214 thread_create(worker(Queue, Done), Id, 215 [ at_exit(thread_send_message(Done, finished(Id))) 216 | Options 217 ]), 218 N2 is N - 1, 219 create_workers(N2, Queue, Done, Ids, Options). 220create_workers(_, _, _, [], _). 221 222 223%! worker(+WorkQueue, +DoneQueue) is det. 224% 225% Process jobs from WorkQueue and send the results to DoneQueue. 226 227worker(Queue, Done) :- 228 thread_get_message(Queue, Message), 229 debug(concurrent, 'Worker: received ~p', [Message]), 230 ( Message = goal(Id, Goal, Vars) 231 -> ( 232 -> thread_send_message(Done, done(Id, Vars)), 233 worker(Queue, Done) 234 ) 235 ; true 236 ). 237 238 239%! concur_cleanup(+Result, +Workers:list, +Queues:list) is det. 240% 241% Cleanup the concurrent workers and message queues. If Result is 242% not =true=, signal all workers to make them stop prematurely. If 243% result is true we assume all workers have been instructed to 244% stop or have stopped themselves. 245 246concur_cleanup(Result, Workers, Queues) :- 247 !, 248 ( Result == true 249 -> true 250 ; kill_workers(Workers) 251 ), 252 join_all(Workers), 253 maplist(message_queue_destroy, Queues). 254 255kill_workers([]). 256kill_workers([Id|T]) :- 257 debug(concurrent, 'Signalling ~w', [Id]), 258 catch(thread_signal(Id, abort), _, true), 259 kill_workers(T). 260 261join_all([]). 262join_all([Id|T]) :- 263 thread_join(Id, _), 264 join_all(T). 265 266 267 /******************************* 268 * MAPLIST * 269 *******************************/ 270 271%! concurrent_maplist(:Goal, +List). 272%! concurrent_maplist(:Goal, +List1, +List2). 273%! concurrent_maplist(:Goal, +List1, +List2, +List3). 274% 275% Concurrent version of maplist/2. This predicate uses 276% concurrent/3, using multiple _worker_ threads. The number of 277% threads is the minimum of the list length and the number of 278% cores available. The number of cores is determined using the 279% prolog flag =cpu_count=. If this flag is absent or 1 or List has 280% less than two elements, this predicate simply calls the 281% corresponding maplist/N version. 282% 283% Note that the the overhead of this predicate is considerable and 284% therefore Goal must be fairly expensive before one reaches a 285% speedup. 286 287concurrent_maplist(Goal, List) :- 288 workers(List, WorkerCount), 289 !, 290 maplist(ml_goal(Goal), List, Goals), 291 concurrent(WorkerCount, Goals, []). 292concurrent_maplist(Goal, List) :- 293 maplist(Goal, List). 294 295ml_goal(Goal, Elem, call(Goal, Elem)). 296 297concurrent_maplist(Goal, List1, List2) :- 298 same_length(List1, List2), 299 workers(List1, WorkerCount), 300 !, 301 maplist(ml_goal(Goal), List1, List2, Goals), 302 concurrent(WorkerCount, Goals, []). 303concurrent_maplist(Goal, List1, List2) :- 304 maplist(Goal, List1, List2). 305 306ml_goal(Goal, Elem1, Elem2, call(Goal, Elem1, Elem2)). 307 308concurrent_maplist(Goal, List1, List2, List3) :- 309 same_length(List1, List2, List3), 310 workers(List1, WorkerCount), 311 !, 312 maplist(ml_goal(Goal), List1, List2, List3, Goals), 313 concurrent(WorkerCount, Goals, []). 314concurrent_maplist(Goal, List1, List2, List3) :- 315 maplist(Goal, List1, List2, List3). 316 317ml_goal(Goal, Elem1, Elem2, Elem3, call(Goal, Elem1, Elem2, Elem3)). 318 319workers(List, Count) :- 320 current_prolog_flag(cpu_count, Cores), 321 Cores > 1, 322 length(List, Len), 323 Count is min(Cores,Len), 324 Count > 1, 325 !. 326 327same_length([], [], []). 328same_length([_|T1], [_|T2], [_|T3]) :- 329 same_length(T1, T2, T3). 330 331 332 /******************************* 333 * FIRST * 334 *******************************/ 335 336%! first_solution(-X, :Goals, +Options) is semidet. 337% 338% Try alternative solvers concurrently, returning the first 339% answer. In a typical scenario, solving any of the goals in Goals 340% is satisfactory for the application to continue. As soon as one 341% of the tried alternatives is successful, all the others are 342% killed and first_solution/3 succeeds. 343% 344% For example, if it is unclear whether it is better to search a 345% graph breadth-first or depth-first we can use: 346% 347% == 348% search_graph(Grap, Path) :- 349% first_solution(Path, [ breadth_first(Graph, Path), 350% depth_first(Graph, Path) 351% ], 352% []). 353% == 354% 355% Options include thread stack-sizes passed to thread_create, as 356% well as the options =on_fail= and =on_error= that specify what 357% to do if a solver fails or triggers an error. By default 358% execution of all solvers is terminated and the result is 359% returned. Sometimes one may wish to continue. One such scenario 360% is if one of the solvers may run out of resources or one of the 361% solvers is known to be incomplete. 362% 363% * on_fail(Action) 364% If =stop= (default), terminate all threads and stop with 365% the failure. If =continue=, keep waiting. 366% * on_error(Action) 367% As above, re-throwing the error if an error appears. 368% 369% @bug first_solution/3 cannot deal with non-determinism. There 370% is no obvious way to fit non-determinism into it. If multiple 371% solutions are needed wrap the solvers in findall/3. 372 373 374first_solution(X, M:List, Options) :- 375 message_queue_create(Done), 376 thread_options(Options, ThreadOptions, RestOptions), 377 length(List, JobCount), 378 create_solvers(List, M, X, Done, Solvers, ThreadOptions), 379 wait_for_one(JobCount, Done, Result, RestOptions), 380 concur_cleanup(kill, Solvers, [Done]), 381 ( Result = done(_, Var) 382 -> X = Var 383 ; Result = error(_, Error) 384 -> throw(Error) 385 ). 386 387create_solvers([], _, _, _, [], _). 388create_solvers([H|T], M, X, Done, [Id|IDs], Options) :- 389 thread_create(solve(M:H, X, Done), Id, Options), 390 create_solvers(T, M, X, Done, IDs, Options). 391 392solve(Goal, Var, Queue) :- 393 thread_self(Me), 394 ( catch(E, true) , 395 -> ( var(E) 396 -> thread_send_message(Queue, done(Me, Var)) 397 ; thread_send_message(Queue, error(Me, E)) 398 ) 399 ; thread_send_message(Queue, failed(Me)) 400 ). 401 402wait_for_one(0, _, failed, _) :- !. 403wait_for_one(JobCount, Queue, Result, Options) :- 404 thread_get_message(Queue, Msg), 405 LeftCount is JobCount - 1, 406 ( Msg = done(_, _) 407 -> Result = Msg 408 ; Msg = failed(_) 409 -> ( option(on_fail(stop), Options, stop) 410 -> Result = Msg 411 ; wait_for_one(LeftCount, Queue, Result, Options) 412 ) 413 ; Msg = error(_, _) 414 -> ( option(on_error(stop), Options, stop) 415 -> Result = Msg 416 ; wait_for_one(LeftCount, Queue, Result, Options) 417 ) 418 ). 419 420 421%! thread_options(+Options, -ThreadOptions, -RestOptions) is det. 422% 423% Split the option list over thread(-size) options and other 424% options. 425 426thread_options([], [], []). 427thread_options([H|T], [H|Th], O) :- 428 thread_option(H), 429 !, 430 thread_options(T, Th, O). 431thread_options([H|T], Th, [H|O]) :- 432 thread_options(T, Th, O). 433 434thread_option(local(_)). 435thread_option(global(_)). 436thread_option(trail(_)). 437thread_option(argument(_)). 438thread_option(stack(_)).