View source with raw comments or as raw
   1/*  Part of SWI-Prolog
   2
   3    Author:        Jan Wielemaker
   4    E-mail:        J.Wielemaker@vu.nl
   5    WWW:           http://www.swi-prolog.org
   6    Copyright (c)  2002-2016, University of Amsterdam
   7                              VU University Amsterdam
   8    All rights reserved.
   9
  10    Redistribution and use in source and binary forms, with or without
  11    modification, are permitted provided that the following conditions
  12    are met:
  13
  14    1. Redistributions of source code must retain the above copyright
  15       notice, this list of conditions and the following disclaimer.
  16
  17    2. Redistributions in binary form must reproduce the above copyright
  18       notice, this list of conditions and the following disclaimer in
  19       the documentation and/or other materials provided with the
  20       distribution.
  21
  22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
  25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
  26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
  27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
  28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
  30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
  32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  33    POSSIBILITY OF SUCH DAMAGE.
  34*/
  35
  36:- module(thread_httpd,
  37          [ http_current_server/2,      % ?:Goal, ?Port
  38            http_server_property/2,     % ?Port, ?Property
  39            http_server/2,              % :Goal, +Options
  40            http_workers/2,             % +Port, ?WorkerCount
  41            http_add_worker/2,          % +Port, +Options
  42            http_current_worker/2,      % ?Port, ?ThreadID
  43            http_stop_server/2,         % +Port, +Options
  44            http_spawn/2,               % :Goal, +Options
  45
  46            http_requeue/1,             % +Request
  47            http_close_connection/1,    % +Request
  48            http_enough_workers/3       % +Queue, +Why, +Peer
  49          ]).
  50:- use_module(library(debug)).
  51:- use_module(library(error)).
  52:- use_module(library(option)).
  53:- use_module(library(socket)).
  54:- use_module(library(thread_pool)).
  55:- use_module(library(gensym)).
  56:- use_module(http_wrapper).
  57:- use_module(http_path).
  58
  59
  60:- predicate_options(http_server/2, 2,
  61                     [ port(any),
  62                       tcp_socket(any),
  63                       workers(positive_integer),
  64                       timeout(number),
  65                       keep_alive_timeout(number),
  66                       ssl(list(any)),  % if http/http_ssl_plugin is loaded
  67                       pass_to(system:thread_create/3, 3)
  68                     ]).
  69:- predicate_options(http_spawn/2, 2,
  70                     [ pool(atom),
  71                       pass_to(system:thread_create/3, 3),
  72                       pass_to(thread_pool:thread_create_in_pool/4, 4)
  73                     ]).
  74:- predicate_options(http_add_worker/2, 2,
  75                     [ timeout(number),
  76                       keep_alive_timeout(number),
  77                       max_idle_time(number),
  78                       pass_to(system:thread_create/3, 3)
  79                     ]).
  80
  81/** <module> Threaded HTTP server
  82
  83This library defines the HTTP server  frontend of choice for SWI-Prolog.
  84It is based on the multi-threading   capabilities of SWI-Prolog and thus
  85exploits multiple cores  to  serve   requests  concurrently.  The server
  86scales well and can cooperate with   library(thread_pool) to control the
  87number of concurrent requests of a given   type.  For example, it can be
  88configured to handle 200 file download requests concurrently, 2 requests
  89that potentially uses a lot of memory and   8 requests that use a lot of
  90CPU resources.
  91
  92On   Unix   systems,    this    library     can    be    combined   with
  93library(http/http_unix_daemon) to realise a proper  Unix service process
  94that creates a web server at  port   80,  runs under a specific account,
  95optionally detaches from the controlling terminal, etc.
  96
  97Combined with library(http/http_ssl_plugin) from the   SSL package, this
  98library   can   be   used   to    create     an    HTTPS   server.   See
  99<plbase>/doc/packages/examples/ssl/https for an example   server using a
 100self-signed SSL certificate.
 101*/
 102
 103:- meta_predicate
 104    http_server(1, :),
 105    http_current_server(1, ?),
 106    http_spawn(0, +).
 107
 108:- dynamic
 109    current_server/6,       % Port, Goal, Thread, Queue, Scheme, StartTime
 110    queue_worker/2,         % Queue, ThreadID
 111    queue_options/2.        % Queue, Options
 112
 113:- multifile
 114    make_socket_hook/3,
 115    accept_hook/2,
 116    close_hook/1,
 117    open_client_hook/6,
 118    http:create_pool/1,
 119    http:schedule_workers/1.
 120
 121%!  http_server(:Goal, :Options) is det.
 122%
 123%   Create a server at Port that calls Goal for each parsed request.
 124%   Options provide a list of options. Defined options are
 125%
 126%     * port(?Address)
 127%     Port to bind to.  Address is either a port or a term
 128%     Host:Port. The port may be a variable, causing the system
 129%     to select a free port.  See tcp_bind/2.
 130%
 131%     * tcp_socket(+Socket)
 132%     If provided, use this socket instead of the creating one and
 133%     binding it to an address.  The socket must be bound to an
 134%     address.
 135%
 136%     * workers(+Count)
 137%     Determine the number of worker threads.  Default is 5.  This
 138%     is fine for small scale usage.  Public servers typically need
 139%     a higher number.
 140%
 141%     * timeout(+Seconds)
 142%     Max time of inactivity trying to read the request after a
 143%     connection has been opened.  Default is 60 seconds.  See
 144%     set_stream/1 using the _timeout_ option.
 145%
 146%     * keep_alive_timeout(+Seconds)
 147%     Time to keep `Keep alive' connections alive.  Default is
 148%     2 seconds.
 149%
 150%     * local(+Kbytes)
 151%     * global(+Kbytes)
 152%     * trail(+Kbytes)
 153%     Stack sizes to use for the workers.  The default is inherited
 154%     from the `main` thread. As of version 5.9 stacks are no longer
 155%     _pre-allocated_ and the given sizes only act as a limit.
 156%     If you need to control resource usage look at the `spawn`
 157%     option of http_handler/3 and library(thread_pool).
 158%
 159%   A  typical  initialization  for  an    HTTP   server  that  uses
 160%   http_dispatch/1 to relay requests to predicates is:
 161%
 162%     ==
 163%     :- use_module(library(http/thread_httpd)).
 164%     :- use_module(library(http/http_dispatch)).
 165%
 166%     start_server(Port) :-
 167%         http_server(http_dispatch, [port(Port)]).
 168%     ==
 169%
 170%   Note that multiple servers  can  coexist   in  the  same  Prolog
 171%   process. A notable application of this is   to have both an HTTP
 172%   and HTTPS server, where the HTTP   server redirects to the HTTPS
 173%   server for handling sensitive requests.
 174
 175http_server(Goal, M:Options0) :-
 176    option(port(Port), Options0),
 177    !,
 178    make_socket(Port, M:Options0, Options),
 179    create_workers(Options),
 180    create_server(Goal, Port, Options),
 181    print_message(informational,
 182                  httpd_started_server(Port)).
 183http_server(_Goal, _Options) :-
 184    existence_error(option, port).
 185
 186
 187%!  make_socket(?Port, :OptionsIn, -OptionsOut) is det.
 188%
 189%   Create the HTTP server socket and  worker pool queue. OptionsOut
 190%   is quaranteed to hold the option queue(QueueId).
 191%
 192%   @arg   OptionsIn   is   qualified   to     allow   passing   the
 193%   module-sensitive ssl option argument.
 194
 195make_socket(Port, Options0, Options) :-
 196    make_socket_hook(Port, Options0, Options),
 197    !.
 198make_socket(Port, _:Options0, Options) :-
 199    option(tcp_socket(_), Options0),
 200    !,
 201    make_addr_atom('httpd', Port, Queue),
 202    Options = [ queue(Queue)
 203              | Options0
 204              ].
 205make_socket(Port, _:Options0, Options) :-
 206    tcp_socket(Socket),
 207    tcp_setopt(Socket, reuseaddr),
 208    tcp_bind(Socket, Port),
 209    tcp_listen(Socket, 5),
 210    make_addr_atom('httpd', Port, Queue),
 211    Options = [ queue(Queue),
 212                tcp_socket(Socket)
 213              | Options0
 214              ].
 215
 216%!  make_addr_atom(+Scheme, +Address, -Atom) is det.
 217%
 218%   Create an atom that identifies  the   server's  queue and thread
 219%   resources.
 220
 221make_addr_atom(Scheme, Address, Atom) :-
 222    phrase(address_parts(Address), Parts),
 223    atomic_list_concat([Scheme,@|Parts], Atom).
 224
 225address_parts(Atomic) -->
 226    { atomic(Atomic) },
 227    !,
 228    [Atomic].
 229address_parts(Host:Port) -->
 230    !,
 231    address_parts(Host), [:], address_parts(Port).
 232address_parts(ip(A,B,C,D)) -->
 233    !,
 234    [ A, '.', B, '.', C, '.', D ].
 235
 236%!  create_server(:Goal, +Address, +Options) is det.
 237%
 238%   Create the main server thread that runs accept_server/2 to
 239%   listen to new requests.
 240
 241create_server(Goal, Address, Options) :-
 242    get_time(StartTime),
 243    memberchk(queue(Queue), Options),
 244    scheme(Scheme, Options),
 245    address_port(Address, Port),
 246    make_addr_atom(Scheme, Port, Alias),
 247    thread_create(accept_server(Goal, Options), _,
 248                  [ alias(Alias)
 249                  ]),
 250    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
 251
 252scheme(Scheme, Options) :-
 253    option(scheme(Scheme), Options),
 254    !.
 255scheme(Scheme, Options) :-
 256    (   option(ssl(_), Options)
 257    ;   option(ssl_instance(_), Options)
 258    ),
 259    !,
 260    Scheme = https.
 261scheme(http, _).
 262
 263address_port(_Host:Port, Port) :- !.
 264address_port(Port, Port).
 265
 266
 267%!  http_current_server(:Goal, ?Port) is nondet.
 268%
 269%   True if Goal is the goal of a server at Port.
 270%
 271%   @deprecated Use http_server_property(Port, goal(Goal))
 272
 273http_current_server(Goal, Port) :-
 274    current_server(Port, Goal, _, _, _, _).
 275
 276
 277%!  http_server_property(?Port, ?Property) is nondet.
 278%
 279%   True if Property is a property of the HTTP server running at
 280%   Port.  Defined properties are:
 281%
 282%       * goal(:Goal)
 283%       Goal used to start the server. This is often
 284%       http_dispatch/1.
 285%       * scheme(-Scheme)
 286%       Scheme is one of `http` or `https`.
 287%       * start_time(?Time)
 288%       Time-stamp when the server was created.
 289
 290http_server_property(_:Port, Property) :-
 291    integer(Port),
 292    !,
 293    server_property(Property, Port).
 294http_server_property(Port, Property) :-
 295    server_property(Property, Port).
 296
 297server_property(goal(Goal), Port) :-
 298    current_server(Port, Goal, _, _, _, _).
 299server_property(scheme(Scheme), Port) :-
 300    current_server(Port, _, _, _, Scheme, _).
 301server_property(start_time(Time), Port) :-
 302    current_server(Port, _, _, _, _, Time).
 303
 304
 305%!  http_workers(+Port, -Workers) is det.
 306%!  http_workers(+Port, +Workers:int) is det.
 307%
 308%   Query or set the number of workers  for the server at this port.
 309%   The number of workers is dynamically   modified. Setting it to 1
 310%   (one) can be used to profile the worker using tprofile/1.
 311
 312http_workers(Port, Workers) :-
 313    must_be(ground, Port),
 314    current_server(Port, _, _, Queue, _, _),
 315    !,
 316    (   integer(Workers)
 317    ->  resize_pool(Queue, Workers)
 318    ;   findall(W, queue_worker(Queue, W), WorkerIDs),
 319        length(WorkerIDs, Workers)
 320    ).
 321http_workers(Port, _) :-
 322    existence_error(http_server, Port).
 323
 324
 325%!  http_add_worker(+Port, +Options) is det.
 326%
 327%   Add a new worker to  the  HTTP   server  for  port Port. Options
 328%   overrule the default queue  options.   The  following additional
 329%   options are processed:
 330%
 331%     - max_idle_time(+Seconds)
 332%     The created worker will automatically terminate if there is
 333%     no new work within Seconds.
 334
 335http_add_worker(Port, Options) :-
 336    must_be(ground, Port),
 337    current_server(Port, _, _, Queue, _, _),
 338    !,
 339    queue_options(Queue, QueueOptions),
 340    merge_options(Options, QueueOptions, WorkerOptions),
 341    atom_concat(Queue, '_', AliasBase),
 342    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
 343http_add_worker(Port, _) :-
 344    existence_error(http_server, Port).
 345
 346
 347%!  http_current_worker(?Port, ?ThreadID) is nondet.
 348%
 349%   True if ThreadID is the identifier   of  a Prolog thread serving
 350%   Port. This predicate is  motivated  to   allow  for  the  use of
 351%   arbitrary interaction with the worker thread for development and
 352%   statistics.
 353
 354http_current_worker(Port, ThreadID) :-
 355    current_server(Port, _, _, Queue, _, _),
 356    queue_worker(Queue, ThreadID).
 357
 358
 359%!  accept_server(:Goal, +Options)
 360%
 361%   The goal of a small server-thread accepting new requests and
 362%   posting them to the queue of workers.
 363
 364accept_server(Goal, Options) :-
 365    catch(accept_server2(Goal, Options), http_stop, true),
 366    thread_self(Thread),
 367    retract(current_server(_Port, _, Thread, _Queue, _Scheme, _StartTime)),
 368    close_server_socket(Options).
 369
 370accept_server2(Goal, Options) :-
 371    repeat,
 372      (   catch(accept_server3(Goal, Options), E, true)
 373      ->  (   var(E)
 374          ->  fail
 375          ;   accept_rethrow_error(E)
 376          ->  throw(E)
 377          ;   print_message(error, E),
 378              fail
 379          )
 380      ;   print_message(error,      % internal error
 381                        goal_failed(accept_server3(Goal, Options))),
 382          fail
 383      ).
 384
 385accept_server3(Goal, Options) :-
 386    accept_hook(Goal, Options),
 387    !.
 388accept_server3(Goal, Options) :-
 389    memberchk(tcp_socket(Socket), Options),
 390    memberchk(queue(Queue), Options),
 391    tcp_accept(Socket, Client, Peer),
 392    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
 393    http_enough_workers(Queue, accept, Peer),
 394    thread_send_message(Queue, tcp_client(Client, Goal, Peer)).
 395
 396accept_rethrow_error(http_stop).
 397accept_rethrow_error('$aborted').
 398
 399
 400%!  close_server_socket(+Options)
 401%
 402%   Close the server socket.
 403
 404close_server_socket(Options) :-
 405    close_hook(Options),
 406    !.
 407close_server_socket(Options) :-
 408    memberchk(tcp_socket(Socket), Options),
 409    !,
 410    tcp_close_socket(Socket).
 411
 412
 413%!  http_stop_server(+Port, +Options)
 414%
 415%   Stop the indicated  HTTP  server   gracefully.  First  stops all
 416%   workers, then stops the server.
 417%
 418%   @tbd    Realise non-graceful stop
 419
 420http_stop_server(Host:Port, Options) :-         % e.g., localhost:4000
 421    ground(Host),
 422    !,
 423    http_stop_server(Port, Options).
 424http_stop_server(Port, _Options) :-
 425    http_workers(Port, 0),                  % checks Port is ground
 426    current_server(Port, _, Thread, Queue, _Scheme, _Start),
 427    retractall(queue_options(Queue, _)),
 428    thread_signal(Thread, throw(http_stop)),
 429    catch(connect(localhost:Port), _, true),
 430    thread_join(Thread, _),
 431    message_queue_destroy(Queue).
 432
 433connect(Address) :-
 434    setup_call_cleanup(
 435        tcp_socket(Socket),
 436        tcp_connect(Socket, Address),
 437        tcp_close_socket(Socket)).
 438
 439%!  http_enough_workers(+Queue, +Why, +Peer) is det.
 440%
 441%   Check that we have enough workers in our queue. If not, call the
 442%   hook http:schedule_workers/1 to extend  the   worker  pool. This
 443%   predicate can be used by accept_hook/2.
 444
 445http_enough_workers(Queue, Why, Peer) :-
 446    message_queue_property(Queue, size(Size)),
 447    (   enough(Size, Why)
 448    ->  true
 449    ;   current_server(Port, _, _, Queue, _, _),
 450        catch(http:schedule_workers(_{port:Port,
 451                                      reason:Why,
 452                                      peer:Peer,
 453                                      waiting:Size}),
 454              Error,
 455              print_message(error, Error))
 456    ->  true
 457    ;   true
 458    ).
 459
 460enough(0, _).
 461enough(1, keep_alive).                  % I will be ready myself
 462
 463
 464%!  http:schedule_workers(+Data:dict) is semidet.
 465%
 466%   Hook called if a  new  connection   or  a  keep-alive connection
 467%   cannot be scheduled _immediately_ to a worker. Dict contains the
 468%   following keys:
 469%
 470%     - port:Port
 471%     Port number that identifies the server.
 472%     - reason:Reason
 473%     One of =accept= for a new connection or =keep_alive= if a
 474%     worker tries to reschedule itself.
 475%     - peer:Peer
 476%     Identify the other end of the connection
 477%     - waiting:Size
 478%     Number of messages waiting in the queue.
 479%
 480%   Note that, when called with `reason:accept`,   we  are called in
 481%   the time critical main accept loop.   An  implementation of this
 482%   hook shall typically send  the  event   to  thread  dedicated to
 483%   dynamic worker-pool management.
 484%
 485%   @see    http_add_worker/2 may be used to create (temporary) extra
 486%           workers.
 487
 488
 489                 /*******************************
 490                 *    WORKER QUEUE OPERATIONS   *
 491                 *******************************/
 492
 493%!  create_workers(+Options)
 494%
 495%   Create the pool of HTTP worker-threads. Each worker has the
 496%   alias http_worker_N.
 497
 498create_workers(Options) :-
 499    option(workers(N), Options, 5),
 500    option(queue(Queue), Options),
 501    catch(message_queue_create(Queue), _, true),
 502    atom_concat(Queue, '_', AliasBase),
 503    create_workers(1, N, Queue, AliasBase, Options),
 504    assert(queue_options(Queue, Options)).
 505
 506create_workers(I, N, _, _, _) :-
 507    I > N,
 508    !.
 509create_workers(I, N, Queue, AliasBase, Options) :-
 510    gensym(AliasBase, Alias),
 511    thread_create(http_worker(Options), Id,
 512                  [ alias(Alias)
 513                  | Options
 514                  ]),
 515    assertz(queue_worker(Queue, Id)),
 516    I2 is I + 1,
 517    create_workers(I2, N, Queue, AliasBase, Options).
 518
 519
 520%!  resize_pool(+Queue, +Workers) is det.
 521%
 522%   Create or destroy workers. If workers   are  destroyed, the call
 523%   waits until the desired number of waiters is reached.
 524
 525resize_pool(Queue, Size) :-
 526    findall(W, queue_worker(Queue, W), Workers),
 527    length(Workers, Now),
 528    (   Now < Size
 529    ->  queue_options(Queue, Options),
 530        atom_concat(Queue, '_', AliasBase),
 531        I0 is Now+1,
 532        create_workers(I0, Size, Queue, AliasBase, Options)
 533    ;   Now == Size
 534    ->  true
 535    ;   Now > Size
 536    ->  Excess is Now - Size,
 537        thread_self(Me),
 538        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
 539        forall(between(1, Excess, _), thread_get_message(quitted(_)))
 540    ).
 541
 542
 543%!  http_worker(+Options)
 544%
 545%   Run HTTP worker main loop. Workers   simply  wait until they are
 546%   passed an accepted socket to process  a client.
 547%
 548%   If the message quit(Sender) is read   from the queue, the worker
 549%   stops.
 550
 551http_worker(Options) :-
 552    thread_at_exit(done_worker),
 553    option(queue(Queue), Options),
 554    option(max_idle_time(MaxIdle), Options, infinite),
 555    repeat,
 556      garbage_collect,
 557      trim_stacks,
 558      debug(http(worker), 'Waiting for a job ...', []),
 559      (   MaxIdle == infinite
 560      ->  thread_get_message(Queue, Message)
 561      ;   thread_get_message(Queue, Message, [timeout(MaxIdle)])
 562      ->  true
 563      ;   Message = quit(idle)
 564      ),
 565      debug(http(worker), 'Got job ~p', [Message]),
 566      (   Message = quit(Sender)
 567      ->  !,
 568          thread_self(Self),
 569          thread_detach(Self),
 570          (   Sender == idle
 571          ->  true
 572          ;   thread_send_message(Sender, quitted(Self))
 573          )
 574      ;   open_client(Message, Queue, Goal, In, Out,
 575                      Options, ClientOptions),
 576          (   catch(http_process(Goal, In, Out, ClientOptions),
 577                    Error, true)
 578          ->  true
 579          ;   Error = goal_failed(http_process/4)
 580          ),
 581          (   var(Error)
 582          ->  fail
 583          ;   current_message_level(Error, Level),
 584              print_message(Level, Error),
 585              memberchk(peer(Peer), ClientOptions),
 586              close_connection(Peer, In, Out),
 587              fail
 588          )
 589      ).
 590
 591
 592%!  open_client(+Message, +Queue, -Goal, -In, -Out,
 593%!              +Options, -ClientOptions) is semidet.
 594%
 595%   Opens the connection to the client in a worker from the message
 596%   sent to the queue by accept_server/2.
 597
 598open_client(requeue(In, Out, Goal, ClOpts),
 599            _, Goal, In, Out, Opts, ClOpts) :-
 600    !,
 601    memberchk(peer(Peer), ClOpts),
 602    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
 603    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
 604open_client(Message, Queue, Goal, In, Out, Opts,
 605            [ pool(client(Queue, Goal, In, Out)),
 606              timeout(Timeout)
 607            | Options
 608            ]) :-
 609    catch(open_client(Message, Goal, In, Out, Options, Opts),
 610          E, report_error(E)),
 611    option(timeout(Timeout), Opts, 60),
 612    (   debugging(http(connection))
 613    ->  memberchk(peer(Peer), Options),
 614        debug(http(connection), 'Opened connection from ~p', [Peer])
 615    ;   true
 616    ).
 617
 618
 619%!  open_client(+Message, +Goal, -In, -Out,
 620%!              -ClientOptions, +Options) is det.
 621
 622open_client(Message, Goal, In, Out, ClientOptions, Options) :-
 623    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
 624    !.
 625open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
 626            [ peer(Peer),
 627              protocol(http)
 628            ], _) :-
 629    tcp_open_socket(Socket, In, Out).
 630
 631report_error(E) :-
 632    print_message(error, E),
 633    fail.
 634
 635
 636%!  check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet.
 637%
 638%   Wait for the client for at most  TimeOut seconds. Succeed if the
 639%   client starts a new request within   this  time. Otherwise close
 640%   the connection and fail.
 641
 642check_keep_alive_connection(In, TMO, Peer, In, Out) :-
 643    stream_property(In, timeout(Old)),
 644    set_stream(In, timeout(TMO)),
 645    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
 646    catch(peek_code(In, Code), E, true),
 647    (   var(E),                     % no exception
 648        Code \== -1                 % no end-of-file
 649    ->  set_stream(In, timeout(Old)),
 650        debug(http(keep_alive), '\tre-using keep-alive connection', [])
 651    ;   (   Code == -1
 652        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
 653        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
 654        ),
 655        close_connection(Peer, In, Out),
 656        fail
 657    ).
 658
 659
 660%!  done_worker
 661%
 662%   Called when worker is terminated  due   to  http_workers/2  or a
 663%   (debugging) exception. In  the   latter  case, recreate_worker/2
 664%   creates a new worker.
 665
 666done_worker :-
 667    thread_self(Self),
 668    thread_property(Self, status(Status)),
 669    retract(queue_worker(Queue, Self)),
 670    (   catch(recreate_worker(Status, Queue), _, fail)
 671    ->  thread_detach(Self),
 672        print_message(informational,
 673                      httpd_restarted_worker(Self))
 674    ;   done_status_message_level(Status, Level),
 675        print_message(Level,
 676                      httpd_stopped_worker(Self, Status))
 677    ).
 678
 679done_status_message_level(true, silent) :- !.
 680done_status_message_level(exception('$aborted'), silent) :- !.
 681done_status_message_level(_, informational).
 682
 683
 684%!  recreate_worker(+Status, +Queue) is semidet.
 685%
 686%   Deal with the possibility that  threads are, during development,
 687%   killed with abort/0. We  recreate  the   worker  to  avoid  that
 688%   eventually we run out of workers.  If   we  are aborted due to a
 689%   halt/0 call, thread_create/3 will raise a permission error.
 690
 691recreate_worker(exception(Error), Queue) :-
 692    recreate_on_error(Error),
 693    queue_options(Queue, Options),
 694    atom_concat(Queue, '_', AliasBase),
 695    create_workers(1, 1, Queue, AliasBase, Options).
 696
 697recreate_on_error('$aborted').
 698recreate_on_error(time_limit_exceeded).
 699
 700%       thread_httpd:message_level(+Exception, -Level)
 701%
 702%       Determine the message stream used for  exceptions that may occur
 703%       during server_loop/5. Being multifile, clauses   can be added by
 704%       the   application   to   refine   error   handling.   See   also
 705%       message_hook/3 for further programming error handling.
 706
 707:- multifile
 708    message_level/2.
 709
 710message_level(error(io_error(read, _), _),      silent).
 711message_level(error(timeout_error(read, _), _), informational).
 712message_level(keep_alive_timeout,               silent).
 713
 714current_message_level(Term, Level) :-
 715    (   message_level(Term, Level)
 716    ->  true
 717    ;   Level = error
 718    ).
 719
 720
 721%!  http_requeue(+Header)
 722%
 723%   Re-queue a connection to  the  worker   pool.  This  deals  with
 724%   processing additional requests on keep-alive connections.
 725
 726http_requeue(Header) :-
 727    requeue_header(Header, ClientOptions),
 728    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
 729    memberchk(peer(Peer), ClientOptions),
 730    http_enough_workers(Queue, keep_alive, Peer),
 731    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
 732    !.
 733http_requeue(Header) :-
 734    debug(http(error), 'Re-queue failed: ~p', [Header]),
 735    fail.
 736
 737requeue_header([], []).
 738requeue_header([H|T0], [H|T]) :-
 739    requeue_keep(H),
 740    !,
 741    requeue_header(T0, T).
 742requeue_header([_|T0], T) :-
 743    requeue_header(T0, T).
 744
 745requeue_keep(pool(_)).
 746requeue_keep(peer(_)).
 747requeue_keep(protocol(_)).
 748
 749
 750%!  http_process(Message, Queue, +Options)
 751%
 752%   Handle a single client message on the given stream.
 753
 754http_process(Goal, In, Out, Options) :-
 755    debug(http(server), 'Running server goal ~p on ~p -> ~p',
 756          [Goal, In, Out]),
 757    option(timeout(TMO), Options, 60),
 758    set_stream(In, timeout(TMO)),
 759    set_stream(Out, timeout(TMO)),
 760    http_wrapper(Goal, In, Out, Connection,
 761                 [ request(Request)
 762                 | Options
 763                 ]),
 764    next(Connection, Request).
 765
 766next(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
 767    !,
 768    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
 769    (   catch(call(SwitchGoal, In, Out), E,
 770              (   print_message(error, E),
 771                  fail))
 772    ->  true
 773    ;   http_close_connection(Request)
 774    ).
 775next(spawned(ThreadId), _) :-
 776    !,
 777    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
 778next(Connection, Request) :-
 779    downcase_atom(Connection, 'keep-alive'),
 780    http_requeue(Request),
 781    !.
 782next(_, Request) :-
 783    http_close_connection(Request).
 784
 785
 786%!  http_close_connection(+Request)
 787%
 788%   Close connection associated to Request.  See also http_requeue/1.
 789
 790http_close_connection(Request) :-
 791    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
 792    memberchk(peer(Peer), Request),
 793    close_connection(Peer, In, Out).
 794
 795%!  close_connection(+Peer, +In, +Out)
 796%
 797%   Closes the connection from the server to the client.  Errors are
 798%   currently silently ignored.
 799
 800close_connection(Peer, In, Out) :-
 801    debug(http(connection), 'Closing connection from ~p', [Peer]),
 802    catch(close(In, [force(true)]), _, true),
 803    catch(close(Out, [force(true)]), _, true).
 804
 805%!  http_spawn(:Goal, +Options) is det.
 806%
 807%   Continue this connection on a  new   thread.  A handler may call
 808%   http_spawn/2 to start a new thread that continues processing the
 809%   current request using Goal. The original   thread returns to the
 810%   worker pool for processing new requests.   Options are passed to
 811%   thread_create/3, except for:
 812%
 813%       * pool(+Pool)
 814%       Interfaces to library(thread_pool), starting the thread
 815%       on the given pool.
 816%
 817%   If a pool does not exist, this predicate calls the multifile
 818%   hook http:create_pool/1 to create it. If this predicate succeeds
 819%   the operation is retried.
 820
 821http_spawn(Goal, Options) :-
 822    select_option(pool(Pool), Options, ThreadOptions),
 823    !,
 824    current_output(CGI),
 825    catch(thread_create_in_pool(Pool,
 826                                wrap_spawned(CGI, Goal), Id,
 827                                [ detached(true)
 828                                | ThreadOptions
 829                                ]),
 830          Error,
 831          true),
 832    (   var(Error)
 833    ->  http_spawned(Id)
 834    ;   Error = error(resource_error(threads_in_pool(_)), _)
 835    ->  throw(http_reply(busy))
 836    ;   Error = error(existence_error(thread_pool, Pool), _),
 837        create_pool(Pool)
 838    ->  http_spawn(Goal, Options)
 839    ;   throw(Error)
 840    ).
 841http_spawn(Goal, Options) :-
 842    current_output(CGI),
 843    thread_create(wrap_spawned(CGI, Goal), Id,
 844                  [ detached(true)
 845                  | Options
 846                  ]),
 847    http_spawned(Id).
 848
 849wrap_spawned(CGI, Goal) :-
 850    set_output(CGI),
 851    http_wrap_spawned(Goal, Request, Connection),
 852    next(Connection, Request).
 853
 854%!  create_pool(+Pool)
 855%
 856%   Lazy  creation  of  worker-pools  for   the  HTTP  server.  This
 857%   predicate calls the hook http:create_pool/1.   If the hook fails
 858%   it creates a default pool of size   10. This should suffice most
 859%   typical usecases. Note that we  get   a  permission error if the
 860%   pool is already created.  We can ignore this.
 861
 862create_pool(Pool) :-
 863    E = error(permission_error(create, thread_pool, Pool), _),
 864    catch(http:create_pool(Pool), E, true).
 865create_pool(Pool) :-
 866    print_message(informational, httpd(created_pool(Pool))),
 867    thread_pool_create(Pool, 10, []).
 868
 869
 870
 871                 /*******************************
 872                 *            MESSAGES          *
 873                 *******************************/
 874
 875:- multifile
 876    prolog:message/3.
 877
 878prolog:message(httpd_started_server(Port)) -->
 879    [ 'Started server at '-[] ],
 880    http_root(Port).
 881prolog:message(httpd_stopped_worker(Self, Status)) -->
 882    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
 883prolog:message(httpd_restarted_worker(Self)) -->
 884    [ 'Replaced aborted worker ~p'-[Self] ].
 885prolog:message(httpd(created_pool(Pool))) -->
 886    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
 887      'Create this pool at startup-time or define the hook ', nl,
 888      'http:create_pool/1 to avoid this message and create a ', nl,
 889      'pool that fits the usage-profile.'
 890    ].
 891
 892http_root(Host:Port) -->
 893    !,
 894    http_scheme(Port),
 895    { http_absolute_location(root(.), URI, []) },
 896    [ '~w:~w~w'-[Host, Port, URI] ].
 897http_root(Port) -->
 898    http_scheme(Port),
 899    { http_absolute_location(root(.), URI, []) },
 900    [ 'localhost:~w~w'-[Port, URI] ].
 901
 902http_scheme(Port) -->
 903    { http_server_property(Port, scheme(Scheme)) },
 904    [ '~w://'-[Scheme] ].