35
36:- module(rdf_persistency,
37 [ rdf_attach_db/2, 38 rdf_detach_db/0, 39 rdf_current_db/1, 40 rdf_persistency/2, 41 rdf_flush_journals/1, 42 rdf_persistency_property/1, 43 rdf_journal_file/2, 44 rdf_snapshot_file/2, 45 rdf_db_to_file/2 46 ]).
47:- use_module(library(semweb/rdf_db)).
48:- use_module(library(filesex)).
49:- use_module(library(lists)).
50:- use_module(library(uri)).
51:- use_module(library(debug)).
52:- use_module(library(option)).
53:- use_module(library(error)).
54:- use_module(library(thread)).
55:- use_module(library(apply)).
56
57
89
90:- volatile
91 rdf_directory/1,
92 rdf_lock/2,
93 rdf_option/1,
94 source_journal_fd/2,
95 file_base_db/2.
96:- dynamic
97 rdf_directory/1, 98 rdf_lock/2, 99 rdf_option/1, 100 source_journal_fd/2, 101 file_base_db/2. 102
103:- meta_predicate
104 no_agc(0).
105
106:- predicate_options(rdf_attach_db/2, 2,
107 [ access(oneof([read_write,read_only])),
108 concurrency(positive_integer),
109 max_open_journals(positive_integer),
110 silent(oneof([true,false,brief])),
111 log_nested_transactions(boolean)
112 ]).
113
161
162rdf_attach_db(DirSpec, Options) :-
163 option(access(read_only), Options),
164 !,
165 absolute_file_name(DirSpec,
166 Directory,
167 [ access(read),
168 file_type(directory)
169 ]),
170 rdf_attach_db_ro(Directory, Options).
171rdf_attach_db(DirSpec, Options) :-
172 option(access(read_write), Options),
173 !,
174 rdf_attach_db_rw(DirSpec, Options).
175rdf_attach_db(DirSpec, Options) :-
176 absolute_file_name(DirSpec,
177 Directory,
178 [ access(exist),
179 file_type(directory),
180 file_errors(fail)
181 ]),
182 !,
183 ( access_file(Directory, write)
184 -> catch(rdf_attach_db_rw(Directory, Options), E, true),
185 ( var(E)
186 -> true
187 ; E = error(permission_error(lock, rdf_db, _), _)
188 -> print_message(warning, E),
189 print_message(warning, rdf(read_only)),
190 rdf_attach_db(DirSpec, [access(read_only)|Options])
191 ; throw(E)
192 )
193 ; print_message(warning,
194 error(permission_error(write, directory, Directory))),
195 print_message(warning, rdf(read_only)),
196 rdf_attach_db_ro(Directory, Options)
197 ).
198rdf_attach_db(DirSpec, Options) :-
199 catch(rdf_attach_db_rw(DirSpec, Options), E, true),
200 ( var(E)
201 -> true
202 ; print_message(warning, E),
203 print_message(warning, rdf(read_only)),
204 rdf_attach_db(DirSpec, [access(read_only)|Options])
205 ).
206
207
208rdf_attach_db_rw(DirSpec, Options) :-
209 absolute_file_name(DirSpec,
210 Directory,
211 [ access(write),
212 file_type(directory),
213 file_errors(fail)
214 ]),
215 !,
216 ( rdf_directory(Directory)
217 -> true 218 ; rdf_detach_db,
219 mkdir(Directory),
220 lock_db(Directory),
221 assert(rdf_directory(Directory)),
222 assert_options(Options),
223 stop_monitor, 224 no_agc(load_db),
225 at_halt(rdf_detach_db),
226 start_monitor
227 ).
228rdf_attach_db_rw(DirSpec, Options) :-
229 absolute_file_name(DirSpec,
230 Directory,
231 [ solutions(all)
232 ]),
233 ( exists_directory(Directory)
234 -> access_file(Directory, write)
235 ; catch(make_directory(Directory), _, fail)
236 ),
237 !,
238 rdf_attach_db(Directory, Options).
239rdf_attach_db_rw(DirSpec, _) :- 240 absolute_file_name(DirSpec, % permission error
241 Directory,
242 [ access(exist),
243 file_type(directory)
244 ]),
245 permission_error(write, directory, Directory).
246
250
251rdf_attach_db_ro(Directory, Options) :-
252 rdf_detach_db,
253 assert(rdf_directory(Directory)),
254 assert_options(Options),
255 stop_monitor, 256 no_agc(load_db).
257
258
259assert_options([]).
260assert_options([H|T]) :-
261 ( option_type(H, Check)
262 -> Check,
263 assert(rdf_option(H))
264 ; true % ignore options we do not understand
265 ),
266 assert_options(T).
267
268option_type(concurrency(X), must_be(positive_integer, X)).
269option_type(max_open_journals(X), must_be(positive_integer, X)).
270option_type(directory_levels(X), must_be(positive_integer, X)).
271option_type(silent(X), must_be(oneof([true,false,brief]), X)).
272option_type(log_nested_transactions(X), must_be(boolean, X)).
273option_type(access(X), must_be(oneof([read_write,
274 read_only]), X)).
275
276
287
288rdf_persistency_property(Property) :-
289 var(Property),
290 !,
291 rdf_persistency_property_(Property).
292rdf_persistency_property(Property) :-
293 rdf_persistency_property_(Property),
294 !.
295
296rdf_persistency_property_(Property) :-
297 rdf_option(Property).
298rdf_persistency_property_(directory(Dir)) :-
299 rdf_directory(Dir).
300
306
307no_agc(Goal) :-
308 current_prolog_flag(agc_margin, Old),
309 setup_call_cleanup(
310 set_prolog_flag(agc_margin, 0),
311 Goal,
312 set_prolog_flag(agc_margin, Old)).
313
314
320
321rdf_detach_db :-
322 debug(halt, 'Detaching RDF database', []),
323 stop_monitor,
324 close_journals,
325 ( retract(rdf_directory(Dir))
326 -> debug(halt, 'DB Directory: ~w', [Dir]),
327 save_prefixes(Dir),
328 retractall(rdf_option(_)),
329 retractall(source_journal_fd(_,_)),
330 unlock_db(Dir)
331 ; true
332 ).
333
334
338
339rdf_current_db(Directory) :-
340 rdf_directory(Dir),
341 !,
342 Dir = Directory.
343
344
355
356rdf_flush_journals(Options) :-
357 option(graph(Graph), Options, _),
358 forall(rdf_graph(Graph),
359 rdf_flush_journal(Graph, Options)).
360
361rdf_flush_journal(Graph, Options) :-
362 db_files(Graph, _SnapshotFile, JournalFile),
363 db_file(JournalFile, File),
364 ( \+ exists_file(File)
365 -> true
366 ; memberchk(min_size(KB), Options),
367 size_file(JournalFile, Size),
368 Size / 1024 < KB
369 -> true
370 ; create_db(Graph)
371 ).
372
373 376
382
383load_db :-
384 rdf_directory(Dir),
385 concurrency(Jobs),
386 cpu_stat_key(Jobs, StatKey),
387 get_time(Wall0),
388 statistics(StatKey, T0),
389 load_prefixes(Dir),
390 verbosity(Silent),
391 find_dbs(Dir, Graphs, SnapShots, Journals),
392 length(Graphs, GraphCount),
393 maplist(rdf_unload_graph, Graphs),
394 rdf_statistics(triples(Triples0)),
395 load_sources(snapshots, SnapShots, Silent, Jobs),
396 load_sources(journals, Journals, Silent, Jobs),
397 rdf_statistics(triples(Triples1)),
398 statistics(StatKey, T1),
399 get_time(Wall1),
400 T is T1 - T0,
401 Wall is Wall1 - Wall0,
402 Triples = Triples1 - Triples0,
403 message_level(Silent, Level),
404 print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))).
405
406load_sources(_, [], _, _) :- !.
407load_sources(Type, Sources, Silent, Jobs) :-
408 length(Sources, Count),
409 RunJobs is min(Count, Jobs),
410 print_message(informational, rdf(restoring(Type, Count, RunJobs))),
411 make_goals(Sources, Silent, 1, Count, Goals),
412 concurrent(RunJobs, Goals, []).
413
414
416
417make_goals([], _, _, _, []).
418make_goals([DB|T0], Silent, I, Total,
419 [load_source(DB, Silent, I, Total)|T]) :-
420 I2 is I + 1,
421 make_goals(T0, Silent, I2, Total, T).
422
423verbosity(Silent) :-
424 rdf_option(silent(Silent)),
425 !.
426verbosity(Silent) :-
427 current_prolog_flag(verbose, silent),
428 !,
429 Silent = true.
430verbosity(brief).
431
432
436
437concurrency(Jobs) :-
438 rdf_option(concurrency(Jobs)),
439 !.
440concurrency(Jobs) :-
441 current_prolog_flag(cpu_count, Jobs),
442 Jobs > 0,
443 !.
444concurrency(1).
445
446cpu_stat_key(1, cputime) :- !.
447cpu_stat_key(_, process_cputime).
448
449
458
459find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :-
460 directory_files(Dir, Files),
461 phrase(scan_db_files(Files, Dir, '.', 0), Scanned),
462 maplist(db_graph, Scanned, UnsortedGraphs),
463 sort(UnsortedGraphs, Graphs),
464 ( consider_reindex_db(Dir, Graphs, Scanned)
465 -> find_dbs(Dir, Graphs, SnapBySize, JournalBySize)
466 ; partition(db_is_snapshot, Scanned, Snapshots, Journals),
467 sort(Snapshots, SnapBySize),
468 sort(Journals, JournalBySize)
469 ).
470
471consider_reindex_db(Dir, Graphs, Scanned) :-
472 length(Graphs, Count),
473 Count > 0,
474 DepthNeeded is floor(log(Count)/log(256)),
475 ( maplist(depth_db(DepthNow), Scanned)
476 -> ( DepthNeeded > DepthNow
477 -> true
478 ; retractall(rdf_option(directory_levels(_))),
479 assertz(rdf_option(directory_levels(DepthNow))),
480 fail
481 )
482 ; true
483 ),
484 reindex_db(Dir, DepthNeeded).
485
486db_is_snapshot(Term) :-
487 arg(2, Term, trp).
488
489db_graph(Term, DB) :-
490 arg(3, Term, DB).
491
492db_file_name(Term, File) :-
493 arg(4, Term, File).
494
495depth_db(Depth, DB) :-
496 arg(5, DB, Depth).
497
502
503scan_db_files([], _, _, _) -->
504 [].
505scan_db_files([Nofollow|T], Dir, Prefix, Depth) -->
506 { nofollow(Nofollow) },
507 !,
508 scan_db_files(T, Dir, Prefix, Depth).
509scan_db_files([File|T], Dir, Prefix, Depth) -->
510 { file_name_extension(Base, Ext, File),
511 db_extension(Ext),
512 !,
513 rdf_db_to_file(DB, Base),
514 directory_file_path(Prefix, File, DBFile),
515 directory_file_path(Dir, DBFile, AbsFile),
516 size_file(AbsFile, Size)
517 },
518 [ db(Size, Ext, DB, AbsFile, Depth) ],
519 scan_db_files(T, Dir, Prefix, Depth).
520scan_db_files([D|T], Dir, Prefix, Depth) -->
521 { directory_file_path(Prefix, D, SubD),
522 directory_file_path(Dir, SubD, AbsD),
523 exists_directory(AbsD),
524 \+ read_link(AbsD, _, _), % Do not follow links
525 !,
526 directory_files(AbsD, SubFiles),
527 SubDepth is Depth + 1
528 },
529 scan_db_files(SubFiles, Dir, SubD, SubDepth),
530 scan_db_files(T, Dir, Prefix, Depth).
531scan_db_files([_|T], Dir, Prefix, Depth) -->
532 scan_db_files(T, Dir, Prefix, Depth).
533
534nofollow(.).
535nofollow(..).
536
537db_extension(trp).
538db_extension(jrn).
539
540:- public load_source/4. 541
542load_source(DB, Silent, Nth, Total) :-
543 db_file_name(DB, File),
544 db_graph(DB, Graph),
545 message_level(Silent, Level),
546 graph_triple_count(Graph, Count0),
547 statistics(cputime, T0),
548 ( db_is_snapshot(DB)
549 -> print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))),
550 rdf_load_db(File)
551 ; print_message(Level, rdf(restore(Silent, journal(Graph, File)))),
552 load_journal(File, Graph)
553 ),
554 statistics(cputime, T1),
555 T is T1 - T0,
556 graph_triple_count(Graph, Count1),
557 Count is Count1 - Count0,
558 print_message(Level, rdf(restore(Silent,
559 done(Graph, T, Count, Nth, Total)))).
560
561
562graph_triple_count(Graph, Count) :-
563 rdf_statistics(triples_by_graph(Graph, Count)),
564 !.
565graph_triple_count(_, 0).
566
567
572
573attach_graph(Graph, Options) :-
574 ( option(silent(true), Options)
575 -> Level = silent
576 ; Level = informational
577 ),
578 db_files(Graph, SnapshotFile, JournalFile),
579 rdf_retractall(_,_,_,Graph),
580 statistics(cputime, T0),
581 print_message(Level, rdf(restore(Silent, Graph))),
582 db_file(SnapshotFile, AbsSnapShot),
583 ( exists_file(AbsSnapShot)
584 -> print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))),
585 rdf_load_db(AbsSnapShot)
586 ; true
587 ),
588 ( exists_db(JournalFile)
589 -> print_message(Level, rdf(restore(Silent, journal(JournalFile)))),
590 load_journal(JournalFile, Graph)
591 ; true
592 ),
593 statistics(cputime, T1),
594 T is T1 - T0,
595 ( rdf_statistics(triples_by_graph(Graph, Count))
596 -> true
597 ; Count = 0
598 ),
599 print_message(Level, rdf(restore(Silent,
600 done(Graph, T, Count)))).
601
602message_level(true, silent) :- !.
603message_level(_, informational).
604
605
606 609
614
615load_journal(File, DB) :-
616 rdf_create_graph(DB),
617 setup_call_cleanup(
618 open(File, read, In, [encoding(utf8)]),
619 ( read(In, T0),
620 process_journal(T0, In, DB)
621 ),
622 close(In)).
623
624process_journal(end_of_file, _, _) :- !.
625process_journal(Term, In, DB) :-
626 ( process_journal_term(Term, DB)
627 -> true
628 ; throw(error(type_error(journal_term, Term), _))
629 ),
630 read(In, T2),
631 process_journal(T2, In, DB).
632
633process_journal_term(assert(S,P,O), DB) :-
634 rdf_assert(S,P,O,DB).
635process_journal_term(assert(S,P,O,Line), DB) :-
636 rdf_assert(S,P,O,DB:Line).
637process_journal_term(retract(S,P,O), DB) :-
638 rdf_retractall(S,P,O,DB).
639process_journal_term(retract(S,P,O,Line), DB) :-
640 rdf_retractall(S,P,O,DB:Line).
641process_journal_term(update(S,P,O,Action), DB) :-
642 ( rdf_update(S,P,O,DB, Action)
643 -> true
644 ; print_message(warning, rdf(update_failed(S,P,O,Action)))
645 ).
646process_journal_term(start(_), _). 647process_journal_term(end(_), _).
648process_journal_term(begin(_), _). 649process_journal_term(end, _).
650process_journal_term(begin(_,_,_,_), _). 651process_journal_term(end(_,_,_), _).
652
653
654 657
658:- dynamic
659 blocked_db/2, 660 transaction_message/3, 661 transaction_db/3. 662
667
668rdf_persistency(DB, Bool) :-
669 must_be(atom, DB),
670 must_be(boolean, Bool),
671 fail.
672rdf_persistency(DB, false) :-
673 !,
674 ( blocked_db(DB, persistency)
675 -> true
676 ; assert(blocked_db(DB, persistency)),
677 delete_db(DB)
678 ).
679rdf_persistency(DB, true) :-
680 ( retract(blocked_db(DB, persistency))
681 -> create_db(DB)
682 ; true
683 ).
684
688
689:- multifile
690 rdf_db:property_of_graph/2.
691
692rdf_db:property_of_graph(persistent(State), Graph) :-
693 ( blocked_db(Graph, persistency)
694 -> State = false
695 ; State = true
696 ).
697
698
704
705start_monitor :-
706 rdf_monitor(monitor,
707 [ -assert(load)
708 ]).
709stop_monitor :-
710 rdf_monitor(monitor,
711 [ -all
712 ]).
713
720
721monitor(Msg) :-
722 debug(monitor, 'Monitor: ~p~n', [Msg]),
723 fail.
724monitor(assert(S,P,O,DB:Line)) :-
725 !,
726 \+ blocked_db(DB, _),
727 journal_fd(DB, Fd),
728 open_transaction(DB, Fd),
729 format(Fd, '~q.~n', [assert(S,P,O,Line)]),
730 sync_journal(DB, Fd).
731monitor(assert(S,P,O,DB)) :-
732 \+ blocked_db(DB, _),
733 journal_fd(DB, Fd),
734 open_transaction(DB, Fd),
735 format(Fd, '~q.~n', [assert(S,P,O)]),
736 sync_journal(DB, Fd).
737monitor(retract(S,P,O,DB:Line)) :-
738 !,
739 \+ blocked_db(DB, _),
740 journal_fd(DB, Fd),
741 open_transaction(DB, Fd),
742 format(Fd, '~q.~n', [retract(S,P,O,Line)]),
743 sync_journal(DB, Fd).
744monitor(retract(S,P,O,DB)) :-
745 \+ blocked_db(DB, _),
746 journal_fd(DB, Fd),
747 open_transaction(DB, Fd),
748 format(Fd, '~q.~n', [retract(S,P,O)]),
749 sync_journal(DB, Fd).
750monitor(update(S,P,O,DB:Line,Action)) :-
751 !,
752 \+ blocked_db(DB, _),
753 ( Action = graph(NewDB)
754 -> monitor(assert(S,P,O,NewDB)),
755 monitor(retract(S,P,O,DB:Line))
756 ; journal_fd(DB, Fd),
757 format(Fd, '~q.~n', [update(S,P,O,Action)]),
758 sync_journal(DB, Fd)
759 ).
760monitor(update(S,P,O,DB,Action)) :-
761 \+ blocked_db(DB, _),
762 ( Action = graph(NewDB)
763 -> monitor(assert(S,P,O,NewDB)),
764 monitor(retract(S,P,O,DB))
765 ; journal_fd(DB, Fd),
766 open_transaction(DB, Fd),
767 format(Fd, '~q.~n', [update(S,P,O,Action)]),
768 sync_journal(DB, Fd)
769 ).
770monitor(load(BE, _DumpFileURI)) :-
771 ( BE = end(Graphs)
772 -> sync_loaded_graphs(Graphs)
773 ; true
774 ).
775monitor(create_graph(Graph)) :-
776 \+ blocked_db(Graph, _),
777 journal_fd(Graph, Fd),
778 open_transaction(Graph, Fd),
779 sync_journal(Graph, Fd).
780monitor(reset) :-
781 forall(rdf_graph(Graph), delete_db(Graph)).
782 783
784monitor(transaction(BE, Id)) :-
785 monitor_transaction(Id, BE).
786
787monitor_transaction(load_journal(DB), begin(_)) :-
788 !,
789 assert(blocked_db(DB, journal)).
790monitor_transaction(load_journal(DB), end(_)) :-
791 !,
792 retractall(blocked_db(DB, journal)).
793
794monitor_transaction(parse(URI), begin(_)) :-
795 !,
796 ( blocked_db(URI, persistency)
797 -> true
798 ; assert(blocked_db(URI, parse))
799 ).
800monitor_transaction(parse(URI), end(_)) :-
801 !,
802 ( retract(blocked_db(URI, parse))
803 -> create_db(URI)
804 ; true
805 ).
806monitor_transaction(unload(DB), begin(_)) :-
807 !,
808 ( blocked_db(DB, persistency)
809 -> true
810 ; assert(blocked_db(DB, unload))
811 ).
812monitor_transaction(unload(DB), end(_)) :-
813 !,
814 ( retract(blocked_db(DB, unload))
815 -> delete_db(DB)
816 ; true
817 ).
818monitor_transaction(log(Msg), begin(N)) :-
819 !,
820 check_nested(N),
821 get_time(Time),
822 asserta(transaction_message(N, Time, Msg)).
823monitor_transaction(log(_), end(N)) :-
824 check_nested(N),
825 retract(transaction_message(N, _, _)),
826 !,
827 findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs),
828 end_transactions(DBs, N).
829monitor_transaction(log(Msg, DB), begin(N)) :-
830 !,
831 check_nested(N),
832 get_time(Time),
833 asserta(transaction_message(N, Time, Msg)),
834 journal_fd(DB, Fd),
835 open_transaction(DB, Fd).
836monitor_transaction(log(Msg, _DB), end(N)) :-
837 monitor_transaction(log(Msg), end(N)).
838
839
845
846check_nested(0) :- !.
847check_nested(_) :-
848 rdf_option(log_nested_transactions(true)).
849
850
858
859open_transaction(DB, Fd) :-
860 transaction_message(N, Time, Msg),
861 !,
862 ( transaction_db(N, DB, _)
863 -> true
864 ; next_transaction_id(DB, Id),
865 assert(transaction_db(N, DB, Id)),
866 RoundedTime is round(Time*100)/100,
867 format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)])
868 ).
869open_transaction(_,_).
870
871
879
880:- dynamic
881 current_transaction_id/2.
882
883next_transaction_id(DB, Id) :-
884 retract(current_transaction_id(DB, Last)),
885 !,
886 Id is Last + 1,
887 assert(current_transaction_id(DB, Id)).
888next_transaction_id(DB, Id) :-
889 db_files(DB, _, Journal),
890 exists_file(Journal),
891 !,
892 size_file(Journal, Size),
893 open_db(Journal, read, In, []),
894 call_cleanup(iterative_expand(In, Size, Last), close(In)),
895 Id is Last + 1,
896 assert(current_transaction_id(DB, Id)).
897next_transaction_id(DB, 1) :-
898 assert(current_transaction_id(DB, 1)).
899
900iterative_expand(_, 0, 0) :- !.
901iterative_expand(In, Size, Last) :- 902 Max is floor(log(Size)/log(2)),
903 between(10, Max, Step),
904 Offset is -(1<<Step),
905 seek(In, Offset, eof, _),
906 skip(In, 10), 907 read(In, T0),
908 last_transaction_id(T0, In, 0, Last),
909 Last > 0,
910 !.
911iterative_expand(In, _, Last) :- 912 seek(In, 0, bof, _),
913 read(In, T0),
914 last_transaction_id(T0, In, 0, Last).
915
916last_transaction_id(end_of_file, _, Last, Last) :- !.
917last_transaction_id(end(Id, _, _), In, _, Last) :-
918 read(In, T1),
919 last_transaction_id(T1, In, Id, Last).
920last_transaction_id(_, In, Id, Last) :-
921 read(In, T1),
922 last_transaction_id(T1, In, Id, Last).
923
924
936
937end_transactions(DBs, N) :-
938 end_transactions(DBs, DBs, N).
939
940end_transactions([], _, _).
941end_transactions([DB:Id|T], DBs, N) :-
942 journal_fd(DB, Fd),
943 once(select(DB:Id, DBs, Others)),
944 format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]),
945 sync_journal(DB, Fd),
946 end_transactions(T, DBs, N).
947
948
953
954sync_loaded_graphs(Graphs) :-
955 maplist(create_db, Graphs).
956
957
958 961
969
970journal_fd(DB, Fd) :-
971 source_journal_fd(DB, Fd),
972 !.
973journal_fd(DB, Fd) :-
974 with_mutex(rdf_journal_file,
975 journal_fd_(DB, Out)),
976 Fd = Out.
977
978journal_fd_(DB, Fd) :-
979 source_journal_fd(DB, Fd),
980 !.
981journal_fd_(DB, Fd) :-
982 limit_fd_pool,
983 db_files(DB, _Snapshot, Journal),
984 open_db(Journal, append, Fd,
985 [ close_on_abort(false)
986 ]),
987 time_stamp(Now),
988 format(Fd, '~q.~n', [start([time(Now)])]),
989 assert(source_journal_fd(DB, Fd)). 990
997
998limit_fd_pool :-
999 predicate_property(source_journal_fd(_, _), number_of_clauses(N)),
1000 !,
1001 ( rdf_option(max_open_journals(Max))
1002 -> true
1003 ; Max = 10
1004 ),
1005 Close is N - Max,
1006 forall(between(1, Close, _),
1007 close_oldest_journal).
1008limit_fd_pool.
1009
1010close_oldest_journal :-
1011 source_journal_fd(DB, _Fd),
1012 !,
1013 debug(rdf_persistency, 'Closing old journal for ~q', [DB]),
1014 close_journal(DB).
1015close_oldest_journal.
1016
1017
1023
1024sync_journal(DB, _) :-
1025 transaction_db(_, DB, _),
1026 !.
1027sync_journal(_, Fd) :-
1028 flush_output(Fd).
1029
1033
1034close_journal(DB) :-
1035 with_mutex(rdf_journal_file,
1036 close_journal_(DB)).
1037
1038close_journal_(DB) :-
1039 ( retract(source_journal_fd(DB, Fd))
1040 -> time_stamp(Now),
1041 format(Fd, '~q.~n', [end([time(Now)])]),
1042 close(Fd, [force(true)])
1043 ; true
1044 ).
1045
1049
1050close_journals :-
1051 forall(source_journal_fd(DB, _),
1052 catch(close_journal(DB), E,
1053 print_message(error, E))).
1054
1059
1060create_db(Graph) :-
1061 \+ rdf(_,_,_,Graph),
1062 !,
1063 debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]),
1064 delete_db(Graph).
1065create_db(Graph) :-
1066 debug(rdf_persistency, 'Saving Graph ~w', [Graph]),
1067 close_journal(Graph),
1068 db_abs_files(Graph, Snapshot, Journal),
1069 atom_concat(Snapshot, '.new', NewSnapshot),
1070 ( catch(( create_directory_levels(Snapshot),
1071 rdf_save_db(NewSnapshot, Graph)
1072 ), Error,
1073 ( print_message(warning, Error),
1074 fail
1075 ))
1076 -> ( exists_file(Journal)
1077 -> delete_file(Journal)
1078 ; true
1079 ),
1080 rename_file(NewSnapshot, Snapshot),
1081 debug(rdf_persistency, 'Saved Graph ~w', [Graph])
1082 ; catch(delete_file(NewSnapshot), _, true)
1083 ).
1084
1085
1089
1090delete_db(DB) :-
1091 with_mutex(rdf_journal_file,
1092 delete_db_(DB)).
1093
1094delete_db_(DB) :-
1095 close_journal_(DB),
1096 db_abs_files(DB, Snapshot, Journal),
1097 !,
1098 ( exists_file(Journal)
1099 -> delete_file(Journal)
1100 ; true
1101 ),
1102 ( exists_file(Snapshot)
1103 -> delete_file(Snapshot)
1104 ; true
1105 ).
1106delete_db_(_).
1107
1108 1111
1115
1116lock_db(Dir) :-
1117 lockfile(Dir, File),
1118 catch(open(File, update, Out, [lock(write), wait(false)]),
1119 error(permission_error(Access, _, _), _),
1120 locked_error(Access, Dir)),
1121 ( current_prolog_flag(pid, PID)
1122 -> true
1123 ; PID = 0 1124 ),
1125 time_stamp(Now),
1126 gethostname(Host),
1127 format(Out, '/* RDF Database is in use */~n~n', []),
1128 format(Out, '~q.~n', [ locked([ time(Now),
1129 pid(PID),
1130 host(Host)
1131 ])
1132 ]),
1133 flush_output(Out),
1134 set_end_of_stream(Out),
1135 assert(rdf_lock(Dir, lock(Out, File))),
1136 at_halt(unlock_db(Dir)).
1137
1138locked_error(lock, Dir) :-
1139 lockfile(Dir, File),
1140 ( catch(read_file_to_terms(File, Terms, []), _, fail),
1141 Terms = [locked(Args)]
1142 -> Context = rdf_locked(Args)
1143 ; Context = context(_, 'Database is in use')
1144 ),
1145 throw(error(permission_error(lock, rdf_db, Dir), Context)).
1146locked_error(open, Dir) :-
1147 throw(error(permission_error(lock, rdf_db, Dir),
1148 context(_, 'Lock file cannot be opened'))).
1149
1152
1153unlock_db(Dir) :-
1154 retract(rdf_lock(Dir, lock(Out, File))),
1155 !,
1156 unlock_db(Out, File).
1157unlock_db(_).
1158
1159unlock_db(Out, File) :-
1160 close(Out),
1161 delete_file(File).
1162
1163 1166
1167lockfile(Dir, LockFile) :-
1168 atomic_list_concat([Dir, /, lock], LockFile).
1169
1170directory_levels(Levels) :-
1171 rdf_option(directory_levels(Levels)),
1172 !.
1173directory_levels(2).
1174
1175db_file(Base, File) :-
1176 rdf_directory(Dir),
1177 directory_levels(Levels),
1178 db_file(Dir, Base, Levels, File).
1179
1180db_file(Dir, Base, Levels, File) :-
1181 dir_levels(Base, Levels, Segments, [Base]),
1182 atomic_list_concat([Dir|Segments], /, File).
1183
1184open_db(Base, Mode, Stream, Options) :-
1185 db_file(Base, File),
1186 create_directory_levels(File),
1187 open(File, Mode, Stream, [encoding(utf8)|Options]).
1188
1189create_directory_levels(_File) :-
1190 rdf_option(directory_levels(0)),
1191 !.
1192create_directory_levels(File) :-
1193 file_directory_name(File, Dir),
1194 make_directory_path(Dir).
1195
1196exists_db(Base) :-
1197 db_file(Base, File),
1198 exists_file(File).
1199
1204
1205dir_levels(_, 0, Segments, Segments) :- !.
1206dir_levels(File, Levels, Segments, Tail) :-
1207 rdf_atom_md5(File, 1, Hash),
1208 create_dir_levels(Levels, 0, Hash, Segments, Tail).
1209
1210create_dir_levels(0, _, _, Segments, Segments) :- !.
1211create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :-
1212 sub_atom(Hash, S, 2, _, S1),
1213 S2 is S+2,
1214 N2 is N-1,
1215 create_dir_levels(N2, S2, Hash, Segments0, Tail).
1216
1217
1225
1226db_files(DB, Snapshot, Journal) :-
1227 nonvar(DB),
1228 !,
1229 rdf_db_to_file(DB, Base),
1230 atom_concat(Base, '.trp', Snapshot),
1231 atom_concat(Base, '.jrn', Journal).
1232db_files(DB, Snapshot, Journal) :-
1233 nonvar(Snapshot),
1234 !,
1235 atom_concat(Base, '.trp', Snapshot),
1236 atom_concat(Base, '.jrn', Journal),
1237 rdf_db_to_file(DB, Base).
1238db_files(DB, Snapshot, Journal) :-
1239 nonvar(Journal),
1240 !,
1241 atom_concat(Base, '.jrn', Journal),
1242 atom_concat(Base, '.trp', Snapshot),
1243 rdf_db_to_file(DB, Base).
1244
1245db_abs_files(DB, Snapshot, Journal) :-
1246 db_files(DB, Snapshot0, Journal0),
1247 db_file(Snapshot0, Snapshot),
1248 db_file(Journal0, Journal).
1249
1250
1255
1256rdf_journal_file(Graph, Journal) :-
1257 ( var(Graph)
1258 -> rdf_graph(Graph)
1259 ; true
1260 ),
1261 db_abs_files(Graph, _Snapshot, Journal),
1262 exists_file(Journal).
1263
1264
1269
1270rdf_snapshot_file(Graph, Snapshot) :-
1271 ( var(Graph)
1272 -> rdf_graph(Graph) % also pick the empty graphs
1273 ; true
1274 ),
1275 db_abs_files(Graph, Snapshot, _Journal),
1276 exists_file(Snapshot).
1277
1278
1287
1288rdf_db_to_file(DB, File) :-
1289 file_base_db(File, DB),
1290 !.
1291rdf_db_to_file(DB, File) :-
1292 url_to_filename(DB, File),
1293 assert(file_base_db(File, DB)).
1294
1305
1306url_to_filename(URL, FileName) :-
1307 atomic(URL),
1308 !,
1309 atom_codes(URL, Codes),
1310 phrase(url_encode(EncCodes), Codes),
1311 atom_codes(FileName, EncCodes).
1312url_to_filename(URL, FileName) :-
1313 uri_encoded(path, URL, FileName).
1314
1315url_encode([0'+|T]) -->
1316 " ",
1317 !,
1318 url_encode(T).
1319url_encode([C|T]) -->
1320 alphanum(C),
1321 !,
1322 url_encode(T).
1323url_encode([C|T]) -->
1324 no_enc_extra(C),
1325 !,
1326 url_encode(T).
1327url_encode(Enc) -->
1328 ( "\r\n"
1329 ; "\n"
1330 ),
1331 !,
1332 { string_codes("%0D%0A", Codes),
1333 append(Codes, T, Enc)
1334 },
1335 url_encode(T).
1336url_encode([]) -->
1337 eos,
1338 !.
1339url_encode([0'%,D1,D2|T]) -->
1340 [C],
1341 { Dv1 is (C>>4 /\ 0xf),
1342 Dv2 is (C /\ 0xf),
1343 code_type(D1, xdigit(Dv1)),
1344 code_type(D2, xdigit(Dv2))
1345 },
1346 url_encode(T).
1347
1348eos([], []).
1349
1350alphanum(C) -->
1351 [C],
1352 { C < 128, 1353 code_type(C, alnum)
1354 }.
1355
(0'_) --> "_".
1357
1358
1359 1362
1366
1367reindex_db(Dir, Levels) :-
1368 directory_files(Dir, Files),
1369 reindex_files(Files, Dir, '.', 0, Levels),
1370 remove_empty_directories(Files, Dir).
1371
1372reindex_files([], _, _, _, _).
1373reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :-
1374 nofollow(Nofollow),
1375 !,
1376 reindex_files(Files, Dir, Prefix, CLevel, Levels).
1377reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :-
1378 CLevel \== Levels,
1379 file_name_extension(_Base, Ext, File),
1380 db_extension(Ext),
1381 !,
1382 directory_file_path(Prefix, File, DBFile),
1383 directory_file_path(Dir, DBFile, OldPath),
1384 db_file(Dir, File, Levels, NewPath),
1385 debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]),
1386 file_directory_name(NewPath, NewDir),
1387 make_directory_path(NewDir),
1388 rename_file(OldPath, NewPath),
1389 reindex_files(Files, Dir, Prefix, CLevel, Levels).
1390reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :-
1391 directory_file_path(Prefix, D, SubD),
1392 directory_file_path(Dir, SubD, AbsD),
1393 exists_directory(AbsD),
1394 \+ read_link(AbsD, _, _), 1395 !,
1396 directory_files(AbsD, SubFiles),
1397 CLevel2 is CLevel + 1,
1398 reindex_files(SubFiles, Dir, SubD, CLevel2, Levels),
1399 reindex_files(Files, Dir, Prefix, CLevel, Levels).
1400reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :-
1401 reindex_files(Files, Dir, Prefix, CLevel, Levels).
1402
1403
1404remove_empty_directories([], _).
1405remove_empty_directories([File|Files], Dir) :-
1406 \+ nofollow(File),
1407 directory_file_path(Dir, File, Path),
1408 exists_directory(Path),
1409 \+ read_link(Path, _, _),
1410 !,
1411 directory_files(Path, Content),
1412 exclude(nofollow, Content, RealContent),
1413 ( RealContent == []
1414 -> debug(rdf_persistency, 'Remove empty dir ~q', [Path]),
1415 delete_directory(Path)
1416 ; remove_empty_directories(RealContent, Path)
1417 ),
1418 remove_empty_directories(Files, Dir).
1419remove_empty_directories([_|Files], Dir) :-
1420 remove_empty_directories(Files, Dir).
1421
1422
1423 1426
1427save_prefixes(Dir) :-
1428 atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
1429 setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]),
1430 write_prefixes(Out),
1431 close(Out)).
1432
1433write_prefixes(Out) :-
1434 format(Out, '% Snapshot of defined RDF prefixes~n~n', []),
1435 forall(rdf_current_ns(Alias, URI),
1436 format(Out, 'prefix(~q, ~q).~n', [Alias, URI])).
1437
1445
1446load_prefixes(Dir) :-
1447 atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
1448 ( exists_file(PrefixFile)
1449 -> setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]),
1450 read_prefixes(In),
1451 close(In))
1452 ; true
1453 ).
1454
1455read_prefixes(Stream) :-
1456 read_term(Stream, T0, []),
1457 read_prefixes(T0, Stream).
1458
1459read_prefixes(end_of_file, _) :- !.
1460read_prefixes(prefix(Alias, URI), Stream) :-
1461 !,
1462 must_be(atom, Alias),
1463 must_be(atom, URI),
1464 catch(rdf_register_ns(Alias, URI, []), E,
1465 print_message(warning, E)),
1466 read_term(Stream, T, []),
1467 read_prefixes(T, Stream).
1468read_prefixes(Term, _) :-
1469 domain_error(prefix_term, Term).
1470
1471
1472 1475
1479
1480mkdir(Directory) :-
1481 exists_directory(Directory),
1482 !.
1483mkdir(Directory) :-
1484 make_directory(Directory).
1485
1489
1490time_stamp(Int) :-
1491 get_time(Now),
1492 Int is round(Now).
1493
1494
1495 1498
1499:- multifile
1500 prolog:message/3,
1501 prolog:message_context/3.
1502
1503prolog:message(rdf(Term)) -->
1504 message(Term).
1505
1506message(restoring(Type, Count, Jobs)) -->
1507 [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ].
1508message(restore(attached(Graphs, Triples, Time/Wall))) -->
1509 { catch(Percent is round(100*Time/Wall), _, Percent = 0) },
1510 [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'-
1511 [Graphs, Triples, Wall, Percent, Time] ].
1513message(restore(true, Action)) -->
1514 !,
1515 silent_message(Action).
1516message(restore(brief, Action)) -->
1517 !,
1518 brief_message(Action).
1519message(restore(_, Graph)) -->
1520 [ 'Restoring ~p ... '-[Graph], flush ].
1521message(restore(_, snapshot(_))) -->
1522 [ at_same_line, '(snapshot) '-[], flush ].
1523message(restore(_, journal(_))) -->
1524 [ at_same_line, '(journal) '-[], flush ].
1525message(restore(_, done(_, Time, Count))) -->
1526 [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
1528message(restore(_, snapshot(G, _))) -->
1529 [ 'Restoring ~p\t(snapshot)'-[G], flush ].
1530message(restore(_, journal(G, _))) -->
1531 [ 'Restoring ~p\t(journal)'-[G], flush ].
1532message(restore(_, done(_, Time, Count))) -->
1533 [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
1535message(update_failed(S,P,O,Action)) -->
1536 [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ].
1538message(reindex(Count, Depth)) -->
1539 [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ].
1540message(reindex(Depth)) -->
1541 [ 'Fixing database directory structure (~d levels)'-[Depth] ].
1542message(read_only) -->
1543 [ 'Cannot write persistent store; continuing in read-only mode.', nl,
1544 'All changes to the RDF store will be lost if this process terminates.'
1545 ].
1546
1547silent_message(_Action) --> [].
1548
1549brief_message(done(Graph, _Time, _Count, Nth, Total)) -->
1550 { file_base_name(Graph, Base) },
1551 [ at_same_line,
1552 '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total],
1553 flush
1554 ].
1555brief_message(_) --> [].
1556
1557
1558prolog:message_context(rdf_locked(Args)) -->
1559 { memberchk(time(Time), Args),
1560 memberchk(pid(Pid), Args),
1561 format_time(string(S), '%+', Time)
1562 },
1563 [ nl,
1564 'locked at ~s by process id ~w'-[S,Pid]
1565 ].