View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker and Sean Charles
    4    E-mail:        jan@swi-prolog.org and <sean at objitsu dot com>
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2013-2022, Sean Charles
    7                              SWI-Prolog Solutions b.v.
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34
   35    NOTE
   36
   37    The original code was subject to the MIT licence and written by
   38    Sean Charles.  Re-licenced to standard SWI-Prolog BSD-2 with
   39    permission from Sean Charles.
   40*/
   41
   42:- module(redis,
   43          [ redis_server/3,             % +Alias, +Address, +Options
   44            redis_connect/1,            % -Connection
   45            redis_connect/3,            % -Connection, +Host, +Port
   46            redis_disconnect/1,         % +Connection
   47            redis_disconnect/2,         % +Connection, +Options
   48                                        % Queries
   49            redis/1,                    % +Request
   50            redis/2,                    % +Connection, +Request
   51            redis/3,                    % +Connection, +Request, -Reply
   52                                        % High level queries
   53            redis_get_list/3,           % +Redis, +Key, -List
   54            redis_get_list/4,           % +Redis, +Key, +ChunkSize, -List
   55            redis_set_list/3,           % +Redis, +Key, +List
   56            redis_get_hash/3,           % +Redis, +Key, -Data:dict
   57            redis_set_hash/3,           % +Redis, +Key, +Data:dict
   58            redis_scan/3,               % +Redis, -LazyList, +Options
   59            redis_sscan/4,              % +Redis, +Set, -LazyList, +Options
   60            redis_hscan/4,              % +Redis, +Hash, -LazyList, +Options
   61            redis_zscan/4,              % +Redis, +Set, -LazyList, +Options
   62                                        % Publish/Subscribe
   63            redis_subscribe/4,          % +Redis, +Channels, -Id, +Options
   64            redis_subscribe/2,          % +Id, +Channels
   65            redis_unsubscribe/2,        % +Id, +Channels
   66            redis_current_subscription/2, % ?Id,?Channels
   67            redis_write/2,              % +Redis, +Command
   68            redis_read/2,               % +Redis, -Reply
   69                                        % Building blocks
   70            redis_array_dict/3,         % ?Array, ?Tag, ?Dict
   71                                        % Admin stuff
   72            redis_property/2,           % +Reply, ?Property
   73            redis_current_command/2,    % +Redis,?Command
   74            redis_current_command/3     % +Redis, +Command, -Properties
   75          ]).   76:- autoload(library(socket), [tcp_connect/3]).   77:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]).   78:- autoload(library(broadcast), [broadcast/1]).   79:- autoload(library(error),
   80            [ must_be/2,
   81	      type_error/2,
   82              instantiation_error/1,
   83              uninstantiation_error/1,
   84              existence_error/2,
   85              existence_error/3
   86            ]).   87:- autoload(library(lazy_lists), [lazy_list/2]).   88:- autoload(library(lists), [append/3, member/2]).   89:- autoload(library(option), [merge_options/3, option/2,
   90			      option/3, select_option/4]).   91:- autoload(library(pairs), [group_pairs_by_key/2]).   92:- autoload(library(time), [call_with_time_limit/2]).   93:- use_module(library(debug), [debug/3, assertion/1]).   94:- use_module(library(settings), [setting/4, setting/2]).   95:- if(exists_source(library(ssl))).   96:- autoload(library(ssl), [ssl_context/3, ssl_negotiate/5]).   97:- endif.   98
   99:- use_foreign_library(foreign(redis4pl)).  100
  101:- setting(max_retry_count, nonneg, 8640, % one day
  102           "Max number of retries").  103:- setting(max_retry_wait, number, 10,
  104           "Max time to wait between recovery attempts").  105:- setting(sentinel_timeout, number, 0.2,
  106	   "Time to wait for a sentinel").  107
  108:- predicate_options(redis_server/3, 3,
  109                     [ pass_to(redis:redis_connect/3, 3)
  110                     ]).  111:- predicate_options(redis_connect/3, 3,
  112                     [ reconnect(boolean),
  113                       user(atom),
  114                       password(atomic),
  115                       version(between(2,3))
  116                     ]).  117:- predicate_options(redis_disconnect/2, 2,
  118                     [ force(boolean)
  119                     ]).  120:- predicate_options(redis_scan/3, 3,
  121                     [ match(atomic),
  122                       count(nonneg),
  123                       type(atom)
  124                     ]).  125% Actually not passing, but the same
  126:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  127:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  128:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  129
  130
  131/** <module> Redis client
  132
  133This library is a client  to   [Redis](https://redis.io),  a popular key
  134value store to  deal  with  caching   and  communication  between  micro
  135services.
  136
  137In the typical use case we register  the   details  of one or more Redis
  138servers using redis_server/3. Subsequenly, redis/2-3   is  used to issue
  139commands on the server.  For example:
  140
  141```
  142?- redis_server(default, redis:6379, [password("secret")]).
  143?- redis(default, set(user, "Bob")).
  144?- redis(default, get(user), User).
  145User = "Bob"
  146```
  147*/
  148
  149:- dynamic server/3.  150
  151:- dynamic ( connection/2,              % ServerName, Stream
  152	     sentinel/2			% Pool, Address
  153           ) as volatile.  154
  155%!  redis_server(+ServerName, +Address, +Options) is det.
  156%
  157%   Register a redis server without  connecting   to  it. The ServerName
  158%   acts as a lazy connection alias.  Initially the ServerName `default`
  159%   points at `localhost:6379` with no   connect  options. The `default`
  160%   server is used for redis/1 and redis/2 and may be changed using this
  161%   predicate.  Options are described with redis_connect/3.
  162%
  163%   Connections  established  this  way  are  by  default  automatically
  164%   reconnected if the connection  is  lost   for  some  reason unless a
  165%   reconnect(false) option is specified.
  166
  167redis_server(Alias, Address, Options) :-
  168    must_be(ground, Alias),
  169    retractall(server(Alias, _, _)),
  170    asserta(server(Alias, Address, Options)).
  171
  172server(default, localhost:6379, []).
  173
  174%!  redis_connect(-Connection) is det.
  175%!  redis_connect(+Address, -Connection, +Options) is det.
  176%!  redis_connect(-Connection, +Host, +Port) is det.
  177%
  178%   Connect to a redis server. The  main mode is redis_connect(+Address,
  179%   -Connection,   +Options).   redis_connect/1   is     equivalent   to
  180%   redis_connect(localhost:6379, Connection, []).  Options:
  181%
  182%     - reconnect(+Boolean)
  183%       If `true`, try to reconnect to the service when the connection
  184%       seems lost.  Default is `true` for connections specified using
  185%       redis_server/3 and `false` for explictly opened connections.
  186%     - user(+User)
  187%       If version(3) and password(Password) are specified, these
  188%       are used to authenticate using the `HELLO` command.
  189%     - password(+Password)
  190%       Authenticate using Password
  191%     - version(+Version)
  192%       Specify the connection protocol version.  Initially this is
  193%       version 2.  Redis 6 also supports version 3.  When specified
  194%       as `3`, the `HELLO` command is used to upgrade the protocol.
  195%     - tls(true)
  196%       When specified, initiate a TLS connection.  If this option is
  197%       specified we must also specify the `cacert`, `key` and `cert`
  198%       options.
  199%     - cacert(+File)
  200%       CA Certificate file to verify with.
  201%     - cert(+File)
  202%       Client certificate to authenticate with.
  203%     - key(+File)
  204%       Private key file to authenticate with.
  205%     - sentinels(+ListOfAddresses)
  206%       Used together with an Address of the form sentinel(MasterName)
  207%       to enable contacting a network of Redis servers guarded by a
  208%       sentinel network.
  209%     - sentinel_user(+User)
  210%     - sentinel_password(+Password)
  211%       Authentication information for the senitels.  When omitted we
  212%       try to connect withour authentication.
  213%
  214%   Instead of using these predicates, redis/2  and redis/3 are normally
  215%   used with a _server name_  argument registered using redis_server/3.
  216%   These  predicates  are  meant  for   creating  a  temporary  paralel
  217%   connection or using a connection with a _blocking_ call.
  218%
  219%   @compat   redis_connect(-Connection,   +Host,     +Port)    provides
  220%   compatibility to the original GNU-Prolog interface and is equivalent
  221%   to redis_connect(Host:Port, Connection, []).
  222%
  223%   @arg Address is a term Host:Port, unix(File) or the name of a server
  224%   registered  using  redis_server/3.  The  latter   realises  a  _new_
  225%   connection that is typically used for   blocking redis commands such
  226%   as listening for published messages, waiting on a list or stream.
  227
  228redis_connect(Conn) :-
  229    redis_connect(default, Conn, []).
  230
  231redis_connect(Conn, Host, Port) :-
  232    var(Conn),
  233    ground(Host), ground(Port),
  234    !,                                  % GNU-Prolog compatibility
  235    redis_connect(Host:Port, Conn, []).
  236redis_connect(Server, Conn, Options) :-
  237    atom(Server),
  238    !,
  239    (   server(Server, Address, DefaultOptions)
  240    ->  merge_options(Options, DefaultOptions, Options2),
  241        do_connect(Server, Address, Conn, [address(Address)|Options2])
  242    ;   existence_error(redis_server, Server)
  243    ).
  244redis_connect(Address, Conn, Options) :-
  245    do_connect(Address, Address, Conn, [address(Address)|Options]).
  246
  247%!  do_connect(+Id, +Address, -Conn, +Options)
  248%
  249%   Open the connection.  A connection is a compound term of the shape
  250%
  251%       redis_connection(Id, Stream, Failures, Options)
  252
  253do_connect(Id, sentinel(Pool), Conn, Options) =>
  254    sentinel_master(Id, Pool, Conn, Options).
  255do_connect(Id, Address0, Conn, Options) =>
  256    tcp_address(Address0, Address),
  257    tcp_connect(Address, Stream0, Options),
  258    tls_upgrade(Address, Stream0, Stream, Options),
  259    Conn = redis_connection(Id, Stream, 0, Options),
  260    hello(Conn, Options).
  261
  262tcp_address(unix(Path), Path) :-
  263    !.                                  % Using an atom is ambiguous
  264tcp_address(Address, Address).
  265
  266%!  tls_upgrade(+Address, +Raw, -Stream, +Options) is det.
  267%
  268%   Upgrade to a TLS connection when tls(true) is specified.
  269
  270:- if(current_predicate(ssl_context/3)).  271tls_upgrade(Host:_Port, Raw, Stream, Options) :-
  272    option(tls(true), Options),
  273    !,
  274    must_have_option(cacert(CacertFile), Options),
  275    must_have_option(key(KeyFile), Options),
  276    must_have_option(cert(CertFile), Options),
  277    ssl_context(client, SSL,
  278		[ host(Host),
  279		  certificate_file(CertFile),
  280		  key_file(KeyFile),
  281		  cacerts([file(CacertFile)]),
  282		  cert_verify_hook(tls_verify),
  283		  close_parent(true)
  284		]),
  285    stream_pair(Raw, RawRead, RawWrite),
  286    ssl_negotiate(SSL, RawRead, RawWrite, Read, Write),
  287    stream_pair(Stream, Read, Write).
  288:- endif.  289tls_upgrade(_, Stream, Stream, _).
  290
  291:- if(current_predicate(ssl_context/3)).  292
  293%!  tls_verify(+SSL, +ProblemCert, +AllCerts, +FirstCert, +Status) is semidet.
  294%
  295%   Accept  or reject  the certificate  verification.  Similar  to the
  296%   Redis  command   line  client   (``redis-cli``),  we   accept  the
  297%   certificate as long as it is signed, not verifying the hostname.
  298
  299:- public tls_verify/5.  300tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, verified) :-
  301    !.
  302tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, hostname_mismatch) :-
  303    !.
  304tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, _Error) :-
  305    fail.
  306
  307:- endif.  308
  309%!  sentinel_master(+ServerId, +SetinelPool, -Connection, +Options) is det.
  310%
  311%   Discover the master and connect to it.
  312
  313sentinel_master(Id, Pool, Master, Options) :-
  314    must_have_option(sentinels(Sentinels), Options),
  315    sentinel_auth(Options, Options1),
  316    setting(sentinel_timeout, TMO),
  317    (   sentinel(Pool, Sentinel)
  318    ;   member(Sentinel, Sentinels)
  319    ),
  320    catch(call_with_time_limit(
  321	      TMO,
  322	      do_connect(Id, Sentinel, Conn,
  323			 [sentinel(true)|Options1])),
  324	  Error,
  325	  (print_message(warning, Error),fail)),
  326    !,
  327    debug(redis(sentinel), 'Connected to sentinel at ~p', [Sentinel]),
  328    call_cleanup(
  329	query_sentinel(Pool, Conn, Sentinel, MasterAddr),
  330	redis_disconnect(Conn)),
  331    debug(redis(sentinel), 'Sentinel claims master is at ~p', [MasterAddr]),
  332    do_connect(Id, MasterAddr, Master, Options),
  333    debug(redis(sentinel), 'Connected to claimed master', []),
  334    redis(Master, role, Role),
  335    (   Role = [master|_Slaves]
  336    ->  debug(redis(sentinel), 'Verified role at ~p', [MasterAddr])
  337    ;   redis_disconnect(Master),
  338	debug(redis(sentinel), '~p is not the master: ~p', [MasterAddr, Role]),
  339	sleep(TMO),
  340	sentinel_master(Id, Pool, Master, Options)
  341    ).
  342
  343sentinel_auth(Options0, Options) :-
  344    option(sentinel_user(User), Options0),
  345    option(sentinel_password(Passwd), Options0),
  346    !,
  347    merge_options([user(User), password(Passwd)], Options0, Options).
  348sentinel_auth(Options0, Options) :-
  349    select_option(password(_), Options0, Options, _).
  350
  351
  352query_sentinel(Pool, Conn, Sentinel, Host:Port) :-
  353    redis(Conn, sentinel('get-master-addr-by-name', Pool), MasterData),
  354    MasterData = [Host,Port],
  355    redis(Conn, sentinel(sentinels, Pool), Peers),
  356    transaction(update_known_sentinels(Pool, Sentinel, Peers)).
  357
  358update_known_sentinels(Pool, Sentinel, Peers) :-
  359    retractall(sentinel(Pool, _)),
  360    maplist(update_peer_sentinel(Pool), Peers),
  361    asserta(sentinel(Pool, Sentinel)).
  362
  363update_peer_sentinel(Pool, Attrs),
  364  memberchk(ip-Host, Attrs),
  365  memberchk(port-Port, Attrs) =>
  366    asserta(sentinel(Pool, Host:Port)).
  367
  368must_have_option(Opt, Options) :-
  369    option(Opt, Options),
  370    !.
  371must_have_option(Opt, Options) :-
  372    existence_error(option, Opt, Options).
  373
  374%!  hello(+Connection, +Option)
  375%
  376%   Initialize the connection. This is  used   to  upgrade  to the RESP3
  377%   protocol and/or to authenticate.
  378
  379hello(Con, Options) :-
  380    option(version(V), Options),
  381    V >= 3,
  382    !,
  383    (   option(user(User), Options),
  384        option(password(Password), Options)
  385    ->  redis(Con, hello(3, auth, User, Password))
  386    ;   redis(Con, hello(3))
  387    ).
  388hello(Con, Options) :-
  389    option(password(Password), Options),
  390    !,
  391    redis(Con, auth(Password)).
  392hello(_, _).
  393
  394%!  redis_stream(+Spec, --Stream, +DoConnect) is det.
  395%
  396%   Get the stream to a Redis server from  Spec. Spec is either the name
  397%   of       a       registered       server       or       a       term
  398%   redis_connection(Id,Stream,Failures,Options).  If  the    stream  is
  399%   disconnected it will be reconnected.
  400
  401redis_stream(Var, S, _) :-
  402    (   var(Var)
  403    ->  !, instantiation_error(Var)
  404    ;   nonvar(S)
  405    ->  !, uninstantiation_error(S)
  406    ).
  407redis_stream(ServerName, S, Connect) :-
  408    atom(ServerName),
  409    !,
  410    (   connection(ServerName, S0)
  411    ->  S = S0
  412    ;   Connect == true,
  413        server(ServerName, Address, Options)
  414    ->  redis_connect(Address, Connection, Options),
  415        redis_stream(Connection, S, false),
  416        asserta(connection(ServerName, S))
  417    ;   existence_error(redis_server, ServerName)
  418    ).
  419redis_stream(redis_connection(_,S0,_,_), S, _) :-
  420    S0 \== (-),
  421    !,
  422    S = S0.
  423redis_stream(Redis, S, _) :-
  424    Redis = redis_connection(Id,-,_,Options),
  425    option(address(Address), Options),
  426    do_connect(Id,Address,Redis2,Options),
  427    arg(2, Redis2, S0),
  428    nb_setarg(2, Redis, S0),
  429    S = S0.
  430
  431has_redis_stream(Var, _) :-
  432    var(Var),
  433    !,
  434    instantiation_error(Var).
  435has_redis_stream(Alias, S) :-
  436    atom(Alias),
  437    !,
  438    connection(Alias, S).
  439has_redis_stream(redis_connection(_,S,_,_), S) :-
  440    S \== (-).
  441
  442
  443%!  redis_disconnect(+Connection) is det.
  444%!  redis_disconnect(+Connection, +Options) is det.
  445%
  446%   Disconnect from a redis server. The   second  form takes one option,
  447%   similar to close/2:
  448%
  449%     - force(Force)
  450%       When `true` (default `false`), do not raise any errors if
  451%       Connection does not exist or closing the connection raises
  452%       a network or I/O related exception.  This version is used
  453%       internally if a connection is in a broken state, either due
  454%       to a protocol error or a network issue.
  455
  456redis_disconnect(Redis) :-
  457    redis_disconnect(Redis, []).
  458
  459redis_disconnect(Redis, Options) :-
  460    option(force(true), Options),
  461    !,
  462    (   Redis = redis_connection(_Id, S, _, _Opts)
  463    ->  (   S == (-)
  464        ->  true
  465        ;   close(S, [force(true)]),
  466            nb_setarg(2, Redis, -)
  467        )
  468    ;   has_redis_stream(Redis, S)
  469    ->  close(S, [force(true)]),
  470        retractall(connection(_,S))
  471    ;   true
  472    ).
  473redis_disconnect(Redis, _Options) :-
  474    redis_stream(Redis, S, false),
  475    close(S),
  476    retractall(connection(_,S)).
  477
  478%!  redis(+Connection, +Request) is semidet.
  479%
  480%   This predicate is overloaded to handle two types of requests. First,
  481%   it is a shorthand for `redis(Connection, Command, _)` and second, it
  482%   can be used to exploit  Redis   _pipelines_  and _transactions_. The
  483%   second form is acticated if Request is  a _list_. In that case, each
  484%   element of the list is either a term  `Command -> Reply` or a simple
  485%   `Command`. Semantically this represents a   sequence  of redis/3 and
  486%   redis/2 calls.  It differs in the following aspects:
  487%
  488%     - All commands are sent in one batch, after which all replies are
  489%       read.  This reduces the number of _round trips_ and typically
  490%       greatly improves performance.
  491%     - If the first command is `multi` and the last `exec`, the
  492%       commands are executed as a Redis _transaction_, i.e., they
  493%       are executed _atomically_.
  494%     - If one of the commands returns an error, the subsequent commands
  495%       __are still executed__.
  496%     - You can not use variables from commands earlier in the list for
  497%       commands later in the list as a result of the above execution
  498%       order.
  499%
  500%   Procedurally, the process takes the following steps:
  501%
  502%     1. Send all commands
  503%     2. Read all replies and push messages
  504%     3. Handle all callbacks from push messages
  505%     4. Check whether one of the replies is an error.  If so,
  506%        raise this error (subsequent errors are lost)
  507%     5. Bind all replies for the `Command -> Reply` terms.
  508%
  509%   Examples
  510%
  511%   ```
  512%   ?- redis(default,
  513%            [ lpush(li,1),
  514%              lpush(li,2),
  515%              lrange(li,0,-1) -> List
  516%            ]).
  517%   List = ["2", "1"].
  518%   ```
  519
  520redis(Redis, PipeLine) :-
  521    is_list(PipeLine),
  522    !,
  523    redis_pipeline(Redis, PipeLine).
  524redis(Redis, Req) :-
  525    redis(Redis, Req, _).
  526
  527%!  redis(+Connection, +Command, -Reply) is semidet.
  528%
  529%   Execute a redis Command on  Connnection.   Next,  bind  Reply to the
  530%   returned result. Command is a  callable   term  whose functor is the
  531%   name of the Redis command  and   whose  arguments  are translated to
  532%   Redis arguments according to the rules below.  Note that all text is
  533%   always represented using UTF-8 encoding.
  534%
  535%     - Atomic values are emitted verbatim
  536%     - A term A:B:... where all arguments are either atoms,
  537%       strings or integers (__no floats__) is translated into
  538%       a string `"A:B:..."`.  This is a common shorthand for
  539%       representing Redis keys.
  540%     - A term Term as prolog is emitted as "\u0000T\u0000" followed
  541%       by Term in canonical form.
  542%     - Any other term is emitted as write/1.
  543%
  544%   Reply is either a plain term (often a  variable) or a term `Value as
  545%   Type`. In the latter form,  `Type`   dictates  how  the Redis _bulk_
  546%   reply is translated to Prolog. The default equals to `auto`, i.e.,
  547%   as a number of the content satisfies the Prolog number syntax and
  548%   as an atom otherwise.
  549%
  550%     - status(Atom)
  551%       Returned if the server replies with ``+ Status``.  Atom
  552%       is the textual value of `Status` converted to lower case,
  553%       e.g., status(ok) or status(pong).
  554%     - `nil`
  555%       This atom is returned for a NIL/NULL value.  Note that if
  556%       the reply is only `nil`, redis/3 _fails_.  The `nil` value
  557%       may be embedded inside lists or maps.
  558%     - A number
  559%       Returned if the server replies an integer (":Int"), double
  560%       (",Num") or big integer ("(Num")
  561%     - A string
  562%       Returned on a _bulk_ reply.  Bulk replies are supposed to be
  563%       in UTF-8 encoding.  The the bulk reply starts with
  564%       "\u0000T\u0000" it is supposed to be a Prolog term.
  565%       Note that this intepretation means it is __not__ possible
  566%       to read arbitrary binary blobs.
  567%     - A list of replies.  A list may also contain `nil`.  If Reply
  568%       as a whole would be `nil` the call fails.
  569%     - A list of _pairs_.  This is returned for the redis version 3
  570%       protocol "%Map".  Both the key and value respect the same
  571%       rules as above.
  572%
  573%   Redis _bulk_ replies are translated depending  on the `as` `Type` as
  574%   explained above.
  575%
  576%     - string
  577%     - string(Encoding)
  578%       Create a SWI-Prolog string object interpreting the blob as
  579%       following Encoding. Encoding is a restricted set of SWI-Prolog's
  580%       encodings: `bytes` (`iso_latin_1`), `utf8` and `text` (the
  581%       current locale translation).
  582%     - atom
  583%     - atom(Encoding)
  584%       As above, producing an atom.
  585%     - codes
  586%     - codes(Encoding)
  587%       As above, producing a list of integers (Unicode code points)
  588%     - chars
  589%     - chars(Encoding)
  590%       As above, producing a list of one-character atoms.
  591%     - integer
  592%     - float
  593%     - rational
  594%     - number
  595%       Interpret the bytes as a string representing a number.  If
  596%       the string does not represent a number of the requested type
  597%       a type_error(Type, String) is raised.
  598%     - tagged_integer
  599%       Same as integer, but demands the value to be between the Prolog
  600%       flags `min_tagged_integer` and `max_tagged_integer`, allowing
  601%       the value to be used as a dict key.
  602%     - auto
  603%       Same as auto(atom, number)
  604%     - auto(AsText,AsNumber)
  605%       If the bulk string confirms the syntax of AsNumber, convert
  606%       the value to the requested numberical type.  Else convert
  607%       the value to text according to AsText.  This is similar to
  608%       the Prolog predicate name/2.
  609%     - dict_key
  610%       Alias for auto(atom,tagged_integer).  This allows the value
  611%       to be used as a key for a SWI-Prolog dict.
  612%     - pairs(AsKey, AsValue)
  613%       Convert a map or array of even length into pairs for which the
  614%       key satisfies AsKey and the value AsValue.  The `pairs` type
  615%       can also be applied to a Redis array.  In this case the array
  616%       length must be even.  This notably allows fetching a Redis
  617%       _hash_ as pairs using ``HGETALL`` using version 2 of the
  618%       Redis protocol.
  619%     - dict(AsKey, AsValue)
  620%       Similar to pairs(AsKey, AsValue), but convert the resulting
  621%       pair list into a SWI-Prolog dict.  AsKey must convert to a
  622%       valid dict key, i.e., an atom or tagged integer. See `dict_key`.
  623%     - dict(AsValue)
  624%       Shorthand for dict(dict_key, AsValue).
  625%
  626%   Here are some simple examples
  627%
  628%   ```
  629%   ?- redis(default, set(a, 42), X).
  630%   X = status("OK").
  631%   ?- redis(default, get(a), X).
  632%   X = "42".
  633%   ?- redis(default, get(a), X as integer).
  634%   X = 42.
  635%   ?- redis(default, get(a), X as float).
  636%   X = 42.0.
  637%   ?- redis(default, set(swipl:version, 8)).
  638%   true.
  639%   ?- redis(default, incr(swipl:version), X).
  640%   X = 9.
  641%   ```
  642%
  643%   @error redis_error(Code, String)
  644
  645redis(Redis, Req, Out) :-
  646    out_val(Out, Val),
  647    redis1(Redis, Req, Out),
  648    Val \== nil.
  649
  650out_val(Out, Val) :-
  651    (   nonvar(Out),
  652        Out = (Val as _)
  653    ->  true
  654    ;   Val = Out
  655    ).
  656
  657redis1(Redis, Req, Out) :-
  658    Error = error(Formal, _),
  659    catch(redis2(Redis, Req, Out), Error, true),
  660    (   var(Formal)
  661    ->  true
  662    ;   recover(Error, Redis, redis1(Redis, Req, Out))
  663    ).
  664
  665redis2(Redis, Req, Out) :-
  666    atom(Redis),
  667    !,
  668    redis_stream(Redis, S, true),
  669    with_mutex(Redis,
  670               ( redis_write_msg(S, Req),
  671                 redis_read_stream(Redis, S, Out)
  672               )).
  673redis2(Redis, Req, Out) :-
  674    redis_stream(Redis, S, true),
  675    redis_write_msg(S, Req),
  676    redis_read_stream(Redis, S, Out).
  677
  678%!  redis_pipeline(+Redis, +PipeLine)
  679
  680redis_pipeline(Redis, PipeLine) :-
  681    Error = error(Formal, _),
  682    catch(redis_pipeline2(Redis, PipeLine), Error, true),
  683    (   var(Formal)
  684    ->  true
  685    ;   recover(Error, Redis, redis_pipeline(Redis, PipeLine))
  686    ).
  687
  688redis_pipeline2(Redis, PipeLine) :-
  689    atom(Redis),
  690    !,
  691    redis_stream(Redis, S, true),
  692    with_mutex(Redis,
  693               redis_pipeline3(Redis, S, PipeLine)).
  694redis_pipeline2(Redis, PipeLine) :-
  695    redis_stream(Redis, S, true),
  696    redis_pipeline3(Redis, S, PipeLine).
  697
  698redis_pipeline3(Redis, S, PipeLine) :-
  699    maplist(write_pipeline(S), PipeLine),
  700    flush_output(S),
  701    read_pipeline(Redis, S, PipeLine).
  702
  703write_pipeline(S, Command -> _Reply) :-
  704    !,
  705    redis_write_msg_no_flush(S, Command).
  706write_pipeline(S, Command) :-
  707    redis_write_msg_no_flush(S, Command).
  708
  709read_pipeline(Redis, S, PipeLine) :-
  710    E = error(Formal,_),
  711    catch(read_pipeline2(Redis, S, PipeLine), E, true),
  712    (   var(Formal)
  713    ->  true
  714    ;   reconnect_error(E)
  715    ->  redis_disconnect(Redis, [force(true)]),
  716        throw(E)
  717    ;   resync(Redis),
  718        throw(E)
  719    ).
  720
  721read_pipeline2(Redis, S, PipeLine) :-
  722    maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed),
  723    maplist(handle_push(Redis), Pushed),
  724    maplist(handle_error, Errors),
  725    maplist(bind_reply, PipeLine, Replies).
  726
  727redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :-
  728    !,
  729    redis_read_msg(S, ReplyIn, Reply, Error, Push).
  730redis_read_msg3(S, Var, Reply, Error, Push) :-
  731    redis_read_msg(S, Var, Reply, Error, Push).
  732
  733handle_push(Redis, Pushed) :-
  734    handle_push_messages(Pushed, Redis).
  735handle_error(Error) :-
  736    (   var(Error)
  737    ->  true
  738    ;   throw(Error)
  739    ).
  740bind_reply(_Command -> Reply0, Reply) :-
  741    !,
  742    Reply0 = Reply.
  743bind_reply(_Command, _).
  744
  745
  746%!  recover(+Error, +Redis, :Goal)
  747%
  748%   Error happened while running Goal on Redis. If this is a recoverable
  749%   error (i.e., a network or disconnected peer),  wait a little and try
  750%   running Goal again.
  751
  752:- meta_predicate recover(+, +, 0).  753
  754recover(Error, Redis, Goal) :-
  755    Error = error(Formal, _),
  756    reconnect_error(Formal),
  757    auto_reconnect(Redis),
  758    !,
  759    debug(redis(recover), '~p: got error ~p; trying to reconnect',
  760          [Redis, Error]),
  761    redis_disconnect(Redis, [force(true)]),
  762    (   wait_to_retry(Redis, Error)
  763    ->  call(Goal),
  764        retractall(failure(Redis, _))
  765    ;   throw(Error)
  766    ).
  767recover(Error, _, _) :-
  768    throw(Error).
  769
  770auto_reconnect(redis_connection(_,_,_,Options)) :-
  771    !,
  772    option(reconnect(true), Options).
  773auto_reconnect(Server) :-
  774    ground(Server),
  775    server(Server, _, Options),
  776    option(reconnect(true), Options, true).
  777
  778reconnect_error(io_error(_Action, _On)).
  779reconnect_error(socket_error(_Code, _)).
  780reconnect_error(syntax_error(unexpected_eof)).
  781reconnect_error(existence_error(stream, _)).
  782
  783%!  wait(+Redis, +Error)
  784%
  785%   Wait for some time after a failure. First  we wait for 10ms. This is
  786%   doubled on each failure upto the   setting  `max_retry_wait`. If the
  787%   setting `max_retry_count` is exceeded we fail and the called signals
  788%   an exception.
  789
  790:- dynamic failure/2 as volatile.  791
  792wait_to_retry(Redis, Error) :-
  793    redis_failures(Redis, Failures),
  794    setting(max_retry_count, Count),
  795    Failures < Count,
  796    Failures2 is Failures+1,
  797    redis_set_failures(Redis, Failures2),
  798    setting(max_retry_wait, MaxWait),
  799    Wait is min(MaxWait*100, 1<<Failures)/100.0,
  800    debug(redis(recover), '  Sleeping ~p seconds', [Wait]),
  801    retry_message_level(Failures, Level),
  802    print_message(Level, redis(retry(Redis, Failures, Wait, Error))),
  803    sleep(Wait).
  804
  805redis_failures(redis_connection(_,_,Failures0,_), Failures) :-
  806    !,
  807    Failures = Failures0.
  808redis_failures(Server, Failures) :-
  809    atom(Server),
  810    (   failure(Server, Failures)
  811    ->  true
  812    ;   Failures = 0
  813    ).
  814
  815redis_set_failures(Connection, Count) :-
  816    compound(Connection),
  817    !,
  818    nb_setarg(3, Connection, Count).
  819redis_set_failures(Server, Count) :-
  820    atom(Server),
  821    retractall(failure(Server, _)),
  822    asserta(failure(Server, Count)).
  823
  824retry_message_level(0, warning) :- !.
  825retry_message_level(_, silent).
  826
  827
  828%!  redis(+Request)
  829%
  830%   Connect to the default redis server,   call  redist/3 using Request,
  831%   disconnect and print the result.  This   predicate  is  intended for
  832%   interactive usage.
  833
  834redis(Req) :-
  835    setup_call_cleanup(
  836        redis_connect(default, C, []),
  837        redis1(C, Req, Out),
  838        redis_disconnect(C)),
  839    print(Out).
  840
  841%!  redis_write(+Redis, +Command) is det.
  842%!  redis_read(+Redis, -Reply) is det.
  843%
  844%   Write command and read replies from a Redis server. These are
  845%   building blocks for subscribing to event streams.
  846
  847redis_write(Redis, Command) :-
  848    redis_stream(Redis, S, true),
  849    redis_write_msg(S, Command).
  850
  851redis_read(Redis, Reply) :-
  852    redis_stream(Redis, S, true),
  853    redis_read_stream(Redis, S, Reply).
  854
  855
  856		 /*******************************
  857		 *      HIGH LEVEL ACCESS	*
  858		 *******************************/
  859
  860%!  redis_get_list(+Redis, +Key, -List) is det.
  861%!  redis_get_list(+Redis, +Key, +ChunkSize, -List) is det.
  862%
  863%   Get the content of a Redis list in   List. If ChunkSize is given and
  864%   smaller than the list length, List is returned as a _lazy list_. The
  865%   actual values are requested using   redis  ``LRANGE`` requests. Note
  866%   that this results in O(N^2) complexity. Using   a  lazy list is most
  867%   useful for relatively short lists holding possibly large items.
  868%
  869%   Note that values retrieved are _strings_, unless the value was added
  870%   using `Term as prolog`.
  871%
  872%   @see lazy_list/2 for a discussion  on   the  difference between lazy
  873%   lists and normal lists.
  874
  875redis_get_list(Redis, Key, List) :-
  876    redis_get_list(Redis, Key, -1, List).
  877
  878redis_get_list(Redis, Key, Chunk, List) :-
  879    redis(Redis, llen(Key), Len),
  880    (   (   Chunk >= Len
  881        ;   Chunk == -1
  882        )
  883    ->  (   Len == 0
  884        ->  List = []
  885        ;   End is Len-1,
  886            list_range(Redis, Key, 0, End, List)
  887        )
  888    ;   lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List)
  889    ).
  890
  891rlist_next(State, List, Tail) :-
  892    State = s(Redis,Key,Offset,Slice,Len),
  893    End is min(Len-1, Offset+Slice-1),
  894    list_range(Redis, Key, Offset, End, Elems),
  895    (   End =:= Len-1
  896    ->  List = Elems,
  897        Tail = []
  898    ;   Offset2 is Offset+Slice,
  899        nb_setarg(3, State, Offset2),
  900        append(Elems, Tail, List)
  901    ).
  902
  903% Redis LRANGE demands End > Start and returns inclusive.
  904
  905list_range(DB, Key, Start, Start, [Elem]) :-
  906    !,
  907    redis(DB, lindex(Key, Start), Elem).
  908list_range(DB, Key, Start, End, List) :-
  909    !,
  910    redis(DB, lrange(Key, Start, End), List).
  911
  912
  913
  914%!  redis_set_list(+Redis, +Key, +List) is det.
  915%
  916%   Associate a Redis key with a list.  As   Redis  has no concept of an
  917%   empty list, if List is `[]`, Key  is _deleted_. Note that key values
  918%   are always strings in  Redis.  The   same  conversion  rules  as for
  919%   redis/1-3 apply.
  920
  921redis_set_list(Redis, Key, List) :-
  922    redis(Redis, del(Key), _),
  923    (   List == []
  924    ->  true
  925    ;   Term =.. [rpush,Key|List],
  926        redis(Redis, Term, _Count)
  927    ).
  928
  929
  930%!  redis_get_hash(+Redis, +Key, -Data:dict) is det.
  931%!  redis_set_hash(+Redis, +Key, +Data:dict) is det.
  932%
  933%   Put/get a Redis hash as a Prolog  dict. Putting a dict first deletes
  934%   Key. Note that in many cases   applications will manage Redis hashes
  935%   by key. redis_get_hash/3 is notably a   user friendly alternative to
  936%   the Redis ``HGETALL`` command. If the  Redis   hash  is  not used by
  937%   other (non-Prolog) applications one  may   also  consider  using the
  938%   `Term as prolog` syntax to store the Prolog dict as-is.
  939
  940redis_get_hash(Redis, Key, Dict) :-
  941    redis(Redis, hgetall(Key), Dict as dict(auto)).
  942
  943redis_set_hash(Redis, Key, Dict) :-
  944    redis_array_dict(Array, _, Dict),
  945    Term =.. [hset,Key|Array],
  946    redis(Redis, del(Key), _),
  947    redis(Redis, Term, _Count).
  948
  949%!  redis_array_dict(?Array, ?Tag, ?Dict) is det.
  950%
  951%   Translate a Redis reply representing  hash   data  into a SWI-Prolog
  952%   dict. Array is either a list  of   alternating  keys and values or a
  953%   list of _pairs_. When translating to an array, this is always a list
  954%   of alternating keys and values.
  955%
  956%   @arg Tag is the SWI-Prolog dict tag.
  957
  958redis_array_dict(Array, Tag, Dict) :-
  959    nonvar(Array),
  960    !,
  961    array_to_pairs(Array, Pairs),
  962    dict_pairs(Dict, Tag, Pairs).
  963redis_array_dict(TwoList, Tag, Dict) :-
  964    dict_pairs(Dict, Tag, Pairs),
  965    pairs_to_array(Pairs, TwoList).
  966
  967array_to_pairs([], []) :-
  968    !.
  969array_to_pairs([NameS-Value|T0], [Name-Value|T]) :-
  970    !,                                  % RESP3 returns a map as pairs.
  971    atom_string(Name, NameS),
  972    array_to_pairs(T0, T).
  973array_to_pairs([NameS,Value|T0], [Name-Value|T]) :-
  974    atom_string(Name, NameS),
  975    array_to_pairs(T0, T).
  976
  977pairs_to_array([], []) :-
  978    !.
  979pairs_to_array([Name-Value|T0], [NameS,Value|T]) :-
  980    atom_string(Name, NameS),
  981    pairs_to_array(T0, T).
  982
  983%!  redis_scan(+Redis, -LazyList, +Options) is det.
  984%!  redis_sscan(+Redis, +Set, -LazyList, +Options) is det.
  985%!  redis_hscan(+Redis, +Hash, -LazyList, +Options) is det.
  986%!  redis_zscan(+Redis, +Set, -LazyList, +Options) is det.
  987%
  988%   Map the Redis ``SCAN``, ``SSCAN``,   ``HSCAN`` and `ZSCAN`` commands
  989%   into a _lazy list_. For redis_scan/3 and redis_sscan/4 the result is
  990%   a list of strings. For redis_hscan/4   and redis_zscan/4, the result
  991%   is a list of _pairs_.   Options processed:
  992%
  993%     - match(Pattern)
  994%       Adds the ``MATCH`` subcommand, only returning matches for
  995%       Pattern.
  996%     - count(Count)
  997%       Adds the ``COUNT`` subcommand, giving a hint to the size of the
  998%       chunks fetched.
  999%     - type(Type)
 1000%       Adds the ``TYPE`` subcommand, only returning answers of the
 1001%       indicated type.
 1002%
 1003%   @see lazy_list/2.
 1004
 1005redis_scan(Redis, LazyList, Options) :-
 1006    scan_options([match,count,type], Options, Parms),
 1007    lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList).
 1008
 1009redis_sscan(Redis, Set, LazyList, Options) :-
 1010    scan_options([match,count,type], Options, Parms),
 1011    lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList).
 1012
 1013redis_hscan(Redis, Hash, LazyList, Options) :-
 1014    scan_options([match,count,type], Options, Parms),
 1015    lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList).
 1016
 1017redis_zscan(Redis, Set, LazyList, Options) :-
 1018    scan_options([match,count,type], Options, Parms),
 1019    lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList).
 1020
 1021scan_options([], _, []).
 1022scan_options([H|T0], Options, [H,V|T]) :-
 1023    Term =.. [H,V],
 1024    option(Term, Options),
 1025    !,
 1026    scan_options(T0, Options, T).
 1027scan_options([_|T0], Options, T) :-
 1028    scan_options(T0, Options, T).
 1029
 1030
 1031scan_next(State, List, Tail) :-
 1032    State = s(Command,Redis,Cursor,Params),
 1033    Command =.. CList,
 1034    append(CList, [Cursor|Params], CList2),
 1035    Term =.. CList2,
 1036    redis(Redis, Term, [NewCursor,Elems0]),
 1037    scan_pairs(Command, Elems0, Elems),
 1038    (   NewCursor == 0
 1039    ->  List = Elems,
 1040        Tail = []
 1041    ;   nb_setarg(3, State, NewCursor),
 1042        append(Elems, Tail, List)
 1043    ).
 1044
 1045scan_pairs(hscan(_), List, Pairs) :-
 1046    !,
 1047    scan_pairs(List, Pairs).
 1048scan_pairs(zscan(_), List, Pairs) :-
 1049    !,
 1050    scan_pairs(List, Pairs).
 1051scan_pairs(_, List, List).
 1052
 1053scan_pairs([], []).
 1054scan_pairs([Key,Value|T0], [Key-Value|T]) :-
 1055    !,
 1056    scan_pairs(T0, T).
 1057scan_pairs([Key-Value|T0], [Key-Value|T]) :-
 1058    scan_pairs(T0, T).
 1059
 1060
 1061		 /*******************************
 1062		 *              ABOUT		*
 1063		 *******************************/
 1064
 1065%!  redis_current_command(+Redis, ?Command) is nondet.
 1066%!  redis_current_command(+Redis, ?Command, -Properties) is nondet.
 1067%
 1068%   True when Command has Properties. Fails   if Command is not defined.
 1069%   The redis_current_command/3 version  returns   the  command argument
 1070%   specification. See Redis documentation for an explanation.
 1071
 1072redis_current_command(Redis, Command) :-
 1073    redis_current_command(Redis, Command, _).
 1074
 1075redis_current_command(Redis, Command, Properties) :-
 1076    nonvar(Command),
 1077    !,
 1078    redis(Redis, command(info, Command), [[_|Properties]]).
 1079redis_current_command(Redis, Command, Properties) :-
 1080    redis(Redis, command, Commands),
 1081    member([Name|Properties], Commands),
 1082    atom_string(Command, Name).
 1083
 1084%!  redis_property(+Redis, ?Property) is nondet.
 1085%
 1086%   True if Property is a property of   the Redis server. Currently uses
 1087%   redis(info, String) and parses the result.   As  this is for machine
 1088%   usage, properties names *_human are skipped.
 1089
 1090redis_property(Redis, Property) :-
 1091    redis(Redis, info, String),
 1092    info_terms(String, Terms),
 1093    member(Property, Terms).
 1094
 1095info_terms(Info, Pairs) :-
 1096    split_string(Info, "\n", "\r\n ", Lines),
 1097    convlist(info_line_term, Lines, Pairs).
 1098
 1099info_line_term(Line, Term) :-
 1100    sub_string(Line, B, _, A, :),
 1101    !,
 1102    sub_atom(Line, 0, B, _, Name),
 1103    \+ sub_atom(Name, _, _, 0, '_human'),
 1104    sub_string(Line, _, A, 0, ValueS),
 1105    (   number_string(Value, ValueS)
 1106    ->  true
 1107    ;   Value = ValueS
 1108    ),
 1109    Term =.. [Name,Value].
 1110
 1111
 1112		 /*******************************
 1113		 *            SUBSCRIBE		*
 1114		 *******************************/
 1115
 1116%!  redis_subscribe(+Redis, +Channels, -Id, +Options) is det.
 1117%
 1118%   Subscribe to one or more  Redis   PUB/SUB  channels.  This predicate
 1119%   creates a thread using thread_create/3 with  the given Options. Once
 1120%   running, the thread listens for messages.   The message content is a
 1121%   string or Prolog term  as  described   in  redis/3.  On  receiving a
 1122%   message, the following message is broadcasted:
 1123%
 1124%       redis(Id, Channel, Data)
 1125%
 1126%   If redis_unsubscribe/2 removes the  last   subscription,  the thread
 1127%   terminates.
 1128%
 1129%   To simply print the incomming messages use e.g.
 1130%
 1131%       ?- listen(redis(_, Channel, Data),
 1132%                 format('Channel ~p got ~p~n', [Channel,Data])).
 1133%       true.
 1134%       ?- redis_subscribe(default, test, Id, []).
 1135%       Id = redis_pubsub_3,
 1136%       ?- redis(publish(test, "Hello world")).
 1137%       Channel test got "Hello world"
 1138%       1
 1139%       true.
 1140%
 1141%   @arg Id is the thread identifier of  the listening thread. Note that
 1142%   the Options alias(Name) can be used to get a system wide name.
 1143
 1144:- dynamic ( subscription/2,            % Id, Channel
 1145             listening/3                % Id, Connection, Thread
 1146           ) as volatile. 1147
 1148redis_subscribe(Redis, Spec, Id, Options) :-
 1149    atom(Redis),
 1150    !,
 1151    channels(Spec, Channels),
 1152    pubsub_thread_options(ThreadOptions, Options),
 1153    thread_create(setup_call_cleanup(
 1154                      redis_connect(Redis, Conn, [reconnect(true)]),
 1155                      redis_subscribe1(Redis, Conn, Channels),
 1156                      redis_disconnect(Conn)),
 1157                  Thread,
 1158                  ThreadOptions),
 1159    pubsub_id(Thread, Id).
 1160redis_subscribe(Redis, Spec, Id, Options) :-
 1161    channels(Spec, Channels),
 1162    pubsub_thread_options(ThreadOptions, Options),
 1163    thread_create(redis_subscribe1(Redis, Redis, Channels),
 1164                  Thread,
 1165                  ThreadOptions),
 1166    pubsub_id(Thread, Id).
 1167
 1168pubsub_thread_options(ThreadOptions, Options) :-
 1169    merge_options(Options, [detached(true)], ThreadOptions).
 1170
 1171pubsub_id(Thread, Thread).
 1172%pubsub_id(Thread, Id) :-
 1173%    thread_property(Thread, id(TID)),
 1174%    atom_concat('redis_pubsub_', TID, Id).
 1175
 1176redis_subscribe1(Redis, Conn, Channels) :-
 1177    Error = error(Formal, _),
 1178    catch(redis_subscribe2(Redis, Conn, Channels), Error, true),
 1179    (   var(Formal)
 1180    ->  true
 1181    ;   recover(Error, Conn, redis1(Conn, echo("reconnect"), _)),
 1182        thread_self(Me),
 1183        pubsub_id(Me, Id),
 1184        findall(Channel, subscription(Id, Channel), CurrentChannels),
 1185        redis_subscribe1(Redis, Conn, CurrentChannels)
 1186    ).
 1187
 1188redis_subscribe2(Redis, Conn, Channels) :-
 1189    redis_subscribe3(Conn, Channels),
 1190    redis_listen(Redis, Conn).
 1191
 1192redis_subscribe3(Conn, Channels) :-
 1193    thread_self(Me),
 1194    pubsub_id(Me, Id),
 1195    prolog_listen(this_thread_exit, pubsub_clean(Id)),
 1196    maplist(register_subscription(Id), Channels),
 1197    redis_stream(Conn, S, true),
 1198    Req =.. [subscribe|Channels],
 1199    redis_write_msg(S, Req).
 1200
 1201pubsub_clean(Id) :-
 1202    retractall(listening(Id, _Connection, _Thread)),
 1203    retractall(subscription(Id, _Channel)).
 1204
 1205%!  redis_subscribe(+Id, +Channels) is det.
 1206%!  redis_unsubscribe(+Id, +Channels) is det.
 1207%
 1208%   Add/remove channels from for the   subscription. If no subscriptions
 1209%   remain, the listening thread terminates.
 1210%
 1211%   @arg Channels is either a single  channel   or  a list thereof. Each
 1212%   channel specification is either an atom   or a term `A:B:...`, where
 1213%   all parts are atoms.
 1214
 1215redis_subscribe(Id, Spec) :-
 1216    channels(Spec, Channels),
 1217    (   listening(Id, Connection, _Thread)
 1218    ->  true
 1219    ;   existence_error(redis_pubsub, Id)
 1220    ),
 1221    maplist(register_subscription(Id), Channels),
 1222    redis_stream(Connection, S, true),
 1223    Req =.. [subscribe|Channels],
 1224    redis_write_msg(S, Req).
 1225
 1226redis_unsubscribe(Id, Spec) :-
 1227    channels(Spec, Channels),
 1228    (   listening(Id, Connection, _Thread)
 1229    ->  true
 1230    ;   existence_error(redis_pubsub, Id)
 1231    ),
 1232    maplist(unregister_subscription(Id), Channels),
 1233    redis_stream(Connection, S, true),
 1234    Req =.. [unsubscribe|Channels],
 1235    redis_write_msg(S, Req).
 1236
 1237%!  redis_current_subscription(?Id, ?Channels)
 1238%
 1239%   True when a PUB/SUB subscription with Id is listening on Channels.
 1240
 1241redis_current_subscription(Id, Channels) :-
 1242    findall(Id-Channel, subscription(Id, Channel), Pairs),
 1243    keysort(Pairs, Sorted),
 1244    group_pairs_by_key(Sorted, Grouped),
 1245    member(Id-Channels, Grouped).
 1246
 1247channels(Spec, List) :-
 1248    is_list(Spec),
 1249    !,
 1250    maplist(channel_name, Spec, List).
 1251channels(Ch, [Key]) :-
 1252    channel_name(Ch, Key).
 1253
 1254channel_name(Atom, Atom) :-
 1255    atom(Atom),
 1256    !.
 1257channel_name(Key, Atom) :-
 1258    phrase(key_parts(Key), Parts),
 1259    !,
 1260    atomic_list_concat(Parts, :, Atom).
 1261channel_name(Key, _) :-
 1262    type_error(redis_key, Key).
 1263
 1264key_parts(Var) -->
 1265    { var(Var), !, fail }.
 1266key_parts(Atom) -->
 1267    { atom(Atom) },
 1268    !,
 1269    [Atom].
 1270key_parts(A:B) -->
 1271    key_parts(A),
 1272    key_parts(B).
 1273
 1274
 1275
 1276
 1277register_subscription(Id, Channel) :-
 1278    (   subscription(Id, Channel)
 1279    ->  true
 1280    ;   assertz(subscription(Id, Channel))
 1281    ).
 1282
 1283unregister_subscription(Id, Channel) :-
 1284    retractall(subscription(Id, Channel)).
 1285
 1286redis_listen(Redis, Conn) :-
 1287    thread_self(Me),
 1288    pubsub_id(Me, Id),
 1289    setup_call_cleanup(
 1290        assertz(listening(Id, Conn, Me), Ref),
 1291        redis_listen_loop(Redis, Id, Conn),
 1292        erase(Ref)).
 1293
 1294redis_listen_loop(Redis, Id, Conn) :-
 1295    redis_stream(Conn, S, true),
 1296    (   subscription(Id, _)
 1297    ->  redis_read_stream(Redis, S, Reply),
 1298        redis_broadcast(Redis, Reply),
 1299        redis_listen_loop(Redis, Id, Conn)
 1300    ;   true
 1301    ).
 1302
 1303redis_broadcast(_, [subscribe, _Channel, _N]) :-
 1304    !.
 1305redis_broadcast(Redis, [message, Channel, Data]) :-
 1306    !,
 1307    catch(broadcast(redis(Redis, Channel, Data)),
 1308          Error,
 1309          print_message(error, Error)).
 1310redis_broadcast(Redis, Message) :-
 1311    assertion((Message = [Type, Channel, _Data],
 1312               atom(Type),
 1313               atom(Channel))),
 1314    debug(redis(warning), '~p: Unknown message while listening: ~p',
 1315          [Redis,Message]).
 1316
 1317
 1318		 /*******************************
 1319		 *          READ/WRITE		*
 1320		 *******************************/
 1321
 1322%!  redis_read_stream(+Redis, +Stream, -Term) is det.
 1323%
 1324%   Read a message from a Redis stream.  Term is one of
 1325%
 1326%     - A list of terms (array)
 1327%     - A list of pairs (map, RESP3 only)
 1328%     - The atom `nil`
 1329%     - A number
 1330%     - A term status(String)
 1331%     - A string
 1332%     - A boolean (`true` or `false`).  RESP3 only.
 1333%
 1334%   If something goes wrong, the connection   is closed and an exception
 1335%   is raised.
 1336
 1337redis_read_stream(Redis, SI, Out) :-
 1338    E = error(Formal,_),
 1339    catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true),
 1340    (   var(Formal)
 1341    ->  handle_push_messages(Push, Redis),
 1342        (   var(Error)
 1343        ->  Out = Out0
 1344        ;   resync(Redis),
 1345            throw(Error)
 1346        )
 1347    ;   redis_disconnect(Redis, [force(true)]),
 1348        throw(E)
 1349    ).
 1350
 1351handle_push_messages([], _).
 1352handle_push_messages([H|T], Redis) :-
 1353    (   catch(handle_push_message(H, Redis), E,
 1354              print_message(warning, E))
 1355    ->  true
 1356    ;   true
 1357    ),
 1358    handle_push_messages(T, Redis).
 1359
 1360handle_push_message(["pubsub"|List], Redis) :-
 1361    redis_broadcast(Redis, List).
 1362% some protocol version 3 push messages (such as
 1363% __keyspace@* events) seem to come directly
 1364% without a pubsub header
 1365handle_push_message([message|List], Redis) :-
 1366    redis_broadcast(Redis, [message|List]).
 1367
 1368
 1369%!  resync(+Redis) is det.
 1370%
 1371%   Re-synchronize  after  an  error.  This  may  happen  if  some  type
 1372%   conversion fails and we have read  a   partial  reply. It is hard to
 1373%   figure out what to read from where we are, so we echo a random magic
 1374%   sequence and read until we find the reply.
 1375
 1376resync(Redis) :-
 1377    E = error(Formal,_),
 1378    catch(do_resync(Redis), E, true),
 1379    (   var(Formal)
 1380    ->  true
 1381    ;   redis_disconnect(Redis, [force(true)])
 1382    ).
 1383
 1384do_resync(Redis) :-
 1385    A is random(1_000_000_000),
 1386    redis_stream(Redis, S, true),
 1387    redis_write_msg(S, echo(A)),
 1388    catch(call_with_time_limit(0.2, '$redis_resync'(S, A)),
 1389          time_limit_exceeded,
 1390          throw(error(time_limit_exceeded,_))).
 1391
 1392
 1393%!  redis_read_msg(+Stream, -Message, -Error, -PushMessages) is det.
 1394%!  redis_write_msg(+Stream, +Message) is det.
 1395%
 1396%   Read/write a Redis message. Both these predicates are in the foreign
 1397%   module `redis4pl`.
 1398%
 1399%   @arg PushMessages is a list of push   messages that may be non-[] if
 1400%   protocol version 3 (see redis_connect/3) is selected. Using protocol
 1401%   version 2 this list is always empty.
 1402
 1403
 1404
 1405		 /*******************************
 1406		 *            MESSAGES		*
 1407		 *******************************/
 1408
 1409:- multifile
 1410    prolog:error_message//1,
 1411    prolog:message//1. 1412
 1413prolog:error_message(redis_error(Code, String)) -->
 1414    [ 'REDIS: ~w: ~s'-[Code, String] ].
 1415
 1416prolog:message(redis(retry(_Redis, _Failures, Wait, Error))) -->
 1417    [ 'REDIS: connection error.  Retrying in ~2f seconds'-[Wait], nl ],
 1418    [ '    '-[] ], '$messages':translate_message(Error)