View source with raw comments or as raw
   1/*  Part of SWI-Prolog
   2
   3    Author:        Jan Wielemaker
   4    E-mail:        J.Wielemaker@vu.nl
   5    WWW:           http://www.swi-prolog.org
   6    Copyright (c)  2006-2015, University of Amsterdam
   7                              VU University Amsterdam
   8    All rights reserved.
   9
  10    Redistribution and use in source and binary forms, with or without
  11    modification, are permitted provided that the following conditions
  12    are met:
  13
  14    1. Redistributions of source code must retain the above copyright
  15       notice, this list of conditions and the following disclaimer.
  16
  17    2. Redistributions in binary form must reproduce the above copyright
  18       notice, this list of conditions and the following disclaimer in
  19       the documentation and/or other materials provided with the
  20       distribution.
  21
  22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
  25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
  26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
  27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
  28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
  30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
  32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  33    POSSIBILITY OF SUCH DAMAGE.
  34*/
  35
  36:- module(rdf_persistency,
  37          [ rdf_attach_db/2,            % +Directory, +Options
  38            rdf_detach_db/0,            % +Detach current Graph
  39            rdf_current_db/1,           % -Directory
  40            rdf_persistency/2,          % +Graph, +Bool
  41            rdf_flush_journals/1,       % +Options
  42            rdf_persistency_property/1, % ?Property
  43            rdf_journal_file/2,         % ?Graph, ?JournalFile
  44            rdf_snapshot_file/2,        % ?Graph, ?SnapshotFile
  45            rdf_db_to_file/2            % ?Graph, ?FileBase
  46          ]).
  47:- use_module(library(semweb/rdf_db)).
  48:- use_module(library(filesex)).
  49:- use_module(library(lists)).
  50:- use_module(library(uri)).
  51:- use_module(library(debug)).
  52:- use_module(library(option)).
  53:- use_module(library(error)).
  54:- use_module(library(thread)).
  55:- use_module(library(apply)).
  56
  57
  58/** <module> RDF persistency plugin
  59
  60This  module  provides  persistency   for    rdf_db.pl   based   on  the
  61rdf_monitor/2 predicate to  track  changes   to  the  repository.  Where
  62previous  versions  used  autosafe  of  the  whole  database  using  the
  63quick-load format of rdf_db, this version is  based on a quick-load file
  64per source (4th argument of rdf/4), and journalling for edit operations.
  65
  66The result is safe, avoids frequent small   changes to large files which
  67makes synchronisation and backup expensive and avoids long disruption of
  68the server doing the autosafe. Only loading large files disrupts service
  69for some time.
  70
  71The persistent backup of the database is  realised in a directory, using
  72a lock file to avoid corruption due to concurrent access. Each source is
  73represented by two files, the latest snapshot   and a journal. The state
  74is restored by loading  the  snapshot   and  replaying  the journal. The
  75predicate rdf_flush_journals/1 can be used to create fresh snapshots and
  76delete the journals.
  77
  78@tbd If there is a complete `.new'   snapshot  and no journal, we should
  79     move the .new to the plain snapshot name as a means of recovery.
  80
  81@tbd Backup of each graph using one or two files is very costly if there
  82     are many graphs.  Although the currently used subdirectories avoid
  83     hitting OS limits early, this is still not ideal. Probably we
  84     should collect (small, older?) files and combine them into a single
  85     quick load file.  We could call this (similar to GIT) a `pack'.
  86
  87@see    rdf_edit.pl
  88*/
  89
  90:- volatile
  91    rdf_directory/1,
  92    rdf_lock/2,
  93    rdf_option/1,
  94    source_journal_fd/2,
  95    file_base_db/2.
  96:- dynamic
  97    rdf_directory/1,                % Absolute path
  98    rdf_lock/2,                     % Dir, Lock
  99    rdf_option/1,                   % Defined options
 100    source_journal_fd/2,            % DB, JournalFD
 101    file_base_db/2.                 % FileBase, DB
 102
 103:- meta_predicate
 104    no_agc(0).
 105
 106:- predicate_options(rdf_attach_db/2, 2,
 107                     [ access(oneof([read_write,read_only])),
 108                       concurrency(positive_integer),
 109                       max_open_journals(positive_integer),
 110                       silent(oneof([true,false,brief])),
 111                       log_nested_transactions(boolean)
 112                     ]).
 113
 114%!  rdf_attach_db(+Directory, +Options) is det.
 115%
 116%   Start persistent operations using Directory   as  place to store
 117%   files.   There are several cases:
 118%
 119%           * Empty DB, existing directory
 120%           Load the DB from the existing directory
 121%
 122%           * Full DB, empty directory
 123%           Create snapshots for all sources in directory
 124%
 125%   Options:
 126%
 127%           * access(+AccessMode)
 128%           One of =auto= (default), =read_write= or
 129%           =read_only=. Read-only access implies that the RDF
 130%           store is not locked. It is read at startup and all
 131%           modifications to the data are temporary. The default
 132%           =auto= mode is =read_write= if the directory is
 133%           writeable and the lock can be acquired.  Otherwise
 134%           it reverts to =read_only=.
 135%
 136%           * concurrency(+Jobs)
 137%           Number of threads to use for loading the initial
 138%           database.  If not provided it is the number of CPUs
 139%           as optained from the flag =cpu_count=.
 140%
 141%           * max_open_journals(+Count)
 142%           Maximum number of journals kept open.  If not provided,
 143%           the default is 10.  See limit_fd_pool/0.
 144%
 145%           * directory_levels(+Count)
 146%           Number of levels of intermediate directories for storing
 147%           the graph files.  Default is 2.
 148%
 149%           * silent(+BoolOrBrief)
 150%           If =true= (default =false=), do not print informational
 151%           messages.  Finally, if =brief= it will show minimal
 152%           feedback.
 153%
 154%           * log_nested_transactions(+Boolean)
 155%           If =true=, nested _log_ transactions are added to the
 156%           journal information.  By default (=false=), no log-term
 157%           is added for nested transactions.\\
 158%
 159%   @error existence_error(source_sink, Directory)
 160%   @error permission_error(write, directory, Directory)
 161
 162rdf_attach_db(DirSpec, Options) :-
 163    option(access(read_only), Options),
 164    !,
 165    absolute_file_name(DirSpec,
 166                       Directory,
 167                       [ access(read),
 168                         file_type(directory)
 169                       ]),
 170    rdf_attach_db_ro(Directory, Options).
 171rdf_attach_db(DirSpec, Options) :-
 172    option(access(read_write), Options),
 173    !,
 174    rdf_attach_db_rw(DirSpec, Options).
 175rdf_attach_db(DirSpec, Options) :-
 176    absolute_file_name(DirSpec,
 177                       Directory,
 178                       [ access(exist),
 179                         file_type(directory),
 180                         file_errors(fail)
 181                       ]),
 182    !,
 183    (   access_file(Directory, write)
 184    ->  catch(rdf_attach_db_rw(Directory, Options), E, true),
 185        (   var(E)
 186        ->  true
 187        ;   E = error(permission_error(lock, rdf_db, _), _)
 188        ->  print_message(warning, E),
 189            print_message(warning, rdf(read_only)),
 190            rdf_attach_db(DirSpec, [access(read_only)|Options])
 191        ;   throw(E)
 192        )
 193    ;   print_message(warning,
 194                      error(permission_error(write, directory, Directory))),
 195        print_message(warning, rdf(read_only)),
 196        rdf_attach_db_ro(Directory, Options)
 197    ).
 198rdf_attach_db(DirSpec, Options) :-
 199    catch(rdf_attach_db_rw(DirSpec, Options), E, true),
 200    (   var(E)
 201    ->  true
 202    ;   print_message(warning, E),
 203        print_message(warning, rdf(read_only)),
 204        rdf_attach_db(DirSpec, [access(read_only)|Options])
 205    ).
 206
 207
 208rdf_attach_db_rw(DirSpec, Options) :-
 209    absolute_file_name(DirSpec,
 210                       Directory,
 211                       [ access(write),
 212                         file_type(directory),
 213                         file_errors(fail)
 214                       ]),
 215    !,
 216    (   rdf_directory(Directory)
 217    ->  true                        % update settings?
 218    ;   rdf_detach_db,
 219        mkdir(Directory),
 220        lock_db(Directory),
 221        assert(rdf_directory(Directory)),
 222        assert_options(Options),
 223        stop_monitor,               % make sure not to register load
 224        no_agc(load_db),
 225        at_halt(rdf_detach_db),
 226        start_monitor
 227    ).
 228rdf_attach_db_rw(DirSpec, Options) :-
 229    absolute_file_name(DirSpec,
 230                       Directory,
 231                       [ solutions(all)
 232                       ]),
 233    (   exists_directory(Directory)
 234    ->  access_file(Directory, write)
 235    ;   catch(make_directory(Directory), _, fail)
 236    ),
 237    !,
 238    rdf_attach_db(Directory, Options).
 239rdf_attach_db_rw(DirSpec, _) :-         % Generate an existence or
 240    absolute_file_name(DirSpec,     % permission error
 241                       Directory,
 242                       [ access(exist),
 243                         file_type(directory)
 244                       ]),
 245    permission_error(write, directory, Directory).
 246
 247%!  rdf_attach_db_ro(+Directory, +Options)
 248%
 249%   Open an RDF database in read-only mode.
 250
 251rdf_attach_db_ro(Directory, Options) :-
 252    rdf_detach_db,
 253    assert(rdf_directory(Directory)),
 254    assert_options(Options),
 255    stop_monitor,           % make sure not to register load
 256    no_agc(load_db).
 257
 258
 259assert_options([]).
 260assert_options([H|T]) :-
 261    (   option_type(H, Check)
 262    ->  Check,
 263        assert(rdf_option(H))
 264    ;   true                        % ignore options we do not understand
 265    ),
 266    assert_options(T).
 267
 268option_type(concurrency(X),             must_be(positive_integer, X)).
 269option_type(max_open_journals(X),       must_be(positive_integer, X)).
 270option_type(directory_levels(X),        must_be(positive_integer, X)).
 271option_type(silent(X),                  must_be(oneof([true,false,brief]), X)).
 272option_type(log_nested_transactions(X), must_be(boolean, X)).
 273option_type(access(X),                  must_be(oneof([read_write,
 274                                                       read_only]), X)).
 275
 276
 277%!  rdf_persistency_property(?Property) is nondet.
 278%
 279%   True if Property  is  a  property   of  the  current  persistent
 280%   database. Currently makes to options   passed to rdf_attach_db/2
 281%   available.  Notable  rdf_persistency_property(access(read_only))
 282%   is true if the database  is   mounted  in  read-only mode. Other
 283%   properties:
 284%
 285%     - directory(Dir)
 286%     Directory in which the database resides.
 287
 288rdf_persistency_property(Property) :-
 289    var(Property),
 290    !,
 291    rdf_persistency_property_(Property).
 292rdf_persistency_property(Property) :-
 293    rdf_persistency_property_(Property),
 294    !.
 295
 296rdf_persistency_property_(Property) :-
 297    rdf_option(Property).
 298rdf_persistency_property_(directory(Dir)) :-
 299    rdf_directory(Dir).
 300
 301%!  no_agc(:Goal)
 302%
 303%   Run Goal with atom garbage collection   disabled. Loading an RDF
 304%   database creates large amounts  of  atoms   we  *know*  are  not
 305%   garbage.
 306
 307no_agc(Goal) :-
 308    current_prolog_flag(agc_margin, Old),
 309    setup_call_cleanup(
 310        set_prolog_flag(agc_margin, 0),
 311        Goal,
 312        set_prolog_flag(agc_margin, Old)).
 313
 314
 315%!  rdf_detach_db is det.
 316%
 317%   Detach from the  current  database.   Succeeds  silently  if  no
 318%   database is attached. Normally called at  the end of the program
 319%   through at_halt/1.
 320
 321rdf_detach_db :-
 322    debug(halt, 'Detaching RDF database', []),
 323    stop_monitor,
 324    close_journals,
 325    (   retract(rdf_directory(Dir))
 326    ->  debug(halt, 'DB Directory: ~w', [Dir]),
 327        save_prefixes(Dir),
 328        retractall(rdf_option(_)),
 329        retractall(source_journal_fd(_,_)),
 330        unlock_db(Dir)
 331    ;   true
 332    ).
 333
 334
 335%!  rdf_current_db(?Dir)
 336%
 337%   True if Dir is the current RDF persistent database.
 338
 339rdf_current_db(Directory) :-
 340    rdf_directory(Dir),
 341    !,
 342    Dir = Directory.
 343
 344
 345%!  rdf_flush_journals(+Options)
 346%
 347%   Flush dirty journals.  Options:
 348%
 349%           * min_size(+KB)
 350%           Only flush if journal is over KB in size.
 351%           * graph(+Graph)
 352%           Only flush the journal of Graph
 353%
 354%   @tbd Provide a default for min_size?
 355
 356rdf_flush_journals(Options) :-
 357    option(graph(Graph), Options, _),
 358    forall(rdf_graph(Graph),
 359           rdf_flush_journal(Graph, Options)).
 360
 361rdf_flush_journal(Graph, Options) :-
 362    db_files(Graph, _SnapshotFile, JournalFile),
 363    db_file(JournalFile, File),
 364    (   \+ exists_file(File)
 365    ->  true
 366    ;   memberchk(min_size(KB), Options),
 367        size_file(JournalFile, Size),
 368        Size / 1024 < KB
 369    ->  true
 370    ;   create_db(Graph)
 371    ).
 372
 373                 /*******************************
 374                 *             LOAD             *
 375                 *******************************/
 376
 377%!  load_db is det.
 378%
 379%   Reload database from the directory specified by rdf_directory/1.
 380%   First we find all names graphs using find_dbs/1 and then we load
 381%   them.
 382
 383load_db :-
 384    rdf_directory(Dir),
 385    concurrency(Jobs),
 386    cpu_stat_key(Jobs, StatKey),
 387    get_time(Wall0),
 388    statistics(StatKey, T0),
 389    load_prefixes(Dir),
 390    verbosity(Silent),
 391    find_dbs(Dir, Graphs, SnapShots, Journals),
 392    length(Graphs, GraphCount),
 393    maplist(rdf_unload_graph, Graphs),
 394    rdf_statistics(triples(Triples0)),
 395    load_sources(snapshots, SnapShots, Silent, Jobs),
 396    load_sources(journals, Journals, Silent, Jobs),
 397    rdf_statistics(triples(Triples1)),
 398    statistics(StatKey, T1),
 399    get_time(Wall1),
 400    T is T1 - T0,
 401    Wall is Wall1 - Wall0,
 402    Triples = Triples1 - Triples0,
 403    message_level(Silent, Level),
 404    print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))).
 405
 406load_sources(_, [], _, _) :- !.
 407load_sources(Type, Sources, Silent, Jobs) :-
 408    length(Sources, Count),
 409    RunJobs is min(Count, Jobs),
 410    print_message(informational, rdf(restoring(Type, Count, RunJobs))),
 411    make_goals(Sources, Silent, 1, Count, Goals),
 412    concurrent(RunJobs, Goals, []).
 413
 414
 415%!  make_goals(+DBs, +Silent, +Index, +Total, -Goals)
 416
 417make_goals([], _, _, _, []).
 418make_goals([DB|T0], Silent, I,  Total,
 419           [load_source(DB, Silent, I, Total)|T]) :-
 420    I2 is I + 1,
 421    make_goals(T0, Silent, I2, Total, T).
 422
 423verbosity(Silent) :-
 424    rdf_option(silent(Silent)),
 425    !.
 426verbosity(Silent) :-
 427    current_prolog_flag(verbose, silent),
 428    !,
 429    Silent = true.
 430verbosity(brief).
 431
 432
 433%!  concurrency(-Jobs)
 434%
 435%   Number of jobs to run concurrently.
 436
 437concurrency(Jobs) :-
 438    rdf_option(concurrency(Jobs)),
 439    !.
 440concurrency(Jobs) :-
 441    current_prolog_flag(cpu_count, Jobs),
 442    Jobs > 0,
 443    !.
 444concurrency(1).
 445
 446cpu_stat_key(1, cputime) :- !.
 447cpu_stat_key(_, process_cputime).
 448
 449
 450%!  find_dbs(+Dir, -Graphs, -SnapBySize, -JournalBySize) is det.
 451%
 452%   Scan the persistent database and return a list of snapshots and
 453%   journals, both sorted by file-size.  Each term is of the form
 454%
 455%     ==
 456%     db(Size, Ext, DB, DBFile, Depth)
 457%     ==
 458
 459find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :-
 460    directory_files(Dir, Files),
 461    phrase(scan_db_files(Files, Dir, '.', 0), Scanned),
 462    maplist(db_graph, Scanned, UnsortedGraphs),
 463    sort(UnsortedGraphs, Graphs),
 464    (   consider_reindex_db(Dir, Graphs, Scanned)
 465    ->  find_dbs(Dir, Graphs, SnapBySize, JournalBySize)
 466    ;   partition(db_is_snapshot, Scanned, Snapshots, Journals),
 467        sort(Snapshots, SnapBySize),
 468        sort(Journals, JournalBySize)
 469    ).
 470
 471consider_reindex_db(Dir, Graphs, Scanned) :-
 472    length(Graphs, Count),
 473    Count > 0,
 474    DepthNeeded is floor(log(Count)/log(256)),
 475    (   maplist(depth_db(DepthNow), Scanned)
 476    ->  (   DepthNeeded > DepthNow
 477        ->  true
 478        ;   retractall(rdf_option(directory_levels(_))),
 479            assertz(rdf_option(directory_levels(DepthNow))),
 480            fail
 481        )
 482    ;   true
 483    ),
 484    reindex_db(Dir, DepthNeeded).
 485
 486db_is_snapshot(Term) :-
 487    arg(2, Term, trp).
 488
 489db_graph(Term, DB) :-
 490    arg(3, Term, DB).
 491
 492db_file_name(Term, File) :-
 493    arg(4, Term, File).
 494
 495depth_db(Depth, DB) :-
 496    arg(5, DB, Depth).
 497
 498%!  scan_db_files(+Files, +Dir, +Prefix, +Depth)// is det.
 499%
 500%   Produces a list of db(DB,  Size,   File)  for all recognised RDF
 501%   database files.  File is relative to the database directory Dir.
 502
 503scan_db_files([], _, _, _) -->
 504    [].
 505scan_db_files([Nofollow|T], Dir, Prefix, Depth) -->
 506    { nofollow(Nofollow) },
 507    !,
 508    scan_db_files(T, Dir, Prefix, Depth).
 509scan_db_files([File|T], Dir, Prefix, Depth) -->
 510    { file_name_extension(Base, Ext, File),
 511      db_extension(Ext),
 512      !,
 513      rdf_db_to_file(DB, Base),
 514      directory_file_path(Prefix, File, DBFile),
 515      directory_file_path(Dir, DBFile, AbsFile),
 516      size_file(AbsFile, Size)
 517    },
 518    [ db(Size, Ext, DB, AbsFile, Depth) ],
 519    scan_db_files(T, Dir, Prefix, Depth).
 520scan_db_files([D|T], Dir, Prefix, Depth) -->
 521    { directory_file_path(Prefix, D, SubD),
 522      directory_file_path(Dir, SubD, AbsD),
 523      exists_directory(AbsD),
 524      \+ read_link(AbsD, _, _),    % Do not follow links
 525      !,
 526      directory_files(AbsD, SubFiles),
 527      SubDepth is Depth + 1
 528    },
 529    scan_db_files(SubFiles, Dir, SubD, SubDepth),
 530    scan_db_files(T, Dir, Prefix, Depth).
 531scan_db_files([_|T], Dir, Prefix, Depth) -->
 532    scan_db_files(T, Dir, Prefix, Depth).
 533
 534nofollow(.).
 535nofollow(..).
 536
 537db_extension(trp).
 538db_extension(jrn).
 539
 540:- public load_source/4.                % called through make_goals/5
 541
 542load_source(DB, Silent, Nth, Total) :-
 543    db_file_name(DB, File),
 544    db_graph(DB, Graph),
 545    message_level(Silent, Level),
 546    graph_triple_count(Graph, Count0),
 547    statistics(cputime, T0),
 548    (   db_is_snapshot(DB)
 549    ->  print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))),
 550        rdf_load_db(File)
 551    ;   print_message(Level, rdf(restore(Silent, journal(Graph, File)))),
 552        load_journal(File, Graph)
 553    ),
 554    statistics(cputime, T1),
 555    T is T1 - T0,
 556    graph_triple_count(Graph, Count1),
 557    Count is Count1 - Count0,
 558    print_message(Level, rdf(restore(Silent,
 559                                     done(Graph, T, Count, Nth, Total)))).
 560
 561
 562graph_triple_count(Graph, Count) :-
 563    rdf_statistics(triples_by_graph(Graph, Count)),
 564    !.
 565graph_triple_count(_, 0).
 566
 567
 568%!  attach_graph(+Graph, +Options) is det.
 569%
 570%   Load triples and reload  journal   from  the  indicated snapshot
 571%   file.
 572
 573attach_graph(Graph, Options) :-
 574    (   option(silent(true), Options)
 575    ->  Level = silent
 576    ;   Level = informational
 577    ),
 578    db_files(Graph, SnapshotFile, JournalFile),
 579    rdf_retractall(_,_,_,Graph),
 580    statistics(cputime, T0),
 581    print_message(Level, rdf(restore(Silent, Graph))),
 582    db_file(SnapshotFile, AbsSnapShot),
 583    (   exists_file(AbsSnapShot)
 584    ->  print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))),
 585        rdf_load_db(AbsSnapShot)
 586    ;   true
 587    ),
 588    (   exists_db(JournalFile)
 589    ->  print_message(Level, rdf(restore(Silent, journal(JournalFile)))),
 590        load_journal(JournalFile, Graph)
 591    ;   true
 592    ),
 593    statistics(cputime, T1),
 594    T is T1 - T0,
 595    (   rdf_statistics(triples_by_graph(Graph, Count))
 596    ->  true
 597    ;   Count = 0
 598    ),
 599    print_message(Level, rdf(restore(Silent,
 600                                     done(Graph, T, Count)))).
 601
 602message_level(true, silent) :- !.
 603message_level(_, informational).
 604
 605
 606                 /*******************************
 607                 *         LOAD JOURNAL         *
 608                 *******************************/
 609
 610%!  load_journal(+File:atom, +DB:atom) is det.
 611%
 612%   Process transactions from the RDF journal File, adding the given
 613%   named graph.
 614
 615load_journal(File, DB) :-
 616    rdf_create_graph(DB),
 617    setup_call_cleanup(
 618        open(File, read, In, [encoding(utf8)]),
 619        ( read(In, T0),
 620          process_journal(T0, In, DB)
 621        ),
 622        close(In)).
 623
 624process_journal(end_of_file, _, _) :- !.
 625process_journal(Term, In, DB) :-
 626    (   process_journal_term(Term, DB)
 627    ->  true
 628    ;   throw(error(type_error(journal_term, Term), _))
 629    ),
 630    read(In, T2),
 631    process_journal(T2, In, DB).
 632
 633process_journal_term(assert(S,P,O), DB) :-
 634    rdf_assert(S,P,O,DB).
 635process_journal_term(assert(S,P,O,Line), DB) :-
 636    rdf_assert(S,P,O,DB:Line).
 637process_journal_term(retract(S,P,O), DB) :-
 638    rdf_retractall(S,P,O,DB).
 639process_journal_term(retract(S,P,O,Line), DB) :-
 640    rdf_retractall(S,P,O,DB:Line).
 641process_journal_term(update(S,P,O,Action), DB) :-
 642    (   rdf_update(S,P,O,DB, Action)
 643    ->  true
 644    ;   print_message(warning, rdf(update_failed(S,P,O,Action)))
 645    ).
 646process_journal_term(start(_), _).      % journal open/close
 647process_journal_term(end(_), _).
 648process_journal_term(begin(_), _).      % logged transaction (compatibility)
 649process_journal_term(end, _).
 650process_journal_term(begin(_,_,_,_), _). % logged transaction (current)
 651process_journal_term(end(_,_,_), _).
 652
 653
 654                 /*******************************
 655                 *         CREATE JOURNAL       *
 656                 *******************************/
 657
 658:- dynamic
 659    blocked_db/2,                   % DB, Reason
 660    transaction_message/3,          % Nesting, Time, Message
 661    transaction_db/3.               % Nesting, DB, Id
 662
 663%!  rdf_persistency(+DB, Bool)
 664%
 665%   Specify whether a database is persistent.  Switching to =false=
 666%   kills the persistent state.  Switching to =true= creates it.
 667
 668rdf_persistency(DB, Bool) :-
 669    must_be(atom, DB),
 670    must_be(boolean, Bool),
 671    fail.
 672rdf_persistency(DB, false) :-
 673    !,
 674    (   blocked_db(DB, persistency)
 675    ->  true
 676    ;   assert(blocked_db(DB, persistency)),
 677        delete_db(DB)
 678    ).
 679rdf_persistency(DB, true) :-
 680    (   retract(blocked_db(DB, persistency))
 681    ->  create_db(DB)
 682    ;   true
 683    ).
 684
 685%!  rdf_db:property_of_graph(?Property, +Graph) is nondet.
 686%
 687%   Extend rdf_graph_property/2 with new properties.
 688
 689:- multifile
 690    rdf_db:property_of_graph/2.
 691
 692rdf_db:property_of_graph(persistent(State), Graph) :-
 693    (   blocked_db(Graph, persistency)
 694    ->  State = false
 695    ;   State = true
 696    ).
 697
 698
 699%!  start_monitor is det.
 700%!  stop_monitor is det.
 701%
 702%   Start/stop monitoring the RDF database   for  changes and update
 703%   the journal.
 704
 705start_monitor :-
 706    rdf_monitor(monitor,
 707                [ -assert(load)
 708                ]).
 709stop_monitor :-
 710    rdf_monitor(monitor,
 711                [ -all
 712                ]).
 713
 714%!  monitor(+Term) is semidet.
 715%
 716%   Handle an rdf_monitor/2 callback to  deal with persistency. Note
 717%   that the monitor calls that come   from rdf_db.pl that deal with
 718%   database changes are serialized.  They   do  come from different
 719%   threads though.
 720
 721monitor(Msg) :-
 722    debug(monitor, 'Monitor: ~p~n', [Msg]),
 723    fail.
 724monitor(assert(S,P,O,DB:Line)) :-
 725    !,
 726    \+ blocked_db(DB, _),
 727    journal_fd(DB, Fd),
 728    open_transaction(DB, Fd),
 729    format(Fd, '~q.~n', [assert(S,P,O,Line)]),
 730    sync_journal(DB, Fd).
 731monitor(assert(S,P,O,DB)) :-
 732    \+ blocked_db(DB, _),
 733    journal_fd(DB, Fd),
 734    open_transaction(DB, Fd),
 735    format(Fd, '~q.~n', [assert(S,P,O)]),
 736    sync_journal(DB, Fd).
 737monitor(retract(S,P,O,DB:Line)) :-
 738    !,
 739    \+ blocked_db(DB, _),
 740    journal_fd(DB, Fd),
 741    open_transaction(DB, Fd),
 742    format(Fd, '~q.~n', [retract(S,P,O,Line)]),
 743    sync_journal(DB, Fd).
 744monitor(retract(S,P,O,DB)) :-
 745    \+ blocked_db(DB, _),
 746    journal_fd(DB, Fd),
 747    open_transaction(DB, Fd),
 748    format(Fd, '~q.~n', [retract(S,P,O)]),
 749    sync_journal(DB, Fd).
 750monitor(update(S,P,O,DB:Line,Action)) :-
 751    !,
 752    \+ blocked_db(DB, _),
 753    (   Action = graph(NewDB)
 754    ->  monitor(assert(S,P,O,NewDB)),
 755        monitor(retract(S,P,O,DB:Line))
 756    ;   journal_fd(DB, Fd),
 757        format(Fd, '~q.~n', [update(S,P,O,Action)]),
 758        sync_journal(DB, Fd)
 759    ).
 760monitor(update(S,P,O,DB,Action)) :-
 761    \+ blocked_db(DB, _),
 762    (   Action = graph(NewDB)
 763    ->  monitor(assert(S,P,O,NewDB)),
 764        monitor(retract(S,P,O,DB))
 765    ;   journal_fd(DB, Fd),
 766        open_transaction(DB, Fd),
 767        format(Fd, '~q.~n', [update(S,P,O,Action)]),
 768        sync_journal(DB, Fd)
 769    ).
 770monitor(load(BE, _DumpFileURI)) :-
 771    (   BE = end(Graphs)
 772    ->  sync_loaded_graphs(Graphs)
 773    ;   true
 774    ).
 775monitor(create_graph(Graph)) :-
 776    \+ blocked_db(Graph, _),
 777    journal_fd(Graph, Fd),
 778    open_transaction(Graph, Fd),
 779    sync_journal(Graph, Fd).
 780monitor(reset) :-
 781    forall(rdf_graph(Graph), delete_db(Graph)).
 782                                        % TBD: Remove empty directories?
 783
 784monitor(transaction(BE, Id)) :-
 785    monitor_transaction(Id, BE).
 786
 787monitor_transaction(load_journal(DB), begin(_)) :-
 788    !,
 789    assert(blocked_db(DB, journal)).
 790monitor_transaction(load_journal(DB), end(_)) :-
 791    !,
 792    retractall(blocked_db(DB, journal)).
 793
 794monitor_transaction(parse(URI), begin(_)) :-
 795    !,
 796    (   blocked_db(URI, persistency)
 797    ->  true
 798    ;   assert(blocked_db(URI, parse))
 799    ).
 800monitor_transaction(parse(URI), end(_)) :-
 801    !,
 802    (   retract(blocked_db(URI, parse))
 803    ->  create_db(URI)
 804    ;   true
 805    ).
 806monitor_transaction(unload(DB), begin(_)) :-
 807    !,
 808    (   blocked_db(DB, persistency)
 809    ->  true
 810    ;   assert(blocked_db(DB, unload))
 811    ).
 812monitor_transaction(unload(DB), end(_)) :-
 813    !,
 814    (   retract(blocked_db(DB, unload))
 815    ->  delete_db(DB)
 816    ;   true
 817    ).
 818monitor_transaction(log(Msg), begin(N)) :-
 819    !,
 820    check_nested(N),
 821    get_time(Time),
 822    asserta(transaction_message(N, Time, Msg)).
 823monitor_transaction(log(_), end(N)) :-
 824    check_nested(N),
 825    retract(transaction_message(N, _, _)),
 826    !,
 827    findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs),
 828    end_transactions(DBs, N).
 829monitor_transaction(log(Msg, DB), begin(N)) :-
 830    !,
 831    check_nested(N),
 832    get_time(Time),
 833    asserta(transaction_message(N, Time, Msg)),
 834    journal_fd(DB, Fd),
 835    open_transaction(DB, Fd).
 836monitor_transaction(log(Msg, _DB), end(N)) :-
 837    monitor_transaction(log(Msg), end(N)).
 838
 839
 840%!  check_nested(+Level) is semidet.
 841%
 842%   True if we must log this transaction.   This  is always the case
 843%   for toplevel transactions. Nested transactions   are only logged
 844%   if log_nested_transactions(true) is defined.
 845
 846check_nested(0) :- !.
 847check_nested(_) :-
 848    rdf_option(log_nested_transactions(true)).
 849
 850
 851%!  open_transaction(+DB, +Fd) is det.
 852%
 853%   Add a begin(Id, Level, Time,  Message)   term  if  a transaction
 854%   involves DB. Id is an incremental   integer, where each database
 855%   has its own counter. Level is the nesting level, Time a floating
 856%   point timestamp and Message te message   provided as argument to
 857%   the log message.
 858
 859open_transaction(DB, Fd) :-
 860    transaction_message(N, Time, Msg),
 861    !,
 862    (   transaction_db(N, DB, _)
 863    ->  true
 864    ;   next_transaction_id(DB, Id),
 865        assert(transaction_db(N, DB, Id)),
 866        RoundedTime is round(Time*100)/100,
 867        format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)])
 868    ).
 869open_transaction(_,_).
 870
 871
 872%!  next_transaction_id(+DB, -Id) is det.
 873%
 874%   Id is the number to user for  the next logged transaction on DB.
 875%   Transactions in each  named  graph   are  numbered  in sequence.
 876%   Searching the Id of the last transaction is performed by the 2nd
 877%   clause starting 1Kb from the end   and doubling this offset each
 878%   failure.
 879
 880:- dynamic
 881    current_transaction_id/2.
 882
 883next_transaction_id(DB, Id) :-
 884    retract(current_transaction_id(DB, Last)),
 885    !,
 886    Id is Last + 1,
 887    assert(current_transaction_id(DB, Id)).
 888next_transaction_id(DB, Id) :-
 889    db_files(DB, _, Journal),
 890    exists_file(Journal),
 891    !,
 892    size_file(Journal, Size),
 893    open_db(Journal, read, In, []),
 894    call_cleanup(iterative_expand(In, Size, Last), close(In)),
 895    Id is Last + 1,
 896    assert(current_transaction_id(DB, Id)).
 897next_transaction_id(DB, 1) :-
 898    assert(current_transaction_id(DB, 1)).
 899
 900iterative_expand(_, 0, 0) :- !.
 901iterative_expand(In, Size, Last) :-     % Scan growing sections from the end
 902    Max is floor(log(Size)/log(2)),
 903    between(10, Max, Step),
 904    Offset is -(1<<Step),
 905    seek(In, Offset, eof, _),
 906    skip(In, 10),                   % records are line-based
 907    read(In, T0),
 908    last_transaction_id(T0, In, 0, Last),
 909    Last > 0,
 910    !.
 911iterative_expand(In, _, Last) :-        % Scan the whole file
 912    seek(In, 0, bof, _),
 913    read(In, T0),
 914    last_transaction_id(T0, In, 0, Last).
 915
 916last_transaction_id(end_of_file, _, Last, Last) :- !.
 917last_transaction_id(end(Id, _, _), In, _, Last) :-
 918    read(In, T1),
 919    last_transaction_id(T1, In, Id, Last).
 920last_transaction_id(_, In, Id, Last) :-
 921    read(In, T1),
 922    last_transaction_id(T1, In, Id, Last).
 923
 924
 925%!  end_transactions(+DBs:list(atom:id)) is det.
 926%
 927%   End a transaction that affected the  given list of databases. We
 928%   write the list of other affected databases as an argument to the
 929%   end-term to facilitate fast finding of the related transactions.
 930%
 931%   In each database, the transaction is   ended with a term end(Id,
 932%   Nesting, Others), where  Id  and   Nesting  are  the transaction
 933%   identifier and nesting (see open_transaction/2)  and Others is a
 934%   list of DB:Id,  indicating  other   databases  affected  by  the
 935%   transaction.
 936
 937end_transactions(DBs, N) :-
 938    end_transactions(DBs, DBs, N).
 939
 940end_transactions([], _, _).
 941end_transactions([DB:Id|T], DBs, N) :-
 942    journal_fd(DB, Fd),
 943    once(select(DB:Id, DBs, Others)),
 944    format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]),
 945    sync_journal(DB, Fd),
 946    end_transactions(T, DBs, N).
 947
 948
 949%!  sync_loaded_graphs(+Graphs)
 950%
 951%   Called after a binary triple has been loaded that added triples
 952%   to the given graphs.
 953
 954sync_loaded_graphs(Graphs) :-
 955    maplist(create_db, Graphs).
 956
 957
 958                 /*******************************
 959                 *         JOURNAL FILES        *
 960                 *******************************/
 961
 962%!  journal_fd(+DB, -Stream) is det.
 963%
 964%   Get an open stream to a journal. If the journal is not open, old
 965%   journals are closed to satisfy   the =max_open_journals= option.
 966%   Then the journal is opened in   =append= mode. Journal files are
 967%   always encoded as UTF-8 for  portability   as  well as to ensure
 968%   full coverage of Unicode.
 969
 970journal_fd(DB, Fd) :-
 971    source_journal_fd(DB, Fd),
 972    !.
 973journal_fd(DB, Fd) :-
 974    with_mutex(rdf_journal_file,
 975               journal_fd_(DB, Out)),
 976    Fd = Out.
 977
 978journal_fd_(DB, Fd) :-
 979    source_journal_fd(DB, Fd),
 980    !.
 981journal_fd_(DB, Fd) :-
 982    limit_fd_pool,
 983    db_files(DB, _Snapshot, Journal),
 984    open_db(Journal, append, Fd,
 985            [ close_on_abort(false)
 986            ]),
 987    time_stamp(Now),
 988    format(Fd, '~q.~n', [start([time(Now)])]),
 989    assert(source_journal_fd(DB, Fd)).              % new one at the end
 990
 991%!  limit_fd_pool is det.
 992%
 993%   Limit the number of  open   journals  to max_open_journals (10).
 994%   Note that calls  from  rdf_monitor/2   are  issued  in different
 995%   threads, but as they are part of write operations they are fully
 996%   synchronised.
 997
 998limit_fd_pool :-
 999    predicate_property(source_journal_fd(_, _), number_of_clauses(N)),
