View source with raw comments or as raw
   1:- encoding(utf8).
   2/*  Part of SWI-Prolog
   3
   4    Author:        Torbj��rn Lager and Jan Wielemaker
   5    E-mail:        J.Wielemaker@vu.nl
   6    WWW:           http://www.swi-prolog.org
   7    Copyright (C): 2014-2016, Torbj��rn Lager,
   8                              VU University Amsterdam
   9    All rights reserved.
  10
  11    Redistribution and use in source and binary forms, with or without
  12    modification, are permitted provided that the following conditions
  13    are met:
  14
  15    1. Redistributions of source code must retain the above copyright
  16       notice, this list of conditions and the following disclaimer.
  17
  18    2. Redistributions in binary form must reproduce the above copyright
  19       notice, this list of conditions and the following disclaimer in
  20       the documentation and/or other materials provided with the
  21       distribution.
  22
  23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
  26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
  27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
  28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
  29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
  31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
  33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34    POSSIBILITY OF SUCH DAMAGE.
  35*/
  36
  37:- module(pengines,
  38          [ pengine_create/1,                   % +Options
  39            pengine_ask/3,                      % +Pengine, :Query, +Options
  40            pengine_next/2,                     % +Pengine. +Options
  41            pengine_stop/2,                     % +Pengine. +Options
  42            pengine_event/2,                    % -Event, +Options
  43            pengine_input/2,                    % +Prompt, -Term
  44            pengine_output/1,                   % +Term
  45            pengine_respond/3,                  % +Pengine, +Input, +Options
  46            pengine_debug/2,                    % +Format, +Args
  47            pengine_self/1,                     % -Pengine
  48            pengine_pull_response/2,            % +Pengine, +Options
  49            pengine_destroy/1,                  % +Pengine
  50            pengine_destroy/2,                  % +Pengine, +Options
  51            pengine_abort/1,                    % +Pengine
  52            pengine_application/1,              % +Application
  53            current_pengine_application/1,      % ?Application
  54            pengine_property/2,                 % ?Pengine, ?Property
  55            pengine_user/1,                     % -User
  56            pengine_event_loop/2,               % :Closure, +Options
  57            pengine_rpc/2,                      % +Server, :Goal
  58            pengine_rpc/3                       % +Server, :Goal, +Options
  59          ]).
  60
  61/** <module> Pengines: Web Logic Programming Made Easy
  62
  63The library(pengines) provides an  infrastructure   for  creating Prolog
  64engines in a (remote) pengine server  and accessing these engines either
  65from Prolog or JavaScript.
  66
  67@author Torbj��rn Lager and Jan Wielemaker
  68*/
  69
  70:- use_module(library(http/http_dispatch)).
  71:- use_module(library(http/http_parameters)).
  72:- use_module(library(http/http_client)).
  73:- use_module(library(http/http_json)).
  74:- use_module(library(http/http_open)).
  75:- use_module(library(http/http_stream)).
  76:- use_module(library(http/http_wrapper)).
  77:- use_module(library(http/http_cors)).
  78:- use_module(library(thread_pool)).
  79:- use_module(library(broadcast)).
  80:- use_module(library(uri)).
  81:- use_module(library(filesex)).
  82:- use_module(library(time)).
  83:- use_module(library(lists)).
  84:- use_module(library(charsio)).
  85:- use_module(library(apply)).
  86:- use_module(library(aggregate)).
  87:- use_module(library(option)).
  88:- use_module(library(settings)).
  89:- use_module(library(debug)).
  90:- use_module(library(error)).
  91:- use_module(library(sandbox)).
  92:- use_module(library(modules)).
  93:- use_module(library(term_to_json)).
  94:- if(exists_source(library(uuid))).
  95:- use_module(library(uuid)).
  96:- endif.
  97
  98
  99:- meta_predicate
 100    pengine_create(:),
 101    pengine_rpc(+, +, :),
 102    pengine_event_loop(1, +).
 103
 104:- multifile
 105    write_result/3,                 % +Format, +Event, +VarNames (deprecated)
 106    write_result/4,                 % +Format, +Event, +VarNames, +Dict
 107    event_to_json/4,                % +Event, -JSON, +Format, +VarNames
 108    prepare_module/3,               % +Module, +Application, +Options
 109    prepare_goal/3,                 % +GoalIn, -GoalOut, +Options
 110    authentication_hook/3,          % +Request, +Application, -User
 111    not_sandboxed/2.                % +User, +App
 112
 113:- predicate_options(pengine_create/1, 1,
 114                     [ id(-atom),
 115                       alias(atom),
 116                       application(atom),
 117                       destroy(boolean),
 118                       server(atom),
 119                       ask(compound),
 120                       template(compound),
 121                       chunk(integer),
 122                       src_list(list),
 123                       src_text(any),           % text
 124                       src_url(atom),
 125                       src_predicates(list)
 126                     ]).
 127:- predicate_options(pengine_ask/3, 3,
 128                     [ template(any),
 129                       chunk(integer)
 130                     ]).
 131:- predicate_options(pengine_next/2, 2,
 132                     [ chunk(integer),
 133                       pass_to(pengine_send/3, 3)
 134                     ]).
 135:- predicate_options(pengine_stop/2, 2,
 136                     [ pass_to(pengine_send/3, 3)
 137                     ]).
 138:- predicate_options(pengine_respond/3, 2,
 139                     [ pass_to(pengine_send/3, 3)
 140                     ]).
 141:- predicate_options(pengine_rpc/3, 3,
 142                     [ chunk(integer),
 143                       pass_to(pengine_create/1, 1)
 144                     ]).
 145:- predicate_options(pengine_send/3, 3,
 146                     [ delay(number)
 147                     ]).
 148:- predicate_options(pengine_event/2, 2,
 149                     [ pass_to(thread_get_message/3, 3)
 150                     ]).
 151:- predicate_options(pengine_pull_response/2, 2,
 152                     [ pass_to(http_open/3, 3)
 153                     ]).
 154:- predicate_options(pengine_event_loop/2, 2,
 155                     []).                       % not yet implemented
 156
 157% :- debug(pengine(transition)).
 158:- debug(pengine(debug)).               % handle pengine_debug in pengine_rpc/3.
 159
 160goal_expansion(random_delay, Expanded) :-
 161    (   debugging(pengine(delay))
 162    ->  Expanded = do_random_delay
 163    ;   Expanded = true
 164    ).
 165
 166do_random_delay :-
 167    Delay is random(20)/1000,
 168    sleep(Delay).
 169
 170:- meta_predicate                       % internal meta predicates
 171    solve(+, ?, 0, +),
 172    findnsols_no_empty(+, ?, 0, -),
 173    pengine_event_loop(+, 1, +).
 174
 175/**  pengine_create(:Options) is det.
 176
 177    Creates a new pengine. Valid options are:
 178
 179    * id(-ID)
 180      ID gets instantiated to the id of the created pengine.  ID is
 181      atomic.
 182
 183    * alias(+Name)
 184      The pengine is named Name (an atom). A slave pengine (child) can
 185      subsequently be referred to by this name.
 186
 187    * application(+Application)
 188      Application in which the pengine runs.  See pengine_application/1.
 189
 190    * server(+URL)
 191      The pengine will run in (and in the Prolog context of) the pengine
 192      server located at URL.
 193
 194    * src_list(+List_of_clauses)
 195      Inject a list of Prolog clauses into the pengine.
 196
 197    * src_text(+Atom_or_string)
 198      Inject the clauses specified by a source text into the pengine.
 199
 200    * src_url(+URL)
 201      Inject the clauses specified in the file located at URL into the
 202      pengine.
 203
 204    * src_predicates(+List)
 205      Send the local predicates denoted by List to the remote pengine.
 206      List is a list of predicate indicators.
 207
 208Remaining  options  are  passed  to  http_open/3  (meaningful  only  for
 209non-local pengines) and thread_create/3. Note   that for thread_create/3
 210only options changing the stack-sizes can be used. In particular, do not
 211pass the detached or alias options..
 212
 213Successful creation of a pengine will return an _event term_ of the
 214following form:
 215
 216    * create(ID, Term)
 217      ID is the id of the pengine that was created.
 218      Term is not used at the moment.
 219
 220An error will be returned if the pengine could not be created:
 221
 222    * error(ID, Term)
 223      ID is invalid, since no pengine was created.
 224      Term is the exception's error term.
 225*/
 226
 227
 228pengine_create(M:Options0) :-
 229    translate_local_sources(Options0, Options, M),
 230    (   select_option(server(BaseURL), Options, RestOptions)
 231    ->  remote_pengine_create(BaseURL, RestOptions)
 232    ;   local_pengine_create(Options)
 233    ).
 234
 235%!  translate_local_sources(+OptionsIn, -Options, +Module) is det.
 236%
 237%   Translate  the  `src_predicates`  and  `src_list`  options  into
 238%   `src_text`. We need to do that   anyway for remote pengines. For
 239%   local pengines, we could avoid  this   step,  but  there is very
 240%   little point in transferring source to a local pengine anyway as
 241%   local pengines can access any  Prolog   predicate  that you make
 242%   visible to the application.
 243%
 244%   Multiple sources are concatenated  to  end   up  with  a  single
 245%   src_text option.
 246
 247translate_local_sources(OptionsIn, Options, Module) :-
 248    translate_local_sources(OptionsIn, Sources, Options2, Module),
 249    (   Sources == []
 250    ->  Options = Options2
 251    ;   Sources = [Source]
 252    ->  Options = [src_text(Source)|Options2]
 253    ;   atomics_to_string(Sources, Source)
 254    ->  Options = [src_text(Source)|Options2]
 255    ).
 256
 257translate_local_sources([], [], [], _).
 258translate_local_sources([H0|T], [S0|S], Options, M) :-
 259    nonvar(H0),
 260    translate_local_source(H0, S0, M),
 261    !,
 262    translate_local_sources(T, S, Options, M).
 263translate_local_sources([H|T0], S, [H|T], M) :-
 264    translate_local_sources(T0, S, T, M).
 265
 266translate_local_source(src_predicates(PIs), Source, M) :-
 267    must_be(list, PIs),
 268    with_output_to(string(Source),
 269                   maplist(listing(M), PIs)).
 270translate_local_source(src_list(Terms), Source, _) :-
 271    must_be(list, Terms),
 272    with_output_to(string(Source),
 273                   forall(member(Term, Terms),
 274                          format('~k .~n', [Term]))).
 275translate_local_source(src_text(Source), Source, _).
 276
 277listing(M, PI) :-
 278    listing(M:PI).
 279
 280/**  pengine_send(+NameOrID, +Term) is det
 281
 282Same as pengine_send(NameOrID, Term, []).
 283*/
 284
 285pengine_send(Target, Event) :-
 286    pengine_send(Target, Event, []).
 287
 288
 289/**  pengine_send(+NameOrID, +Term, +Options) is det
 290
 291Succeeds immediately and  places  Term  in   the  queue  of  the pengine
 292NameOrID. Options is a list of options:
 293
 294   * delay(+Time)
 295     The actual sending is delayed by Time seconds. Time is an integer
 296     or a float.
 297
 298Any remaining options are passed to http_open/3.
 299*/
 300
 301pengine_send(Target, Event, Options) :-
 302    must_be(atom, Target),
 303    pengine_send2(Target, Event, Options).
 304
 305pengine_send2(self, Event, Options) :-
 306    !,
 307    thread_self(Queue),
 308    delay_message(queue(Queue), Event, Options).
 309pengine_send2(Name, Event, Options) :-
 310    child(Name, Target),
 311    !,
 312    delay_message(pengine(Target), Event, Options).
 313pengine_send2(Target, Event, Options) :-
 314    delay_message(pengine(Target), Event, Options).
 315
 316delay_message(Target, Event, Options) :-
 317    option(delay(Delay), Options),
 318    !,
 319    alarm(Delay,
 320          send_message(Target, Event, Options),
 321          _AlarmID,
 322          [remove(true)]).
 323delay_message(Target, Event, Options) :-
 324    random_delay,
 325    send_message(Target, Event, Options).
 326
 327send_message(queue(Queue), Event, _) :-
 328    thread_send_message(Queue, pengine_request(Event)).
 329send_message(pengine(Pengine), Event, Options) :-
 330    (   pengine_remote(Pengine, Server)
 331    ->  remote_pengine_send(Server, Pengine, Event, Options)
 332    ;   pengine_thread(Pengine, Thread)
 333    ->  thread_send_message(Thread, pengine_request(Event))
 334    ;   existence_error(pengine, Pengine)
 335    ).
 336
 337%!  pengine_request(-Request) is det.
 338%
 339%   To be used by a  pengine  to   wait  for  the next request. Such
 340%   messages are placed in the queue by pengine_send/2.
 341
 342pengine_request(Request) :-
 343    pengine_self(Self),
 344    get_pengine_application(Self, Application),
 345    setting(Application:idle_limit, IdleLimit),
 346    thread_self(Me),
 347    (   thread_get_message(Me, pengine_request(Request), [timeout(IdleLimit)])
 348    ->  true
 349    ;   Request = destroy
 350    ).
 351
 352
 353%!  pengine_reply(+Event) is det.
 354%!  pengine_reply(+Queue, +Event) is det.
 355%
 356%   Reply Event to the parent of the   current  Pengine or the given
 357%   Queue.  Such  events  are  read   by    the   other   side  with
 358%   pengine_event/1.
 359%
 360%   If the message cannot be sent within the `idle_limit` setting of
 361%   the pengine, abort the pengine.
 362
 363pengine_reply(Event) :-
 364    pengine_parent(Queue),
 365    pengine_reply(Queue, Event).
 366
 367pengine_reply(_Queue, _Event0) :-
 368    nb_current(pengine_idle_limit_exceeded, true),
 369    !.
 370pengine_reply(Queue, Event0) :-
 371    arg(1, Event0, ID),
 372    wrap_first_answer(ID, Event0, Event),
 373    random_delay,
 374    debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]),
 375    (   pengine_self(ID)
 376    ->  get_pengine_application(ID, Application),
 377        setting(Application:idle_limit, IdleLimit),
 378        debug(pengine(reply), 'Sending ~p, timout: ~q', [Event, IdleLimit]),
 379        (   thread_send_message(Queue, pengine_event(ID, Event),
 380                                [ timeout(IdleLimit)
 381                                ])
 382        ->  true
 383        ;   thread_self(Me),
 384            debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)',
 385                  [ID, Me]),
 386            nb_setval(pengine_idle_limit_exceeded, true),
 387            thread_detach(Me),
 388            abort
 389        )
 390    ;   thread_send_message(Queue, pengine_event(ID, Event))
 391    ).
 392
 393wrap_first_answer(ID, Event0, CreateEvent) :-
 394    wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]),
 395    arg(1, CreateEvent, ID),
 396    !,
 397    retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])).
 398wrap_first_answer(_ID, Event, Event).
 399
 400
 401empty_queue :-
 402    pengine_parent(Queue),
 403    empty_queue(Queue, 0, Discarded),
 404    debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]).
 405
 406empty_queue(Queue, C0, C) :-
 407    thread_get_message(Queue, _Term, [timeout(0)]),
 408    !,
 409    C1 is C0+1,
 410    empty_queue(Queue, C1, C).
 411empty_queue(_, C, C).
 412
 413
 414/** pengine_ask(+NameOrID, @Query, +Options) is det
 415
 416Asks pengine NameOrID a query Query.
 417
 418Options is a list of options:
 419
 420    * template(+Template)
 421      Template is a variable (or a term containing variables) shared
 422      with the query. By default, the template is identical to the
 423      query.
 424
 425    * chunk(+Integer)
 426      Retrieve solutions in chunks of Integer rather than one by one. 1
 427      means no chunking (default). Other integers indicate the maximum
 428      number of solutions to retrieve in one chunk.
 429
 430Any remaining options are passed to pengine_send/3.
 431
 432Note that the predicate pengine_ask/3 is deterministic, even for queries
 433that have more than one solution. Also,  the variables in Query will not
 434be bound. Instead, results will  be  returned   in  the  form  of _event
 435terms_.
 436
 437    * success(ID, Terms, More)
 438      ID is the id of the pengine that succeeded in solving the query.
 439      Terms is a list holding instantiations of `Template`. More is
 440      either `true` or `false`, indicating whether we can expect the
 441      pengine to be able to return more solutions or not, would we call
 442      pengine_next/2.
 443
 444    * failure(ID)
 445      ID is the id of the pengine that failed for lack of a solutions.
 446
 447    * error(ID, Term)
 448      ID is the id of the pengine throwing the exception.
 449      Term is the exception's error term.
 450
 451    * output(ID, Term)
 452      ID is the id of a pengine running the query that called
 453      pengine_output/1. Term is the term that was passed in the first
 454      argument of pengine_output/1 when it was called.
 455
 456    * prompt(ID, Term)
 457      ID is the id of the pengine that called pengine_input/2 and Term is
 458      the prompt.
 459
 460Defined in terms of pengine_send/3, like so:
 461
 462==
 463pengine_ask(ID, Query, Options) :-
 464    partition(pengine_ask_option, Options, AskOptions, SendOptions),
 465    pengine_send(ID, ask(Query, AskOptions), SendOptions).
 466==
 467*/
 468
 469pengine_ask(ID, Query, Options) :-
 470    partition(pengine_ask_option, Options, AskOptions, SendOptions),
 471    pengine_send(ID, ask(Query, AskOptions), SendOptions).
 472
 473
 474pengine_ask_option(template(_)).
 475pengine_ask_option(chunk(_)).
 476pengine_ask_option(breakpoints(_)).
 477
 478
 479/** pengine_next(+NameOrID, +Options) is det
 480
 481Asks pengine NameOrID for the  next  solution   to  a  query  started by
 482pengine_ask/3. Defined options are:
 483
 484    * chunk(+Count)
 485    Modify the chunk-size to Count before asking the next set of
 486    solutions.
 487
 488Remaining  options  are  passed  to    pengine_send/3.   The  result  of
 489re-executing the current goal is returned  to the caller's message queue
 490in the form of _event terms_.
 491
 492    * success(ID, Terms, More)
 493      ID is the id of the pengine that succeeded in finding yet another
 494      solution to the query. Terms is a list holding instantiations of
 495      `Template`. More is either `true` or `false`, indicating whether
 496      we can expect the pengine to be able to return more solutions or
 497      not, would we call pengine_next/2.
 498
 499    * failure(ID)
 500      ID is the id of the pengine that failed for lack of more solutions.
 501
 502    * error(ID, Term)
 503      ID is the id of the pengine throwing the exception.
 504      Term is the exception's error term.
 505
 506    * output(ID, Term)
 507      ID is the id of a pengine running the query that called
 508      pengine_output/1. Term is the term that was passed in the first
 509      argument of pengine_output/1 when it was called.
 510
 511    * prompt(ID, Term)
 512      ID is the id of the pengine that called pengine_input/2 and Term
 513      is the prompt.
 514
 515Defined in terms of pengine_send/3, as follows:
 516
 517==
 518pengine_next(ID, Options) :-
 519    pengine_send(ID, next, Options).
 520==
 521
 522*/
 523
 524pengine_next(ID, Options) :-
 525    select_option(chunk(Count), Options, Options1),
 526    !,
 527    pengine_send(ID, next(Count), Options1).
 528pengine_next(ID, Options) :-
 529    pengine_send(ID, next, Options).
 530
 531
 532/** pengine_stop(+NameOrID, +Options) is det
 533
 534Tells pengine NameOrID to stop looking  for   more  solutions to a query
 535started by pengine_ask/3. Options are passed to pengine_send/3.
 536
 537Defined in terms of pengine_send/3, like so:
 538
 539==
 540pengine_stop(ID, Options) :-
 541    pengine_send(ID, stop, Options).
 542==
 543*/
 544
 545pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
 546
 547
 548/** pengine_abort(+NameOrID) is det
 549
 550Aborts the running query. The pengine goes   back  to state `2', waiting
 551for new queries.
 552
 553@see pengine_destroy/1.
 554*/
 555
 556pengine_abort(Name) :-
 557    (   child(Name, Pengine)
 558    ->  true
 559    ;   Pengine = Name
 560    ),
 561    (   pengine_remote(Pengine, Server)
 562    ->  remote_pengine_abort(Server, Pengine, [])
 563    ;   pengine_thread(Pengine, Thread),
 564        debug(pengine(abort), 'Signalling thread ~p', [Thread]),
 565        catch(thread_signal(Thread, throw(abort_query)), _, true)
 566    ).
 567
 568
 569/** pengine_destroy(+NameOrID) is det.
 570    pengine_destroy(+NameOrID, +Options) is det.
 571
 572Destroys the pengine NameOrID.  With the option force(true), the pengine
 573is killed using abort/0 and pengine_destroy/2 succeeds.
 574*/
 575
 576pengine_destroy(ID) :-
 577    pengine_destroy(ID, []).
 578
 579pengine_destroy(Name, Options) :-
 580    (   child(Name, ID)
 581    ->  true
 582    ;   ID = Name
 583    ),
 584    option(force(true), Options),
 585    !,
 586    (   pengine_thread(ID, Thread)
 587    ->  catch(thread_signal(Thread, abort),
 588              error(existence_error(thread, _), _), true)
 589    ;   true
 590    ).
 591pengine_destroy(ID, _) :-
 592    catch(pengine_send(ID, destroy),
 593          error(existence_error(pengine, ID), _),
 594          retractall(child(_,ID))).
 595
 596
 597/*================= pengines administration =======================
 598*/
 599
 600%!  current_pengine(?Id, ?Parent, ?Location)
 601%
 602%   Dynamic predicate that registers our known pengines.  Id is
 603%   an atomic unique datatype.  Parent is the id of our parent
 604%   pengine.  Location is one of
 605%
 606%     - thread(ThreadId)
 607%     - remote(URL)
 608
 609:- dynamic
 610    current_pengine/6,              % Id, ParentId, Thread, URL, App, Destroy
 611    pengine_queue/4,                % Id, Queue, TimeOut, Time
 612    output_queue/3,                 % Id, Queue, Time
 613    pengine_user/2,                 % Id, User
 614    pengine_data/2.                 % Id, Data
 615:- volatile
 616    current_pengine/6,
 617    pengine_queue/4,
 618    output_queue/3,
 619    pengine_user/2,
 620    pengine_data/2.
 621
 622:- thread_local
 623    child/2.                        % ?Name, ?Child
 624
 625%!  pengine_register_local(+Id, +Thread, +Queue, +URL, +App, +Destroy) is det.
 626%!  pengine_register_remote(+Id, +URL, +Queue, +App, +Destroy) is det.
 627%!  pengine_unregister(+Id) is det.
 628
 629pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :-
 630    asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)).
 631
 632pengine_register_remote(Id, URL, Application, Destroy) :-
 633    thread_self(Queue),
 634    asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
 635
 636%!  pengine_unregister(+Id)
 637%
 638%   Called by the pengine thread  destruction.   If  we are a remote
 639%   pengine thread, our URL  equals  =http=   and  the  queue is the
 640%   message queue used to send events to the HTTP workers.
 641
 642pengine_unregister(Id) :-
 643    thread_self(Me),
 644    (   current_pengine(Id, Queue, Me, http, _, _)
 645    ->  with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue))
 646    ;   true
 647    ),
 648    retractall(current_pengine(Id, _, Me, _, _, _)),
 649    retractall(pengine_user(Id, _)),
 650    retractall(pengine_data(Id, _)).
 651
 652pengine_unregister_remote(Id) :-
 653    retractall(current_pengine(Id, _Parent, 0, _, _, _)).
 654
 655pengine_self(Id) :-
 656    thread_self(Thread),
 657    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy).
 658
 659pengine_parent(Parent) :-
 660    nb_getval(pengine_parent, Parent).
 661
 662pengine_thread(Pengine, Thread) :-
 663    current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy),
 664    Thread \== 0,
 665    !.
 666
 667pengine_remote(Pengine, URL) :-
 668    current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy).
 669
 670get_pengine_application(Pengine, Application) :-
 671    current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy),
 672    !.
 673
 674get_pengine_module(Pengine, Pengine).
 675
 676:- if(current_predicate(uuid/2)).
 677pengine_uuid(Id) :-
 678    uuid(Id, [version(4)]).             % Version 4 is random.
 679:- else.
 680:- use_module(library(random)).
 681pengine_uuid(Id) :-
 682    Max is 1<<128,
 683    random_between(0, Max, Num),
 684    atom_number(Id, Num).
 685:- endif.
 686
 687/** pengine_application(+Application) is det.
 688
 689Directive that must be used to declare  a pengine application
 690module. The module may not  be  associated   to  any  file.  The default
 691application is =pengine_sandbox=.  The  example   below  creates  a  new
 692application =address_book= and imports the  API   defined  in the module
 693file =adress_book_api.pl= into the application.
 694
 695  ==
 696  :- pengine_application(address_book).
 697  :- use_module(address_book:adress_book_api).
 698  ==
 699*/
 700
 701pengine_application(Application) :-
 702    throw(error(context_error(nodirective,
 703                             pengine_application(Application)), _)).
 704
 705:- multifile
 706    system:term_expansion/2,
 707    current_application/1.
 708
 709%!  current_pengine_application(?Application) is nondet.
 710%
 711%   True when Application is a currently defined application.
 712%
 713%   @see pengine_application/1
 714
 715current_pengine_application(Application) :-
 716    current_application(Application).
 717
 718
 719% Default settings for all applications
 720
 721:- setting(thread_pool_size, integer, 100,
 722           'Maximum number of pengines this application can run.').
 723:- setting(thread_pool_stacks, list(compound), [],
 724           'Maximum stack sizes for pengines this application can run.').
 725:- setting(slave_limit, integer, 3,
 726           'Maximum number of slave pengines a master pengine can create.').
 727:- setting(time_limit, number, 300,
 728           'Maximum time to wait for output').
 729:- setting(idle_limit, number, 300,
 730           'Pengine auto-destroys when idle for this time').
 731:- setting(safe_goal_limit, number, 10,
 732           'Maximum time to try proving safity of the goal').
 733:- setting(program_space, integer, 100_000_000,
 734           'Maximum memory used by predicates').
 735:- setting(allow_from, list(atom), [*],
 736           'IP addresses from which remotes are allowed to connect').
 737:- setting(deny_from, list(atom), [],
 738           'IP addresses from which remotes are NOT allowed to connect').
 739:- setting(debug_info, boolean, false,
 740           'Keep information to support source-level debugging').
 741
 742
 743system:term_expansion((:- pengine_application(Application)), Expanded) :-
 744    must_be(atom, Application),
 745    (   module_property(Application, file(_))
 746    ->  permission_error(create, pengine_application, Application)
 747    ;   true
 748    ),
 749    expand_term((:- setting(Application:thread_pool_size, integer,
 750                            setting(pengines:thread_pool_size),
 751                            'Maximum number of pengines this \c
 752                            application can run.')),
 753                ThreadPoolSizeSetting),
 754    expand_term((:- setting(Application:thread_pool_stacks, list(compound),
 755                            setting(pengines:thread_pool_stacks),
 756                            'Maximum stack sizes for pengines \c
 757                            this application can run.')),
 758                ThreadPoolStacksSetting),
 759    expand_term((:- setting(Application:slave_limit, integer,
 760                            setting(pengines:slave_limit),
 761                            'Maximum number of local slave pengines \c
 762                            a master pengine can create.')),
 763                SlaveLimitSetting),
 764    expand_term((:- setting(Application:time_limit, number,
 765                            setting(pengines:time_limit),
 766                            'Maximum time to wait for output')),
 767                TimeLimitSetting),
 768    expand_term((:- setting(Application:idle_limit, number,
 769                            setting(pengines:idle_limit),
 770                            'Pengine auto-destroys when idle for this time')),
 771                IdleLimitSetting),
 772    expand_term((:- setting(Application:safe_goal_limit, number,
 773                            setting(pengines:safe_goal_limit),
 774                            'Maximum time to try proving safity of the goal')),
 775                SafeGoalLimitSetting),
 776    expand_term((:- setting(Application:program_space, integer,
 777                            setting(pengines:program_space),
 778                            'Maximum memory used by predicates')),
 779                ProgramSpaceSetting),
 780    expand_term((:- setting(Application:allow_from, list(atom),
 781                            setting(pengines:allow_from),
 782                            'IP addresses from which remotes are allowed \c
 783                            to connect')),
 784                AllowFromSetting),
 785    expand_term((:- setting(Application:deny_from, list(atom),
 786                            setting(pengines:deny_from),
 787                            'IP addresses from which remotes are NOT \c
 788                            allowed to connect')),
 789                DenyFromSetting),
 790    expand_term((:- setting(Application:debug_info, boolean,
 791                            setting(pengines:debug_info),
 792                            'Keep information to support source-level \c
 793                            debugging')),
 794                DebugInfoSetting),
 795    flatten([ pengines:current_application(Application),
 796              ThreadPoolSizeSetting,
 797              ThreadPoolStacksSetting,
 798              SlaveLimitSetting,
 799              TimeLimitSetting,
 800              IdleLimitSetting,
 801              SafeGoalLimitSetting,
 802              ProgramSpaceSetting,
 803              AllowFromSetting,
 804              DenyFromSetting,
 805              DebugInfoSetting
 806            ], Expanded).
 807
 808% Register default application
 809
 810:- pengine_application(pengine_sandbox).
 811
 812
 813/** pengine_property(?Pengine, ?Property) is nondet.
 814
 815True when Property is a property of   the  given Pengine. Enumerates all
 816pengines  that  are  known  to  the   calling  Prolog  process.  Defined
 817properties are:
 818
 819  * self(ID)
 820    Identifier of the pengine.  This is the same as the first argument,
 821    and can be used to enumerate all known pengines.
 822  * alias(Name)
 823    Name is the alias name of the pengine, as provided through the
 824    `alias` option when creating the pengine.
 825  * thread(Thread)
 826    If the pengine is a local pengine, Thread is the Prolog thread
 827    identifier of the pengine.
 828  * remote(Server)
 829    If the pengine is remote, the URL of the server.
 830  * application(Application)
 831    Pengine runs the given application
 832  * module(Module)
 833    Temporary module used for running the Pengine.
 834  * destroy(Destroy)
 835    Destroy is =true= if the pengines is destroyed automatically
 836    after completing the query.
 837  * parent(Queue)
 838    Message queue to which the (local) pengine reports.
 839  * source(?SourceID, ?Source)
 840    Source is the source code with the given SourceID. May be present if
 841    the setting `debug_info` is present.
 842*/
 843
 844
 845pengine_property(Id, Prop) :-
 846    nonvar(Id), nonvar(Prop),
 847    pengine_property2(Id, Prop),
 848    !.
 849pengine_property(Id, Prop) :-
 850    pengine_property2(Id, Prop).
 851
 852pengine_property2(Id, self(Id)) :-
 853    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
 854pengine_property2(Id, module(Id)) :-
 855    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
 856pengine_property2(Id, alias(Alias)) :-
 857    child(Alias, Id),
 858    Alias \== Id.
 859pengine_property2(Id, thread(Thread)) :-
 860    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy),
 861    Thread \== 0.
 862pengine_property2(Id, remote(Server)) :-
 863    current_pengine(Id, _Parent, 0, Server, _Application, _Destroy).
 864pengine_property2(Id, application(Application)) :-
 865    current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy).
 866pengine_property2(Id, destroy(Destroy)) :-
 867    current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy).
 868pengine_property2(Id, parent(Parent)) :-
 869    current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy).
 870pengine_property2(Id, source(SourceID, Source)) :-
 871    pengine_data(Id, source(SourceID, Source)).
 872
 873/** pengine_output(+Term) is det
 874
 875Sends Term to the parent pengine or thread.
 876*/
 877
 878pengine_output(Term) :-
 879    pengine_self(Me),
 880    pengine_reply(output(Me, Term)).
 881
 882
 883/** pengine_debug(+Format, +Args) is det
 884
 885Create a message using format/3 from Format   and  Args and send this to
 886the    client.    The    default    JavaScript    client    will    call
 887=|console.log(Message)|=  if  there  is   a    console.   The  predicate
 888pengine_rpc/3 calls debug(pengine(debug), '~w',   [Message]).  The debug
 889topic pengine(debug) is enabled by default.
 890
 891@see debug/1 and nodebug/1 for controlling the pengine(debug) topic
 892@see format/2 for format specifications
 893*/
 894
 895pengine_debug(Format, Args) :-
 896    pengine_parent(Queue),
 897    pengine_self(Self),
 898    catch(safe_goal(format(atom(_), Format, Args)), E, true),
 899    (   var(E)
 900    ->  format(atom(Message), Format, Args)
 901    ;   message_to_string(E, Message)
 902    ),
 903    pengine_reply(Queue, debug(Self, Message)).
 904
 905
 906/*================= Local pengine =======================
 907*/
 908
 909%!  local_pengine_create(+Options)
 910%
 911%   Creates  a  local   Pengine,   which    is   a   thread  running
 912%   pengine_main/2.  It maintains two predicates:
 913%
 914%     - The global dynamic predicate id/2 relates Pengines to their
 915%       childs.
 916%     - The local predicate id/2 maps named childs to their ids.
 917
 918local_pengine_create(Options) :-
 919    thread_self(Self),
 920    option(application(Application), Options, pengine_sandbox),
 921    create(Self, Child, Options, local, Application),
 922    option(alias(Name), Options, Child),
 923    assert(child(Name, Child)).
 924
 925
 926%!  thread_pool:create_pool(+Application) is det.
 927%
 928%   On demand creation of a thread pool for a pengine application.
 929
 930thread_pool:create_pool(Application) :-
 931    current_application(Application),
 932    setting(Application:thread_pool_size, Size),
 933    setting(Application:thread_pool_stacks, Stacks),
 934    thread_pool_create(Application, Size, Stacks).
 935
 936%!  create(+Queue, -Child, +Options, +URL, +Application) is det.
 937%
 938%   Create a new pengine thread.
 939%
 940%   @arg Queue is the queue (or thread handle) to report to
 941%   @arg Child is the identifier of the created pengine.
 942%   @arg URL is one of =local= or =http=
 943
 944create(Queue, Child, Options, local, Application) :-
 945    !,
 946    pengine_child_id(Child),
 947    create0(Queue, Child, Options, local, Application).
 948create(Queue, Child, Options, URL, Application) :-
 949    pengine_child_id(Child),
 950    catch(create0(Queue, Child, Options, URL, Application),
 951          Error,
 952          create_error(Queue, Child, Error)).
 953
 954pengine_child_id(Child) :-
 955    (   nonvar(Child)
 956    ->  true
 957    ;   pengine_uuid(Child)
 958    ).
 959
 960create_error(Queue, Child, Error) :-
 961    pengine_reply(Queue, error(Child, Error)).
 962
 963create0(Queue, Child, Options, URL, Application) :-
 964    (  current_application(Application)
 965    -> true
 966    ;  existence_error(pengine_application, Application)
 967    ),
 968    (   URL \== http                    % pengine is _not_ a child of the
 969                                        % HTTP server thread
 970    ->  aggregate_all(count, child(_,_), Count),
 971        setting(Application:slave_limit, Max),
 972        (   Count >= Max
 973        ->  throw(error(resource_error(max_pengines), _))
 974        ;   true
 975        )
 976    ;   true
 977    ),
 978    partition(pengine_create_option, Options, PengineOptions, RestOptions),
 979    thread_create_in_pool(
 980        Application,
 981        pengine_main(Queue, PengineOptions, Application), ChildThread,
 982        [ at_exit(pengine_done)
 983        | RestOptions
 984        ]),
 985    option(destroy(Destroy), PengineOptions, true),
 986    pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy),
 987    thread_send_message(ChildThread, pengine_registered(Child)),
 988    (   option(id(Id), Options)
 989    ->  Id = Child
 990    ;   true
 991    ).
 992
 993pengine_create_option(src_text(_)).
 994pengine_create_option(src_url(_)).
 995pengine_create_option(application(_)).
 996pengine_create_option(destroy(_)).
 997pengine_create_option(ask(_)).
 998pengine_create_option(template(_)).
 999pengine_create_option(chunk(_)).
1000pengine_create_option(alias(_)).
1001pengine_create_option(user(_)).
1002
1003
1004%!  pengine_done is det.
1005%
1006%   Called  from  the  pengine  thread  =at_exit=  option.  Destroys
1007%   _child_ pengines using pengine_destroy/1.
1008
1009:- public
1010    pengine_done/0.
1011
1012pengine_done :-
1013    thread_self(Me),
1014    (   thread_property(Me, status(exception('$aborted')))
1015    ->  pengine_self(Pengine),
1016        pengine_reply(destroy(Pengine, abort(Pengine))),
1017        thread_detach(Me)
1018    ;   true
1019    ),
1020    forall(child(_Name, Child),
1021           pengine_destroy(Child)),
1022    pengine_self(Id),
1023    pengine_unregister(Id).
1024
1025
1026%!  pengine_main(+Parent, +Options, +Application)
1027%
1028%   Run a pengine main loop. First acknowledges its creation and run
1029%   pengine_main_loop/1.
1030
1031:- thread_local wrap_first_answer_in_create_event/2.
1032
1033:- meta_predicate
1034    pengine_prepare_source(:, +).
1035
1036pengine_main(Parent, Options, Application) :-
1037    fix_streams,
1038    thread_get_message(pengine_registered(Self)),
1039    nb_setval(pengine_parent, Parent),
1040    pengine_register_user(Options),
1041    catch(in_temporary_module(
1042              Self,
1043              pengine_prepare_source(Application, Options),
1044              pengine_create_and_loop(Self, Application, Options)),
1045          prepare_source_failed,
1046          pengine_terminate(Self)).
1047
1048pengine_create_and_loop(Self, Application, Options) :-
1049    setting(Application:slave_limit, SlaveLimit),
1050    CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]),
1051    (   option(ask(Query), Options)
1052    ->  asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)),
1053        option(template(Template), Options, Query),
1054        option(chunk(Chunk), Options, 1),
1055        pengine_ask(Self, Query, [template(Template), chunk(Chunk)])
1056    ;   Extra = [],
1057        pengine_reply(CreateEvent)
1058    ),
1059    pengine_main_loop(Self).
1060
1061
1062%!  fix_streams is det.
1063%
1064%   If we are a pengine that is   created  from a web server thread,
1065%   the current output points to a CGI stream.
1066
1067fix_streams :-
1068    fix_stream(current_output).
1069
1070fix_stream(Name) :-
1071    is_cgi_stream(Name),
1072    !,
1073    debug(pengine(stream), '~w is a CGI stream!', [Name]),
1074    set_stream(user_output, alias(Name)).
1075fix_stream(_).
1076
1077%!  pengine_prepare_source(:Application, +Options) is det.
1078%
1079%   Load the source into the pengine's module.
1080%
1081%   @throws =prepare_source_failed= if it failed to prepare the
1082%           sources.
1083
1084pengine_prepare_source(Module:Application, Options) :-
1085    setting(Application:program_space, SpaceLimit),
1086    set_module(Module:program_space(SpaceLimit)),
1087    delete_import_module(Module, user),
1088    add_import_module(Module, Application, start),
1089    catch(prep_module(Module, Application, Options), Error, true),
1090    (   var(Error)
1091    ->  true
1092    ;   send_error(Error),
1093        throw(prepare_source_failed)
1094    ).
1095
1096prep_module(Module, Application, Options) :-
1097    maplist(copy_flag(Module, Application), [var_prefix]),
1098    forall(prepare_module(Module, Application, Options), true),
1099    setup_call_cleanup(
1100        '$set_source_module'(OldModule, Module),
1101        maplist(process_create_option(Module), Options),
1102        '$set_source_module'(OldModule)).
1103
1104copy_flag(Module, Application, Flag) :-
1105    current_prolog_flag(Application:Flag, Value),
1106    !,
1107    set_prolog_flag(Module:Flag, Value).
1108copy_flag(_, _, _).
1109
1110process_create_option(Application, src_text(Text)) :-
1111    !,
1112    pengine_src_text(Text, Application).
1113process_create_option(Application, src_url(URL)) :-
1114    !,
1115    pengine_src_url(URL, Application).
1116process_create_option(_, _).
1117
1118
1119%!  prepare_module(+Module, +Application, +Options) is semidet.
1120%
1121%   Hook, called to initialize  the   temporary  private module that
1122%   provides the working context of a pengine. This hook is executed
1123%   by the pengine's thread.  Preparing the source consists of three
1124%   steps:
1125%
1126%     1. Add Application as (first) default import module for Module
1127%     2. Call this hook
1128%     3. Compile the source provided by the the `src_text` and
1129%        `src_url` options
1130%
1131%   @arg    Module is a new temporary module (see
1132%           in_temporary_module/3) that may be (further) prepared
1133%           by this hook.
1134%   @arg    Application (also a module) associated to the pengine.
1135%   @arg    Options is passed from the environment and should
1136%           (currently) be ignored.
1137
1138
1139pengine_main_loop(ID) :-
1140    catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)).
1141
1142pengine_aborted(ID) :-
1143    thread_self(Self),
1144    debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]),
1145    empty_queue,
1146    destroy_or_continue(abort(ID)).
1147
1148
1149%!  guarded_main_loop(+Pengine) is det.
1150%
1151%   Executes state `2' of  the  pengine,   where  it  waits  for two
1152%   events:
1153%
1154%     - destroy
1155%     Terminate the pengine
1156%     - ask(:Goal, +Options)
1157%     Solve Goal.
1158
1159guarded_main_loop(ID) :-
1160    pengine_request(Request),
1161    (   Request = destroy
1162    ->  debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]),
1163        pengine_terminate(ID)
1164    ;   Request = ask(Goal, Options)
1165    ->  debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]),
1166        ask(ID, Goal, Options)
1167    ;   debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]),
1168        pengine_reply(error(ID, error(protocol_error, _))),
1169        guarded_main_loop(ID)
1170    ).
1171
1172
1173pengine_terminate(ID) :-
1174    pengine_reply(destroy(ID)),
1175    thread_self(Me),            % Make the thread silently disappear
1176    thread_detach(Me).
1177
1178
1179%!  solve(+Chunk, +Template, :Goal, +ID) is det.
1180%
1181%   Solve Goal. Note that because we can ask for a new goal in state
1182%   `6', we must provide for an ancesteral cut (prolog_cut_to/1). We
1183%   need to be sure to  have  a   choice  point  before  we can call
1184%   prolog_current_choice/1. This is the reason   why this predicate
1185%   has two clauses.
1186
1187solve(Chunk, Template, Goal, ID) :-
1188    prolog_current_choice(Choice),
1189    State = count(Chunk),
1190    statistics(cputime, Epoch),
1191    Time = time(Epoch),
1192    (   call_cleanup(catch(findnsols_no_empty(State, Template, Goal, Result),
1193                           Error, true),
1194                     Det = true),
1195        arg(1, Time, T0),
1196        statistics(cputime, T1),
1197        CPUTime is T1-T0,
1198        (   var(Error)
1199        ->  (   var(Det)
1200            ->  pengine_reply(success(ID, Result, CPUTime, true)),
1201                more_solutions(ID, Choice, State, Time)
1202            ;   !,                      % commit
1203                destroy_or_continue(success(ID, Result, CPUTime, false))
1204            )
1205        ;   !,                          % commit
1206            (   Error == abort_query
1207            ->  throw(Error)
1208            ;   destroy_or_continue(error(ID, Error))
1209            )
1210        )
1211    ;   !,                              % commit
1212        arg(1, Time, T0),
1213        statistics(cputime, T1),
1214        CPUTime is T1-T0,
1215        destroy_or_continue(failure(ID, CPUTime))
1216    ).
1217solve(_, _, _, _).                      % leave a choice point
1218
1219findnsols_no_empty(N, Template, Goal, List) :-
1220    findnsols(N, Template, Goal, List),
1221    List \== [].
1222
1223destroy_or_continue(Event) :-
1224    arg(1, Event, ID),
1225    (   pengine_property(ID, destroy(true))
1226    ->  thread_self(Me),
1227        thread_detach(Me),
1228        pengine_reply(destroy(ID, Event))
1229    ;   pengine_reply(Event),
1230        garbage_collect,                % minimise our footprint
1231        trim_stacks,
1232        guarded_main_loop(ID)
1233    ).
1234
1235%!  more_solutions(+Pengine, +Choice, +State, +Time)
1236%
1237%   Called after a solution was found while  there can be more. This
1238%   is state `6' of the state machine. It processes these events:
1239%
1240%     * stop
1241%     Go back via state `7' to state `2' (guarded_main_loop/1)
1242%     * next
1243%     Fail.  This causes solve/3 to backtrack on the goal asked,
1244%     providing at most the current `chunk` solutions.
1245%     * next(Count)
1246%     As `next`, but sets the new chunk-size to Count.
1247%     * ask(Goal, Options)
1248%     Ask another goal.  Note that we must commit the choice point
1249%     of the previous goal asked for.
1250
1251more_solutions(ID, Choice, State, Time) :-
1252    pengine_request(Event),
1253    more_solutions(Event, ID, Choice, State, Time).
1254
1255more_solutions(stop, ID, _Choice, _State, _Time) :-
1256    !,
1257    debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]),
1258    destroy_or_continue(stop(ID)).
1259more_solutions(next, ID, _Choice, _State, Time) :-
1260    !,
1261    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]),
1262    statistics(cputime, T0),
1263    nb_setarg(1, Time, T0),
1264    fail.
1265more_solutions(next(Count), ID, _Choice, State, Time) :-
1266    Count > 0,
1267    !,
1268    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]),
1269    nb_setarg(1, State, Count),
1270    statistics(cputime, T0),
1271    nb_setarg(1, Time, T0),
1272    fail.
1273more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :-
1274    !,
1275    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]),
1276    prolog_cut_to(Choice),
1277    ask(ID, Goal, Options).
1278more_solutions(destroy, ID, _Choice, _State, _Time) :-
1279    !,
1280    debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]),
1281    pengine_terminate(ID).
1282more_solutions(Event, ID, Choice, State, Time) :-
1283    debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]),
1284    pengine_reply(error(ID, error(protocol_error, _))),
1285    more_solutions(ID, Choice, State, Time).
1286
1287%!  ask(+Pengine, :Goal, +Options)
1288%
1289%   Migrate from state `2' to `3'.  This predicate validates that it
1290%   is safe to call Goal using safe_goal/1 and then calls solve/3 to
1291%   prove the goal. It takes care of the chunk(N) option.
1292
1293ask(ID, Goal, Options) :-
1294    catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
1295    !,
1296    (   var(Error)
1297    ->  option(template(Template), Options, Goal),
1298        option(chunk(N), Options, 1),
1299        solve(N, Template, Goal1, ID)
1300    ;   pengine_reply(error(ID, Error)),
1301        guarded_main_loop(ID)
1302    ).
1303
1304%!  prepare_goal(+Pengine, +GoalIn, -GoalOut, +Options) is det.
1305%
1306%   Prepare GoalIn for execution in Pengine.   This  implies we must
1307%   perform goal expansion and, if the   system  is sandboxed, check
1308%   the sandbox.
1309%
1310%   Note that expand_goal(Module:GoalIn, GoalOut) is  what we'd like
1311%   to write, but this does not work correctly if the user wishes to
1312%   expand `X:Y` while interpreting `X` not   as the module in which
1313%   to run `Y`. This happens in the  CQL package. Possibly we should
1314%   disallow this reinterpretation?
1315
1316prepare_goal(ID, Goal0, Module:Goal, Options) :-
1317    (   prepare_goal(Goal0, Goal1, Options)
1318    ->  true
1319    ;   Goal1 = Goal0
1320    ),
1321    get_pengine_module(ID, Module),
1322    setup_call_cleanup(
1323        '$set_source_module'(Old, Module),
1324        expand_goal(Goal1, Goal),
1325        '$set_source_module'(_, Old)),
1326    (   pengine_not_sandboxed(ID)
1327    ->  true
1328    ;   get_pengine_application(ID, App),
1329        setting(App:safe_goal_limit, Limit),
1330        catch(call_with_time_limit(
1331                  Limit,
1332                  safe_goal(Module:Goal)), E, true)
1333    ->  (   var(E)
1334        ->  true
1335        ;   E = time_limit_exceeded
1336        ->  throw(error(sandbox(time_limit_exceeded, Limit),_))
1337        ;   throw(E)
1338        )
1339    ).
1340
1341%%  prepare_goal(+Goal0, -Goal1, +Options) is semidet.
1342%
1343%   Pre-preparation hook for running Goal0. The hook runs in the context
1344%   of the pengine. Goal is the raw   goal  given to _ask_. The returned
1345%   Goal1 is subject  to  goal   expansion  (expand_goal/2)  and sandbox
1346%   validation (safe_goal/1) prior to  execution.   If  this goal fails,
1347%   Goal0 is used for further processing.
1348%
1349%   @arg Options provides the options as given to _ask_
1350
1351
1352%%  pengine_not_sandboxed(+Pengine) is semidet.
1353%
1354%   True when pengine does not operate in sandboxed mode. This implies a
1355%   user must be  registered  by   authentication_hook/3  and  the  hook
1356%   pengines:not_sandboxed(User, Application) must succeed.
1357
1358pengine_not_sandboxed(ID) :-
1359    pengine_user(ID, User),
1360    pengine_property(ID, application(App)),
1361    not_sandboxed(User, App),
1362    !.
1363
1364%%  not_sandboxed(+User, +Application) is semidet.
1365%
1366%   This hook is called to see whether the Pengine must be executed in a
1367%   protected environment. It is only called after authentication_hook/3
1368%   has confirmed the authentity  of  the   current  user.  If this hook
1369%   succeeds, both loading the code and  executing the query is executed
1370%   without enforcing sandbox security.  Typically, one should:
1371%
1372%     1. Provide a safe user authentication hook.
1373%     2. Enable HTTPS in the server or put it behind an HTTPS proxy and
1374%        ensure that the network between the proxy and the pengine
1375%        server can be trusted.
1376
1377
1378/** pengine_pull_response(+Pengine, +Options) is det
1379
1380Pulls a response (an event term) from the  slave Pengine if Pengine is a
1381remote process, else does nothing at all.
1382*/
1383
1384pengine_pull_response(Pengine, Options) :-
1385    pengine_remote(Pengine, Server),
1386    !,
1387    remote_pengine_pull_response(Server, Pengine, Options).
1388pengine_pull_response(_ID, _Options).
1389
1390
1391/** pengine_input(+Prompt, -Term) is det
1392
1393Sends Prompt to the parent pengine and waits for input. Note that Prompt may be
1394any term, compound as well as atomic.
1395*/
1396
1397pengine_input(Prompt, Term) :-
1398    pengine_self(Self),
1399    pengine_parent(Parent),
1400    pengine_reply(Parent, prompt(Self, Prompt)),
1401    pengine_request(Request),
1402    (   Request = input(Input)
1403    ->  Term = Input
1404    ;   Request == destroy
1405    ->  abort
1406    ;   throw(error(protocol_error,_))
1407    ).
1408
1409
1410/** pengine_respond(+Pengine, +Input, +Options) is det
1411
1412Sends a response in the form of the term Input to a slave pengine
1413that has prompted its master for input.
1414
1415Defined in terms of pengine_send/3, as follows:
1416
1417==
1418pengine_respond(Pengine, Input, Options) :-
1419    pengine_send(Pengine, input(Input), Options).
1420==
1421
1422*/
1423
1424pengine_respond(Pengine, Input, Options) :-
1425    pengine_send(Pengine, input(Input), Options).
1426
1427
1428%!  send_error(+Error) is det.
1429%
1430%   Send an error to my parent.   Remove non-readable blobs from the
1431%   error term first using replace_blobs/2. If  the error contains a
1432%   stack-trace, this is resolved to a string before sending.
1433
1434send_error(error(Formal, context(prolog_stack(Frames), Message))) :-
1435    is_list(Frames),
1436    !,
1437    with_output_to(string(Stack),
1438                   print_prolog_backtrace(current_output, Frames)),
1439    pengine_self(Self),
1440    replace_blobs(Formal, Formal1),
1441    replace_blobs(Message, Message1),
1442    pengine_reply(error(Self, error(Formal1,
1443                                    context(prolog_stack(Stack), Message1)))).
1444send_error(Error) :-
1445    pengine_self(Self),
1446    replace_blobs(Error, Error1),
1447    pengine_reply(error(Self, Error1)).
1448
1449%!  replace_blobs(Term0, Term) is det.
1450%
1451%   Copy Term0 to Term, replacing non-text   blobs. This is required
1452%   for error messages that may hold   streams  and other handles to
1453%   non-readable objects.
1454
1455replace_blobs(Blob, Atom) :-
1456    blob(Blob, Type), Type \== text,
1457    !,
1458    format(atom(Atom), '~p', [Blob]).
1459replace_blobs(Term0, Term) :-
1460    compound(Term0),
1461    !,
1462    compound_name_arguments(Term0, Name, Args0),
1463    maplist(replace_blobs, Args0, Args),
1464    compound_name_arguments(Term, Name, Args).
1465replace_blobs(Term, Term).
1466
1467
1468/*================= Remote pengines =======================
1469*/
1470
1471
1472remote_pengine_create(BaseURL, Options) :-
1473    partition(pengine_create_option, Options, PengineOptions0, RestOptions),
1474        (       option(ask(Query), PengineOptions0),
1475                \+ option(template(_Template), PengineOptions0)
1476        ->      PengineOptions = [template(Query)|PengineOptions0]
1477        ;       PengineOptions = PengineOptions0
1478        ),
1479    options_to_dict(PengineOptions, PostData),
1480    remote_post_rec(BaseURL, create, PostData, Reply, RestOptions),
1481    arg(1, Reply, ID),
1482    (   option(id(ID2), Options)
1483    ->  ID = ID2
1484    ;   true
1485    ),
1486    option(alias(Name), Options, ID),
1487    assert(child(Name, ID)),
1488    (   (   functor(Reply, create, _)   % actually created
1489        ;   functor(Reply, output, _)   % compiler messages
1490        )
1491    ->  option(application(Application), PengineOptions, pengine_sandbox),
1492        option(destroy(Destroy), PengineOptions, true),
1493        pengine_register_remote(ID, BaseURL, Application, Destroy)
1494    ;   true
1495    ),
1496    thread_self(Queue),
1497    pengine_reply(Queue, Reply).
1498
1499options_to_dict(Options, Dict) :-
1500    select_option(ask(Ask), Options, Options1),
1501    select_option(template(Template), Options1, Options2),
1502    !,
1503    no_numbered_var_in(Ask+Template),
1504    findall(AskString-TemplateString,
1505            ask_template_to_strings(Ask, Template, AskString, TemplateString),
1506            [ AskString-TemplateString ]),
1507    options_to_dict(Options2, Dict0),
1508    Dict = Dict0.put(_{ask:AskString,template:TemplateString}).
1509options_to_dict(Options, Dict) :-
1510    maplist(prolog_option, Options, Options1),
1511    dict_create(Dict, _, Options1).
1512
1513no_numbered_var_in(Term) :-
1514    sub_term(Sub, Term),
1515    subsumes_term('$VAR'(_), Sub),
1516    !,
1517    domain_error(numbered_vars_free_term, Term).
1518no_numbered_var_in(_).
1519
1520ask_template_to_strings(Ask, Template, AskString, TemplateString) :-
1521    numbervars(Ask+Template, 0, _),
1522    WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ],
1523    format(string(AskTemplate), '~W\n~W', [ Ask, WOpts,
1524                                            Template, WOpts
1525                                          ]),
1526    split_string(AskTemplate, "\n", "", [AskString, TemplateString]).
1527
1528prolog_option(Option0, Option) :-
1529    create_option_type(Option0, term),
1530    !,
1531    Option0 =.. [Name,Value],
1532    format(string(String), '~k', [Value]),
1533    Option =.. [Name,String].
1534prolog_option(Option, Option).
1535
1536create_option_type(ask(_),         term).
1537create_option_type(template(_),    term).
1538create_option_type(application(_), atom).
1539
1540remote_pengine_send(BaseURL, ID, Event, Options) :-
1541    remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options),
1542    thread_self(Queue),
1543    pengine_reply(Queue, Reply).
1544
1545remote_pengine_pull_response(BaseURL, ID, Options) :-
1546    remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options),
1547    thread_self(Queue),
1548    pengine_reply(Queue, Reply).
1549
1550remote_pengine_abort(BaseURL, ID, Options) :-
1551    remote_send_rec(BaseURL, abort, ID, [], Reply, Options),
1552    thread_self(Queue),
1553    pengine_reply(Queue, Reply).
1554
1555%!  remote_send_rec(+Server, +Action, +ID, +Params, -Reply, +Options)
1556%
1557%   Issue a GET request on Server and   unify Reply with the replied
1558%   term.
1559
1560remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :-
1561    !,
1562    server_url(Server, Action, [id=ID], URL),
1563    http_open(URL, Stream,              % putting this in setup_call_cleanup/3
1564              [ post(prolog(Event))     % makes it impossible to interrupt.
1565              | Options
1566              ]),
1567    call_cleanup(
1568        read_prolog_reply(Stream, Reply),
1569        close(Stream)).
1570remote_send_rec(Server, Action, ID, Params, Reply, Options) :-
1571    server_url(Server, Action, [id=ID|Params], URL),
1572    http_open(URL, Stream, Options),
1573    call_cleanup(
1574        read_prolog_reply(Stream, Reply),
1575        close(Stream)).
1576
1577remote_post_rec(Server, Action, Data, Reply, Options) :-
1578    server_url(Server, Action, [], URL),
1579    probe(Action, URL),
1580    http_open(URL, Stream,
1581              [ post(json(Data))
1582              | Options
1583              ]),
1584    call_cleanup(
1585        read_prolog_reply(Stream, Reply),
1586        close(Stream)).
1587
1588%!  probe(+Action, +URL) is det.
1589%
1590%   Probe the target. This is a  good   idea  before posting a large
1591%   document and be faced with an authentication challenge. Possibly
1592%   we should make this an option for simpler scenarios.
1593
1594probe(create, URL) :-
1595    !,
1596    http_open(URL, Stream, [method(options)]),
1597    close(Stream).
1598probe(_, _).
1599
1600read_prolog_reply(In, Reply) :-
1601    set_stream(In, encoding(utf8)),
1602    read(In, Reply0),
1603    rebind_cycles(Reply0, Reply).
1604
1605rebind_cycles(@(Reply, Bindings), Reply) :-
1606    is_list(Bindings),
1607    !,
1608    maplist(bind, Bindings).
1609rebind_cycles(Reply, Reply).
1610
1611bind(Var = Value) :-
1612    Var = Value.
1613
1614server_url(Server, Action, Params, URL) :-
1615    uri_components(Server, Components0),
1616    uri_query_components(Query, Params),
1617    uri_data(path, Components0, Path0),
1618    atom_concat('pengine/', Action, PAction),
1619    directory_file_path(Path0, PAction, Path),
1620    uri_data(path, Components0, Path, Components),
1621    uri_data(search, Components, Query),
1622    uri_components(URL, Components).
1623
1624
1625/** pengine_event(?EventTerm) is det.
1626    pengine_event(?EventTerm, +Options) is det.
1627
1628Examines the pengine's event queue  and   if  necessary blocks execution
1629until a term that unifies to Term  arrives   in  the queue. After a term
1630from the queue has been unified to Term,   the  term is deleted from the
1631queue.
1632
1633   Valid options are:
1634
1635   * timeout(+Time)
1636     Time is a float or integer and specifies the maximum time to wait
1637     in seconds. If no event has arrived before the time is up EventTerm
1638     is bound to the atom =timeout=.
1639   * listen(+Id)
1640     Only listen to events from the pengine identified by Id.
1641*/
1642
1643pengine_event(Event) :-
1644    pengine_event(Event, []).
1645
1646pengine_event(Event, Options) :-
1647    thread_self(Self),
1648    option(listen(Id), Options, _),
1649    (   thread_get_message(Self, pengine_event(Id, Event), Options)
1650    ->  true
1651    ;   Event = timeout
1652    ),
1653    update_remote_destroy(Event).
1654
1655update_remote_destroy(Event) :-
1656    destroy_event(Event),
1657    arg(1, Event, Id),
1658    pengine_remote(Id, _Server),
1659    !,
1660    pengine_unregister_remote(Id).
1661update_remote_destroy(_).
1662
1663destroy_event(destroy(_)).
1664destroy_event(destroy(_,_)).
1665destroy_event(create(_,Features)) :-
1666    memberchk(answer(Answer), Features),
1667    !,
1668    nonvar(Answer),
1669    destroy_event(Answer).
1670
1671
1672/** pengine_event_loop(:Closure, +Options) is det
1673
1674Starts an event loop accepting event terms   sent to the current pengine
1675or thread. For each such  event   E,  calls  ignore(call(Closure, E)). A
1676closure thus acts as a _handler_  for   the  event. Some events are also
1677treated specially:
1678
1679   * create(ID, Term)
1680     The ID is placed in a list of active pengines.
1681
1682   * destroy(ID)
1683     The ID is removed from the list of active pengines. When the last
1684     pengine ID is removed, the loop terminates.
1685
1686   * output(ID, Term)
1687     The predicate pengine_pull_response/2 is called.
1688
1689Valid options are:
1690
1691   * autoforward(+To)
1692     Forwards received event terms to slaves. To is either =all=,
1693     =all_but_sender= or a Prolog list of NameOrIDs. [not yet
1694     implemented]
1695
1696*/
1697
1698pengine_event_loop(Closure, Options) :-
1699    child(_,_),
1700    !,
1701    pengine_event(Event),
1702    (   option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs
1703    ->  forall(child(_,ID), pengine_send(ID, Event))
1704    ;   true
1705    ),
1706    pengine_event_loop(Event, Closure, Options).
1707pengine_event_loop(_, _).
1708
1709:- meta_predicate
1710    pengine_process_event(+, 1, -, +).
1711
1712pengine_event_loop(Event, Closure, Options) :-
1713    pengine_process_event(Event, Closure, Continue, Options),
1714    (   Continue == true
1715    ->  pengine_event_loop(Closure, Options)
1716    ;   true
1717    ).
1718
1719pengine_process_event(create(ID, T), Closure, Continue, Options) :-
1720    debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]),
1721    (   select(answer(First), T, T1)
1722    ->  ignore(call(Closure, create(ID, T1))),
1723        pengine_process_event(First, Closure, Continue, Options)
1724    ;   ignore(call(Closure, create(ID, T))),
1725        Continue = true
1726    ).
1727pengine_process_event(output(ID, Msg), Closure, true, _Options) :-
1728    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]),
1729    ignore(call(Closure, output(ID, Msg))),
1730    pengine_pull_response(ID, []).
1731pengine_process_event(debug(ID, Msg), Closure, true, _Options) :-
1732    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]),
1733    ignore(call(Closure, debug(ID, Msg))),
1734    pengine_pull_response(ID, []).
1735pengine_process_event(prompt(ID, Term), Closure, true, _Options) :-
1736    debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]),
1737    ignore(call(Closure, prompt(ID, Term))).
1738pengine_process_event(success(ID, Sol, _Time, More), Closure, true, _Options) :-
1739    debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]),
1740    ignore(call(Closure, success(ID, Sol, More))).
1741pengine_process_event(failure(ID, _Time), Closure, true, _Options) :-
1742    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]),
1743    ignore(call(Closure, failure(ID))).
1744pengine_process_event(error(ID, Error), Closure, Continue, _Options) :-
1745    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]),
1746    (   call(Closure, error(ID, Error))
1747    ->  Continue = true
1748    ;   forall(child(_,Child), pengine_destroy(Child)),
1749        throw(Error)
1750    ).
1751pengine_process_event(stop(ID), Closure, true, _Options) :-
1752    debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]),
1753    ignore(call(Closure, stop(ID))).
1754pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :-
1755    pengine_process_event(Event, Closure, _, Options),
1756    pengine_process_event(destroy(ID), Closure, Continue, Options).
1757pengine_process_event(destroy(ID), Closure, true, _Options) :-
1758    retractall(child(_,ID)),
1759    debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]),
1760    ignore(call(Closure, destroy(ID))).
1761
1762
1763/** pengine_rpc(+URL, +Query) is nondet.
1764    pengine_rpc(+URL, +Query, +Options) is nondet.
1765
1766Semantically equivalent to the sequence below,  except that the query is
1767executed in (and in the Prolog context   of) the pengine server referred
1768to by URL, rather than locally.
1769
1770  ==
1771    copy_term(Query, Copy),
1772    call(Copy),                 % executed on server at URL
1773    Query = Copy.
1774  ==
1775
1776Valid options are:
1777
1778    - chunk(+Integer)
1779      Can be used to reduce the number of network roundtrips being made.
1780      See pengine_ask/3.
1781    - timeout(+Time)
1782      Wait at most Time seconds for the next event from the server.
1783      The default is defined by the setting `pengines:time_limit`.
1784
1785Remaining  options  (except   the   server    option)   are   passed  to
1786pengine_create/1.
1787*/
1788
1789pengine_rpc(URL, Query) :-
1790    pengine_rpc(URL, Query, []).
1791
1792pengine_rpc(URL, Query, M:Options0) :-
1793    translate_local_sources(Options0, Options1, M),
1794    (  option(timeout(_), Options1)
1795    -> Options = Options1
1796    ;  setting(time_limit, Limit),
1797       Options = [timeout(Limit)|Options1]
1798    ),
1799    term_variables(Query, Vars),
1800    Template =.. [v|Vars],
1801    State = destroy(true),              % modified by process_event/4
1802    setup_call_catcher_cleanup(
1803        pengine_create([ ask(Query),
1804                         template(Template),
1805                         server(URL),
1806                         id(Id)
1807                       | Options
1808                       ]),
1809        wait_event(Template, State, [listen(Id)|Options]),
1810        Why,
1811        pengine_destroy_and_wait(State, Id, Why)).
1812
1813pengine_destroy_and_wait(destroy(true), Id, Why) :-
1814    !,
1815    debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]),
1816    pengine_destroy(Id),
1817    wait_destroy(Id, 10).
1818pengine_destroy_and_wait(_, _, Why) :-
1819    debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]).
1820
1821wait_destroy(Id, _) :-
1822    \+ child(_, Id),
1823    !.
1824wait_destroy(Id, N) :-
1825    pengine_event(Event, [listen(Id),timeout(10)]),
1826    !,
1827    (   destroy_event(Event)
1828    ->  retractall(child(_,Id))
1829    ;   succ(N1, N)
1830    ->  wait_destroy(Id, N1)
1831    ;   debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]),
1832        pengine_unregister_remote(Id),
1833        retractall(child(_,Id))
1834    ).
1835
1836wait_event(Template, State, Options) :-
1837    pengine_event(Event, Options),
1838    debug(pengine(event), 'Received ~p', [Event]),
1839    process_event(Event, Template, State, Options).
1840
1841process_event(create(_ID, Features), Template, State, Options) :-
1842    memberchk(answer(First), Features),
1843    process_event(First, Template, State, Options).
1844process_event(error(_ID, Error), _Template, _, _Options) :-
1845    throw(Error).
1846process_event(failure(_ID, _Time), _Template, _, _Options) :-
1847    fail.
1848process_event(prompt(ID, Prompt), Template, State, Options) :-
1849    pengine_rpc_prompt(ID, Prompt, Reply),
1850    pengine_send(ID, input(Reply)),
1851    wait_event(Template, State, Options).
1852process_event(output(ID, Term), Template, State, Options) :-
1853    pengine_rpc_output(ID, Term),
1854    pengine_pull_response(ID, Options),
1855    wait_event(Template, State, Options).
1856process_event(debug(ID, Message), Template, State, Options) :-
1857    debug(pengine(debug), '~w', [Message]),
1858    pengine_pull_response(ID, Options),
1859    wait_event(Template, State, Options).
1860process_event(success(_ID, Solutions, _Time, false), Template, _, _Options) :-
1861    !,
1862    member(Template, Solutions).
1863process_event(success(ID, Solutions, _Time, true), Template, State, Options) :-
1864    (   member(Template, Solutions)
1865    ;   pengine_next(ID, Options),
1866        wait_event(Template, State, Options)
1867    ).
1868process_event(destroy(ID, Event), Template, State, Options) :-
1869    !,
1870    retractall(child(_,ID)),
1871    nb_setarg(1, State, false),
1872    debug(pengine(destroy), 'State: ~p~n', [State]),
1873    process_event(Event, Template, State, Options).
1874
1875pengine_rpc_prompt(ID, Prompt, Term) :-
1876    prompt(ID, Prompt, Term0),
1877    !,
1878    Term = Term0.
1879pengine_rpc_prompt(_ID, Prompt, Term) :-
1880    setup_call_cleanup(
1881        prompt(Old, Prompt),
1882        read(Term),
1883        prompt(_, Old)).
1884
1885pengine_rpc_output(ID, Term) :-
1886    output(ID, Term),
1887    !.
1888pengine_rpc_output(_ID, Term) :-
1889    print(Term).
1890
1891%%  prompt(+ID, +Prompt, -Term) is semidet.
1892%
1893%   Hook to handle pengine_input/2 from the remote pengine. If the hooks
1894%   fails, pengine_rpc/3 calls read/1 using the current prompt.
1895
1896:- multifile prompt/3.
1897
1898%%  output(+ID, +Term) is semidet.
1899%
1900%   Hook to handle pengine_output/1 from the remote pengine. If the hook
1901%   fails, it calls print/1 on Term.
1902
1903:- multifile output/2.
1904
1905
1906/*================= HTTP handlers =======================
1907*/
1908
1909%   Declare  HTTP  locations  we  serve  and   how.  Note  that  we  use
1910%   time_limit(inifinite) because pengines have their  own timeout. Also
1911%   note that we use spawn. This  is   needed  because we can easily get
1912%   many clients waiting for  some  action   on  a  pengine to complete.
1913%   Without spawning, we would quickly exhaust   the  worker pool of the
1914%   HTTP server.
1915%
1916%   FIXME: probably we should wait for a   short time for the pengine on
1917%   the default worker thread. Only if  that   time  has expired, we can
1918%   call http_spawn/2 to continue waiting on   a  new thread. That would
1919%   improve the performance and reduce the usage of threads.
1920
1921:- http_handler(root(pengine),               http_404([]),
1922                [ id(pengines) ]).
1923:- http_handler(root(pengine/create),        http_pengine_create,
1924                [ time_limit(infinite), spawn([]) ]).
1925:- http_handler(root(pengine/send),          http_pengine_send,
1926                [ time_limit(infinite), spawn([]) ]).
1927:- http_handler(root(pengine/pull_response), http_pengine_pull_response,
1928                [ time_limit(infinite), spawn([]) ]).
1929:- http_handler(root(pengine/abort),         http_pengine_abort,         []).
1930:- http_handler(root(pengine/ping),          http_pengine_ping,          []).
1931:- http_handler(root(pengine/destroy_all),   http_pengine_destroy_all,   []).
1932
1933:- http_handler(root(pengine/'pengines.js'),
1934                http_reply_file(library('http/web/js/pengines.js'), []), []).
1935:- http_handler(root(pengine/'plterm.css'),
1936                http_reply_file(library('http/web/css/plterm.css'), []), []).
1937
1938
1939%%  http_pengine_create(+Request)
1940%
1941%   HTTP POST handler  for  =/pengine/create=.   This  API  accepts  the
1942%   pengine  creation  parameters  both  as  =application/json=  and  as
1943%   =www-form-encoded=.  Accepted parameters:
1944%
1945%     | **Parameter** | **Default**       | **Comment**                |
1946%     |---------------|-------------------|----------------------------|
1947%     | format        | `prolog`          | Output format              |
1948%     | application   | `pengine_sandbox` | Pengine application        |
1949%     | chunk         | 1                 | Chunk-size for results     |
1950%     | solutions     | chunked           | If `all`, emit all results |
1951%     | ask           | -                 | The query                  |
1952%     | template      | -                 | Output template            |
1953%     | src_text      | ""                | Program                    |
1954%     | src_url       | -                 | Program to download        |
1955%     | disposition   | -                 | Download location          |
1956%
1957%     Note that solutions=all internally  uses   chunking  to obtain the
1958%     results from the pengine, but the results are combined in a single
1959%     HTTP reply. This is currently only  implemented by the CSV backend
1960%     that is part of SWISH for   downloading unbounded result sets with
1961%     limited memory resources.
1962
1963http_pengine_create(Request) :-
1964    reply_options(Request, [post]),
1965    !.
1966http_pengine_create(Request) :-
1967    memberchk(content_type(CT), Request),
1968    sub_atom(CT, 0, _, _, 'application/json'),
1969    !,
1970    http_read_json_dict(Request, Dict),
1971    dict_atom_option(format, Dict, Format, prolog),
1972    dict_atom_option(application, Dict, Application, pengine_sandbox),
1973    http_pengine_create(Request, Application, Format, Dict).
1974http_pengine_create(Request) :-
1975    Optional = [optional(true)],
1976    OptString = [string|Optional],
1977    Form = [ format(Format, [default(prolog)]),
1978             application(Application, [default(pengine_sandbox)]),
1979             chunk(_, [integer, default(1)]),
1980             solutions(_, [oneof([all,chunked]), default(chunked)]),
1981             ask(_, OptString),
1982             template(_, OptString),
1983             src_text(_, OptString),
1984             disposition(_, OptString),
1985             src_url(_, Optional)
1986           ],
1987    http_parameters(Request, Form),
1988    form_dict(Form, Dict),
1989    http_pengine_create(Request, Application, Format, Dict).
1990
1991dict_atom_option(Key, Dict, Atom, Default) :-
1992    (   get_dict(Key, Dict, String)
1993    ->  atom_string(Atom, String)
1994    ;   Atom = Default
1995    ).
1996
1997form_dict(Form, Dict) :-
1998    form_values(Form, Pairs),
1999    dict_pairs(Dict, _, Pairs).
2000
2001form_values([], []).
2002form_values([H|T], Pairs) :-
2003    arg(1, H, Value),
2004    nonvar(Value),
2005    !,
2006    functor(H, Name, _),
2007    Pairs = [Name-Value|PairsT],
2008    form_values(T, PairsT).
2009form_values([_|T], Pairs) :-
2010    form_values(T, Pairs).
2011
2012%!  http_pengine_create(+Request, +Application, +Format, +OptionsDict)
2013
2014
2015http_pengine_create(Request, Application, Format, Dict) :-
2016    current_application(Application),
2017    !,
2018    allowed(Request, Application),
2019    authenticate(Request, Application, UserOptions),
2020    dict_to_options(Dict, Application, CreateOptions0, VarNames),
2021    append(UserOptions, CreateOptions0, CreateOptions),
2022    pengine_uuid(Pengine),
2023    message_queue_create(Queue, [max_size(25)]),
2024    setting(Application:time_limit, TimeLimit),
2025    get_time(Now),
2026    asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)),
2027    broadcast(pengine(create(Pengine, Application, CreateOptions))),
2028    create(Queue, Pengine, CreateOptions, http, Application),
2029    create_wait_and_output_result(Pengine, Queue, Format,
2030                                  TimeLimit, VarNames, Dict),
2031    gc_abandoned_queues.
2032http_pengine_create(_Request, Application, Format, _Dict) :-
2033    Error = existence_error(pengine_application, Application),
2034    pengine_uuid(ID),
2035    output_result(Format, error(ID, error(Error, _))).
2036
2037
2038dict_to_options(Dict, Application, CreateOptions, VarNames) :-
2039    dict_pairs(Dict, _, Pairs),
2040    pairs_create_options(Pairs, Application, CreateOptions, VarNames).
2041
2042pairs_create_options([], _, [], _) :- !.
2043pairs_create_options(T0, App, [AskOpt, TemplateOpt|T], VarNames) :-
2044    selectchk(ask-Ask, T0, T1),
2045    selectchk(template-Template, T1, T2),
2046    !,
2047    format(string(AskTemplate), 't((~s),(~s))', [Ask, Template]),
2048    term_string(t(Ask1,Template1), AskTemplate,
2049                [ variable_names(Bindings),
2050                  module(App)
2051                ]),
2052    template_varnames(Template1, Bindings, VarNames),
2053    AskOpt = ask(Ask1),
2054    TemplateOpt = template(Template1),
2055    pairs_create_options(T2, App, T, VarNames).
2056pairs_create_options([ask-String|T0], App,
2057                     [ask(Ask),template(Template)|T], VarNames) :-
2058    !,
2059    term_string(Ask, String,
2060                [ variable_names(Bindings),
2061                  module(App)
2062                ]),
2063    exclude(anon, Bindings, Bindings1),
2064    maplist(var_name, Bindings1, VarNames),
2065    dict_create(Template, json, Bindings1),
2066    pairs_create_options(T0, App, T, VarNames).
2067pairs_create_options([N-V0|T0], App, [Opt|T], VarNames) :-
2068    Opt =.. [N,V],
2069    pengine_create_option(Opt), N \== user,
2070    !,
2071    (   create_option_type(Opt, Type)
2072    ->  (   Type == term
2073        ->  atom_to_term(V0, V, _)
2074        ;   Type == atom
2075        ->  atom_string(V, V0)
2076        ;   assertion(false)
2077        )
2078    ;   V = V0
2079    ),
2080    pairs_create_options(T0, App, T, VarNames).
2081pairs_create_options([_|T0], App, T, VarNames) :-
2082    pairs_create_options(T0, App, T, VarNames).
2083
2084
2085%!  template_varnames(+Template, +Bindings, -VarNames)
2086%
2087%   Compute the variable names for the template.
2088
2089template_varnames(Template1, Bindings, VarNames) :-
2090    term_variables(Template1, TemplateVars),
2091    filter_template_varnames(TemplateVars, Bindings, VarNames).
2092
2093filter_template_varnames([], _, []).
2094filter_template_varnames([H|T0], Bindings, [Name|T]) :-
2095    member(Name=Var, Bindings),
2096    Var == H,
2097    !,
2098    filter_template_varnames(T0, Bindings, T).
2099
2100
2101%!  wait_and_output_result(+Pengine, +Queue,
2102%!                         +Format, +TimeLimit) is det.
2103%!  wait_and_output_result(+Pengine, +Queue,
2104%!                         +Format, +TimeLimit, +VarNames) is det.
2105%
2106%   Wait for the Pengine's Queue and if  there is a message, send it
2107%   to the requester using  output_result/1.   If  Pengine  does not
2108%   answer within the time specified   by  the setting =time_limit=,
2109%   Pengine is aborted and the  result is error(time_limit_exceeded,
2110%   _).
2111
2112wait_and_output_result(Pengine, Queue, Format, TimeLimit) :-
2113    wait_and_output_result(Pengine, Queue, Format, TimeLimit, -).
2114
2115wait_and_output_result(Pengine, Queue, Format, TimeLimit, VarNames) :-
2116    (   catch(thread_get_message(Queue, pengine_event(_, Event),
2117                                 [ timeout(TimeLimit)
2118                                 ]),
2119              Error, true)
2120    ->  (   var(Error)
2121        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
2122            ignore(destroy_queue_from_http(Pengine, Event, Queue)),
2123            output_result(Format, Event, VarNames)
2124        ;   output_result(Format, died(Pengine))
2125        )
2126    ;   output_result(Format, error(Pengine,
2127                                    error(time_limit_exceeded, _))),
2128        pengine_abort(Pengine)
2129    ).
2130
2131%!  create_wait_and_output_result(+Pengine, +Queue, +Format,
2132%!                                +TimeLimit, +VarNames, +Dict) is det.
2133%
2134%   Intercepts  the  `solutions=all'  case    used  for  downloading
2135%   results. Dict may contain a  `disposition`   key  to  denote the
2136%   download location.
2137
2138create_wait_and_output_result(Pengine, Queue, Format,
2139                              TimeLimit, VarNames, Dict) :-
2140    get_dict(solutions, Dict, all),
2141    !,
2142    between(1, infinite, Page),
2143    (   catch(thread_get_message(Queue, pengine_event(_, Event),
2144                                 [ timeout(TimeLimit)
2145                                 ]),
2146              Error, true)
2147    ->  (   var(Error)
2148        ->  debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]),
2149            (   destroy_queue_from_http(Pengine, Event, Queue)
2150            ->  output_result(Format, page(Page, Event), VarNames)
2151            ;   pengine_thread(Pengine, Thread),
2152                thread_send_message(Thread, pengine_request(next)),
2153                output_result(Format, page(Page, Event), VarNames, Dict),
2154                fail
2155            )
2156        ;   output_result(Format, died(Pengine))
2157        )
2158    ;   output_result(Format, error(Pengine,
2159                                    error(time_limit_exceeded, _))),
2160        pengine_abort(Pengine)
2161    ),
2162    !.
2163create_wait_and_output_result(Pengine, Queue, Format,
2164                              TimeLimit, VarNames, _Dict) :-
2165    wait_and_output_result(Pengine, Queue, Format, TimeLimit, VarNames).
2166
2167
2168%!  destroy_queue_from_http(+Pengine, +Event, +Queue) is semidet.
2169%
2170%   Consider destroying the output queue   for Pengine after sending
2171%   Event back to the HTTP client. We can destroy the queue if
2172%
2173%     - The pengine already died (output_queue/3 is present) and
2174%       the queue is empty.
2175%     - This is a final (destroy) event.
2176%
2177%   @tbd    If the client did not request all output, the queue will
2178%           not be destroyed.  We need some timeout and GC for that.
2179
2180destroy_queue_from_http(ID, _, Queue) :-
2181    output_queue(ID, Queue, _),
2182    !,
2183    destroy_queue_if_empty(Queue).
2184destroy_queue_from_http(ID, Event, Queue) :-
2185    debug(pengine(destroy), 'DESTROY? ~p', [Event]),
2186    is_destroy_event(Event),
2187    !,
2188    message_queue_property(Queue, size(Waiting)),
2189    debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]),
2190    with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)).
2191
2192is_destroy_event(destroy(_)).
2193is_destroy_event(destroy(_,_)).
2194is_destroy_event(create(_, Options)) :-
2195    memberchk(answer(Event), Options),
2196    is_destroy_event(Event).
2197
2198destroy_queue_if_empty(Queue) :-
2199    thread_peek_message(Queue, _),
2200    !.
2201destroy_queue_if_empty(Queue) :-
2202    retractall(output_queue(_, Queue, _)),
2203    message_queue_destroy(Queue).
2204
2205%!  gc_abandoned_queues
2206%
2207%   Check whether there are queues  that   have  been abadoned. This
2208%   happens if the stream contains output events and not all of them
2209%   are read by the client.
2210
2211:- dynamic
2212    last_gc/1.
2213
2214gc_abandoned_queues :-
2215    consider_queue_gc,
2216    !,
2217    get_time(Now),
2218    (   output_queue(_, Queue, Time),
2219        Now-Time > 15*60,
2220        retract(output_queue(_, Queue, Time)),
2221        message_queue_destroy(Queue),
2222        fail
2223    ;   retractall(last_gc(_)),
2224        asserta(last_gc(Now))
2225    ).
2226gc_abandoned_queues.
2227
2228consider_queue_gc :-
2229    predicate_property(output_queue(_,_,_), number_of_clauses(N)),
2230    N > 100,
2231    (   last_gc(Time),
2232        get_time(Now),
2233        Now-Time > 5*60
2234    ->  true
2235    ;   \+ last_gc(_)
2236    ).
2237
2238%!  sync_destroy_queue_from_http(+Pengine, +Queue) is det.
2239%!  sync_delay_destroy_queue(+Pengine, +Queue) is det.
2240%
2241%   Handle destruction of the message queue connecting the HTTP side
2242%   to the pengine. We cannot delete the queue when the pengine dies
2243%   because the queue may contain output  events. Termination of the
2244%   pengine and finishing the  HTTP  exchange   may  happen  in both
2245%   orders. This means we need handle this using synchronization.
2246%
2247%     * sync_destroy_queue_from_pengine(+Pengine, +Queue)
2248%     Called (indirectly) from pengine_done/1 if the pengine's
2249%     thread dies.
2250%     * sync_destroy_queue_from_http(+Pengine, +Queue)
2251%     Called from destroy_queue/3, from wait_and_output_result/4,
2252%     i.e., from the HTTP side.
2253
2254:- dynamic output_queue_destroyed/1.
2255
2256sync_destroy_queue_from_http(ID, Queue) :-
2257    (   output_queue(ID, Queue, _)
2258    ->  destroy_queue_if_empty(Queue)
2259    ;   thread_peek_message(Queue, pengine_event(_, output(_,_)))
2260    ->  debug(pengine(destroy), 'Delay destruction of ~p because of output',
2261              [Queue]),
2262        get_time(Now),
2263        asserta(output_queue(ID, Queue, Now))
2264    ;   message_queue_destroy(Queue),
2265        asserta(output_queue_destroyed(Queue))
2266    ).
2267
2268%!  sync_destroy_queue_from_pengine(+Pengine, +Queue)
2269%
2270%   Called  from  pengine_unregister/1  when    the  pengine  thread
2271%   terminates. It is called while the mutex `pengine` held.
2272
2273sync_destroy_queue_from_pengine(ID, Queue) :-
2274    (   retract(output_queue_destroyed(Queue))
2275    ->  true
2276    ;   get_time(Now),
2277        asserta(output_queue(ID, Queue, Now))
2278    ),
2279    retractall(pengine_queue(ID, Queue, _, _)).
2280
2281
2282http_pengine_send(Request) :-
2283    reply_options(Request, [get,post]),
2284    !.
2285http_pengine_send(Request) :-
2286    http_parameters(Request,
2287                    [ id(ID, [ type(atom) ]),
2288                      event(EventString, [optional(true)]),
2289                      format(Format, [default(prolog)])
2290                    ]),
2291    get_pengine_module(ID, Module),
2292    (   current_module(Module)          % avoid re-creating the module
2293    ->  catch(( read_event(Request, EventString, Module, Event0, Bindings),
2294                fix_bindings(Format, Event0, Bindings, VarNames, Event1)
2295              ),
2296              Error,
2297              true),
2298        (   var(Error)
2299        ->  debug(pengine(event), 'HTTP send: ~p', [Event1]),
2300            (   pengine_thread(ID, Thread)
2301            ->  pengine_queue(ID, Queue, TimeLimit, _),
2302                random_delay,
2303                broadcast(pengine(send(ID, Event1))),
2304                thread_send_message(Thread, pengine_request(Event1)),
2305                wait_and_output_result(ID, Queue, Format, TimeLimit, VarNames)
2306            ;   atom(ID)
2307            ->  pengine_died(Format, ID)
2308            ;   http_404([], Request)
2309            )
2310        ;   output_result(Format, error(ID, Error))
2311        )
2312    ;   debug(pengine(event), 'Pengine module ~q vanished', [Module]),
2313        discard_post_data(Request),
2314        pengine_died(Format, ID)
2315    ).
2316
2317pengine_died(Format, Pengine) :-
2318    output_result(Format, error(Pengine,
2319                                error(existence_error(pengine, Pengine),_))).
2320
2321
2322%%  read_event(+Request, +EventString, +Module, -Event, -Bindings)
2323%
2324%   Read the sent event. The event is a   Prolog  term that is either in
2325%   the =event= parameter or as a posted document.
2326
2327read_event(_Request, EventString, Module, Event, Bindings) :-
2328    nonvar(EventString),
2329    !,
2330    term_string(Event, EventString,
2331                [ variable_names(Bindings),
2332                  module(Module)
2333                ]).
2334read_event(Request, _EventString, Module, Event, Bindings) :-
2335    option(method(post), Request),
2336    http_read_data(Request,     Event,
2337                   [ content_type('application/x-prolog'),
2338                     module(Module),
2339                     variable_names(Bindings)
2340                   ]).
2341
2342%%  discard_post_data(+Request) is det.
2343%
2344%   If this is a POST request, discard the posted data.
2345
2346discard_post_data(Request) :-
2347    option(method(post), Request),
2348    !,
2349    setup_call_cleanup(
2350        open_null_stream(NULL),
2351        http_read_data(Request, _, [to(stream(NULL))]),
2352        close(NULL)).
2353discard_post_data(_).
2354
2355%!  fix_bindings(+Format, +EventIn, +Bindings, -VarNames, -Event) is det.
2356%
2357%   Generate the template for json(-s) Format  from the variables in
2358%   the asked Goal. Variables starting  with an underscore, followed
2359%   by an capital letter are ignored from the template.
2360
2361fix_bindings(Format,
2362             ask(Goal, Options0), Bindings, VarNames,
2363             ask(Goal, NewOptions)) :-
2364    json_lang(Format),
2365    !,
2366    template(Bindings, VarNames, Template, Options0, Options1),
2367    select_option(chunk(Paging), Options1, Options2, 1),
2368    NewOptions = [template(Template), chunk(Paging) | Options2].
2369fix_bindings(_, Command, _, -, Command).
2370
2371template(_, -, Template, Options0, Options) :-
2372    select_option(template(Template), Options0, Options),
2373    !.
2374template(Bindings, VarNames, Template, Options, Options) :-
2375    exclude(anon, Bindings, Bindings1),
2376    maplist(var_name, Bindings1, VarNames),
2377    dict_create(Template, json, Bindings1).
2378
2379anon(Name=_) :-
2380    sub_atom(Name, 0, _, _, '_'),
2381    sub_atom(Name, 1, 1, _, Next),
2382    char_type(Next, prolog_var_start).
2383
2384var_name(Name=_, Name).
2385
2386
2387%!  json_lang(+Format) is semidet.
2388%
2389%   True if Format is a JSON variation.
2390
2391json_lang(json) :- !.
2392json_lang(Format) :-
2393    sub_atom(Format, 0, _, _, 'json-').
2394
2395%!  http_pengine_pull_response(+Request)
2396%
2397%   HTTP handler for /pengine/pull_response.  Pulls possible pending
2398%   messages from the pengine.
2399
2400http_pengine_pull_response(Request) :-
2401    reply_options(Request, [get]),
2402    !.
2403http_pengine_pull_response(Request) :-
2404    http_parameters(Request,
2405            [   id(ID, []),
2406                format(Format, [default(prolog)])
2407            ]),
2408    (   (   pengine_queue(ID, Queue, TimeLimit, _)
2409        ->  true
2410        ;   output_queue(ID, Queue, _),
2411            TimeLimit = 0
2412        )
2413    ->  wait_and_output_result(ID, Queue, Format, TimeLimit)
2414    ;   http_404([], Request)
2415    ).
2416
2417%!  http_pengine_abort(+Request)
2418%
2419%   HTTP handler for /pengine/abort. Note that  abort may be sent at
2420%   any time and the reply may  be   handled  by a pull_response. In
2421%   that case, our  pengine  has  already   died  before  we  get to
2422%   wait_and_output_result/4.
2423
2424http_pengine_abort(Request) :-
2425    reply_options(Request, [get]),
2426    !.
2427http_pengine_abort(Request) :-
2428    http_parameters(Request,
2429            [   id(ID, []),
2430                format(Format, [default(prolog)])
2431            ]),
2432    (   pengine_thread(ID, _Thread),
2433        pengine_queue(ID, Queue, TimeLimit, _)
2434    ->  broadcast(pengine(abort(ID))),
2435        abort_pending_output(ID),
2436        pengine_abort(ID),
2437        wait_and_output_result(ID, Queue, Format, TimeLimit)
2438    ;   http_404([], Request)
2439    ).
2440
2441http_pengine_destroy_all(Request) :-
2442    reply_options(Request, [get]),
2443    !.
2444http_pengine_destroy_all(Request) :-
2445    http_parameters(Request,
2446                    [ ids(IDsAtom, [])
2447                    ]),
2448    atomic_list_concat(IDs, ',', IDsAtom),
2449    forall(member(ID, IDs),
2450           pengine_destroy(ID, [force(true)])),
2451    reply_json("ok").
2452
2453%!  http_pengine_ping(+Request)
2454%
2455%   HTTP handler for /pengine/ping.  If   the  requested  Pengine is
2456%   alive and event status(Pengine, Stats) is created, where `Stats`
2457%   is the return of thread_statistics/2.
2458
2459http_pengine_ping(Request) :-
2460    reply_options(Request, [get]),
2461    !.
2462http_pengine_ping(Request) :-
2463    http_parameters(Request,
2464                    [ id(Pengine, []),
2465                      format(Format, [default(prolog)])
2466                    ]),
2467    (   pengine_thread(Pengine, Thread),
2468        catch(thread_statistics(Thread, Stats), _, fail)
2469    ->  output_result(Format, ping(Pengine, Stats))
2470    ;   output_result(Format, died(Pengine))
2471    ).
2472
2473
2474%!  output_result(+Format, +EventTerm) is det.
2475%!  output_result(+Format, +EventTerm, +VarNames) is det.
2476%!  output_result(+Format, +EventTerm, +VarNames, +OptionsDict) is det.
2477%
2478%   Formulate an HTTP response from a pengine event term. Format is
2479%   one of =prolog=, =json= or =json-s=.
2480
2481:- dynamic
2482    pengine_replying/2.             % +Pengine, +Thread
2483
2484output_result(Format, Event) :-
2485    output_result(Format, Event, -).
2486output_result(Format, Event, VarNames) :-
2487    arg(1, Event, Pengine),
2488    thread_self(Thread),
2489    setup_call_cleanup(
2490        asserta(pengine_replying(Pengine, Thread), Ref),
2491        catch(output_result(Format, Event, VarNames, _{}),
2492              pengine_abort_output,
2493              true),
2494        erase(Ref)).
2495
2496output_result(prolog, Event, _, _) :-
2497    !,
2498    format('Content-type: text/x-prolog; charset=UTF-8~n~n'),
2499    write_term(Event,
2500               [ quoted(true),
2501                 ignore_ops(true),
2502                 fullstop(true),
2503                 blobs(portray),
2504                 portray_goal(portray_blob),
2505                 nl(true)
2506               ]).
2507output_result(Lang, Event, VarNames, Dict) :-
2508    write_result(Lang, Event, VarNames, Dict),
2509    !.
2510output_result(Lang, Event, VarNames, _) :-              % deprecated
2511    write_result(Lang, Event, VarNames),
2512    !.
2513output_result(Lang, Event, VarNames, _) :-
2514    json_lang(Lang),
2515    !,
2516    (   event_term_to_json_data(Event, JSON, Lang, VarNames)
2517    ->  cors_enable,
2518        disable_client_cache,
2519        reply_json(JSON)
2520    ;   assertion(event_term_to_json_data(Event, _, Lang))
2521    ).
2522output_result(Lang, _Event, _, _) :-    % FIXME: allow for non-JSON format
2523    domain_error(pengine_format, Lang).
2524
2525%!  portray_blob(+Blob, +Options) is det.
2526%
2527%   Portray non-text blobs that may  appear   in  output  terms. Not
2528%   really sure about that. Basically such  terms need to be avoided
2529%   as they are meaningless outside the process. The generated error
2530%   is hard to debug though, so now we send them as `'$BLOB'(Type)`.
2531%   Future versions may include more info, depending on `Type`.
2532
2533:- public portray_blob/2.               % called from write-term
2534portray_blob(Blob, _Options) :-
2535    blob(Blob, Type),
2536    writeq('$BLOB'(Type)).
2537
2538%!  abort_pending_output(+Pengine) is det.
2539%
2540%   If we get an abort, it is possible that output is being produced
2541%   for the client.  This predicate aborts these threads.
2542
2543abort_pending_output(Pengine) :-
2544    forall(pengine_replying(Pengine, Thread),
2545           abort_output_thread(Thread)).
2546
2547abort_output_thread(Thread) :-
2548    catch(thread_signal(Thread, throw(pengine_abort_output)),
2549          error(existence_error(thread, _), _),
2550          true).
2551
2552%!  write_result(+Lang, +Event, +VarNames) is semidet.
2553%
2554%   Called after write_result/4 for backward compatibility reasons.
2555%
2556%   @deprecated Use write_result/4.
2557
2558%!  write_result(+Lang, +Event, +VarNames, +Dict) is semidet.
2559%
2560%   Hook that allows for different output formats. The core Pengines
2561%   library supports `prolog` and various   JSON  dialects. The hook
2562%   event_to_json/4 can be used to refine   the  JSON dialects. This
2563%   hook must be used if  a   completely  different output format is
2564%   desired.
2565
2566%!  disable_client_cache
2567%
2568%   Make sure the client will not cache our page.
2569%
2570%   @see http://stackoverflow.com/questions/49547/making-sure-a-web-page-is-not-cached-across-all-browsers
2571
2572disable_client_cache :-
2573    format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c
2574            Pragma: no-cache\r\n\c
2575            Expires: 0\r\n').
2576
2577event_term_to_json_data(Event, JSON, Lang, VarNames) :-
2578    event_to_json(Event, JSON, Lang, VarNames),
2579    !.
2580event_term_to_json_data(Event, JSON, Lang, -) :-
2581    !,
2582    event_term_to_json_data(Event, JSON, Lang).
2583event_term_to_json_data(success(ID, Bindings0, Time, More),
2584                        json{event:success, id:ID, time:Time,
2585                             data:Bindings, more:More, projection:VarNames},
2586                        json, VarNames) :-
2587    !,
2588    term_to_json(Bindings0, Bindings).
2589event_term_to_json_data(destroy(ID, Event),
2590                        json{event:destroy, id:ID, data:JSON},
2591                        Style, VarNames) :-
2592    !,
2593    event_term_to_json_data(Event, JSON, Style, VarNames).
2594event_term_to_json_data(create(ID, Features0), JSON, Style, VarNames) :-
2595    !,
2596    (   select(answer(First0), Features0, Features1)
2597    ->  event_term_to_json_data(First0, First, Style, VarNames),
2598        Features = [answer(First)|Features1]
2599    ;   Features = Features0
2600    ),
2601    dict_create(JSON, json, [event(create), id(ID)|Features]).
2602event_term_to_json_data(Event, JSON, Lang, _) :-
2603    event_term_to_json_data(Event, JSON, Lang).
2604
2605event_term_to_json_data(success(ID, Bindings0, Time, More),
2606                        json{event:success, id:ID, time:Time,
2607                             data:Bindings, more:More},
2608                        json) :-
2609    !,
2610    term_to_json(Bindings0, Bindings).
2611event_term_to_json_data(create(ID, Features0), JSON, Style) :-
2612    !,
2613    (   select(answer(First0), Features0, Features1)
2614    ->  event_term_to_json_data(First0, First, Style),
2615        Features = [answer(First)|Features1]
2616    ;   Features = Features0
2617    ),
2618    dict_create(JSON, json, [event(create), id(ID)|Features]).
2619event_term_to_json_data(destroy(ID, Event),
2620                        json{event:destroy, id:ID, data:JSON}, Style) :-
2621    !,
2622    event_term_to_json_data(Event, JSON, Style, -).
2623event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :-
2624    !,
2625    Error0 = json{event:error, id:ID, data:Message},
2626    add_error_details(ErrorTerm, Error0, Error),
2627    message_to_string(ErrorTerm, Message).
2628event_term_to_json_data(failure(ID, Time),
2629                        json{event:failure, id:ID, time:Time}, _).
2630event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :-
2631    functor(EventTerm, F, 1),
2632    !,
2633    arg(1, EventTerm, ID).
2634event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :-
2635    functor(EventTerm, F, 2),
2636    arg(1, EventTerm, ID),
2637    arg(2, EventTerm, Data),
2638    term_to_json(Data, JSON).
2639
2640:- public add_error_details/3.
2641
2642%%  add_error_details(+Error, +JSON0, -JSON)
2643%
2644%   Add format error code and  location   information  to an error. Also
2645%   used by pengines_io.pl.
2646
2647add_error_details(Error, JSON0, JSON) :-
2648    add_error_code(Error, JSON0, JSON1),
2649    add_error_location(Error, JSON1, JSON).
2650
2651%%  add_error_code(+Error, +JSON0, -JSON) is det.
2652%
2653%   Add a =code= field to JSON0 of Error is an ISO error term. The error
2654%   code is the functor name of  the   formal  part  of the error, e.g.,
2655%   =syntax_error=,  =type_error=,  etc.   Some    errors   carry   more
2656%   information:
2657%
2658%     - existence_error(Type, Obj)
2659%     {arg1:Type, arg2:Obj}, where Obj is stringified of it is not
2660%     atomic.
2661
2662add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :-
2663    atom(Type),
2664    !,
2665    to_atomic(Obj, Value),
2666    Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}).
2667add_error_code(error(Formal, _), Error0, Error) :-
2668    callable(Formal),
2669    !,
2670    functor(Formal, Code, _),
2671    Error = Error0.put(code, Code).
2672add_error_code(_, Error, Error).
2673
2674% What to do with large integers?
2675to_atomic(Obj, Atomic) :- atom(Obj),   !, Atomic = Obj.
2676to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj.
2677to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj.
2678to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
2679
2680
2681%%  add_error_location(+Error, +JSON0, -JSON) is det.
2682%
2683%   Add a =location= property if the  error   can  be  associated with a
2684%   source location. The location is an   object  with properties =file=
2685%   and =line= and, if available, the character location in the line.
2686
2687add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :-
2688    atom(Path), integer(Line),
2689    !,
2690    Term = Term0.put(_{location:_{file:Path, line:Line}}).
2691add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :-
2692    atom(Path), integer(Line), integer(Ch),
2693    !,
2694    Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}).
2695add_error_location(_, Term, Term).
2696
2697
2698%!  event_to_json(+Event, -JSONTerm, +Lang) is semidet.
2699%
2700%   Hook that translates a Pengine event structure into a term
2701%   suitable   for   reply_json/1,   according   to   the   language
2702%   specification Lang. This can be used   to massage general Prolog
2703%   terms, notably associated with   `success(ID,  Bindings0, More)`
2704%   and `output(ID, Term)` into a format  suitable for processing at
2705%   the client side.
2706
2707%:- multifile pengines:event_to_json/3.
2708
2709
2710                 /*******************************
2711                 *        ACCESS CONTROL        *
2712                 *******************************/
2713
2714%!  allowed(+Request, +Application) is det.
2715%
2716%   Check whether the peer is allowed to connect.  Returns a
2717%   =forbidden= header if contact is not allowed.
2718
2719allowed(Request, Application) :-
2720    setting(Application:allow_from, Allow),
2721    match_peer(Request, Allow),
2722    setting(Application:deny_from, Deny),
2723    \+ match_peer(Request, Deny),
2724    !.
2725allowed(Request, _Application) :-
2726    memberchk(request_uri(Here), Request),
2727    throw(http_reply(forbidden(Here))).
2728
2729match_peer(_, Allowed) :-
2730    memberchk(*, Allowed),
2731    !.
2732match_peer(_, []) :- !, fail.
2733match_peer(Request, Allowed) :-
2734    http_peer(Request, Peer),
2735    debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]),
2736    (   memberchk(Peer, Allowed)
2737    ->  true
2738    ;   member(Pattern, Allowed),
2739        match_peer_pattern(Pattern, Peer)
2740    ).
2741
2742match_peer_pattern(Pattern, Peer) :-
2743    ip_term(Pattern, IP),
2744    ip_term(Peer, IP),
2745    !.
2746
2747ip_term(Peer, Pattern) :-
2748    split_string(Peer, ".", "", PartStrings),
2749    ip_pattern(PartStrings, Pattern).
2750
2751ip_pattern([], []).
2752ip_pattern([*], _) :- !.
2753ip_pattern([S|T0], [N|T]) :-
2754    number_string(N, S),
2755    ip_pattern(T0, T).
2756
2757
2758%%  authenticate(+Request, +Application, -UserOptions:list) is det.
2759%
2760%   Call authentication_hook/3, returning either `[user(User)]`, `[]` or
2761%   an exception.
2762
2763authenticate(Request, Application, UserOptions) :-
2764    authentication_hook(Request, Application, User),
2765    !,
2766    must_be(ground, User),
2767    UserOptions = [user(User)].
2768authenticate(_, _, []).
2769
2770%%  authentication_hook(+Request, +Application, -User) is semidet.
2771%
2772%   This hook is called  from  the   =/pengine/create=  HTTP  handler to
2773%   discover whether the server is accessed   by  an authorized user. It
2774%   can react in three ways:
2775%
2776%     - Succeed, binding User to a ground term.  The authentity of the
2777%       user is available through pengine_user/1.
2778%     - Fail.  The =/create= succeeds, but the pengine is not associated
2779%       with a user.
2780%     - Throw an exception to prevent creation of the pengine.  Two
2781%       meaningful exceptions are:
2782%         - throw(http_reply(authorise(basic(Realm))))
2783%         Start a normal HTTP login challenge (reply 401)
2784%         - throw(http_reply(forbidden(Path))))
2785%         Reject the request using a 403 repply.
2786%
2787%   @see http_authenticate/3 can be used to implement this hook using
2788%        default HTTP authentication data.
2789
2790pengine_register_user(Options) :-
2791    option(user(User), Options),
2792    !,
2793    pengine_self(Me),
2794    asserta(pengine_user(Me, User)).
2795pengine_register_user(_).
2796
2797
2798%%  pengine_user(-User) is semidet.
2799%
2800%   True when the pengine was create by  an HTTP request that authorized
2801%   User.
2802%
2803%   @see authentication_hook/3 can be used to extract authorization from
2804%        the HTTP header.
2805
2806pengine_user(User) :-
2807    pengine_self(Me),
2808    pengine_user(Me, User).
2809
2810%!  reply_options(+Request, +Methods) is semidet.
2811%
2812%   Reply the HTTP OPTIONS request
2813
2814reply_options(Request, Allowed) :-
2815    option(method(options), Request),
2816    !,
2817    cors_enable(Request,
2818                [ methods(Allowed)
2819                ]),
2820    format('Content-type: text/plain\r\n'),
2821    format('~n').                   % empty body
2822
2823
2824                 /*******************************
2825                 *        COMPILE SOURCE        *
2826                 *******************************/
2827
2828/** pengine_src_text(+SrcText, +Module) is det
2829
2830Asserts the clauses defined in SrcText in   the  private database of the
2831current Pengine. This  predicate  processes   the  `src_text'  option of
2832pengine_create/1.
2833*/
2834
2835pengine_src_text(Src, Module) :-
2836    pengine_self(Self),
2837    format(atom(ID), 'pengine://~w/src', [Self]),
2838    extra_load_options(Self, Options),
2839    setup_call_cleanup(
2840        open_chars_stream(Src, Stream),
2841        load_files(Module:ID,
2842                   [ stream(Stream),
2843                     module(Module),
2844                     silent(true)
2845                   | Options
2846                   ]),
2847        close(Stream)),
2848    keep_source(Self, ID, Src).
2849
2850%%   pengine_src_url(+URL, +Module) is det
2851%
2852%    Asserts the clauses defined in URL in   the private database of the
2853%    current Pengine. This predicate processes   the `src_url' option of
2854%    pengine_create/1.
2855%
2856%    @tbd: make a sensible guess at the encoding.
2857
2858pengine_src_url(URL, Module) :-
2859    pengine_self(Self),
2860    uri_encoded(path, URL, Path),
2861    format(atom(ID), 'pengine://~w/url/~w', [Self, Path]),
2862    extra_load_options(Self, Options),
2863    (   get_pengine_application(Self, Application),
2864        setting(Application:debug_info, false)
2865    ->  setup_call_cleanup(
2866            http_open(URL, Stream, []),
2867            ( set_stream(Stream, encoding(utf8)),
2868              load_files(Module:ID,
2869                         [ stream(Stream),
2870                           module(Module)
2871                         | Options
2872                         ])
2873            ),
2874            close(Stream))
2875    ;   setup_call_cleanup(
2876            http_open(URL, TempStream, []),
2877            ( set_stream(TempStream, encoding(utf8)),
2878              read_string(TempStream, _, Src)
2879            ),
2880            close(TempStream)),
2881        setup_call_cleanup(
2882            open_chars_stream(Src, Stream),
2883            load_files(Module:ID,
2884                       [ stream(Stream),
2885                         module(Module)
2886                       | Options
2887                       ]),
2888            close(Stream)),
2889        keep_source(Self, ID, Src)
2890    ).
2891
2892
2893extra_load_options(Pengine, Options) :-
2894    pengine_not_sandboxed(Pengine),
2895    !,
2896    Options = [].
2897extra_load_options(_, [sandboxed(true)]).
2898
2899
2900keep_source(Pengine, ID, SrcText) :-
2901    get_pengine_application(Pengine, Application),
2902    setting(Application:debug_info, true),
2903    !,
2904    to_string(SrcText, SrcString),
2905    assertz(pengine_data(Pengine, source(ID, SrcString))).
2906keep_source(_, _, _).
2907
2908to_string(String, String) :-
2909    string(String),
2910    !.
2911to_string(Atom, String) :-
2912    atom_string(Atom, String),
2913    !.
2914
2915
2916                 /*******************************
2917                 *            MESSAGES          *
2918                 *******************************/
2919
2920prolog:error_message(sandbox(time_limit_exceeded, Limit)) -->
2921    [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl,
2922      'This is normally caused by an insufficiently instantiated'-[], nl,
2923      'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl,
2924      'find all possible instantations of Var.'-[]
2925    ].
2926
2927
2928                 /*******************************
2929                 *        SANDBOX SUPPORT       *
2930                 *******************************/
2931
2932:- multifile
2933    sandbox:safe_primitive/1,               % Goal
2934    sandbox:safe_meta/2.                    % Goal, Calls
2935
2936%!  sandbox:safe_primitive(+Goal) is semidet.
2937%
2938%   Declare the core pengine operations as   safe. If we are talking
2939%   about  local  pengines,  their  safety   is  guaranteed  by  the
2940%   sandboxing done for all pengines.
2941%
2942%   @tbd    If at some point we allow for `unsafe' pengines, we must
2943%           reconsider this.
2944
2945sandbox:safe_primitive(pengines:pengine_destroy(_,_)).
2946sandbox:safe_primitive(pengines:pengine_event(_, _)).
2947sandbox:safe_primitive(pengines:pengine_send(_, _, _)).
2948sandbox:safe_primitive(pengines:pengine_input(_, _)).
2949sandbox:safe_primitive(pengines:pengine_output(_)).
2950sandbox:safe_primitive(pengines:pengine_debug(_,_)).
2951sandbox:safe_primitive(pengines:pengine_ask(_, _, _)).
2952sandbox:safe_primitive(pengines:pengine_pull_response(_,_)).
2953sandbox:safe_primitive(pengines:pengine_user(_)).
2954
2955%!  sandbox:safe_meta(+Goal, -Called) is semidet.
2956%
2957%   Declare the pengine  meta-predicates  as   safe.  Note  that the
2958%   pengine calling predicates  are  safe   because  the  safety  is
2959%   guaranteed by the recieving pengine.
2960
2961sandbox:safe_meta(pengines:pengine_create(_), []).
2962sandbox:safe_meta(pengines:pengine_rpc(_, _, _), []).
2963sandbox:safe_meta(pengines:pengine_event_loop(_,Closure,_,_), [Closure1]) :-
2964    extend_goal(Closure, [_], Closure1).
2965
2966extend_goal(Var, _, _) :-
2967    var(Var),
2968    !,
2969    instantiation_error(Var).
2970extend_goal(M:Term0, Extra, M:Term) :-
2971    extend_goal(Term0, Extra, Term).
2972extend_goal(Atom, Extra, Goal) :-
2973    atom(Atom),
2974    !,
2975    Goal =.. [Atom|Extra].
2976extend_goal(Compound, Extra, Goal) :-
2977    compound(Compound),
2978    !,
2979    compound_name_arguments(Compound, Name, Args0),
2980    append(Args0, Extra, Args),
2981    compound_name_arguments(Goal, Name, Args).