35
36:- module(thread_httpd,
37 [ http_current_server/2, 38 http_server_property/2, 39 http_server/2, 40 http_workers/2, 41 http_add_worker/2, 42 http_current_worker/2, 43 http_stop_server/2, 44 http_spawn/2, 45
46 http_requeue/1, 47 http_close_connection/1, 48 http_enough_workers/3 49 ]).
50:- use_module(library(debug)).
51:- use_module(library(error)).
52:- use_module(library(option)).
53:- use_module(library(socket)).
54:- use_module(library(thread_pool)).
55:- use_module(library(gensym)).
56:- use_module(http_wrapper).
57:- use_module(http_path).
58
59
60:- predicate_options(http_server/2, 2,
61 [ port(any),
62 tcp_socket(any),
63 workers(positive_integer),
64 timeout(number),
65 keep_alive_timeout(number),
66 ssl(list(any)), 67 pass_to(system:thread_create/3, 3)
68 ]).
69:- predicate_options(http_spawn/2, 2,
70 [ pool(atom),
71 pass_to(system:thread_create/3, 3),
72 pass_to(thread_pool:thread_create_in_pool/4, 4)
73 ]).
74:- predicate_options(http_add_worker/2, 2,
75 [ timeout(number),
76 keep_alive_timeout(number),
77 max_idle_time(number),
78 pass_to(system:thread_create/3, 3)
79 ]).
80
102
103:- meta_predicate
104 http_server(1, :),
105 http_current_server(1, ?),
106 http_spawn(0, +).
107
108:- dynamic
109 current_server/6, 110 queue_worker/2, 111 queue_options/2. 112
113:- multifile
114 make_socket_hook/3,
115 accept_hook/2,
116 close_hook/1,
117 open_client_hook/6,
118 http:create_pool/1,
119 http:schedule_workers/1.
120
174
175http_server(Goal, M:Options0) :-
176 option(port(Port), Options0),
177 !,
178 make_socket(Port, M:Options0, Options),
179 create_workers(Options),
180 create_server(Goal, Port, Options),
181 print_message(informational,
182 httpd_started_server(Port)).
183http_server(_Goal, _Options) :-
184 existence_error(option, port).
185
186
194
195make_socket(Port, Options0, Options) :-
196 make_socket_hook(Port, Options0, Options),
197 !.
198make_socket(Port, _:Options0, Options) :-
199 option(tcp_socket(_), Options0),
200 !,
201 make_addr_atom('httpd', Port, Queue),
202 Options = [ queue(Queue)
203 | Options0
204 ].
205make_socket(Port, _:Options0, Options) :-
206 tcp_socket(Socket),
207 tcp_setopt(Socket, reuseaddr),
208 tcp_bind(Socket, Port),
209 tcp_listen(Socket, 5),
210 make_addr_atom('httpd', Port, Queue),
211 Options = [ queue(Queue),
212 tcp_socket(Socket)
213 | Options0
214 ].
215
220
221make_addr_atom(Scheme, Address, Atom) :-
222 phrase(address_parts(Address), Parts),
223 atomic_list_concat([Scheme,@|Parts], Atom).
224
225address_parts(Atomic) -->
226 { atomic(Atomic) },
227 !,
228 [Atomic].
229address_parts(Host:Port) -->
230 !,
231 address_parts(Host), [:], address_parts(Port).
232address_parts(ip(A,B,C,D)) -->
233 !,
234 [ A, '.', B, '.', C, '.', D ].
235
240
241create_server(Goal, Address, Options) :-
242 get_time(StartTime),
243 memberchk(queue(Queue), Options),
244 scheme(Scheme, Options),
245 address_port(Address, Port),
246 make_addr_atom(Scheme, Port, Alias),
247 thread_create(accept_server(Goal, Options), _,
248 [ alias(Alias)
249 ]),
250 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
251
252scheme(Scheme, Options) :-
253 option(scheme(Scheme), Options),
254 !.
255scheme(Scheme, Options) :-
256 ( option(ssl(_), Options)
257 ; option(ssl_instance(_), Options)
258 ),
259 !,
260 Scheme = https.
261scheme(http, _).
262
263address_port(_Host:Port, Port) :- !.
264address_port(Port, Port).
265
266
272
273http_current_server(Goal, Port) :-
274 current_server(Port, Goal, _, _, _, _).
275
276
289
290http_server_property(_:Port, Property) :-
291 integer(Port),
292 !,
293 server_property(Property, Port).
294http_server_property(Port, Property) :-
295 server_property(Property, Port).
296
297server_property(goal(Goal), Port) :-
298 current_server(Port, Goal, _, _, _, _).
299server_property(scheme(Scheme), Port) :-
300 current_server(Port, _, _, _, Scheme, _).
301server_property(start_time(Time), Port) :-
302 current_server(Port, _, _, _, _, Time).
303
304
311
312http_workers(Port, Workers) :-
313 must_be(ground, Port),
314 current_server(Port, _, _, Queue, _, _),
315 !,
316 ( integer(Workers)
317 -> resize_pool(Queue, Workers)
318 ; findall(W, queue_worker(Queue, W), WorkerIDs),
319 length(WorkerIDs, Workers)
320 ).
321http_workers(Port, _) :-
322 existence_error(http_server, Port).
323
324
334
335http_add_worker(Port, Options) :-
336 must_be(ground, Port),
337 current_server(Port, _, _, Queue, _, _),
338 !,
339 queue_options(Queue, QueueOptions),
340 merge_options(Options, QueueOptions, WorkerOptions),
341 atom_concat(Queue, '_', AliasBase),
342 create_workers(1, 1, Queue, AliasBase, WorkerOptions).
343http_add_worker(Port, _) :-
344 existence_error(http_server, Port).
345
346
353
354http_current_worker(Port, ThreadID) :-
355 current_server(Port, _, _, Queue, _, _),
356 queue_worker(Queue, ThreadID).
357
358
363
364accept_server(Goal, Options) :-
365 catch(accept_server2(Goal, Options), http_stop, true),
366 thread_self(Thread),
367 retract(current_server(_Port, _, Thread, _Queue, _Scheme, _StartTime)),
368 close_server_socket(Options).
369
370accept_server2(Goal, Options) :-
371 repeat,
372 ( catch(accept_server3(Goal, Options), E, true)
373 -> ( var(E)
374 -> fail
375 ; accept_rethrow_error(E)
376 -> throw(E)
377 ; print_message(error, E),
378 fail
379 )
380 ; print_message(error, 381 goal_failed(accept_server3(Goal, Options))),
382 fail
383 ).
384
385accept_server3(Goal, Options) :-
386 accept_hook(Goal, Options),
387 !.
388accept_server3(Goal, Options) :-
389 memberchk(tcp_socket(Socket), Options),
390 memberchk(queue(Queue), Options),
391 tcp_accept(Socket, Client, Peer),
392 debug(http(connection), 'New HTTP connection from ~p', [Peer]),
393 http_enough_workers(Queue, accept, Peer),
394 thread_send_message(Queue, tcp_client(Client, Goal, Peer)).
395
396accept_rethrow_error(http_stop).
397accept_rethrow_error('$aborted').
398
399
403
404close_server_socket(Options) :-
405 close_hook(Options),
406 !.
407close_server_socket(Options) :-
408 memberchk(tcp_socket(Socket), Options),
409 !,
410 tcp_close_socket(Socket).
411
412
419
420http_stop_server(Host:Port, Options) :- 421 ground(Host),
422 !,
423 http_stop_server(Port, Options).
424http_stop_server(Port, _Options) :-
425 http_workers(Port, 0), 426 current_server(Port, _, Thread, Queue, _Scheme, _Start),
427 retractall(queue_options(Queue, _)),
428 thread_signal(Thread, throw(http_stop)),
429 catch(connect(localhost:Port), _, true),
430 thread_join(Thread, _),
431 message_queue_destroy(Queue).
432
433connect(Address) :-
434 setup_call_cleanup(
435 tcp_socket(Socket),
436 tcp_connect(Socket, Address),
437 tcp_close_socket(Socket)).
438
444
445http_enough_workers(Queue, Why, Peer) :-
446 message_queue_property(Queue, size(Size)),
447 ( enough(Size, Why)
448 -> true
449 ; current_server(Port, _, _, Queue, _, _),
450 catch(http:schedule_workers(_{port:Port,
451 reason:Why,
452 peer:Peer,
453 waiting:Size}),
454 Error,
455 print_message(error, Error))
456 -> true
457 ; true
458 ).
459
460enough(0, _).
461enough(1, keep_alive). 462
463
487
488
489 492
497
498create_workers(Options) :-
499 option(workers(N), Options, 5),
500 option(queue(Queue), Options),
501 catch(message_queue_create(Queue), _, true),
502 atom_concat(Queue, '_', AliasBase),
503 create_workers(1, N, Queue, AliasBase, Options),
504 assert(queue_options(Queue, Options)).
505
506create_workers(I, N, _, _, _) :-
507 I > N,
508 !.
509create_workers(I, N, Queue, AliasBase, Options) :-
510 gensym(AliasBase, Alias),
511 thread_create(http_worker(Options), Id,
512 [ alias(Alias)
513 | Options
514 ]),
515 assertz(queue_worker(Queue, Id)),
516 I2 is I + 1,
517 create_workers(I2, N, Queue, AliasBase, Options).
518
519
524
525resize_pool(Queue, Size) :-
526 findall(W, queue_worker(Queue, W), Workers),
527 length(Workers, Now),
528 ( Now < Size
529 -> queue_options(Queue, Options),
530 atom_concat(Queue, '_', AliasBase),
531 I0 is Now+1,
532 create_workers(I0, Size, Queue, AliasBase, Options)
533 ; Now == Size
534 -> true
535 ; Now > Size
536 -> Excess is Now - Size,
537 thread_self(Me),
538 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
539 forall(between(1, Excess, _), thread_get_message(quitted(_)))
540 ).
541
542
550
551http_worker(Options) :-
552 thread_at_exit(done_worker),
553 option(queue(Queue), Options),
554 option(max_idle_time(MaxIdle), Options, infinite),
555 repeat,
556 garbage_collect,
557 trim_stacks,
558 debug(http(worker), 'Waiting for a job ...', []),
559 ( MaxIdle == infinite
560 -> thread_get_message(Queue, Message)
561 ; thread_get_message(Queue, Message, [timeout(MaxIdle)])
562 -> true
563 ; Message = quit(idle)
564 ),
565 debug(http(worker), 'Got job ~p', [Message]),
566 ( Message = quit(Sender)
567 -> !,
568 thread_self(Self),
569 thread_detach(Self),
570 ( Sender == idle
571 -> true
572 ; thread_send_message(Sender, quitted(Self))
573 )
574 ; open_client(Message, Queue, Goal, In, Out,
575 Options, ClientOptions),
576 ( catch(http_process(Goal, In, Out, ClientOptions),
577 Error, true)
578 -> true
579 ; Error = goal_failed(http_process/4)
580 ),
581 ( var(Error)
582 -> fail
583 ; current_message_level(Error, Level),
584 print_message(Level, Error),
585 memberchk(peer(Peer), ClientOptions),
586 close_connection(Peer, In, Out),
587 fail
588 )
589 ).
590
591
597
598open_client(requeue(In, Out, Goal, ClOpts),
599 _, Goal, In, Out, Opts, ClOpts) :-
600 !,
601 memberchk(peer(Peer), ClOpts),
602 option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
603 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
604open_client(Message, Queue, Goal, In, Out, Opts,
605 [ pool(client(Queue, Goal, In, Out)),
606 timeout(Timeout)
607 | Options
608 ]) :-
609 catch(open_client(Message, Goal, In, Out, Options, Opts),
610 E, report_error(E)),
611 option(timeout(Timeout), Opts, 60),
612 ( debugging(http(connection))
613 -> memberchk(peer(Peer), Options),
614 debug(http(connection), 'Opened connection from ~p', [Peer])
615 ; true
616 ).
617
618
621
622open_client(Message, Goal, In, Out, ClientOptions, Options) :-
623 open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
624 !.
625open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
626 [ peer(Peer),
627 protocol(http)
628 ], _) :-
629 tcp_open_socket(Socket, In, Out).
630
631report_error(E) :-
632 print_message(error, E),
633 fail.
634
635
641
642check_keep_alive_connection(In, TMO, Peer, In, Out) :-
643 stream_property(In, timeout(Old)),
644 set_stream(In, timeout(TMO)),
645 debug(http(keep_alive), 'Waiting for keep-alive ...', []),
646 catch(peek_code(In, Code), E, true),
647 ( var(E), 648 Code \== -1 649 -> set_stream(In, timeout(Old)),
650 debug(http(keep_alive), '\tre-using keep-alive connection', [])
651 ; ( Code == -1
652 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
653 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
654 ),
655 close_connection(Peer, In, Out),
656 fail
657 ).
658
659
665
666done_worker :-
667 thread_self(Self),
668 thread_property(Self, status(Status)),
669 retract(queue_worker(Queue, Self)),
670 ( catch(recreate_worker(Status, Queue), _, fail)
671 -> thread_detach(Self),
672 print_message(informational,
673 httpd_restarted_worker(Self))
674 ; done_status_message_level(Status, Level),
675 print_message(Level,
676 httpd_stopped_worker(Self, Status))
677 ).
678
679done_status_message_level(true, silent) :- !.
680done_status_message_level(exception('$aborted'), silent) :- !.
681done_status_message_level(_, informational).
682
683
690
691recreate_worker(exception(Error), Queue) :-
692 recreate_on_error(Error),
693 queue_options(Queue, Options),
694 atom_concat(Queue, '_', AliasBase),
695 create_workers(1, 1, Queue, AliasBase, Options).
696
697recreate_on_error('$aborted').
698recreate_on_error(time_limit_exceeded).
699
706
707:- multifile
708 message_level/2.
709
710message_level(error(io_error(read, _), _), silent).
711message_level(error(timeout_error(read, _), _), informational).
712message_level(keep_alive_timeout, silent).
713
714current_message_level(Term, Level) :-
715 ( message_level(Term, Level)
716 -> true
717 ; Level = error
718 ).
719
720
725
726http_requeue(Header) :-
727 requeue_header(Header, ClientOptions),
728 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
729 memberchk(peer(Peer), ClientOptions),
730 http_enough_workers(Queue, keep_alive, Peer),
731 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
732 !.
733http_requeue(Header) :-
734 debug(http(error), 'Re-queue failed: ~p', [Header]),
735 fail.
736
([], []).
738requeue_header([H|T0], [H|T]) :-
739 requeue_keep(H),
740 !,
741 requeue_header(T0, T).
742requeue_header([_|T0], T) :-
743 requeue_header(T0, T).
744
745requeue_keep(pool(_)).
746requeue_keep(peer(_)).
747requeue_keep(protocol(_)).
748
749
753
754http_process(Goal, In, Out, Options) :-
755 debug(http(server), 'Running server goal ~p on ~p -> ~p',
756 [Goal, In, Out]),
757 option(timeout(TMO), Options, 60),
758 set_stream(In, timeout(TMO)),
759 set_stream(Out, timeout(TMO)),
760 http_wrapper(Goal, In, Out, Connection,
761 [ request(Request)
762 | Options
763 ]),
764 next(Connection, Request).
765
766next(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
767 !,
768 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
769 ( catch(call(SwitchGoal, In, Out), E,
770 ( print_message(error, E),
771 fail))
772 -> true
773 ; http_close_connection(Request)
774 ).
775next(spawned(ThreadId), _) :-
776 !,
777 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
778next(Connection, Request) :-
779 downcase_atom(Connection, 'keep-alive'),
780 http_requeue(Request),
781 !.
782next(_, Request) :-
783 http_close_connection(Request).
784
785
789
790http_close_connection(Request) :-
791 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
792 memberchk(peer(Peer), Request),
793 close_connection(Peer, In, Out).
794
799
800close_connection(Peer, In, Out) :-
801 debug(http(connection), 'Closing connection from ~p', [Peer]),
802 catch(close(In, [force(true)]), _, true),
803 catch(close(Out, [force(true)]), _, true).
804
820
821http_spawn(Goal, Options) :-
822 select_option(pool(Pool), Options, ThreadOptions),
823 !,
824 current_output(CGI),
825 catch(thread_create_in_pool(Pool,
826 wrap_spawned(CGI, Goal), Id,
827 [ detached(true)
828 | ThreadOptions
829 ]),
830 Error,
831 true),
832 ( var(Error)
833 -> http_spawned(Id)
834 ; Error = error(resource_error(threads_in_pool(_)), _)
835 -> throw(http_reply(busy))
836 ; Error = error(existence_error(thread_pool, Pool), _),
837 create_pool(Pool)
838 -> http_spawn(Goal, Options)
839 ; throw(Error)
840 ).
841http_spawn(Goal, Options) :-
842 current_output(CGI),
843 thread_create(wrap_spawned(CGI, Goal), Id,
844 [ detached(true)
845 | Options
846 ]),
847 http_spawned(Id).
848
849wrap_spawned(CGI, Goal) :-
850 set_output(CGI),
851 http_wrap_spawned(Goal, Request, Connection),
852 next(Connection, Request).
853
861
862create_pool(Pool) :-
863 E = error(permission_error(create, thread_pool, Pool), _),
864 catch(http:create_pool(Pool), E, true).
865create_pool(Pool) :-
866 print_message(informational, httpd(created_pool(Pool))),
867 thread_pool_create(Pool, 10, []).
868
869
870
871 874
875:- multifile
876 prolog:message/3.
877
878prolog:message(httpd_started_server(Port)) -->
879 [ 'Started server at '-[] ],
880 http_root(Port).
881prolog:message(httpd_stopped_worker(Self, Status)) -->
882 [ 'Stopped worker ~p: ~p'-[Self, Status] ].
883prolog:message(httpd_restarted_worker(Self)) -->
884 [ 'Replaced aborted worker ~p'-[Self] ].
885prolog:message(httpd(created_pool(Pool))) -->
886 [ 'Created thread-pool ~p of size 10'-[Pool], nl,
887 'Create this pool at startup-time or define the hook ', nl,
888 'http:create_pool/1 to avoid this message and create a ', nl,
889 'pool that fits the usage-profile.'
890 ].
891
892http_root(Host:Port) -->
893 !,
894 http_scheme(Port),
895 { http_absolute_location(root(.), URI, []) },
896 [ '~w:~w~w'-[Host, Port, URI] ].
897http_root(Port) -->
898 http_scheme(Port),
899 { http_absolute_location(root(.), URI, []) },
900 [ 'localhost:~w~w'-[Port, URI] ].
901
902http_scheme(Port) -->
903 { http_server_property(Port, scheme(Scheme)) },
904 [ '~w://'-[Scheme] ].