1000    !,
1001    (   rdf_option(max_open_journals(Max))
1002    ->  true
1003    ;   Max = 10
1004    ),
1005    Close is N - Max,
1006    forall(between(1, Close, _),
1007           close_oldest_journal).
1008limit_fd_pool.
1009
1010close_oldest_journal :-
1011    source_journal_fd(DB, _Fd),
1012    !,
1013    debug(rdf_persistency, 'Closing old journal for ~q', [DB]),
1014    close_journal(DB).
1015close_oldest_journal.
1016
1017
1018%!  sync_journal(+DB, +Fd)
1019%
1020%   Sync journal represented by database and   stream.  If the DB is
1021%   involved in a transaction there is   no point flushing until the
1022%   end of the transaction.
1023
1024sync_journal(DB, _) :-
1025    transaction_db(_, DB, _),
1026    !.
1027sync_journal(_, Fd) :-
1028    flush_output(Fd).
1029
1030%!  close_journal(+DB) is det.
1031%
1032%   Close the journal associated with DB if it is open.
1033
1034close_journal(DB) :-
1035    with_mutex(rdf_journal_file,
1036               close_journal_(DB)).
1037
1038close_journal_(DB) :-
1039    (   retract(source_journal_fd(DB, Fd))
1040    ->  time_stamp(Now),
1041        format(Fd, '~q.~n', [end([time(Now)])]),
1042        close(Fd, [force(true)])
1043    ;   true
1044    ).
1045
1046%!  close_journals
1047%
1048%   Close all open journals.
1049
1050close_journals :-
1051    forall(source_journal_fd(DB, _),
1052           catch(close_journal(DB), E,
1053                 print_message(error, E))).
1054
1055%!  create_db(+Graph)
1056%
1057%   Create a saved version of Graph in corresponding file, close and
1058%   delete journals.
1059
1060create_db(Graph) :-
1061    \+ rdf(_,_,_,Graph),
1062    !,
1063    debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]),
1064    delete_db(Graph).
1065create_db(Graph) :-
1066    debug(rdf_persistency, 'Saving Graph ~w', [Graph]),
1067    close_journal(Graph),
1068    db_abs_files(Graph, Snapshot, Journal),
1069    atom_concat(Snapshot, '.new', NewSnapshot),
1070    (   catch(( create_directory_levels(Snapshot),
1071                rdf_save_db(NewSnapshot, Graph)
1072              ), Error,
1073              ( print_message(warning, Error),
1074                fail
1075              ))
1076    ->  (   exists_file(Journal)
1077        ->  delete_file(Journal)
1078        ;   true
1079        ),
1080        rename_file(NewSnapshot, Snapshot),
1081        debug(rdf_persistency, 'Saved Graph ~w', [Graph])
1082    ;   catch(delete_file(NewSnapshot), _, true)
1083    ).
1084
1085
1086%!  delete_db(+DB)
1087%
1088%   Remove snapshot and journal file for DB.
1089
1090delete_db(DB) :-
1091    with_mutex(rdf_journal_file,
1092               delete_db_(DB)).
1093
1094delete_db_(DB) :-
1095    close_journal_(DB),
1096    db_abs_files(DB, Snapshot, Journal),
1097    !,
1098    (   exists_file(Journal)
1099    ->  delete_file(Journal)
1100    ;   true
1101    ),
1102    (   exists_file(Snapshot)
1103    ->  delete_file(Snapshot)
1104    ;   true
1105    ).
1106delete_db_(_).
1107
1108                 /*******************************
1109                 *             LOCKING          *
1110                 *******************************/
1111
1112%!  lock_db(+Dir)
1113%
1114%   Lock the database directory Dir.
1115
1116lock_db(Dir) :-
1117    lockfile(Dir, File),
1118    catch(open(File, update, Out, [lock(write), wait(false)]),
1119          error(permission_error(Access, _, _), _),
1120          locked_error(Access, Dir)),
1121    (   current_prolog_flag(pid, PID)
1122    ->  true
1123    ;   PID = 0                     % TBD: Fix in Prolog
1124    ),
1125    time_stamp(Now),
1126    gethostname(Host),
1127    format(Out, '/* RDF Database is in use */~n~n', []),
1128    format(Out, '~q.~n', [ locked([ time(Now),
1129                                    pid(PID),
1130                                    host(Host)
1131                                  ])
1132                         ]),
1133    flush_output(Out),
1134    set_end_of_stream(Out),
1135    assert(rdf_lock(Dir, lock(Out, File))),
1136    at_halt(unlock_db(Dir)).
1137
1138locked_error(lock, Dir) :-
1139    lockfile(Dir, File),
1140    (   catch(read_file_to_terms(File, Terms, []), _, fail),
1141        Terms = [locked(Args)]
1142    ->  Context = rdf_locked(Args)
1143    ;   Context = context(_, 'Database is in use')
1144    ),
1145    throw(error(permission_error(lock, rdf_db, Dir), Context)).
1146locked_error(open, Dir) :-
1147    throw(error(permission_error(lock, rdf_db, Dir),
1148                context(_, 'Lock file cannot be opened'))).
1149
1150%!  unlock_db(+Dir) is det.
1151%!  unlock_db(+Stream, +File) is det.
1152
1153unlock_db(Dir) :-
1154    retract(rdf_lock(Dir, lock(Out, File))),
1155    !,
1156    unlock_db(Out, File).
1157unlock_db(_).
1158
1159unlock_db(Out, File) :-
1160    close(Out),
1161    delete_file(File).
1162
1163                 /*******************************
1164                 *           FILENAMES          *
1165                 *******************************/
1166
1167lockfile(Dir, LockFile) :-
1168    atomic_list_concat([Dir, /, lock], LockFile).
1169
1170directory_levels(Levels) :-
1171    rdf_option(directory_levels(Levels)),
1172    !.
1173directory_levels(2).
1174
1175db_file(Base, File) :-
1176    rdf_directory(Dir),
1177    directory_levels(Levels),
1178    db_file(Dir, Base, Levels, File).
1179
1180db_file(Dir, Base, Levels, File) :-
1181    dir_levels(Base, Levels, Segments, [Base]),
1182    atomic_list_concat([Dir|Segments], /, File).
1183
1184open_db(Base, Mode, Stream, Options) :-
1185    db_file(Base, File),
1186    create_directory_levels(File),
1187    open(File, Mode, Stream, [encoding(utf8)|Options]).
1188
1189create_directory_levels(_File) :-
1190    rdf_option(directory_levels(0)),
1191    !.
1192create_directory_levels(File) :-
1193    file_directory_name(File, Dir),
1194    make_directory_path(Dir).
1195
1196exists_db(Base) :-
1197    db_file(Base, File),
1198    exists_file(File).
1199
1200%!  dir_levels(+File, +Levels, ?Segments, ?Tail) is det.
1201%
1202%   Create a list of intermediate directory names for File.  Each
1203%   directory consists of two hexadecimal digits.
1204
1205dir_levels(_, 0, Segments, Segments) :- !.
1206dir_levels(File, Levels, Segments, Tail) :-
1207    rdf_atom_md5(File, 1, Hash),
1208    create_dir_levels(Levels, 0, Hash, Segments, Tail).
1209
1210create_dir_levels(0, _, _, Segments, Segments) :- !.
1211create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :-
1212    sub_atom(Hash, S, 2, _, S1),
1213    S2 is S+2,
1214    N2 is N-1,
1215    create_dir_levels(N2, S2, Hash, Segments0, Tail).
1216
1217
1218%!  db_files(+DB, -Snapshot, -Journal).
1219%!  db_files(-DB, +Snapshot, -Journal).
1220%!  db_files(-DB, -Snapshot, +Journal).
1221%
1222%   True if named graph DB is represented  by the files Snapshot and
1223%   Journal. The filenames are local   to the directory representing
1224%   the store.
1225
1226db_files(DB, Snapshot, Journal) :-
1227    nonvar(DB),
1228    !,
1229    rdf_db_to_file(DB, Base),
1230    atom_concat(Base, '.trp', Snapshot),
1231    atom_concat(Base, '.jrn', Journal).
1232db_files(DB, Snapshot, Journal) :-
1233    nonvar(Snapshot),
1234    !,
1235    atom_concat(Base, '.trp', Snapshot),
1236    atom_concat(Base, '.jrn', Journal),
1237    rdf_db_to_file(DB, Base).
1238db_files(DB, Snapshot, Journal) :-
1239    nonvar(Journal),
1240    !,
1241    atom_concat(Base, '.jrn', Journal),
1242    atom_concat(Base, '.trp', Snapshot),
1243    rdf_db_to_file(DB, Base).
1244
1245db_abs_files(DB, Snapshot, Journal) :-
1246    db_files(DB, Snapshot0, Journal0),
1247    db_file(Snapshot0, Snapshot),
1248    db_file(Journal0, Journal).
1249
1250
1251%!  rdf_journal_file(+Graph, -File) is semidet.
1252%!  rdf_journal_file(-Graph, -File) is nondet.
1253%
1254%   True if File the name of the existing journal file for Graph.
1255
1256rdf_journal_file(Graph, Journal) :-
1257    (   var(Graph)
1258    ->  rdf_graph(Graph)
1259    ;   true
1260    ),
1261    db_abs_files(Graph, _Snapshot, Journal),
1262    exists_file(Journal).
1263
1264
1265%!  rdf_snapshot_file(+Graph, -File) is semidet.
1266%!  rdf_snapshot_file(-Graph, -File) is nondet.
1267%
1268%   True if File the name of the existing snapshot file for Graph.
1269
1270rdf_snapshot_file(Graph, Snapshot) :-
1271    (   var(Graph)
1272    ->  rdf_graph(Graph)    % also pick the empty graphs
1273    ;   true
1274    ),
1275    db_abs_files(Graph, Snapshot, _Journal),
1276    exists_file(Snapshot).
1277
1278
1279%!  rdf_db_to_file(+DB, -File) is det.
1280%!  rdf_db_to_file(-DB, +File) is det.
1281%
1282%   Translate between database encoding (often an   file or URL) and
1283%   the name we store in the  directory.   We  keep  a cache for two
1284%   reasons. Speed, but much more important   is that the mapping of
1285%   raw --> encoded provided by  www_form_encode/2 is not guaranteed
1286%   to be unique by the W3C standards.
1287
1288rdf_db_to_file(DB, File) :-
1289    file_base_db(File, DB),
1290    !.
1291rdf_db_to_file(DB, File) :-
1292    url_to_filename(DB, File),
1293    assert(file_base_db(File, DB)).
1294
1295%!  url_to_filename(+URL, -FileName) is det.
1296%!  url_to_filename(-URL, +FileName) is det.
1297%
1298%   Turn  a  valid  URL  into  a  filename.  Earlier  versions  used
1299%   www_form_encode/2, but this can produce  characters that are not
1300%   valid  in  filenames.  We  will  use    the   same  encoding  as
1301%   www_form_encode/2,  but  using  our  own    rules   for  allowed
1302%   characters. The only requirement is that   we avoid any filename
1303%   special character in use.  The   current  encoding  use US-ASCII
1304%   alnum characters, _ and %
1305
1306url_to_filename(URL, FileName) :-
1307    atomic(URL),
1308    !,
1309    atom_codes(URL, Codes),
1310    phrase(url_encode(EncCodes), Codes),
1311    atom_codes(FileName, EncCodes).
1312url_to_filename(URL, FileName) :-
1313    uri_encoded(path, URL, FileName).
1314
1315url_encode([0'+|T]) -->
1316    " ",
1317    !,
1318    url_encode(T).
1319url_encode([C|T]) -->
1320    alphanum(C),
1321    !,
1322    url_encode(T).
1323url_encode([C|T]) -->
1324    no_enc_extra(C),
1325    !,
1326    url_encode(T).
1327url_encode(Enc) -->
1328    (   "\r\n"
1329    ;   "\n"
1330    ),
1331    !,
1332    { string_codes("%0D%0A", Codes),
1333      append(Codes, T, Enc)
1334    },
1335    url_encode(T).
1336url_encode([]) -->
1337    eos,
1338    !.
1339url_encode([0'%,D1,D2|T]) -->
1340    [C],
1341    { Dv1 is (C>>4 /\ 0xf),
1342      Dv2 is (C /\ 0xf),
1343      code_type(D1, xdigit(Dv1)),
1344      code_type(D2, xdigit(Dv2))
1345    },
1346    url_encode(T).
1347
1348eos([], []).
1349
1350alphanum(C) -->
1351    [C],
1352    { C < 128,                      % US-ASCII
1353      code_type(C, alnum)
1354    }.
1355
1356no_enc_extra(0'_) --> "_".
1357
1358
1359                 /*******************************
1360                 *             REINDEX          *
1361                 *******************************/
1362
1363%!  reindex_db(+Dir, +Levels)
1364%
1365%   Reindex the database by creating intermediate directories.
1366
1367reindex_db(Dir, Levels) :-
1368    directory_files(Dir, Files),
1369    reindex_files(Files, Dir, '.', 0, Levels),
1370    remove_empty_directories(Files, Dir).
1371
1372reindex_files([], _, _, _, _).
1373reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :-
1374    nofollow(Nofollow),
1375    !,
1376    reindex_files(Files, Dir, Prefix, CLevel, Levels).
1377reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :-
1378    CLevel \== Levels,
1379    file_name_extension(_Base, Ext, File),
1380    db_extension(Ext),
1381    !,
1382    directory_file_path(Prefix, File, DBFile),
1383    directory_file_path(Dir, DBFile, OldPath),
1384    db_file(Dir, File, Levels, NewPath),
1385    debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]),
1386    file_directory_name(NewPath, NewDir),
1387    make_directory_path(NewDir),
1388    rename_file(OldPath, NewPath),
1389    reindex_files(Files, Dir, Prefix, CLevel, Levels).
1390reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :-
1391    directory_file_path(Prefix, D, SubD),
1392    directory_file_path(Dir, SubD, AbsD),
1393    exists_directory(AbsD),
1394    \+ read_link(AbsD, _, _),      % Do not follow links
1395    !,
1396    directory_files(AbsD, SubFiles),
1397    CLevel2 is CLevel + 1,
1398    reindex_files(SubFiles, Dir, SubD, CLevel2, Levels),
1399    reindex_files(Files, Dir, Prefix, CLevel, Levels).
1400reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :-
1401    reindex_files(Files, Dir, Prefix, CLevel, Levels).
1402
1403
1404remove_empty_directories([], _).
1405remove_empty_directories([File|Files], Dir) :-
1406    \+ nofollow(File),
1407    directory_file_path(Dir, File, Path),
1408    exists_directory(Path),
1409    \+ read_link(Path, _, _),
1410    !,
1411    directory_files(Path, Content),
1412    exclude(nofollow, Content, RealContent),
1413    (   RealContent == []
1414    ->  debug(rdf_persistency, 'Remove empty dir ~q', [Path]),
1415        delete_directory(Path)
1416    ;   remove_empty_directories(RealContent, Path)
1417    ),
1418    remove_empty_directories(Files, Dir).
1419remove_empty_directories([_|Files], Dir) :-
1420    remove_empty_directories(Files, Dir).
1421
1422
1423                 /*******************************
1424                 *            PREFIXES          *
1425                 *******************************/
1426
1427save_prefixes(Dir) :-
1428    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
1429    setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]),
1430                       write_prefixes(Out),
1431                       close(Out)).
1432
1433write_prefixes(Out) :-
1434    format(Out, '% Snapshot of defined RDF prefixes~n~n', []),
1435    forall(rdf_current_ns(Alias, URI),
1436           format(Out, 'prefix(~q, ~q).~n', [Alias, URI])).
1437
1438%!  load_prefixes(+RDFDBDir) is det.
1439%
1440%   If the file RDFDBDir/prefixes.db exists,  load the prefixes. The
1441%   prefixes are registered using rdf_register_ns/3. Possible errors
1442%   because the prefix  definitions  have   changed  are  printed as
1443%   warnings, retaining the  old  definition.   Note  that  changing
1444%   prefixes generally requires reloading all RDF from the source.
1445
1446load_prefixes(Dir) :-
1447    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
1448    (   exists_file(PrefixFile)
1449    ->  setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]),
1450                           read_prefixes(In),
1451                           close(In))
1452    ;   true
1453    ).
1454
1455read_prefixes(Stream) :-
1456    read_term(Stream, T0, []),
1457    read_prefixes(T0, Stream).
1458
1459read_prefixes(end_of_file, _) :- !.
1460read_prefixes(prefix(Alias, URI), Stream) :-
1461    !,
1462    must_be(atom, Alias),
1463    must_be(atom, URI),
1464    catch(rdf_register_ns(Alias, URI, []), E,
1465          print_message(warning, E)),
1466    read_term(Stream, T, []),
1467    read_prefixes(T, Stream).
1468read_prefixes(Term, _) :-
1469    domain_error(prefix_term, Term).
1470
1471
1472                 /*******************************
1473                 *              UTIL            *
1474                 *******************************/
1475
1476%!  mkdir(+Directory)
1477%
1478%   Create a directory if it does not already exist.
1479
1480mkdir(Directory) :-
1481    exists_directory(Directory),
1482    !.
1483mkdir(Directory) :-
1484    make_directory(Directory).
1485
1486%!  time_stamp(-Integer)
1487%
1488%   Return time-stamp rounded to integer.
1489
1490time_stamp(Int) :-
1491    get_time(Now),
1492    Int is round(Now).
1493
1494
1495                 /*******************************
1496                 *            MESSAGES          *
1497                 *******************************/
1498
1499:- multifile
1500    prolog:message/3,
1501    prolog:message_context/3.
1502
1503prolog:message(rdf(Term)) -->
1504    message(Term).
1505
1506message(restoring(Type, Count, Jobs)) -->
1507    [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ].
1508message(restore(attached(Graphs, Triples, Time/Wall))) -->
1509    { catch(Percent is round(100*Time/Wall), _, Percent = 0) },
1510    [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'-
1511      [Graphs, Triples, Wall, Percent, Time] ].
1512% attach_graph/2
1513message(restore(true, Action)) -->
1514    !,
1515    silent_message(Action).
1516message(restore(brief, Action)) -->
1517    !,
1518    brief_message(Action).
1519message(restore(_, Graph)) -->
1520    [ 'Restoring ~p ... '-[Graph], flush ].
1521message(restore(_, snapshot(_))) -->
1522    [ at_same_line, '(snapshot) '-[], flush ].
1523message(restore(_, journal(_))) -->
1524    [ at_same_line, '(journal) '-[], flush ].
1525message(restore(_, done(_, Time, Count))) -->
1526    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
1527% load_source/4
1528message(restore(_, snapshot(G, _))) -->
1529    [ 'Restoring ~p\t(snapshot)'-[G], flush ].
1530message(restore(_, journal(G, _))) -->
1531    [ 'Restoring ~p\t(journal)'-[G], flush ].
1532message(restore(_, done(_, Time, Count))) -->
1533    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
1534% journal handling
1535message(update_failed(S,P,O,Action)) -->
1536    [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ].
1537% directory reindexing
1538message(reindex(Count, Depth)) -->
1539    [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ].
1540message(reindex(Depth)) -->
1541    [ 'Fixing database directory structure (~d levels)'-[Depth] ].
1542message(read_only) -->
1543    [ 'Cannot write persistent store; continuing in read-only mode.', nl,
1544      'All changes to the RDF store will be lost if this process terminates.'
1545    ].
1546
1547silent_message(_Action) --> [].
1548
1549brief_message(done(Graph, _Time, _Count, Nth, Total)) -->
1550    { file_base_name(Graph, Base) },
1551    [ at_same_line,
1552      '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total],
1553      flush
1554    ].
1555brief_message(_) --> [].
1556
1557
1558prolog:message_context(rdf_locked(Args)) -->
1559    { memberchk(time(Time), Args),
1560      memberchk(pid(Pid), Args),
1561      format_time(string(S), '%+', Time)
1562    },
1563    [ nl,
1564      'locked at ~s by process id ~w'-[S,Pid]
1565    ].