View source with formatted comments or as raw
    1/*  Part of SWISH
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2015-2020, VU University Amsterdam
    7                              CWI Amsterdam
    8                              SWI-Prolog Solutions b.v.
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(gitty_driver_files,
   38          [ gitty_open/2,               % +Store, +Options
   39            gitty_close/1,              % +Store
   40            gitty_file/4,               % +Store, ?Name, ?Ext, ?Hash
   41
   42            gitty_update_head/5,        % +Store, +Name, +OldCommit, +NewCommit
   43					% +DataHash
   44            delete_head/2,              % +Store, +Name
   45            set_head/3,                 % +Store, +Name, +Hash
   46            store_object/4,             % +Store, +Hash, +Header, +Data
   47            delete_object/2,            % +Store, +Hash
   48
   49            gitty_hash/2,               % +Store, ?Hash
   50            load_plain_commit/3,        % +Store, +Hash, -Meta
   51            load_object/5,              % +Store, +Hash, -Data, -Type, -Size
   52            gitty_object_file/3,        % +Store, +Hash, -File
   53
   54            repack_objects/2,           % +Store, +Options
   55            pack_objects/6,             % +Store, +Objs, +Packs, +PackDir,
   56                                        % -File, +Opts
   57            unpack_packs/1,             % +Store
   58            unpack_pack/2,              % +Store, +PackFile
   59
   60            attach_pack/2,              % +Store, +PackFile
   61            gitty_fsck/1,               % +Store
   62            fsck_pack/1,                % +PackFile
   63            load_object_from_pack/4,    % +Hash, -Data, -Type, -Size
   64
   65            gitty_rescan/1              % Store
   66          ]).   67:- use_module(library(apply)).   68:- use_module(library(zlib)).   69:- use_module(library(filesex)).   70:- use_module(library(lists)).   71:- use_module(library(apply)).   72:- use_module(library(error)).   73:- use_module(library(debug)).   74:- use_module(library(zlib)).   75:- use_module(library(hash_stream)).   76:- use_module(library(option)).   77:- use_module(library(dcg/basics)).   78:- use_module(library(redis)).   79:- use_module(library(redis_streams)).   80:- use_module(gitty, [is_gitty_hash/1]).   81
   82:- use_module(swish_redis).   83
   84/** <module> Gitty plain files driver
   85
   86This version of the driver uses plain files  to store the gitty data. It
   87consists of a nested directory  structure   with  files  named after the
   88hash. Objects and hash computation is the same as for `git`. The _heads_
   89(files) are computed on startup by scanning all objects. There is a file
   90=ref/head= that is updated if a head is updated. Other clients can watch
   91this file and update their notion  of   the  head. This implies that the
   92store can handle multiple clients that can  access a shared file system,
   93optionally shared using NFS from different machines.
   94
   95The store is simple and robust. The  main disadvantages are long startup
   96times as the store holds more objects and relatively high disk usage due
   97to rounding the small objects to disk allocation units.
   98
   99@bug    Shared access does not work on Windows.
  100*/
  101
  102:- dynamic
  103    head/4,                             % Store, Name, Ext, Hash
  104    store/2,                            % Store, Updated
  105    commit/3,                           % Store, Hash, Meta
  106    heads_input_stream_cache/2,         % Store, Stream
  107    pack_object/6,                      % Hash, Type, Size, Offset, Store,PackFile
  108    attached_packs/1,                   % Store
  109    attached_pack/2,                    % Store, PackFile
  110    redis_db/4.                         % Store, DB, RO, Prefix
  111
  112:- volatile
  113    head/4,
  114    store/2,
  115    commit/3,
  116    heads_input_stream_cache/2,
  117    pack_object/6,
  118    attached_packs/1,
  119    attached_pack/2.  120
  121:- multifile
  122    gitty:check_object/4.  123
  124% enable/disable syncing remote servers running on  the same file store.
  125% This facility requires shared access to files and thus doesn't work on
  126% Windows.
  127
  128:- if(current_prolog_flag(windows, true)).  129remote_sync(false).
  130:- else.  131remote_sync(true).
  132:- endif.  133
  134%!  gitty_open(+Store, +Options) is det.
  135%
  136%   Driver  specific  initialization.  Handles  setting    up   a  Redis
  137%   connection when requested.  This processes:
  138%
  139%     - redis(+DB)
  140%       Name of the redis DB to connect to.  See redis_server/3.
  141%     - redis_ro(+DB)
  142%       Read-only redis DB.
  143%     - redis_prefix(+Prefix)
  144%       Prefix for all keys.  This can be used to host multiple
  145%       SWISH servers on the same redis cluster.  Default is `swish`.
  146
  147gitty_open(Store, Options) :-
  148    option(redis(DB), Options),
  149    !,
  150    option(redis_ro(RO), Options, DB),
  151    option(redis_prefix(Prefix), Options, swish),
  152    asserta(redis_db(Store, DB, RO, Prefix)),
  153    thread_create(gitty_scan(Store), _, [detached(true)]).
  154gitty_open(_, _).
  155
  156
  157%!  gitty_close(+Store) is det.
  158%
  159%   Close resources associated with a store.
  160
  161gitty_close(Store) :-
  162    (   retract(heads_input_stream_cache(Store, In))
  163    ->  close(In)
  164    ;   true
  165    ),
  166    retractall(head(Store,_,_,_)),
  167    retractall(store(Store,_)),
  168    retractall(pack_object(_,_,_,_,Store,_)).
  169
  170
  171%!  gitty_file(+Store, ?File, ?Ext, ?Head) is nondet.
  172%
  173%   True when File entry in the  gitty   store  and Head is the HEAD
  174%   revision.
  175
  176gitty_file(Store, Head, Ext, Hash) :-
  177    redis_db(Store, _, _, _),
  178    !,
  179    gitty_scan(Store),
  180    redis_file(Store, Head, Ext, Hash).
  181gitty_file(Store, Head, Ext, Hash) :-
  182    gitty_scan(Store),
  183    head(Store, Head, Ext, Hash).
  184
  185%!  load_plain_commit(+Store, +Hash, -Meta:dict) is semidet.
  186%
  187%   Load the commit data as a  dict.   Loaded  commits are cached in
  188%   commit/3.  Note  that  only  adding  a  fact  to  the  cache  is
  189%   synchronized. This means that during  a   race  situation we may
  190%   load the same object  multiple  times   from  disk,  but this is
  191%   harmless while a lock  around   the  whole  predicate serializes
  192%   loading different objects, which is not needed.
  193
  194load_plain_commit(Store, Hash, Meta) :-
  195    must_be(atom, Store),
  196    must_be(atom, Hash),
  197    commit(Store, Hash, Meta),
  198    !.
  199load_plain_commit(Store, Hash, Meta) :-
  200    load_object(Store, Hash, String, _, _),
  201    term_string(Meta0, String, []),
  202    with_mutex(gitty_commit_cache,
  203               assert_cached_commit(Store, Hash, Meta0)),
  204    Meta = Meta0.
  205
  206assert_cached_commit(Store, Hash, Meta) :-
  207    commit(Store, Hash, Meta0),
  208    !,
  209    assertion(Meta0 =@= Meta).
  210assert_cached_commit(Store, Hash, Meta) :-
  211    assertz(commit(Store, Hash, Meta)).
  212
  213%!  store_object(+Store, +Hash, +Header:string, +Data:string) is det.
  214%
  215%   Store the actual object. The store  must associate Hash with the
  216%   concatenation of Hdr and Data.
  217
  218store_object(Store, Hash, _Hdr, _Data) :-
  219    pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
  220    !.
  221store_object(Store, Hash, Hdr, Data) :-
  222    gitty_object_file(Store, Hash, Path),
  223    with_mutex(gitty_file, exists_or_create(Path, Out0)),
  224    (   var(Out0)
  225    ->  true
  226    ;   setup_call_cleanup(
  227            zopen(Out0, Out, [format(gzip)]),
  228            format(Out, '~s~s', [Hdr, Data]),
  229            close(Out))
  230    ).
  231
  232exists_or_create(Path, _Out) :-
  233    exists_file(Path),
  234    !.
  235exists_or_create(Path, Out) :-
  236    file_directory_name(Path, Dir),
  237    make_directory_path(Dir),
  238    open(Path, write, Out, [encoding(utf8), lock(write)]).
  239
  240:- if(\+current_predicate(ensure_directory/1)).  241% in Library as of SWI-Prolog 9.1.20
  242ensure_directory(Dir) :-
  243    exists_directory(Dir),
  244    !.
  245ensure_directory(Dir) :-
  246    make_directory(Dir).
  247:- endif.  248
  249%!  store_object_raw(+Store, +Hash, +Bytes:string, -New) is det.
  250%
  251%   Store an object  from  raw  bytes.   This  is  used  for replicating
  252%   objects.
  253
  254store_object_raw(Store, Hash, _Bytes, false) :-
  255    pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
  256    !.
  257store_object_raw(Store, Hash, Bytes, New) :-
  258    gitty_object_file(Store, Hash, Path),
  259    with_mutex(gitty_file, exists_or_create(Path, Out)),
  260    (   var(Out)
  261    ->  New = false
  262    ;   call_cleanup(
  263            ( set_stream(Out, type(binary)),
  264              write(Out, Bytes)
  265            ),
  266            close(Out)),
  267        New = true
  268    ).
  269
  270%!  load_object(+Store, +Hash, -Data, -Type, -Size) is det.
  271%
  272%   Load the given object.
  273
  274load_object(_Store, Hash, Data, Type, Size) :-
  275    load_object_from_pack(Hash, Data0, Type0, Size0),
  276    !,
  277    f(Data0, Type0, Size0) = f(Data, Type, Size).
  278load_object(Store, Hash, Data, Type, Size) :-
  279    load_object_file(Store, Hash, Data0, Type0, Size0),
  280    !,
  281    f(Data0, Type0, Size0) = f(Data, Type, Size).
  282load_object(Store, Hash, Data, Type, Size) :-
  283    redis_db(Store, _, _, _),
  284    redis_replicate_get(Store, Hash),
  285    load_object_file(Store, Hash, Data, Type, Size).
  286
  287load_object_file(Store, Hash, Data, Type, Size) :-
  288    gitty_object_file(Store, Hash, Path),
  289    exists_file(Path),
  290    !,
  291    setup_call_cleanup(
  292        gzopen(Path, read, In, [encoding(utf8)]),
  293        read_object(In, Data, Type, Size),
  294        close(In)).
  295
  296%!  has_object(+Store, +Hash) is det.
  297%
  298%   True when Hash exists in store.
  299
  300has_object(Store, Hash) :-
  301    pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
  302    !.
  303has_object(Store, Hash) :-
  304    gitty_object_file(Store, Hash, Path),
  305    exists_file(Path).
  306
  307%!  load_object_raw(+Store, +Hash, -Data)
  308%
  309%   Load the compressed data for an object. Intended for replication.
  310
  311load_object_raw(_Store, Hash, Bytes) :-
  312    load_object_from_pack(Hash, Data, Type, Size),
  313    !,
  314    object_bytes(Type, Size, Data, Bytes).
  315load_object_raw(Store, Hash, Data) :-
  316    gitty_object_file(Store, Hash, Path),
  317    exists_file(Path),
  318    !,
  319    setup_call_cleanup(
  320        open(Path, read, In, [type(binary)]),
  321        read_string(In, _, Data),
  322        close(In)).
  323
  324%!  object_bytes(+Type, +Size, +Data, -Bytes) is det.
  325%
  326%   Encode an object with the given parameters in memory.
  327
  328object_bytes(Type, Size, Data, Bytes) :-
  329    setup_call_cleanup(
  330        new_memory_file(MF),
  331        ( setup_call_cleanup(
  332              open_memory_file(MF, write, Out, [encoding(octet)]),
  333              setup_call_cleanup(
  334                  zopen(Out, ZOut, [format(gzip), close_parent(false)]),
  335                  ( set_stream(ZOut, encoding(utf8)),
  336                    format(ZOut, '~w ~d\u0000~w', [Type, Size, Data])
  337                  ),
  338                  close(ZOut)),
  339              close(Out)),
  340          memory_file_to_string(MF, Bytes, octet)
  341        ),
  342        free_memory_file(MF)).
  343
  344
  345%!  load_object_header(+Store, +Hash, -Type, -Size) is det.
  346%
  347%   Load the header of an object
  348
  349load_object_header(Store, Hash, Type, Size) :-
  350    gitty_object_file(Store, Hash, Path),
  351    setup_call_cleanup(
  352        gzopen(Path, read, In, [encoding(utf8)]),
  353        read_object_hdr(In, Type, Size),
  354        close(In)).
  355
  356read_object(In, Data, Type, Size) :-
  357    read_object_hdr(In, Type, Size),
  358    read_string(In, _, Data).
  359
  360read_object_hdr(In, Type, Size) :-
  361    get_code(In, C0),
  362    read_hdr(C0, In, Hdr),
  363    phrase((nonblanks(TypeChars), " ", integer(Size)), Hdr),
  364    atom_codes(Type, TypeChars).
  365
  366read_hdr(C, In, [C|T]) :-
  367    C > 0,
  368    !,
  369    get_code(In, C1),
  370    read_hdr(C1, In, T).
  371read_hdr(_, _, []).
  372
  373%!  gitty_rescan(?Store) is det.
  374%
  375%   Update our view of the shared   storage  for all stores matching
  376%   Store.
  377
  378gitty_rescan(Store) :-
  379    retractall(store(Store, _)).
  380
  381%!  gitty_scan(+Store) is det.
  382%
  383%   Scan gitty store for files (entries),   filling  head/3. This is
  384%   performed lazily at first access to the store.
  385%
  386%   @tdb    Possibly we need to maintain a cached version of this
  387%           index to avoid having to open all objects of the gitty
  388%           store.
  389
  390gitty_scan(Store) :-
  391    store(Store, _),
  392    !,
  393    remote_updates(Store).
  394gitty_scan(Store) :-
  395    with_mutex(gitty, gitty_scan_sync(Store)).
  396
  397:- thread_local
  398    latest/3.  399
  400gitty_scan_sync(Store) :-
  401    store(Store, _),
  402    !.
  403gitty_scan_sync(Store) :-
  404    redis_db(Store, _, _, _),
  405    !,
  406    gitty_attach_packs(Store),
  407    redis_ensure_heads(Store),
  408    get_time(Now),
  409    assertz(store(Store, Now)).
  410:- if(remote_sync(true)).  411gitty_scan_sync(Store) :-
  412    remote_sync(true),
  413    !,
  414    gitty_attach_packs(Store),
  415    restore_heads_from_remote(Store).
  416:- endif.  417gitty_scan_sync(Store) :-
  418    gitty_attach_packs(Store),
  419    read_heads_from_objects(Store).
  420
  421%!  read_heads_from_objects(+Store) is det.
  422%
  423%   Establish the head(Store,File,Ext,Hash) relation  by reading all
  424%   objects and adding a fact for the most recent commit.
  425
  426read_heads_from_objects(Store) :-
  427    gitty_scan_latest(Store),
  428    forall(retract(latest(Name, Hash, _Time)),
  429           assert_head(Store, Name, Hash)),
  430    get_time(Now),
  431    assertz(store(Store, Now)).
  432
  433assert_head(Store, Name, Hash) :-
  434    file_name_extension(_, Ext, Name),
  435    assertz(head(Store, Name, Ext, Hash)).
  436
  437
  438%!  gitty_scan_latest(+Store)
  439%
  440%   Scans the gitty store, extracting  the   latest  version of each
  441%   named entry.
  442
  443gitty_scan_latest(Store) :-
  444    retractall(head(Store, _, _, _)),
  445    retractall(latest(_, _, _)),
  446    (   gitty_hash(Store, Hash),
  447        load_object(Store, Hash, Data, commit, _Size),
  448        term_string(Meta, Data, []),
  449        _{name:Name, time:Time} :< Meta,
  450        (   latest(Name, _, OldTime),
  451            OldTime > Time
  452        ->  true
  453        ;   retractall(latest(Name, _, _)),
  454            assertz(latest(Name, Hash, Time))
  455        ),
  456        fail
  457    ;   true
  458    ).
  459
  460
  461%!  gitty_hash(+Store, ?Hash) is nondet.
  462%
  463%   True when Hash is an object in the store.
  464
  465gitty_hash(Store, Hash) :-
  466    var(Hash),
  467    !,
  468    (   gitty_attach_packs(Store),
  469        pack_object(Hash, _Type, _Size, _Offset, Store, _Pack)
  470    ;   gitty_file_object(Store, Hash)
  471    ).
  472gitty_hash(Store, Hash) :-
  473    (   gitty_attach_packs(Store),
  474        pack_object(Hash, _Type, _Size, _Offset, Store, _Pack)
  475    ->  true
  476    ;   gitty_object_file(Store, Hash, File),
  477        exists_file(File)
  478    ).
  479
  480gitty_file_object(Store, Hash) :-
  481    access_file(Store, exist),
  482    directory_files(Store, Level0),
  483    member(E0, Level0),
  484    E0 \== '..',
  485    atom_length(E0, 2),
  486    directory_file_path(Store, E0, Dir0),
  487    directory_files(Dir0, Level1),
  488    member(E1, Level1),
  489    E1 \== '..',
  490    atom_length(E1, 2),
  491    directory_file_path(Dir0, E1, Dir),
  492    directory_files(Dir, Files),
  493    member(File, Files),
  494    atom_length(File, 36),
  495    atomic_list_concat([E0,E1,File], Hash).
  496
  497%!  delete_object(+Store, +Hash)
  498%
  499%   Delete an existing object
  500
  501delete_object(Store, Hash) :-
  502    gitty_object_file(Store, Hash, File),
  503    delete_file(File).
  504
  505%!  gitty_object_file(+Store, +Hash, -Path) is det.
  506%
  507%   True when Path is the file  at   which  the  object with Hash is
  508%   stored.
  509
  510gitty_object_file(Store, Hash, Path) :-
  511    sub_string(Hash, 0, 2, _, Dir0),
  512    sub_string(Hash, 2, 2, _, Dir1),
  513    sub_string(Hash, 4, _, 0, File),
  514    atomic_list_concat([Store, Dir0, Dir1, File], /, Path).
  515
  516
  517                 /*******************************
  518                 *            SYNCING           *
  519                 *******************************/
  520
  521%!  gitty_update_head(+Store, +Name, +OldCommit,
  522%!                    +NewCommit, +DataHash) is det.
  523%
  524%   Update the head of a gitty  store   for  Name.  OldCommit is the
  525%   current head and NewCommit is the new  head. If Name is created,
  526%   and thus there is no head, OldCommit must be `-`.
  527%
  528%   This operation can fail because another   writer has updated the
  529%   head.  This can both be in-process or another process.
  530
  531gitty_update_head(Store, Name, OldCommit, NewCommit, DataHash) :-
  532    redis_db(Store, _, _, _),
  533    !,
  534    redis_update_head(Store, Name, OldCommit, NewCommit, DataHash).
  535gitty_update_head(Store, Name, OldCommit, NewCommit, _) :-
  536    with_mutex(gitty,
  537               gitty_update_head_sync(Store, Name, OldCommit, NewCommit)).
  538
  539:- if(remote_sync(true)).  540gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :-
  541    remote_sync(true),
  542    !,
  543    setup_call_cleanup(
  544        heads_output_stream(Store, HeadsOut),
  545        gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut),
  546        close(HeadsOut)).
  547:- endif.  548gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :-
  549    gitty_update_head_sync2(Store, Name, OldCommit, NewCommit).
  550
  551gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut) :-
  552    gitty_update_head_sync2(Store, Name, OldCommit, NewCommit),
  553    format(HeadsOut, '~q.~n', [head(Name, OldCommit, NewCommit)]).
  554
  555gitty_update_head_sync2(Store, Name, OldCommit, NewCommit) :-
  556    gitty_scan(Store),              % fetch remote changes
  557    (   OldCommit == (-)
  558    ->  (   head(Store, Name, _, _)
  559        ->  throw(error(gitty(file_exists(Name),_)))
  560        ;   assert_head(Store, Name, NewCommit)
  561        )
  562    ;   (   retract(head(Store, Name, _, OldCommit))
  563        ->  assert_head(Store, Name, NewCommit)
  564        ;   throw(error(gitty(not_at_head(Name, OldCommit)), _))
  565        )
  566    ).
  567
  568%!  remote_updates(+Store)
  569%
  570%   Watch for remote updates to the store. We only do this if we did
  571%   not do so the last second.
  572
  573:- dynamic
  574    last_remote_sync/2.  575
  576:- if(remote_sync(false)).  577remote_updates(_) :-
  578    remote_sync(false),
  579    !.
  580:- endif.  581remote_updates(Store) :-
  582    remote_up_to_data(Store),
  583    !.
  584remote_updates(Store) :-
  585    with_mutex(gitty, remote_updates_sync(Store)).
  586
  587remote_updates_sync(Store) :-
  588    remote_up_to_data(Store),
  589    !.
  590remote_updates_sync(Store) :-
  591    retractall(last_remote_sync(Store, _)),
  592    get_time(Now),
  593    asserta(last_remote_sync(Store, Now)),
  594    remote_update(Store).
  595
  596remote_up_to_data(Store) :-
  597    last_remote_sync(Store, Last),
  598    get_time(Now),
  599    Now-Last < 1.
  600
  601remote_update(Store) :-
  602    remote_updates(Store, List),
  603    maplist(update_head(Store), List).
  604
  605update_head(Store, head(Name, OldCommit, NewCommit)) :-
  606    (   OldCommit == (-)
  607    ->  \+ head(Store, Name, _, _)
  608    ;   retract(head(Store, Name, _, OldCommit))
  609    ),
  610    !,
  611    assert_head(Store, Name, NewCommit).
  612update_head(_, _).
  613
  614%!  remote_updates(+Store, -List) is det.
  615%
  616%   Find updates from other gitties  on   the  same filesystem. Note
  617%   that we have to push/pop the input   context to avoid creating a
  618%   notion of an  input  context   which  possibly  relate  messages
  619%   incorrectly to the sync file.
  620
  621remote_updates(Store, List) :-
  622    heads_input_stream(Store, Stream),
  623    setup_call_cleanup(
  624        '$push_input_context'(gitty_sync),
  625        read_new_terms(Stream, List),
  626        '$pop_input_context').
  627
  628read_new_terms(Stream, Terms) :-
  629    read(Stream, First),
  630    read_new_terms(First, Stream, Terms).
  631
  632read_new_terms(end_of_file, _, List) :-
  633    !,
  634    List = [].
  635read_new_terms(Term, Stream, [Term|More]) :-
  636    read(Stream, Term2),
  637    read_new_terms(Term2, Stream, More).
  638
  639heads_output_stream(Store, Out) :-
  640    heads_file(Store, HeadsFile),
  641    open(HeadsFile, append, Out,
  642         [ encoding(utf8),
  643           lock(exclusive)
  644         ]).
  645
  646heads_input_stream(Store, Stream) :-
  647    heads_input_stream_cache(Store, Stream0),
  648    !,
  649    Stream = Stream0.
  650heads_input_stream(Store, Stream) :-
  651    heads_file(Store, File),
  652    between(1, 2, _),
  653    catch(open(File, read, In,
  654               [ encoding(utf8),
  655                 eof_action(reset)
  656               ]),
  657          _,
  658          create_heads_file(Store)),
  659    !,
  660    assert(heads_input_stream_cache(Store, In)),
  661    Stream = In.
  662
  663create_heads_file(Store) :-
  664    call_cleanup(
  665        heads_output_stream(Store, Out),
  666        close(Out)),
  667    fail.                                   % always fail!
  668
  669heads_file(Store, HeadsFile) :-
  670    ensure_directory(Store),
  671    directory_file_path(Store, ref, RefDir),
  672    ensure_directory(RefDir),
  673    directory_file_path(RefDir, head, HeadsFile).
  674
  675%!  restore_heads_from_remote(Store)
  676%
  677%   Restore the known heads by reading the remote sync file.
  678
  679restore_heads_from_remote(Store) :-
  680    heads_file(Store, File),
  681    exists_file(File),
  682    setup_call_cleanup(
  683        open(File, read, In, [encoding(utf8)]),
  684        restore_heads(Store, In),
  685        close(In)),
  686    !,
  687    get_time(Now),
  688    assertz(store(Store, Now)).
  689restore_heads_from_remote(Store) :-
  690    read_heads_from_objects(Store),
  691    heads_file(Store, File),
  692    setup_call_cleanup(
  693        open(File, write, Out, [encoding(utf8)]),
  694        save_heads(Store, Out),
  695        close(Out)),
  696    !.
  697
  698restore_heads(Store, In) :-
  699    read(In, Term0),
  700    Term0 = epoch(_),
  701    read(In, Term1),
  702    restore_heads(Term1, In, Store).
  703
  704restore_heads(end_of_file, _, _) :- !.
  705restore_heads(head(File, _, Hash), In, Store) :-
  706    retractall(head(Store, File, _, _)),
  707    assert_head(Store, File, Hash),
  708    read(In, Term),
  709    restore_heads(Term, In, Store).
  710
  711save_heads(Store, Out) :-
  712    get_time(Now),
  713    format(Out, 'epoch(~0f).~n~n', [Now]),
  714    forall(head(Store, File, _, Hash),
  715           format(Out, '~q.~n', [head(File, -, Hash)])).
  716
  717
  718%!  delete_head(+Store, +Head) is det.
  719%
  720%   Delete Head from Store. Used  by   gitty_fsck/1  to remove heads
  721%   that have no commits. Should  we   forward  this  to remotes, or
  722%   should they do their own thing?
  723
  724delete_head(Store, Head) :-
  725    redis_db(Store, _, _, _),
  726    !,
  727    redis_delete_head(Store, Head).
  728delete_head(Store, Head) :-
  729    retractall(head(Store, Head, _, _)).
  730
  731%!  set_head(+Store, +File, +Hash) is det.
  732%
  733%   Set the head of the given File to Hash
  734
  735set_head(Store, File, Hash) :-
  736    redis_db(Store, _, _, _),
  737    !,
  738    redis_set_head(Store, File, Hash).
  739set_head(Store, File, Hash) :-
  740    file_name_extension(_, Ext, File),
  741    (   head(Store, File, _, Hash0)
  742    ->  (   Hash == Hash0
  743        ->  true
  744        ;   asserta(head(Store, File, Ext, Hash)),
  745            retractall(head(Store, File, _, Hash0))
  746        )
  747    ;   asserta(head(Store, File, Ext, Hash))
  748    ).
  749
  750
  751                 /*******************************
  752                 *            PACKS             *
  753                 *******************************/
  754
  755/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  756
  757<pack file> := <header>
  758               <file>*
  759<header>    := "gitty(Version).\n" <object>* "end_of_header.\n"
  760<object>    := obj(Hash, Type, Size, FileSize)
  761- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  762
  763pack_version(1).
  764
  765%!  repack_objects(+Store, +Options) is det.
  766%
  767%   Repack  objects  of  Store  for  reduced  disk  usage  and  enhanced
  768%   performance. By default this picks up all  file objects of the store
  769%   and all existing small pack files.  Options:
  770%
  771%     - small_pack(+Bytes)
  772%     Consider all packs with less than Bytes as small and repack them.
  773%     Default 10Mb
  774%     - min_files(+Count)
  775%     Do not repack if there are less than Count new files.
  776%     Default 1,000.
  777
  778:- debug(gitty(pack)).  779
  780repack_objects(Store, Options) :-
  781    option(min_files(MinFiles), Options, 1_000),
  782    findall(Object, gitty_file_object(Store, Object), Objects),
  783    length(Objects, NewFiles),
  784    debug(gitty(pack), 'Found ~D file objects', [NewFiles]),
  785    (   NewFiles >= MinFiles
  786    ->  pack_files(Store, ExistingPacks),
  787        option(small_pack(MaxSize), Options, 10_000_000),
  788        include(small_file(MaxSize), ExistingPacks, PackFiles),
  789        (   debugging(gitty(pack))
  790        ->  length(PackFiles, PackCount),
  791            debug(gitty(pack), 'Found ~D small packs', [PackCount])
  792        ;   true
  793        ),
  794        directory_file_path(Store, pack, PackDir),
  795        make_directory_path(PackDir),
  796        pack_objects(Store, Objects, PackFiles, PackDir, _PackFile, Options)
  797    ;   debug(gitty(pack), 'Nothing to do', [])
  798    ).
  799
  800small_file(MaxSize, File) :-
  801    size_file(File, Size),
  802    Size < MaxSize.
  803
  804%!  pack_objects(+Store, +Objects, +Packs, +PackDir,
  805%!               -PackFile, +Options) is det.
  806%
  807%   Pack the given objects and pack files into a new pack.
  808
  809pack_objects(Store, Objects, Packs, PackDir, PackFile, Options) :-
  810    with_mutex(gitty_pack,
  811               pack_objects_sync(Store, Objects, Packs, PackDir,
  812                                 PackFile, Options)).
  813
  814pack_objects_sync(_Store, [], [Pack], _, [Pack], _) :-
  815    !.
  816pack_objects_sync(Store, Objects, Packs, PackDir, PackFilePath, Options) :-
  817    length(Objects, ObjCount),
  818    length(Packs, PackCount),
  819    debug(gitty(pack), 'Repacking ~D objects and ~D packs',
  820          [ObjCount, PackCount]),
  821    maplist(object_info(Store), Objects, FileInfo),
  822    maplist(pack_info(Store), Packs, PackInfo),
  823    append([FileInfo|PackInfo], Info0),
  824    sort(1, @<, Info0, Info),           % remove possible duplicates
  825    (   debugging(gitty(pack))
  826    ->  (   PackCount > 0
  827        ->  length(Info, FinalObjCount),
  828            debug(gitty(pack), 'Total ~D objects', [FinalObjCount])
  829        ;   true
  830        )
  831    ;   true
  832    ),
  833    directory_file_path(PackDir, 'pack-create', TmpPack),
  834    setup_call_cleanup(
  835        (   open(TmpPack, write, Out0, [type(binary), lock(write)]),
  836            open_hash_stream(Out0, Out, [algorithm(sha1)])
  837        ),
  838        (   write_signature(Out),
  839            maplist(write_header(Out), Info),
  840            format(Out, 'end_of_header.~n', []),
  841            maplist(add_file(Out, Store), Info),
  842            stream_hash(Out, SHA1)
  843        ),
  844        close(Out)),
  845    format(atom(PackFile), 'pack-~w.pack', [SHA1]),
  846    directory_file_path(PackDir, PackFile, PackFilePath),
  847    rename_file(TmpPack, PackFilePath),
  848    debug(gitty(pack), 'Attaching ~p', [PackFilePath]),
  849    attach_pack(Store, PackFilePath),
  850    remove_objects_after_pack(Store, Objects, Options),
  851    delete(Packs, PackFilePath, RmPacks),
  852    remove_repacked_packs(Store, RmPacks, Options),
  853    debug(gitty(pack), 'Packing completed', []).
  854
  855object_info(Store, Object, obj(Object, Type, Size, FileSize)) :-
  856    gitty_object_file(Store, Object, File),
  857    load_object_header(Store, Object, Type, Size),
  858    size_file(File, FileSize).
  859
  860pack_info(Store, PackFile, Objects) :-
  861    attach_pack(Store, PackFile),
  862    pack_read_header(PackFile, _Version, _DataOffset, Objects).
  863
  864write_signature(Out) :-
  865    pack_version(Version),
  866    format(Out, "gitty(~d).~n", [Version]).
  867
  868write_header(Out, obj(Object, Type, Size, FileSize)) :-
  869    format(Out, 'obj(~q,~q,~d,~d).~n', [Object, Type, Size, FileSize]).
  870
  871%!  add_file(+Out, +Store, +Object) is det.
  872%
  873%   Add Object from Store to the pack stream Out.
  874
  875add_file(Out, Store, obj(Object, _Type, _Size, _FileSize)) :-
  876    gitty_object_file(Store, Object, File),
  877    exists_file(File),
  878    !,
  879    setup_call_cleanup(
  880        open(File, read, In, [type(binary)]),
  881        copy_stream_data(In, Out),
  882        close(In)).
  883add_file(Out, Store, obj(Object, Type, Size, FileSize)) :-
  884    pack_object(Object, Type, Size, Offset, Store, PackFile),
  885    setup_call_cleanup(
  886        open(PackFile, read, In, [type(binary)]),
  887        (   seek(In, Offset, bof, Offset),
  888            copy_stream_data(In, Out, FileSize)
  889        ),
  890        close(In)).
  891
  892
  893%!  gitty_fsck(+Store) is det.
  894%
  895%   Validate all packs associated with Store
  896
  897gitty_fsck(Store) :-
  898    pack_files(Store, PackFiles),
  899    maplist(fsck_pack, PackFiles).
  900
  901%!  fsck_pack(+File) is det.
  902%
  903%   Validate the integrity of the pack file File.
  904
  905fsck_pack(File) :-
  906    debug(gitty(pack), 'fsck ~p', [File]),
  907    check_pack_hash(File),
  908    debug(gitty(pack), 'fsck ~p: checking objects', [File]),
  909    check_pack_objects(File),
  910    debug(gitty(pack), 'fsck ~p: done', [File]).
  911
  912check_pack_hash(File) :-
  913    file_base_name(File, Base),
  914    file_name_extension(Plain, Ext, Base),
  915    must_be(oneof([pack]), Ext),
  916    atom_concat('pack-', Hash, Plain),
  917    setup_call_cleanup(
  918        (   open(File, read, In0, [type(binary)]),
  919            open_hash_stream(In0, In, [algorithm(sha1)])
  920        ),
  921        (   setup_call_cleanup(
  922                open_null_stream(Null),
  923                copy_stream_data(In, Null),
  924                close(Null)),
  925            stream_hash(In, SHA1)
  926        ),
  927        close(In)),
  928    assertion(Hash == SHA1).
  929
  930check_pack_objects(PackFile) :-
  931    setup_call_cleanup(
  932        open(PackFile, read, In, [type(binary)]),
  933        (  read_header(In, Version, DataOffset, Objects),
  934           set_stream(In, encoding(utf8)),
  935           foldl(check_object(In, PackFile, Version), Objects, DataOffset, _)
  936        ),
  937        close(In)).
  938
  939check_object(In, PackFile, _Version,
  940             obj(Object, Type, Size, FileSize),
  941             Offset0, Offset) :-
  942    Offset is Offset0+FileSize,
  943    byte_count(In, Here),
  944    (   Here == Offset0
  945    ->  true
  946    ;   print_message(warning, pack(reposition(Here, Offset0))),
  947        seek(In, Offset0, bof, Offset0)
  948    ),
  949    (   setup_call_cleanup(
  950            zopen(In, In2, [multi_part(false), close_parent(false)]),
  951            catch(read_object(In2, Data, _0RType, _0RSize), E,
  952                  ( print_message(error,
  953                                  gitty(PackFile, fsck(read_object(Object, E)))),
  954                    fail)),
  955            close(In2))
  956    ->  byte_count(In, End),
  957        (   End == Offset
  958        ->  true
  959        ;   print_message(error,
  960                          gitty(PackFile, fsck(object_end(Object, End,
  961                                                          Offset0, Offset,
  962                                                          Data))))
  963        ),
  964        assertion(Type == _0RType),
  965        assertion(Size == _0RSize),
  966        gitty:check_object(Object, Data, Type, Size)
  967    ;   true
  968    ).
  969
  970
  971%!  gitty_attach_packs(+Store) is det.
  972%
  973%   Attach all packs for Store
  974
  975gitty_attach_packs(Store) :-
  976    attached_packs(Store),
  977    !.
  978gitty_attach_packs(Store) :-
  979    with_mutex(gitty_attach_packs,
  980               gitty_attach_packs_sync(Store)).
  981
  982gitty_attach_packs_sync(Store) :-
  983    attached_packs(Store),
  984    !.
  985gitty_attach_packs_sync(Store) :-
  986    pack_files(Store, PackFiles),
  987    maplist(attach_pack(Store), PackFiles),
  988    asserta(attached_packs(Store)).
  989
  990pack_files(Store, Packs) :-
  991    directory_file_path(Store, pack, PackDir),
  992    exists_directory(PackDir),
  993    !,
  994    directory_files(PackDir, Files),
  995    convlist(is_pack(PackDir), Files, Packs).
  996pack_files(_, []).
  997
  998is_pack(PackDir, File, Path) :-
  999    file_name_extension(_, pack, File),
 1000    directory_file_path(PackDir, File, Path).
 1001
 1002%!  attach_pack(+Store, +PackFile)
 1003%
 1004%   Load the index of Pack into memory.
 1005
 1006attach_pack(Store, PackFile) :-
 1007    attached_pack(Store, PackFile),
 1008    !.
 1009attach_pack(Store, PackFile) :-
 1010    retractall(pack_object(_,_,_,_,_,PackFile)),
 1011    pack_read_header(PackFile, Version, DataOffset, Objects),
 1012    foldl(assert_object(Store, PackFile, Version), Objects, DataOffset, _),
 1013    assertz(attached_pack(Store, PackFile)).
 1014
 1015pack_read_header(PackFile, Version, DataOffset, Objects) :-
 1016    setup_call_cleanup(
 1017        open(PackFile, read, In, [type(binary)]),
 1018        read_header(In, Version, DataOffset, Objects),
 1019        close(In)).
 1020
 1021read_header(In, Version, DataOffset, Objects) :-
 1022    read(In, Signature),
 1023    (   Signature = gitty(Version)
 1024    ->  true
 1025    ;   domain_error(gitty_pack_file, Objects)
 1026    ),
 1027    read(In, Term),
 1028    read_index(Term, In, Objects),
 1029    get_code(In, Code),
 1030    assertion(Code == 0'\n),
 1031    byte_count(In, DataOffset).
 1032
 1033read_index(end_of_header, _, []) :-
 1034    !.
 1035read_index(Object, In, [Object|T]) :-
 1036    read(In, Term2),
 1037    read_index(Term2, In, T).
 1038
 1039assert_object(Store, Pack, _Version,
 1040              obj(Object, Type, Size, FileSize),
 1041              Offset0, Offset) :-
 1042    Offset is Offset0+FileSize,
 1043    assertz(pack_object(Object, Type, Size, Offset0, Store, Pack)).
 1044
 1045%!  detach_pack(+Store, +Pack) is det.
 1046%
 1047%   Remove a pack file from the memory index.
 1048
 1049detach_pack(Store, Pack) :-
 1050    retractall(pack_object(_, _, _, _, Store, Pack)),
 1051    retractall(attached_pack(Store, Pack)).
 1052
 1053%!  load_object_from_pack(+Hash, -Data, -Type, -Size) is semidet.
 1054%
 1055%   True when Hash is in a pack and can be loaded.
 1056
 1057load_object_from_pack(Hash, Data, Type, Size) :-
 1058    pack_object(Hash, Type, Size, Offset, _Store, Pack),
 1059    setup_call_cleanup(
 1060        open(Pack, read, In, [type(binary)]),
 1061        read_object_at(In, Offset, Data, Type, Size),
 1062        close(In)).
 1063
 1064read_object_at(In, Offset, Data, Type, Size) :-
 1065    seek(In, Offset, bof, Offset),
 1066    read_object_here(In, Data, Type, Size).
 1067
 1068read_object_here(In, Data, Type, Size) :-
 1069    stream_property(In, encoding(Enc)),
 1070    setup_call_cleanup(
 1071        ( set_stream(In, encoding(utf8)),
 1072          zopen(In, In2, [multi_part(false), close_parent(false)])
 1073        ),
 1074        read_object(In2, Data, Type, Size),
 1075        ( close(In2),
 1076          set_stream(In, encoding(Enc))
 1077        )).
 1078
 1079
 1080%!  unpack_packs(+Store) is det.
 1081%
 1082%   Unpack all packs.
 1083
 1084unpack_packs(Store) :-
 1085    absolute_file_name(Store, AbsStore, [file_type(directory),
 1086                                         access(read)]),
 1087    pack_files(AbsStore, Packs),
 1088    maplist(unpack_pack(AbsStore), Packs).
 1089
 1090%!  unpack_pack(+Store, +Pack) is det.
 1091%
 1092%   Turn a pack back into a plain object files
 1093
 1094unpack_pack(Store, PackFile) :-
 1095    pack_read_header(PackFile, _Version, DataOffset, Objects),
 1096    setup_call_cleanup(
 1097        open(PackFile, read, In, [type(binary)]),
 1098        foldl(create_file(Store, In), Objects, DataOffset, _),
 1099        close(In)),
 1100    detach_pack(Store, PackFile),
 1101    delete_file(PackFile).
 1102
 1103create_file(Store, In, obj(Object, _Type, _Size, FileSize), Offset0, Offset) :-
 1104    Offset is Offset0+FileSize,
 1105    gitty_object_file(Store, Object, Path),
 1106    with_mutex(gitty_file, exists_or_recreate(Path, Out)),
 1107        (   var(Out)
 1108        ->  true
 1109        ;   setup_call_cleanup(
 1110                seek(In, Offset0, bof, Offset0),
 1111                copy_stream_data(In, Out, FileSize),
 1112                close(Out))
 1113        ).
 1114
 1115exists_or_recreate(Path, _Out) :-
 1116    exists_file(Path),
 1117    !.
 1118exists_or_recreate(Path, Out) :-
 1119    file_directory_name(Path, Dir),
 1120    make_directory_path(Dir),
 1121    open(Path, write, Out, [type(binary), lock(write)]).
 1122
 1123
 1124%!  remove_objects_after_pack(+Store, +Objects, +Options) is det.
 1125%
 1126%   Remove the indicated (file) objects from Store.
 1127
 1128remove_objects_after_pack(Store, Objects, Options) :-
 1129    debug(gitty(pack), 'Deleting plain files', []),
 1130    maplist(delete_object(Store), Objects),
 1131    (   option(prune_empty_directories(true), Options, true)
 1132    ->  debug(gitty(pack), 'Pruning empty directories', []),
 1133        prune_empty_directories(Store)
 1134    ;   true
 1135    ).
 1136
 1137%!  remove_repacked_packs(+Store, +Packs, +Options)
 1138%
 1139%   Remove packs that have been repacked.
 1140
 1141remove_repacked_packs(Store, Packs, Options) :-
 1142    maplist(remove_pack(Store, Options), Packs).
 1143
 1144remove_pack(Store, _Options, Pack) :-
 1145    detach_pack(Store, Pack),
 1146    delete_file(Pack).
 1147
 1148%!  prune_empty_directories(+Dir) is det.
 1149%
 1150%   Prune directories that are  empty  below   Dir.  Dir  itself  is not
 1151%   removed, even if it is empty.
 1152
 1153prune_empty_directories(Dir) :-
 1154    prune_empty_directories(Dir, 0).
 1155
 1156prune_empty_directories(Dir, Level) :-
 1157    directory_files(Dir, AllFiles),
 1158    exclude(hidden, AllFiles, Files),
 1159    (   Files == [],
 1160        Level > 0
 1161    ->  delete_directory_async(Dir)
 1162    ;   convlist(prune_empty_directories(Dir, Level), Files, Left),
 1163        (   Left == [],
 1164            Level > 0
 1165        ->  delete_directory_async(Dir)
 1166        ;   true
 1167        )
 1168    ).
 1169
 1170hidden(.).
 1171hidden(..).
 1172
 1173prune_empty_directories(Parent, Level0, File, _) :-
 1174    directory_file_path(Parent, File, Path),
 1175    exists_directory(Path),
 1176    !,
 1177    Level is Level0 + 1,
 1178    prune_empty_directories(Path, Level),
 1179    fail.
 1180prune_empty_directories(_, _, File, File).
 1181
 1182delete_directory_async(Dir) :-
 1183    with_mutex(gitty_file, delete_directory_async2(Dir)).
 1184
 1185delete_directory_async2(Dir) :-
 1186    catch(delete_directory(Dir), E,
 1187          (   \+ exists_directory(Dir)
 1188          ->  true
 1189          ;   \+ empty_directory(Dir)
 1190          ->  true
 1191          ;   throw(E)
 1192          )).
 1193
 1194empty_directory(Dir) :-
 1195    directory_files(Dir, AllFiles),
 1196    exclude(hidden, AllFiles, []).
 1197
 1198
 1199		 /*******************************
 1200		 *        REDIS PRIMITIVES	*
 1201		 *******************************/
 1202
 1203redis_head_db(Store, DB, Key) :-
 1204    redis_db(Store, DB, _, Prefix),
 1205    string_concat(Prefix, ":gitty:head", Key).
 1206
 1207redis_head_db_ro(Store, DB, Key) :-
 1208    redis_db(Store, _, DB, Prefix),
 1209    string_concat(Prefix, ":gitty:head", Key).
 1210
 1211
 1212%!  redis_file(+Store, ?Name, ?Ext, ?Hash)
 1213
 1214redis_file(Store, Name, Ext, Hash) :-
 1215    nonvar(Name),
 1216    !,
 1217    file_name_extension(_Base, Ext, Name),
 1218    redis_head_db_ro(Store, DB, Heads),
 1219    redis(DB, hget(Heads, Name), Hash as atom).
 1220redis_file(Store, Name, Ext, Hash) :-
 1221    nonvar(Ext),
 1222    !,
 1223    string_concat("*.", Ext, Pattern),
 1224    redis_head_db_ro(Store, DB, Heads),
 1225    redis_hscan(DB, Heads, LazyList, [match(Pattern)]),
 1226    member(NameS-HashS, LazyList),
 1227    atom_string(Name, NameS),
 1228    atom_string(Hash, HashS).
 1229redis_file(Store, Name, Ext, Hash) :-
 1230    nonvar(Hash),
 1231    !,
 1232    load_plain_commit(Store, Hash, Commit),
 1233    Name = Commit.name,
 1234    file_name_extension(_Base, Ext, Name).
 1235redis_file(Store, Name, Ext, Hash) :-
 1236    redis_head_db_ro(Store, DB, Heads),
 1237    redis(DB, hgetall(Heads), Pairs as pairs(atom,atom)),
 1238    member(Name-Hash, Pairs),
 1239    file_name_extension(_Base, Ext, Name).
 1240
 1241%!  redis_ensure_heads(+Store)
 1242%
 1243%   Ensure the redis db contains a  hashmap   mapping  all file names to
 1244%   their head hashes.
 1245
 1246redis_ensure_heads(Store) :-
 1247    redis_head_db_ro(Store, DB, Key),
 1248    redis(DB, exists(Key), 1),
 1249    !.
 1250redis_ensure_heads(Store) :-
 1251    redis_head_db(Store, DB, Key),
 1252    debug(gitty(redis), 'Initializing gitty heads in ~p ...', [Key]),
 1253    gitty_scan_latest(Store),
 1254    forall(retract(latest(Name, Hash, _Time)),
 1255           redis(DB, hset(Key, Name, Hash))),
 1256    debug(gitty(redis), '... finished gitty heads', []).
 1257
 1258%!  redis_update_head(+Store, +Name, +OldCommit, +NewCommit, +DataHash)
 1259
 1260redis_update_head(Store, Name, -, NewCommit, DataHash) :-
 1261    !,
 1262    redis_head_db(Store, DB, Key),
 1263    redis(DB, hset(Key, Name, NewCommit)),
 1264    publish_objects(Store, [NewCommit, DataHash]).
 1265redis_update_head(Store, Name, OldCommit, NewCommit, DataHash) :-
 1266    redis_head_db(Store, DB, Key),
 1267    redis_hcas(DB, Key, Name, OldCommit, NewCommit),
 1268    publish_objects(Store, [NewCommit, DataHash]).
 1269
 1270%!  redis_delete_head(Store, Head) is det.
 1271%
 1272%   Unregister Head
 1273
 1274redis_delete_head(Store, Head) :-
 1275    redis_head_db(Store, DB, Key),
 1276    redis(DB, hdel(Key, Head)).
 1277
 1278%!  redis_set_head(+Store, +File, +Hash) is det.
 1279
 1280redis_set_head(Store, File, Hash) :-
 1281    redis_head_db(Store, DB, Key),
 1282    redis(DB, hset(Key, File, Hash)).
 1283
 1284		 /*******************************
 1285		 *           REPLICATE		*
 1286		 *******************************/
 1287
 1288%!  redis_replicate_get(+Store, +Hash)
 1289%
 1290%   Try to get an object from another   SWISH  server in the network. We
 1291%   implement replication using the PUB/SUB protocol   of Redis. This is
 1292%   not ideal as this route of the   synchronisation is only used if for
 1293%   some reason this server lacks some object. This typically happens if
 1294%   this node is new to the cluster or has been offline for a long time.
 1295%   In a large cluster, most nodes  will   have  the objects and each of
 1296%   them will send the object around. A consumer group based solution is
 1297%   not ideal either, as the message may  be   picked  up by a node that
 1298%   does not have this object, after which  we need the failure recovery
 1299%   protocol to get it right. This  is   particularly  the case with two
 1300%   nodes, where we have a fair chance to have be requested for the hash
 1301%   we miss ourselves.
 1302%
 1303%   We could improve on this two ways: (1)   put the hash published in a
 1304%   short-lived key on Redis and make others  check that. That is likely
 1305%   to avoid many nodes sending the  same   object  or  (2) see how many
 1306%   nodes are in the pool and switch  to a consumer group based approach
 1307%   if this number is  high  (and  thus   we  are  unlikely  to be asked
 1308%   ourselves for the missing hash).
 1309%
 1310%   @see publish_objects/2 for the incremental replication
 1311
 1312:- multifile
 1313    swish_redis:stream/2. 1314
 1315swish_redis:stream('gitty:replicate', [maxlen(100)]).
 1316
 1317:- listen(http(pre_server_start(_)),
 1318          init_replicator). 1319
 1320init_replicator :-
 1321    redis_swish_stream('gitty:replicate', ReplKey),
 1322    listen(redis(_Redis, ReplKey, _Id, Data),
 1323           replicate(Data)),
 1324    listen(redis(_, 'swish:gitty', Message),
 1325           gitty_message(Message)),
 1326    message_queue_create(_, [alias(gitty_queue)]).
 1327
 1328:- debug(gitty(replicate)). 1329
 1330gitty_message(discover(Hash)) :-
 1331    debug(gitty(replicate), 'Discover: ~p', [Hash]),
 1332    store(Store, _),
 1333    load_object_raw(Store, Hash, Data),
 1334    debug(gitty(replicate), 'Sending object ~p', [Hash]),
 1335    redis(swish, publish(swish:gitty, object(Hash, Data) as prolog)).
 1336gitty_message(object(Hash, Data)) :-
 1337    debug(gitty(replicate), 'Replicate: ~p', [Hash]),
 1338    redis_db(Store, _DB, _RO, _Prefix),
 1339    store_object_raw(Store, Hash, Data, New),
 1340    debug(gitty(replicate), 'Received object ~p (new=~p)', [Hash, New]),
 1341    (   New == true
 1342    ->  thread_send_message(gitty_queue, Hash)
 1343    ;   true
 1344    ).
 1345
 1346%!  redis_replicate_get(+Store, +Hash) is semidet.
 1347%
 1348%   True to get Hash if we do  not   have  it  locally. This initiates a
 1349%   Redis `discover` request for the hash. The  replies are picked up by
 1350%   gitty_message/1 above.
 1351%
 1352%   The code may be subject to  various race conditions, but fortunately
 1353%   objects are immutable. It also seems  possible that the Redis stream
 1354%   gets lost. Not sure when and how. For   now, we restart if we get no
 1355%   reply, but nore more than once per minute.
 1356
 1357redis_replicate_get(Store, Hash) :-
 1358    is_gitty_hash(Hash),
 1359    redis(swish, publish(swish:gitty, discover(Hash) as prolog), Count),
 1360    Count > 1,                          % If I'm alone it won't help ...
 1361    between(1, 100, _),
 1362    (   thread_get_message(gitty_queue, Hash,
 1363                           [ timeout(0.1)
 1364                           ])
 1365    ->  !
 1366    ;   has_object(Store, Hash)
 1367    ->  !
 1368    ;   restart_pubsub,
 1369        fail
 1370    ).
 1371
 1372:- dynamic
 1373    restarted/1. 1374
 1375restart_pubsub :-
 1376    (   restarted(When)
 1377    ->  get_time(Now),
 1378        Now-When < 60,
 1379        !
 1380    ).
 1381restart_pubsub :-
 1382    get_time(Now),
 1383    transaction(( retractall(restarted(_)),
 1384                  asserta(restarted(Now)))),
 1385    thread_signal(redis_pubsub, throw(error(io_error(read, _),_))),
 1386    sleep(0.05).
 1387
 1388
 1389
 1390%!  publish_objects(+Store, +Hashes)
 1391%
 1392%   Make the objects we just stored globally   known. These are added to
 1393%   the Redis stream gitty:replicate and received by replicate/1 below.
 1394%
 1395%   This realized eager  replication  as  opposed   to  the  above  code
 1396%   (redis_replicate_get/2)  which  performs  lazy   replication.  Eager
 1397%   replication ensure the object is  on   multiple  places in the event
 1398%   that the node on which it was saved dies shortly after.
 1399%
 1400%   Note that we also  receive  the  object   we  just  saved.  That  is
 1401%   unavoidable in a network where all nodes are equal.
 1402
 1403publish_objects(Store, Hashes) :-
 1404    redis_swish_stream('gitty:replicate', ReplKey),
 1405    maplist(publish_object(Store, ReplKey), Hashes).
 1406
 1407publish_object(Store, Stream, Hash) :-
 1408    load_object_raw(Store, Hash, Data),
 1409    debug(gitty(replicate), 'Sending ~p to ~p', [Hash, Stream]),
 1410    xadd(swish, Stream, _, _{hash:Hash, data:Data}).
 1411
 1412%!  replicate(+Data) is det.
 1413%
 1414%   Act on a message send to the  gitty:replicate stream. Add the object
 1415%   to our store unless we already have it. Note that we receive our own
 1416%   objects as well.
 1417
 1418replicate(Data) :-
 1419    redis_db(Store, _DB, _RO, _Prefix),
 1420    atom_string(Hash, Data.hash),
 1421    store_object_raw(Store, Hash, Data.data, _0New),
 1422    debug(gitty(replicate), 'Received object ~p (new=~p)',
 1423          [Hash, _0New]).
 1424
 1425
 1426		 /*******************************
 1427		 *         REDIS BASICS		*
 1428		 *******************************/
 1429
 1430%!  redis_hcas(+DB, +Hash, +Key, +Old, +New) is semidet.
 1431%
 1432%   Update Hash.Key to New provided the current value is Old.
 1433
 1434redis_hcas(DB, Hash, Key, Old, New) :-
 1435    redis(DB, eval("if redis.call('HGET', KEYS[1], ARGV[1]) == ARGV[2] then \c
 1436                      redis.call('HSET', KEYS[1],  ARGV[1], ARGV[3]); \c
 1437                      return 1; \c
 1438                      end; \c
 1439                    return 0\c
 1440                   ",
 1441                   1, Hash, Key, Old, New),
 1442          1)