View source with raw comments or as raw
   1/*  Part of SWI-Prolog
   2
   3    Author:        Jeffrey Rosenwald
   4    E-mail:        jeffrose@acm.org
   5    WWW:           http://www.swi-prolog.org
   6    Copyright (c)  2012-2013, Jeffrey Rosenwald
   7    All rights reserved.
   8
   9    Redistribution and use in source and binary forms, with or without
  10    modification, are permitted provided that the following conditions
  11    are met:
  12
  13    1. Redistributions of source code must retain the above copyright
  14       notice, this list of conditions and the following disclaimer.
  15
  16    2. Redistributions in binary form must reproduce the above copyright
  17       notice, this list of conditions and the following disclaimer in
  18       the documentation and/or other materials provided with the
  19       distribution.
  20
  21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  22    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  23    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
  24    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
  25    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
  26    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
  27    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  28    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
  29    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  30    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
  31    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  32    POSSIBILITY OF SUCH DAMAGE.
  33*/
  34
  35:- module(udp_broadcast,
  36             [
  37             udp_host_to_address/2,    % ? Host, ? Address
  38             udp_broadcast_initialize/2,   % +IPAddress, +Subnet
  39             udp_broadcast_service/2   % ? Domain, ? Address
  40             ]).
  41
  42/** <module> A UDP Broadcast Bridge
  43
  44SWI-Prolog's broadcast library provides a  means   that  may  be used to
  45facilitate publish and subscribe communication regimes between anonymous
  46members of a community of interest.  The   members  of the community are
  47however, necessarily limited to a  single   instance  of Prolog. The UDP
  48broadcast library removes that restriction.   With  this library loaded,
  49any member on your local IP subnetwork that also has this library loaded
  50may hear and respond to your broadcasts.
  51
  52This  module  has  only  two  public  predicates.  When  the  module  is
  53initialized, it starts a two listener threads that listen for broadcasts
  54from others, received as UDP datagrams.
  55
  56Unlike TIPC broadcast, UDP broadcast has only one scope, =udp_subnet=. A
  57broadcast/1 or broadcast_request/1 that is not  directed to the listener
  58above, behaves as usual and is confined   to the instance of Prolog that
  59originated it. But when so directed, the   broadcast will be sent to all
  60participating systems, including  itself,  by   way  of  UDP's multicast
  61addressing facility. A UDP broadcast  or   broadcast  request  takes the
  62typical form: =|broadcast(udp_subnet(+Term, +Timeout))|=. To prevent the
  63potential for feedback loops, the scope   qualifier is stripped from the
  64message before transmission. The timeout is   optional. It specifies the
  65amount to time  to  wait  for  replies   to  arrive  in  response  to  a
  66broadcast_request. The default period is 0.250   seconds. The timeout is
  67ignored for broadcasts.
  68
  69An example of three separate processes cooperating on the same Node:
  70
  71==
  72Process A:
  73
  74   ?- listen(number(X), between(1, 5, X)).
  75   true.
  76
  77   ?-
  78
  79Process B:
  80
  81   ?- listen(number(X), between(7, 9, X)).
  82   true.
  83
  84   ?-
  85
  86Process C:
  87
  88   ?- findall(X, broadcast_request(udp_subnet(number(X))), Xs).
  89   Xs = [1, 2, 3, 4, 5, 7, 8, 9].
  90
  91   ?-
  92==
  93
  94It is also  possible  to  carry  on   a  private  dialog  with  a single
  95responder. To do this, you supply a   compound of the form, Term:PortId,
  96to a UDP scoped broadcast/1 or  broadcast_request/1, where PortId is the
  97ip-address and port-id of  the  intended   listener.  If  you  supply an
  98unbound variable, PortId, to broadcast_request, it  will be unified with
  99the address of the listener  that  responds   to  Term.  You  may send a
 100directed broadcast to a specific member by simply providing this address
 101in a similarly structured compound  to   a  UDP  scoped broadcast/1. The
 102message is sent via unicast to that member   only by way of the member's
 103broadcast listener. It is received by  the   listener  just as any other
 104broadcast would be. The listener does not know the difference.
 105
 106For example, in order to discover who responded with a particular value:
 107
 108==
 109Host B Process 1:
 110
 111   ?- listen(number(X), between(1, 5, X)).
 112   true.
 113
 114   ?-
 115
 116Host A Process 1:
 117
 118
 119   ?- listen(number(X), between(7, 9, X)).
 120   true.
 121
 122   ?-
 123
 124Host A Process 2:
 125
 126   ?- listen(number(X), between(1, 5, X)).
 127   true.
 128
 129   ?- bagof(X, broadcast_request(udp_subnet(number(X):From,1)), Xs).
 130   From = ip(192, 168, 1, 103):34855,
 131   Xs = [7, 8, 9] ;
 132   From = ip(192, 168, 1, 103):56331,
 133   Xs = [1, 2, 3, 4, 5] ;
 134   From = ip(192, 168, 1, 104):3217,
 135   Xs = [1, 2, 3, 4, 5].
 136
 137==
 138
 139## Caveats {#udp-broadcase-caveats}
 140
 141While the implementation is mostly transparent, there are some important
 142and subtle differences that must be taken into consideration:
 143
 144    * UDP broadcast requires an initialization step in order to
 145    launch the broadcast listener daemon. See udp_broadcast_initialize/2.
 146
 147    * Prolog's broadcast_request/1 is nondet. It sends the request,
 148    then evaluates the replies synchronously, backtracking as needed
 149    until a satisfactory reply is received. The remaining potential
 150    replies are not evaluated. This is not so when UDP is involved.
 151
 152    * A UDP broadcast/1 is completely asynchronous.
 153
 154    * A  UDP broadcast_request/1 is partially synchronous. A
 155    broadcast_request/1 is sent, then the sender balks for a period of
 156    time (default: 250 ms) while the replies are collected. Any reply
 157    that is received after this period is silently discarded. A
 158    optional second argument is provided so that a sender may specify
 159    more (or less) time for replies.
 160
 161    * Replies are presented to the user as a choice point on arrival,
 162    until the broadcast request timer finally expires. This
 163    allows traffic to propagate through the system faster and provides
 164    the requestor with the opportunity to terminate a broadcast request
 165    early if desired, by simply cutting choice points.
 166
 167    * Please beware that broadcast request transactions remain active
 168    and resources consumed until broadcast_request finally fails on
 169    backtracking, an uncaught exception occurs, or until choice points
 170    are cut. Failure to properly manage this will likely result in
 171    chronic exhaustion of UDP sockets.
 172
 173    * If a listener is connected to a generator that always succeeds
 174    (e.g. a random number generator), then the broadcast request will
 175    never terminate and trouble is bound to ensue.
 176
 177    * broadcast_request/1 with =|udp_subnet|= scope is _not_ reentrant.
 178    If a listener performs a broadcast_request/1 with UDP scope
 179    recursively, then disaster looms certain. This caveat does not apply
 180    to a UDP scoped broadcast/1, which can safely be performed from a
 181    listener context.
 182
 183    * UDP broadcast's capacity is not infinite. While it can tolerate
 184    substantial bursts of activity, it is designed for short bursts of
 185    small messages. Unlike TIPC, UDP is unreliable and has no QOS
 186    protections. Congestion is likely to cause trouble in the form of
 187    non-Byzantine failure. That is, late, lost (e.g. infinitely late),
 188    or duplicate datagrams. Caveat emptor.
 189
 190    * A UDP broadcast_request/1 term that is grounded is considered to
 191    be a broadcast only. No replies are collected unless the there is at
 192    least one unbound variable to unify.
 193
 194    * A UDP broadcast/1 always succeeds, even if there are no
 195    listeners.
 196
 197    * A UDP broadcast_request/1 that receives no replies will fail.
 198
 199    * Replies may be coming from many different places in the network
 200    (or none at all). No ordering of replies is implied.
 201
 202    * Prolog terms are sent to others after first converting them to
 203    atoms using term_to_atom/2. Passing real numbers this way may
 204    result in a substantial truncation of precision.
 205
 206    * The broadcast model is based on anonymity and a presumption of
 207    trust--a perfect recipe for compromise. UDP is an Internet protocol.
 208    A UDP broadcast listener exposes a public port (20005), which is
 209    static and shared by all listeners, and a private port, which is
 210    semi-static and unique to the listener instance. Both can be seen
 211    from off-cluster nodes and networks. Usage of this module exposes
 212    the node and consequently, the cluster to significant security
 213    risks. So have a care when designing your application. You must talk
 214    only to those who share and contribute to your concerns using a
 215    carefully prescribed protocol.
 216
 217    * UDP broadcast categorically and silently ignores all message
 218    traffic originating from or terminating on nodes that are not
 219    members of the local subnet. This security measure only keeps honest
 220    people honest!
 221
 222@author    Jeffrey Rosenwald (JeffRose@acm.org)
 223@license   BSD-2
 224@see       tipc.pl
 225*/
 226
 227:- use_module(library(socket)).
 228:- use_module(library(broadcast)).
 229:- use_module(library(time)).
 230
 231:- require([ thread_self/1
 232           , forall/2
 233           , term_to_atom/2
 234           , thread_send_message/2
 235           , catch/3
 236           , setup_call_cleanup/3
 237           , thread_create/3
 238           ]).
 239
 240% %     ~>(:P, :Q) is nondet.
 241% %     eventually_implies(P, Q) is nondet.
 242%    asserts temporal Liveness (something good happens, eventually) and
 243%    Safety (nothing bad ever happens) properties. Analogous to the
 244%    "leads-to" operator of Owicki and Lamport, 1982. Provides a sort of
 245%    lazy implication described informally as:
 246%
 247%    * Liveness: For all possible outcomes, P -> Q, eventually.
 248%    * Safety: For all possible outcomes, (\+P ; Q), is invariant.
 249%
 250%  Described practically:
 251%
 252%    P ~> Q, declares that if P is true, then Q must be true, now or at
 253%    some point in the future.
 254%
 255
 256:- meta_predicate ~>(0,0).
 257:- op(950, xfy, ~>).
 258
 259~>(P, Q) :-
 260    setup_call_cleanup(P,
 261                       (true; fail),
 262                       (   Q -> true;
 263                       throw(error(goal_failed(Q), context(~>, _))))
 264                      ).
 265
 266:- meta_predicate safely(0).
 267
 268safely(Predicate) :-
 269    catch(Predicate, Err,
 270          (Err == '$aborted' -> (!, fail);
 271          print_message(error, Err), fail)).
 272
 273:- meta_predicate make_thread(0, +).
 274
 275% You can't thread_signal a thread that isn't running.
 276
 277join_thread(Id) :-
 278    catch(thread_signal(Id, abort),
 279          error(existence_error(thread, Id), _Context),
 280          true),
 281
 282    thread_join(Id, exception('$aborted')).
 283
 284make_thread(Goal, Options) :-
 285    thread_create(safely(Goal), Id, [ detached(false) | Options ])
 286      ~> join_thread(Id).
 287
 288udp_broadcast_address(IPAddress, Subnet, BroadcastAddress) :-
 289    IPAddress = ip(A1, A2, A3, A4),
 290    Subnet = ip(S1, S2, S3, S4),
 291    BroadcastAddress = ip(B1, B2, B3, B4),
 292
 293    B1 is A1 \/ (S1 xor 255),
 294    B2 is A2 \/ (S2 xor 255),
 295    B3 is A3 \/ (S3 xor 255),
 296    B4 is A4 \/ (S4 xor 255).
 297
 298%!  udp_broadcast_service(?Domain, ?Address) is nondet.
 299%   provides the UDP broadcast address for a given Domain. At present,
 300%   only one domain is supported, =|udp_subnet|=.
 301%
 302
 303%  The following are defined at initialization:
 304:- dynamic
 305    udp_subnet_member/1,      % +IpAddress:Port
 306    udp_broadcast_service/2.  % ?Domain, ?BroadcastAddress:Port
 307
 308:- volatile
 309    udp_subnet_member/1,      % +IpAddress:Port
 310    udp_broadcast_service/2.  % ?Domain, ?BroadcastAddress:Port
 311%
 312%  Here's a UDP bridge to Prolog's broadcast library
 313%
 314%  A sender may extend a broadcast  to  a   subnet  of  a UDP network by
 315%  specifying a =|udp_subnet|= scoping qualifier   in his/her broadcast.
 316%  The qualifier has the effect of  selecting the appropriate multi-cast
 317%  address for the transmission. Thus,  the   sender  of the message has
 318%  control over the scope of his/her traffic on a per-message basis.
 319%
 320%  All in-scope listeners receive the   broadcast and simply rebroadcast
 321%  the message locally. All broadcast replies, if any, are sent directly
 322%  to the sender via the port-id that   was received with the broadcast.
 323%
 324%  Each listener exposes two UDP ports,  a   shared  public port that is
 325%  bound to a well-known port number and   a  private port that uniquely
 326%  indentifies the listener. Broadcasts are received  on the public port
 327%  and replies are  sent  on  the   private  port.  Directed  broadcasts
 328%  (unicasts) are received on the private port   and replies are sent on
 329%  the private port.
 330%
 331%  Interactions with TIPC Broadcast
 332%
 333%  As a security precaution, we do   not allow unsupervised transactions
 334%  directly between UDP and TIPC broadcast. These terms are black-listed
 335%  and ignored when received via UDP   listeners.  A UDP enabled service
 336%  that wishes to use TIPC resources on  the cluster must have a sponsor
 337%  on the TIPC cluster to filter   incoming  UDP broadcast traffic. This
 338%  can be as simple as loading  and initializing both tipc_broadcast and
 339%  udp_broadcast within the same TIPC broadcast service.
 340%
 341%  Because the UDP  and  TIPC  broadcast   listeners  are  operating  in
 342%  separate threads of execution, a  UDP   broadcast  sponsor can safely
 343%  perform a broadcast_request with  TIPC  scope   from  within  the UDP
 344%  broadcast listener context. This is one of the few scenarios where a
 345%  recursive broadcast_request with TIPC scope is safe.
 346%
 347
 348black_list(tipc_node(_)).
 349black_list(tipc_node(_,_)).
 350black_list(tipc_cluster(_)).
 351black_list(tipc_cluster(_,_)).
 352black_list(tipc_zone(_)).
 353black_list(tipc_zone(_,_)).
 354%
 355ld_dispatch(_S, '$udp_request'(Term), _From) :-
 356    black_list(Term), !, fail.
 357
 358ld_dispatch(_S, Term, _From) :-
 359    black_list(Term), !, fail.
 360
 361ld_dispatch(S, '$udp_request'(wru(Name)), From) :-
 362    !, gethostname(Name),
 363    term_to_atom(wru(Name), Atom),
 364    udp_send(S, Atom, From, []).
 365
 366ld_dispatch(S, '$udp_request'(Term), From) :-
 367    !, forall(broadcast_request(Term),
 368          (   term_to_atom(Term, Atom),
 369              udp_send(S, Atom, From, []))).
 370
 371ld_dispatch(_S, Term, _From) :-
 372    safely(broadcast(Term)).
 373
 374%  Thread 1 listens for directed traffic on the private port.
 375%
 376udp_listener_daemon1(S) :-
 377    repeat,
 378    safely(dispatch_traffic(S, S)).
 379
 380%  Thread 2 listens for broadcast traffic on the well-known public port
 381%  (S). All replies are originated from the private port (S1).
 382%
 383udp_listener_daemon2(Parent) :-
 384    udp_socket(S) ~> tcp_close_socket(S),
 385    udp_socket(S1) ~> tcp_close_socket(S1),
 386
 387    tcp_bind(S1, _PrivatePort),   % bind him to a private port now
 388
 389    make_thread(udp_listener_daemon1(S1),
 390                [ alias(udp_listener_daemon1)]),
 391
 392    tcp_setopt(S, reuseaddr),
 393
 394    udp_broadcast_service(udp_subnet, _Address:Port),
 395    tcp_bind(S, Port),      % bind to our public port
 396
 397    listen(udp_broadcast, Head, broadcast_listener(Head))
 398         ~> unlisten(udp_broadcast),
 399
 400    thread_send_message(Parent, udp_listener_daemon_ready),
 401
 402    repeat,
 403    safely(dispatch_traffic(S, S1)).
 404
 405dispatch_traffic(S, S1) :-
 406    udp_receive(S, Data, From,
 407                [as(atom), max_message_size(65535)]),
 408    udp_subnet_member(From),  % ignore all traffic that is foreign to my subnet
 409    term_to_atom(Term, Data),
 410    with_mutex(udp_broadcast, ld_dispatch(S1, Term, From)),
 411    !,
 412    dispatch_traffic(S, S1).
 413
 414start_udp_listener_daemon :-
 415    catch(thread_property(udp_listener_daemon2, status(running)),_, fail),
 416
 417    !.
 418
 419start_udp_listener_daemon :-
 420    thread_self(Self),
 421    thread_create(udp_listener_daemon2(Self), _,
 422           [alias(udp_listener_daemon2), detached(true)]),
 423    call_with_time_limit(6.0,
 424                         thread_get_message(udp_listener_daemon_ready)).
 425
 426:- multifile udp:host_to_address/2.
 427%
 428broadcast_listener(udp_host_to_address(Host, Addr)) :-
 429    udp:host_to_address(Host, Addr).
 430
 431broadcast_listener(udp_broadcast_service(Class, Addr)) :-
 432    udp_broadcast_service(Class, Addr).
 433
 434broadcast_listener(udp_subnet(X)) :-
 435    udp_broadcast(X, udp_subnet, 0.250).
 436
 437broadcast_listener(udp_subnet(X, Timeout)) :-
 438    udp_broadcast(X, udp_subnet, Timeout).
 439
 440%
 441%
 442udp_basic_broadcast(S, Port, Term, Address) :-
 443    udp_socket(S)
 444      ~> tcp_close_socket(S),
 445
 446    (   udp_broadcast_service(udp_subnet, Address)
 447           -> tcp_setopt(S, broadcast)
 448           ; true
 449    ),
 450
 451    tcp_bind(S, Port),  % find our own ephemeral Port
 452    term_to_atom(Term, Atom),
 453
 454    (   udp_subnet_member(Address)  % talk only to your local subnet
 455        -> safely(udp_send(S, Atom, Address, []))
 456        ;  true).
 457
 458% directed broadcast to a single listener
 459udp_broadcast(Term:To, _Scope, _Timeout) :-
 460    ground(Term), ground(To),
 461    !,
 462    udp_basic_broadcast(_S, _Port, Term, To),
 463
 464    !.
 465
 466% broadcast to all listeners
 467udp_broadcast(Term, Scope, _Timeout) :-
 468    ground(Term),
 469    !,
 470    udp_broadcast_service(Scope, Address),
 471    udp_basic_broadcast(_S, _Port, Term, Address),
 472
 473    !.
 474
 475% directed broadcast_request to a single listener
 476udp_broadcast(Term:Address, _Scope, Timeout) :-
 477    ground(Address),
 478    !,
 479    udp_basic_broadcast(S, Port, '$udp_request'(Term), Address),
 480    udp_br_collect_replies(S, Port, Timeout, Term:Address).
 481
 482% broadcast_request to all listeners returning responder port-id
 483udp_broadcast(Term:From, Scope, Timeout) :-
 484    !, udp_broadcast_service(Scope, Address),
 485    udp_basic_broadcast(S, Port, '$udp_request'(Term), Address),
 486    udp_br_collect_replies(S, Port, Timeout, Term:From).
 487
 488% broadcast_request to all listeners ignoring responder port-id
 489udp_broadcast(Term, Scope, Timeout) :-
 490    udp_broadcast(Term:_, Scope, Timeout).
 491
 492udp_br_send_timeout(Port) :-
 493    udp_socket(S)
 494      ~> tcp_close_socket(S),
 495    udp_send(S, '$udp_br_timeout', localhost:Port, []),
 496
 497    !.
 498
 499udp_br_collect_replies(S, Port, Timeout, Term:From) :-
 500    alarm(Timeout, udp_br_send_timeout(Port), Id, [remove(false)])
 501      ~> remove_alarm(Id),
 502
 503    tcp_setopt(S, dispatch(false)),
 504
 505    repeat,
 506    udp_receive(S, Atom, From1, [as(atom)]),
 507    (   (Atom \== '$udp_br_timeout')
 508        -> (From1 = From, safely(term_to_atom(Term, Atom)))
 509        ;  (!, fail)).
 510
 511%!  udp_host_to_address(?Service, ?Address) is nondet.
 512%
 513%   locates a UDP service by name. Service  is an atom or grounded term
 514%   representing the common name  of  the   service.  Address  is a UDP
 515%   address structure. A server may advertise   its  services by name by
 516%   including  the  fact,    udp:host_to_address(+Service,   +Address),
 517%   somewhere in its source. This predicate can  also be used to perform
 518%   reverse searches. That is it  will  also   resolve  an  Address to a
 519%   Service name.
 520%
 521
 522udp_host_to_address(Host, Address) :-
 523    broadcast_request(udp_subnet(udp_host_to_address(Host, Address))).
 524
 525%!  udp_initialize is semidet.
 526%   See udp:udp_initialize/0
 527%
 528%:- multifile udp:udp_stack_initialize/0.
 529
 530
 531%!  udp_broadcast_initialize(+IPAddress, +SubnetMask) is semidet.
 532%   causes any required runtime initialization to occur. At present,
 533%   proper operation of UDP broadcast depends on local information
 534%   that is not easily obtained mechanically. In order to determine
 535%   the appropriate UDP broadcast address, you must supply the
 536%   IPAddress and SubnetMask for the node that is running this module.
 537%   These data are supplied in the form of ip/4 terms. This is now
 538%   required to be included in an applications intialization directive.
 539%
 540
 541udp_broadcast_initialize(IPAddress, Subnet) :-
 542    retractall(udp_broadcast_service(_,_)),
 543    retractall(udp_subnet_member(_)),
 544
 545    udp_broadcast_address(IPAddress, Subnet, BroadcastAddr),
 546    assert(udp_broadcast_service(udp_subnet, BroadcastAddr:20005)),
 547    assert(udp_subnet_member(Address:_Port) :- udp_broadcast_address(Address, Subnet, BroadcastAddr)),
 548
 549    start_udp_listener_daemon.