View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker and Sean Charles
    4    E-mail:        jan@swi-prolog.org and <sean at objitsu dot com>
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2013-2022, Sean Charles
    7                              SWI-Prolog Solutions b.v.
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34
   35    NOTE
   36
   37    The original code was subject to the MIT licence and written by
   38    Sean Charles.  Re-licenced to standard SWI-Prolog BSD-2 with
   39    permission from Sean Charles.
   40*/
   41
   42:- module(redis,
   43          [ redis_server/3,             % +Alias, +Address, +Options
   44            redis_connect/1,            % -Connection
   45            redis_connect/3,            % -Connection, +Host, +Port
   46            redis_disconnect/1,         % +Connection
   47            redis_disconnect/2,         % +Connection, +Options
   48                                        % Queries
   49            redis/1,                    % +Request
   50            redis/2,                    % +Connection, +Request
   51            redis/3,                    % +Connection, +Request, -Reply
   52                                        % High level queries
   53            redis_get_list/3,           % +Redis, +Key, -List
   54            redis_get_list/4,           % +Redis, +Key, +ChunkSize, -List
   55            redis_set_list/3,           % +Redis, +Key, +List
   56            redis_get_hash/3,           % +Redis, +Key, -Data:dict
   57            redis_set_hash/3,           % +Redis, +Key, +Data:dict
   58            redis_scan/3,               % +Redis, -LazyList, +Options
   59            redis_sscan/4,              % +Redis, +Set, -LazyList, +Options
   60            redis_hscan/4,              % +Redis, +Hash, -LazyList, +Options
   61            redis_zscan/4,              % +Redis, +Set, -LazyList, +Options
   62                                        % Publish/Subscribe
   63            redis_subscribe/4,          % +Redis, +Channels, -Id, +Options
   64            redis_subscribe/2,          % +Id, +Channels
   65            redis_unsubscribe/2,        % +Id, +Channels
   66            redis_current_subscription/2, % ?Id,?Channels
   67            redis_write/2,              % +Redis, +Command
   68            redis_read/2,               % +Redis, -Reply
   69                                        % Building blocks
   70            redis_array_dict/3,         % ?Array, ?Tag, ?Dict
   71                                        % Admin stuff
   72            redis_property/2,           % +Reply, ?Property
   73            redis_current_command/2,    % +Redis,?Command
   74            redis_current_command/3     % +Redis, +Command, -Properties
   75          ]).   76:- autoload(library(socket), [tcp_connect/3]).   77:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]).   78:- autoload(library(broadcast), [broadcast/1]).   79:- autoload(library(error),
   80            [ must_be/2,
   81	      type_error/2,
   82              instantiation_error/1,
   83              uninstantiation_error/1,
   84              existence_error/2,
   85              existence_error/3
   86            ]).   87:- autoload(library(lazy_lists), [lazy_list/2]).   88:- autoload(library(lists), [append/3, member/2]).   89:- autoload(library(option), [merge_options/3, option/2,
   90			      option/3, select_option/4]).   91:- autoload(library(pairs), [group_pairs_by_key/2]).   92:- autoload(library(time), [call_with_time_limit/2]).   93:- use_module(library(debug), [debug/3, assertion/1]).   94:- use_module(library(settings), [setting/4, setting/2]).   95:- if(exists_source(library(ssl))).   96:- autoload(library(ssl), [ssl_context/3, ssl_negotiate/5]).   97:- endif.   98
   99:- use_foreign_library(foreign(redis4pl)).  100
  101:- setting(max_retry_count, nonneg, 8640, % one day
  102           "Max number of retries").  103:- setting(max_retry_wait, number, 10,
  104           "Max time to wait between recovery attempts").  105:- setting(sentinel_timeout, number, 0.2,
  106	   "Time to wait for a sentinel").  107
  108:- predicate_options(redis_server/3, 3,
  109                     [ pass_to(redis:redis_connect/3, 3)
  110                     ]).  111:- predicate_options(redis_connect/3, 3,
  112                     [ reconnect(boolean),
  113                       user(atom),
  114                       password(atomic),
  115                       version(between(2,3))
  116                     ]).  117:- predicate_options(redis_disconnect/2, 2,
  118                     [ force(boolean)
  119                     ]).  120:- predicate_options(redis_scan/3, 3,
  121                     [ match(atomic),
  122                       count(nonneg),
  123                       type(atom)
  124                     ]).  125% Actually not passing, but the same
  126:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  127:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  128:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).

Redis client

This library is a client to Redis, a popular key value store to deal with caching and communication between micro services.

In the typical use case we register the details of one or more Redis servers using redis_server/3. Subsequenly, redis/2-3 is used to issue commands on the server. For example:

?- redis_server(default, redis:6379, [password("secret")]).
?- redis(default, set(user, "Bob")).
?- redis(default, get(user), User).
User = "Bob"

*/

  149:- dynamic server/3.  150
  151:- dynamic ( connection/2,              % ServerName, Stream
  152	     sentinel/2			% Pool, Address
  153           ) as volatile.
 redis_server(+ServerName, +Address, +Options) is det
Register a redis server without connecting to it. The ServerName acts as a lazy connection alias. Initially the ServerName default points at localhost:6379 with no connect options. The default server is used for redis/1 and redis/2 and may be changed using this predicate. Options are described with redis_connect/3.

Connections established this way are by default automatically reconnected if the connection is lost for some reason unless a reconnect(false) option is specified.

  167redis_server(Alias, Address, Options) :-
  168    must_be(ground, Alias),
  169    retractall(server(Alias, _, _)),
  170    asserta(server(Alias, Address, Options)).
  171
  172server(default, localhost:6379, []).
 redis_connect(-Connection) is det
 redis_connect(+Address, -Connection, +Options) is det
redis_connect(-Connection, +Host, +Port) is det
Connect to a redis server. The main mode is redis_connect(+Address, -Connection, +Options). redis_connect/1 is equivalent to redis_connect(localhost:6379, Connection, []). Options:
reconnect(+Boolean)
If true, try to reconnect to the service when the connection seems lost. Default is true for connections specified using redis_server/3 and false for explictly opened connections.
user(+User)
If version(3) and password(Password) are specified, these are used to authenticate using the HELLO command.
password(+Password)
Authenticate using Password
version(+Version)
Specify the connection protocol version. Initially this is version 2. Redis 6 also supports version 3. When specified as 3, the HELLO command is used to upgrade the protocol.
tls(true)
When specified, initiate a TLS connection. If this option is specified we must also specify the cacert, key and cert options.
cacert(+File)
CA Certificate file to verify with.
cert(+File)
Client certificate to authenticate with.
key(+File)
Private key file to authenticate with.
sentinels(+ListOfAddresses)
Used together with an Address of the form sentinel(MasterName) to enable contacting a network of Redis servers guarded by a sentinel network.
sentinel_user(+User)
sentinel_password(+Password)
Authentication information for the senitels. When omitted we try to connect withour authentication.

Instead of using these predicates, redis/2 and redis/3 are normally used with a server name argument registered using redis_server/3. These predicates are meant for creating a temporary paralel connection or using a connection with a blocking call.

Arguments:
Address- is a term Host:Port, unix(File) or the name of a server registered using redis_server/3. The latter realises a new connection that is typically used for blocking redis commands such as listening for published messages, waiting on a list or stream.
Compatibility
- redis_connect(-Connection, +Host, +Port) provides compatibility to the original GNU-Prolog interface and is equivalent to redis_connect(Host:Port, Connection, []).
  228redis_connect(Conn) :-
  229    redis_connect(default, Conn, []).
  230
  231redis_connect(Conn, Host, Port) :-
  232    var(Conn),
  233    ground(Host), ground(Port),
  234    !,                                  % GNU-Prolog compatibility
  235    redis_connect(Host:Port, Conn, []).
  236redis_connect(Server, Conn, Options) :-
  237    atom(Server),
  238    !,
  239    (   server(Server, Address, DefaultOptions)
  240    ->  merge_options(Options, DefaultOptions, Options2),
  241        do_connect(Server, Address, Conn, [address(Address)|Options2])
  242    ;   existence_error(redis_server, Server)
  243    ).
  244redis_connect(Address, Conn, Options) :-
  245    do_connect(Address, Address, Conn, [address(Address)|Options]).
 do_connect(+Id, +Address, -Conn, +Options)
Open the connection. A connection is a compound term of the shape
redis_connection(Id, Stream, Failures, Options)
  253do_connect(Id, sentinel(Pool), Conn, Options) =>
  254    sentinel_master(Id, Pool, Conn, Options).
  255do_connect(Id, Address0, Conn, Options) =>
  256    tcp_address(Address0, Address),
  257    tcp_connect(Address, Stream0, Options),
  258    tls_upgrade(Address, Stream0, Stream, Options),
  259    Conn = redis_connection(Id, Stream, 0, Options),
  260    hello(Conn, Options).
  261
  262tcp_address(unix(Path), Path) :-
  263    !.                                  % Using an atom is ambiguous
  264tcp_address(Address, Address).
 tls_upgrade(+Address, +Raw, -Stream, +Options) is det
Upgrade to a TLS connection when tls(true) is specified.
  270:- if(current_predicate(ssl_context/3)).  271tls_upgrade(Host:_Port, Raw, Stream, Options) :-
  272    option(tls(true), Options),
  273    !,
  274    must_have_option(cacert(CacertFile), Options),
  275    must_have_option(key(KeyFile), Options),
  276    must_have_option(cert(CertFile), Options),
  277    ssl_context(client, SSL,
  278		[ host(Host),
  279		  certificate_file(CertFile),
  280		  key_file(KeyFile),
  281		  cacerts([file(CacertFile)]),
  282		  cert_verify_hook(tls_verify),
  283		  close_parent(true)
  284		]),
  285    stream_pair(Raw, RawRead, RawWrite),
  286    ssl_negotiate(SSL, RawRead, RawWrite, Read, Write),
  287    stream_pair(Stream, Read, Write).
  288:- endif.  289tls_upgrade(_, Stream, Stream, _).
  290
  291:- if(current_predicate(ssl_context/3)).
 tls_verify(+SSL, +ProblemCert, +AllCerts, +FirstCert, +Status) is semidet
Accept or reject the certificate verification. Similar to the Redis command line client (redis-cli), we accept the certificate as long as it is signed, not verifying the hostname.
  299:- public tls_verify/5.  300tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, verified) :-
  301    !.
  302tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, hostname_mismatch) :-
  303    !.
  304tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, _Error) :-
  305    fail.
  306
  307:- endif.
 sentinel_master(+ServerId, +SetinelPool, -Connection, +Options) is det
Discover the master and connect to it.
  313sentinel_master(Id, Pool, Master, Options) :-
  314    must_have_option(sentinels(Sentinels), Options),
  315    sentinel_auth(Options, Options1),
  316    setting(sentinel_timeout, TMO),
  317    (   sentinel(Pool, Sentinel)
  318    ;   member(Sentinel, Sentinels)
  319    ),
  320    catch(call_with_time_limit(
  321	      TMO,
  322	      do_connect(Id, Sentinel, Conn,
  323			 [sentinel(true)|Options1])),
  324	  Error,
  325	  (print_message(warning, Error),fail)),
  326    !,
  327    debug(redis(sentinel), 'Connected to sentinel at ~p', [Sentinel]),
  328    call_cleanup(
  329	query_sentinel(Pool, Conn, Sentinel, MasterAddr),
  330	redis_disconnect(Conn)),
  331    debug(redis(sentinel), 'Sentinel claims master is at ~p', [MasterAddr]),
  332    do_connect(Id, MasterAddr, Master, Options),
  333    debug(redis(sentinel), 'Connected to claimed master', []),
  334    redis(Master, role, Role),
  335    (   Role = [master|_Slaves]
  336    ->  debug(redis(sentinel), 'Verified role at ~p', [MasterAddr])
  337    ;   redis_disconnect(Master),
  338	debug(redis(sentinel), '~p is not the master: ~p', [MasterAddr, Role]),
  339	sleep(TMO),
  340	sentinel_master(Id, Pool, Master, Options)
  341    ).
  342
  343sentinel_auth(Options0, Options) :-
  344    option(sentinel_user(User), Options0),
  345    option(sentinel_password(Passwd), Options0),
  346    !,
  347    merge_options([user(User), password(Passwd)], Options0, Options).
  348sentinel_auth(Options0, Options) :-
  349    select_option(password(_), Options0, Options, _).
  350
  351
  352query_sentinel(Pool, Conn, Sentinel, Host:Port) :-
  353    redis(Conn, sentinel('get-master-addr-by-name', Pool), MasterData),
  354    MasterData = [Host,Port],
  355    redis(Conn, sentinel(sentinels, Pool), Peers),
  356    transaction(update_known_sentinels(Pool, Sentinel, Peers)).
  357
  358update_known_sentinels(Pool, Sentinel, Peers) :-
  359    retractall(sentinel(Pool, _)),
  360    maplist(update_peer_sentinel(Pool), Peers),
  361    asserta(sentinel(Pool, Sentinel)).
  362
  363update_peer_sentinel(Pool, Attrs),
  364  memberchk(ip-Host, Attrs),
  365  memberchk(port-Port, Attrs) =>
  366    asserta(sentinel(Pool, Host:Port)).
  367
  368must_have_option(Opt, Options) :-
  369    option(Opt, Options),
  370    !.
  371must_have_option(Opt, Options) :-
  372    existence_error(option, Opt, Options).
 hello(+Connection, +Option)
Initialize the connection. This is used to upgrade to the RESP3 protocol and/or to authenticate.
  379hello(Con, Options) :-
  380    option(version(V), Options),
  381    V >= 3,
  382    !,
  383    (   option(user(User), Options),
  384        option(password(Password), Options)
  385    ->  redis(Con, hello(3, auth, User, Password))
  386    ;   redis(Con, hello(3))
  387    ).
  388hello(Con, Options) :-
  389    option(password(Password), Options),
  390    !,
  391    redis(Con, auth(Password)).
  392hello(_, _).
 redis_stream(+Spec, --Stream, +DoConnect) is det
Get the stream to a Redis server from Spec. Spec is either the name of a registered server or a term redis_connection(Id,Stream,Failures,Options). If the stream is disconnected it will be reconnected.
  401redis_stream(Var, S, _) :-
  402    (   var(Var)
  403    ->  !, instantiation_error(Var)
  404    ;   nonvar(S)
  405    ->  !, uninstantiation_error(S)
  406    ).
  407redis_stream(ServerName, S, Connect) :-
  408    atom(ServerName),
  409    !,
  410    (   connection(ServerName, S0)
  411    ->  S = S0
  412    ;   Connect == true,
  413        server(ServerName, Address, Options)
  414    ->  redis_connect(Address, Connection, Options),
  415        redis_stream(Connection, S, false),
  416        asserta(connection(ServerName, S))
  417    ;   existence_error(redis_server, ServerName)
  418    ).
  419redis_stream(redis_connection(_,S0,_,_), S, _) :-
  420    S0 \== (-),
  421    !,
  422    S = S0.
  423redis_stream(Redis, S, _) :-
  424    Redis = redis_connection(Id,-,_,Options),
  425    option(address(Address), Options),
  426    do_connect(Id,Address,Redis2,Options),
  427    arg(2, Redis2, S0),
  428    nb_setarg(2, Redis, S0),
  429    S = S0.
  430
  431has_redis_stream(Var, _) :-
  432    var(Var),
  433    !,
  434    instantiation_error(Var).
  435has_redis_stream(Alias, S) :-
  436    atom(Alias),
  437    !,
  438    connection(Alias, S).
  439has_redis_stream(redis_connection(_,S,_,_), S) :-
  440    S \== (-).
 redis_disconnect(+Connection) is det
 redis_disconnect(+Connection, +Options) is det
Disconnect from a redis server. The second form takes one option, similar to close/2:
force(Force)
When true (default false), do not raise any errors if Connection does not exist or closing the connection raises a network or I/O related exception. This version is used internally if a connection is in a broken state, either due to a protocol error or a network issue.
  456redis_disconnect(Redis) :-
  457    redis_disconnect(Redis, []).
  458
  459redis_disconnect(Redis, Options) :-
  460    option(force(true), Options),
  461    !,
  462    (   Redis = redis_connection(_Id, S, _, _Opts)
  463    ->  (   S == (-)
  464        ->  true
  465        ;   close(S, [force(true)]),
  466            nb_setarg(2, Redis, -)
  467        )
  468    ;   has_redis_stream(Redis, S)
  469    ->  close(S, [force(true)]),
  470        retractall(connection(_,S))
  471    ;   true
  472    ).
  473redis_disconnect(Redis, _Options) :-
  474    redis_stream(Redis, S, false),
  475    close(S),
  476    retractall(connection(_,S)).
 redis(+Connection, +Request) is semidet
This predicate is overloaded to handle two types of requests. First, it is a shorthand for redis(Connection, Command, _) and second, it can be used to exploit Redis pipelines and transactions. The second form is acticated if Request is a list. In that case, each element of the list is either a term Command -> Reply or a simple Command. Semantically this represents a sequence of redis/3 and redis/2 calls. It differs in the following aspects:

Procedurally, the process takes the following steps:

  1. Send all commands
  2. Read all replies and push messages
  3. Handle all callbacks from push messages
  4. Check whether one of the replies is an error. If so, raise this error (subsequent errors are lost)
  5. Bind all replies for the Command -> Reply terms.

Examples

?- redis(default,
         [ lpush(li,1),
           lpush(li,2),
           lrange(li,0,-1) -> List
         ]).
List = ["2", "1"].
  520redis(Redis, PipeLine) :-
  521    is_list(PipeLine),
  522    !,
  523    redis_pipeline(Redis, PipeLine).
  524redis(Redis, Req) :-
  525    redis(Redis, Req, _).
 redis(+Connection, +Command, -Reply) is semidet
Execute a redis Command on Connnection. Next, bind Reply to the returned result. Command is a callable term whose functor is the name of the Redis command and whose arguments are translated to Redis arguments according to the rules below. Note that all text is always represented using UTF-8 encoding.

Reply is either a plain term (often a variable) or a term Value as Type. In the latter form, Type dictates how the Redis bulk reply is translated to Prolog. The default equals to auto, i.e., as a number of the content satisfies the Prolog number syntax and as an atom otherwise.

Redis bulk replies are translated depending on the as Type as explained above.

string
string(Encoding)
Create a SWI-Prolog string object interpreting the blob as following Encoding. Encoding is a restricted set of SWI-Prolog's encodings: bytes (iso_latin_1), utf8 and text (the current locale translation).
atom
atom(Encoding)
As above, producing an atom.
codes
codes(Encoding)
As above, producing a list of integers (Unicode code points)
chars
chars(Encoding)
As above, producing a list of one-character atoms.
integer
float
rational
number
Interpret the bytes as a string representing a number. If the string does not represent a number of the requested type a type_error(Type, String) is raised.
tagged_integer
Same as integer, but demands the value to be between the Prolog flags min_tagged_integer and max_tagged_integer, allowing the value to be used as a dict key.
auto
Same as auto(atom, number)
auto(AsText, AsNumber)
If the bulk string confirms the syntax of AsNumber, convert the value to the requested numberical type. Else convert the value to text according to AsText. This is similar to the Prolog predicate name/2.
dict_key
Alias for auto(atom,tagged_integer). This allows the value to be used as a key for a SWI-Prolog dict.
pairs(AsKey, AsValue)
Convert a map or array of even length into pairs for which the key satisfies AsKey and the value AsValue. The pairs type can also be applied to a Redis array. In this case the array length must be even. This notably allows fetching a Redis hash as pairs using HGETALL using version 2 of the Redis protocol.
dict(AsKey, AsValue)
Similar to pairs(AsKey, AsValue), but convert the resulting pair list into a SWI-Prolog dict. AsKey must convert to a valid dict key, i.e., an atom or tagged integer. See dict_key.
dict(AsValue)
Shorthand for dict(dict_key, AsValue).

Here are some simple examples

?- redis(default, set(a, 42), X).
X = status("OK").
?- redis(default, get(a), X).
X = "42".
?- redis(default, get(a), X as integer).
X = 42.
?- redis(default, get(a), X as float).
X = 42.0.
?- redis(default, set(swipl:version, 8)).
true.
?- redis(default, incr(swipl:version), X).
X = 9.
Errors
- redis_error(Code, String)
  645redis(Redis, Req, Out) :-
  646    out_val(Out, Val),
  647    redis1(Redis, Req, Out),
  648    Val \== nil.
  649
  650out_val(Out, Val) :-
  651    (   nonvar(Out),
  652        Out = (Val as _)
  653    ->  true
  654    ;   Val = Out
  655    ).
  656
  657redis1(Redis, Req, Out) :-
  658    Error = error(Formal, _),
  659    catch(redis2(Redis, Req, Out), Error, true),
  660    (   var(Formal)
  661    ->  true
  662    ;   recover(Error, Redis, redis1(Redis, Req, Out))
  663    ).
  664
  665redis2(Redis, Req, Out) :-
  666    atom(Redis),
  667    !,
  668    redis_stream(Redis, S, true),
  669    with_mutex(Redis,
  670               ( redis_write_msg(S, Req),
  671                 redis_read_stream(Redis, S, Out)
  672               )).
  673redis2(Redis, Req, Out) :-
  674    redis_stream(Redis, S, true),
  675    redis_write_msg(S, Req),
  676    redis_read_stream(Redis, S, Out).
 redis_pipeline(+Redis, +PipeLine)
  680redis_pipeline(Redis, PipeLine) :-
  681    Error = error(Formal, _),
  682    catch(redis_pipeline2(Redis, PipeLine), Error, true),
  683    (   var(Formal)
  684    ->  true
  685    ;   recover(Error, Redis, redis_pipeline(Redis, PipeLine))
  686    ).
  687
  688redis_pipeline2(Redis, PipeLine) :-
  689    atom(Redis),
  690    !,
  691    redis_stream(Redis, S, true),
  692    with_mutex(Redis,
  693               redis_pipeline3(Redis, S, PipeLine)).
  694redis_pipeline2(Redis, PipeLine) :-
  695    redis_stream(Redis, S, true),
  696    redis_pipeline3(Redis, S, PipeLine).
  697
  698redis_pipeline3(Redis, S, PipeLine) :-
  699    maplist(write_pipeline(S), PipeLine),
  700    flush_output(S),
  701    read_pipeline(Redis, S, PipeLine).
  702
  703write_pipeline(S, Command -> _Reply) :-
  704    !,
  705    redis_write_msg_no_flush(S, Command).
  706write_pipeline(S, Command) :-
  707    redis_write_msg_no_flush(S, Command).
  708
  709read_pipeline(Redis, S, PipeLine) :-
  710    E = error(Formal,_),
  711    catch(read_pipeline2(Redis, S, PipeLine), E, true),
  712    (   var(Formal)
  713    ->  true
  714    ;   reconnect_error(E)
  715    ->  redis_disconnect(Redis, [force(true)]),
  716        throw(E)
  717    ;   resync(Redis),
  718        throw(E)
  719    ).
  720
  721read_pipeline2(Redis, S, PipeLine) :-
  722    maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed),
  723    maplist(handle_push(Redis), Pushed),
  724    maplist(handle_error, Errors),
  725    maplist(bind_reply, PipeLine, Replies).
  726
  727redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :-
  728    !,
  729    redis_read_msg(S, ReplyIn, Reply, Error, Push).
  730redis_read_msg3(S, Var, Reply, Error, Push) :-
  731    redis_read_msg(S, Var, Reply, Error, Push).
  732
  733handle_push(Redis, Pushed) :-
  734    handle_push_messages(Pushed, Redis).
  735handle_error(Error) :-
  736    (   var(Error)
  737    ->  true
  738    ;   throw(Error)
  739    ).
  740bind_reply(_Command -> Reply0, Reply) :-
  741    !,
  742    Reply0 = Reply.
  743bind_reply(_Command, _).
 recover(+Error, +Redis, :Goal)
Error happened while running Goal on Redis. If this is a recoverable error (i.e., a network or disconnected peer), wait a little and try running Goal again.
  752:- meta_predicate recover(+, +, 0).  753
  754recover(Error, Redis, Goal) :-
  755    Error = error(Formal, _),
  756    reconnect_error(Formal),
  757    auto_reconnect(Redis),
  758    !,
  759    debug(redis(recover), '~p: got error ~p; trying to reconnect',
  760          [Redis, Error]),
  761    redis_disconnect(Redis, [force(true)]),
  762    (   wait_to_retry(Redis, Error)
  763    ->  call(Goal),
  764        retractall(failure(Redis, _))
  765    ;   throw(Error)
  766    ).
  767recover(Error, _, _) :-
  768    throw(Error).
  769
  770auto_reconnect(redis_connection(_,_,_,Options)) :-
  771    !,
  772    option(reconnect(true), Options).
  773auto_reconnect(Server) :-
  774    ground(Server),
  775    server(Server, _, Options),
  776    option(reconnect(true), Options, true).
  777
  778reconnect_error(io_error(_Action, _On)).
  779reconnect_error(socket_error(_Code, _)).
  780reconnect_error(syntax_error(unexpected_eof)).
  781reconnect_error(existence_error(stream, _)).
 wait(+Redis, +Error)
Wait for some time after a failure. First we wait for 10ms. This is doubled on each failure upto the setting max_retry_wait. If the setting max_retry_count is exceeded we fail and the called signals an exception.
  790:- dynamic failure/2 as volatile.  791
  792wait_to_retry(Redis, Error) :-
  793    redis_failures(Redis, Failures),
  794    setting(max_retry_count, Count),
  795    Failures < Count,
  796    Failures2 is Failures+1,
  797    redis_set_failures(Redis, Failures2),
  798    setting(max_retry_wait, MaxWait),
  799    Wait is min(MaxWait*100, 1<<Failures)/100.0,
  800    debug(redis(recover), '  Sleeping ~p seconds', [Wait]),
  801    retry_message_level(Failures, Level),
  802    print_message(Level, redis(retry(Redis, Failures, Wait, Error))),
  803    sleep(Wait).
  804
  805redis_failures(redis_connection(_,_,Failures0,_), Failures) :-
  806    !,
  807    Failures = Failures0.
  808redis_failures(Server, Failures) :-
  809    atom(Server),
  810    (   failure(Server, Failures)
  811    ->  true
  812    ;   Failures = 0
  813    ).
  814
  815redis_set_failures(Connection, Count) :-
  816    compound(Connection),
  817    !,
  818    nb_setarg(3, Connection, Count).
  819redis_set_failures(Server, Count) :-
  820    atom(Server),
  821    retractall(failure(Server, _)),
  822    asserta(failure(Server, Count)).
  823
  824retry_message_level(0, warning) :- !.
  825retry_message_level(_, silent).
 redis(+Request)
Connect to the default redis server, call redist/3 using Request, disconnect and print the result. This predicate is intended for interactive usage.
  834redis(Req) :-
  835    setup_call_cleanup(
  836        redis_connect(default, C, []),
  837        redis1(C, Req, Out),
  838        redis_disconnect(C)),
  839    print(Out).
 redis_write(+Redis, +Command) is det
 redis_read(+Redis, -Reply) is det
Write command and read replies from a Redis server. These are building blocks for subscribing to event streams.
  847redis_write(Redis, Command) :-
  848    redis_stream(Redis, S, true),
  849    redis_write_msg(S, Command).
  850
  851redis_read(Redis, Reply) :-
  852    redis_stream(Redis, S, true),
  853    redis_read_stream(Redis, S, Reply).
  854
  855
  856		 /*******************************
  857		 *      HIGH LEVEL ACCESS	*
  858		 *******************************/
 redis_get_list(+Redis, +Key, -List) is det
 redis_get_list(+Redis, +Key, +ChunkSize, -List) is det
Get the content of a Redis list in List. If ChunkSize is given and smaller than the list length, List is returned as a lazy list. The actual values are requested using redis LRANGE requests. Note that this results in O(N^2) complexity. Using a lazy list is most useful for relatively short lists holding possibly large items.

Note that values retrieved are strings, unless the value was added using Term as prolog.

See also
- lazy_list/2 for a discussion on the difference between lazy lists and normal lists.
  875redis_get_list(Redis, Key, List) :-
  876    redis_get_list(Redis, Key, -1, List).
  877
  878redis_get_list(Redis, Key, Chunk, List) :-
  879    redis(Redis, llen(Key), Len),
  880    (   (   Chunk >= Len
  881        ;   Chunk == -1
  882        )
  883    ->  (   Len == 0
  884        ->  List = []
  885        ;   End is Len-1,
  886            list_range(Redis, Key, 0, End, List)
  887        )
  888    ;   lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List)
  889    ).
  890
  891rlist_next(State, List, Tail) :-
  892    State = s(Redis,Key,Offset,Slice,Len),
  893    End is min(Len-1, Offset+Slice-1),
  894    list_range(Redis, Key, Offset, End, Elems),
  895    (   End =:= Len-1
  896    ->  List = Elems,
  897        Tail = []
  898    ;   Offset2 is Offset+Slice,
  899        nb_setarg(3, State, Offset2),
  900        append(Elems, Tail, List)
  901    ).
  902
  903% Redis LRANGE demands End > Start and returns inclusive.
  904
  905list_range(DB, Key, Start, Start, [Elem]) :-
  906    !,
  907    redis(DB, lindex(Key, Start), Elem).
  908list_range(DB, Key, Start, End, List) :-
  909    !,
  910    redis(DB, lrange(Key, Start, End), List).
 redis_set_list(+Redis, +Key, +List) is det
Associate a Redis key with a list. As Redis has no concept of an empty list, if List is [], Key is deleted. Note that key values are always strings in Redis. The same conversion rules as for redis/1-3 apply.
  921redis_set_list(Redis, Key, List) :-
  922    redis(Redis, del(Key), _),
  923    (   List == []
  924    ->  true
  925    ;   Term =.. [rpush,Key|List],
  926        redis(Redis, Term, _Count)
  927    ).
 redis_get_hash(+Redis, +Key, -Data:dict) is det
 redis_set_hash(+Redis, +Key, +Data:dict) is det
Put/get a Redis hash as a Prolog dict. Putting a dict first deletes Key. Note that in many cases applications will manage Redis hashes by key. redis_get_hash/3 is notably a user friendly alternative to the Redis HGETALL command. If the Redis hash is not used by other (non-Prolog) applications one may also consider using the Term as prolog syntax to store the Prolog dict as-is.
  940redis_get_hash(Redis, Key, Dict) :-
  941    redis(Redis, hgetall(Key), Dict as dict(auto)).
  942
  943redis_set_hash(Redis, Key, Dict) :-
  944    redis_array_dict(Array, _, Dict),
  945    Term =.. [hset,Key|Array],
  946    redis(Redis, del(Key), _),
  947    redis(Redis, Term, _Count).
 redis_array_dict(?Array, ?Tag, ?Dict) is det
Translate a Redis reply representing hash data into a SWI-Prolog dict. Array is either a list of alternating keys and values or a list of pairs. When translating to an array, this is always a list of alternating keys and values.
Arguments:
Tag- is the SWI-Prolog dict tag.
  958redis_array_dict(Array, Tag, Dict) :-
  959    nonvar(Array),
  960    !,
  961    array_to_pairs(Array, Pairs),
  962    dict_pairs(Dict, Tag, Pairs).
  963redis_array_dict(TwoList, Tag, Dict) :-
  964    dict_pairs(Dict, Tag, Pairs),
  965    pairs_to_array(Pairs, TwoList).
  966
  967array_to_pairs([], []) :-
  968    !.
  969array_to_pairs([NameS-Value|T0], [Name-Value|T]) :-
  970    !,                                  % RESP3 returns a map as pairs.
  971    atom_string(Name, NameS),
  972    array_to_pairs(T0, T).
  973array_to_pairs([NameS,Value|T0], [Name-Value|T]) :-
  974    atom_string(Name, NameS),
  975    array_to_pairs(T0, T).
  976
  977pairs_to_array([], []) :-
  978    !.
  979pairs_to_array([Name-Value|T0], [NameS,Value|T]) :-
  980    atom_string(Name, NameS),
  981    pairs_to_array(T0, T).
 redis_scan(+Redis, -LazyList, +Options) is det
 redis_sscan(+Redis, +Set, -LazyList, +Options) is det
 redis_hscan(+Redis, +Hash, -LazyList, +Options) is det
 redis_zscan(+Redis, +Set, -LazyList, +Options) is det
Map the Redis SCAN, SSCAN, HSCAN and ZSCAN` commands into a lazy list. For redis_scan/3 and redis_sscan/4 the result is a list of strings. For redis_hscan/4 and redis_zscan/4, the result is a list of pairs. Options processed:
match(Pattern)
Adds the MATCH subcommand, only returning matches for Pattern.
count(Count)
Adds the COUNT subcommand, giving a hint to the size of the chunks fetched.
type(Type)
Adds the TYPE subcommand, only returning answers of the indicated type.
See also
- lazy_list/2.
 1005redis_scan(Redis, LazyList, Options) :-
 1006    scan_options([match,count,type], Options, Parms),
 1007    lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList).
 1008
 1009redis_sscan(Redis, Set, LazyList, Options) :-
 1010    scan_options([match,count,type], Options, Parms),
 1011    lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList).
 1012
 1013redis_hscan(Redis, Hash, LazyList, Options) :-
 1014    scan_options([match,count,type], Options, Parms),
 1015    lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList).
 1016
 1017redis_zscan(Redis, Set, LazyList, Options) :-
 1018    scan_options([match,count,type], Options, Parms),
 1019    lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList).
 1020
 1021scan_options([], _, []).
 1022scan_options([H|T0], Options, [H,V|T]) :-
 1023    Term =.. [H,V],
 1024    option(Term, Options),
 1025    !,
 1026    scan_options(T0, Options, T).
 1027scan_options([_|T0], Options, T) :-
 1028    scan_options(T0, Options, T).
 1029
 1030
 1031scan_next(State, List, Tail) :-
 1032    State = s(Command,Redis,Cursor,Params),
 1033    Command =.. CList,
 1034    append(CList, [Cursor|Params], CList2),
 1035    Term =.. CList2,
 1036    redis(Redis, Term, [NewCursor,Elems0]),
 1037    scan_pairs(Command, Elems0, Elems),
 1038    (   NewCursor == 0
 1039    ->  List = Elems,
 1040        Tail = []
 1041    ;   nb_setarg(3, State, NewCursor),
 1042        append(Elems, Tail, List)
 1043    ).
 1044
 1045scan_pairs(hscan(_), List, Pairs) :-
 1046    !,
 1047    scan_pairs(List, Pairs).
 1048scan_pairs(zscan(_), List, Pairs) :-
 1049    !,
 1050    scan_pairs(List, Pairs).
 1051scan_pairs(_, List, List).
 1052
 1053scan_pairs([], []).
 1054scan_pairs([Key,Value|T0], [Key-Value|T]) :-
 1055    !,
 1056    scan_pairs(T0, T).
 1057scan_pairs([Key-Value|T0], [Key-Value|T]) :-
 1058    scan_pairs(T0, T).
 1059
 1060
 1061		 /*******************************
 1062		 *              ABOUT		*
 1063		 *******************************/
 redis_current_command(+Redis, ?Command) is nondet
 redis_current_command(+Redis, ?Command, -Properties) is nondet
True when Command has Properties. Fails if Command is not defined. The redis_current_command/3 version returns the command argument specification. See Redis documentation for an explanation.
 1072redis_current_command(Redis, Command) :-
 1073    redis_current_command(Redis, Command, _).
 1074
 1075redis_current_command(Redis, Command, Properties) :-
 1076    nonvar(Command),
 1077    !,
 1078    redis(Redis, command(info, Command), [[_|Properties]]).
 1079redis_current_command(Redis, Command, Properties) :-
 1080    redis(Redis, command, Commands),
 1081    member([Name|Properties], Commands),
 1082    atom_string(Command, Name).
 redis_property(+Redis, ?Property) is nondet
True if Property is a property of the Redis server. Currently uses redis(info, String) and parses the result. As this is for machine usage, properties names *_human are skipped.
 1090redis_property(Redis, Property) :-
 1091    redis(Redis, info, String),
 1092    info_terms(String, Terms),
 1093    member(Property, Terms).
 1094
 1095info_terms(Info, Pairs) :-
 1096    split_string(Info, "\n", "\r\n ", Lines),
 1097    convlist(info_line_term, Lines, Pairs).
 1098
 1099info_line_term(Line, Term) :-
 1100    sub_string(Line, B, _, A, :),
 1101    !,
 1102    sub_atom(Line, 0, B, _, Name),
 1103    \+ sub_atom(Name, _, _, 0, '_human'),
 1104    sub_string(Line, _, A, 0, ValueS),
 1105    (   number_string(Value, ValueS)
 1106    ->  true
 1107    ;   Value = ValueS
 1108    ),
 1109    Term =.. [Name,Value].
 1110
 1111
 1112		 /*******************************
 1113		 *            SUBSCRIBE		*
 1114		 *******************************/
 redis_subscribe(+Redis, +Channels, -Id, +Options) is det
Subscribe to one or more Redis PUB/SUB channels. This predicate creates a thread using thread_create/3 with the given Options. Once running, the thread listens for messages. The message content is a string or Prolog term as described in redis/3. On receiving a message, the following message is broadcasted:
redis(Id, Channel, Data)

If redis_unsubscribe/2 removes the last subscription, the thread terminates.

To simply print the incomming messages use e.g.

?- listen(redis(_, Channel, Data),
          format('Channel ~p got ~p~n', [Channel,Data])).
true.
?- redis_subscribe(default, test, Id, []).
Id = redis_pubsub_3,
?- redis(publish(test, "Hello world")).
Channel test got "Hello world"
1
true.
Arguments:
Id- is the thread identifier of the listening thread. Note that the Options alias(Name) can be used to get a system wide name.
 1144:- dynamic ( subscription/2,            % Id, Channel
 1145             listening/3                % Id, Connection, Thread
 1146           ) as volatile. 1147
 1148redis_subscribe(Redis, Spec, Id, Options) :-
 1149    atom(Redis),
 1150    !,
 1151    channels(Spec, Channels),
 1152    pubsub_thread_options(ThreadOptions, Options),
 1153    thread_create(setup_call_cleanup(
 1154                      redis_connect(Redis, Conn, [reconnect(true)]),
 1155                      redis_subscribe1(Redis, Conn, Channels),
 1156                      redis_disconnect(Conn)),
 1157                  Thread,
 1158                  ThreadOptions),
 1159    pubsub_id(Thread, Id).
 1160redis_subscribe(Redis, Spec, Id, Options) :-
 1161    channels(Spec, Channels),
 1162    pubsub_thread_options(ThreadOptions, Options),
 1163    thread_create(redis_subscribe1(Redis, Redis, Channels),
 1164                  Thread,
 1165                  ThreadOptions),
 1166    pubsub_id(Thread, Id).
 1167
 1168pubsub_thread_options(ThreadOptions, Options) :-
 1169    merge_options(Options, [detached(true)], ThreadOptions).
 1170
 1171pubsub_id(Thread, Thread).
 1172%pubsub_id(Thread, Id) :-
 1173%    thread_property(Thread, id(TID)),
 1174%    atom_concat('redis_pubsub_', TID, Id).
 1175
 1176redis_subscribe1(Redis, Conn, Channels) :-
 1177    Error = error(Formal, _),
 1178    catch(redis_subscribe2(Redis, Conn, Channels), Error, true),
 1179    (   var(Formal)
 1180    ->  true
 1181    ;   recover(Error, Conn, redis1(Conn, echo("reconnect"), _)),
 1182        thread_self(Me),
 1183        pubsub_id(Me, Id),
 1184        findall(Channel, subscription(Id, Channel), CurrentChannels),
 1185        redis_subscribe1(Redis, Conn, CurrentChannels)
 1186    ).
 1187
 1188redis_subscribe2(Redis, Conn, Channels) :-
 1189    redis_subscribe3(Conn, Channels),
 1190    redis_listen(Redis, Conn).
 1191
 1192redis_subscribe3(Conn, Channels) :-
 1193    thread_self(Me),
 1194    pubsub_id(Me, Id),
 1195    prolog_listen(this_thread_exit, pubsub_clean(Id)),
 1196    maplist(register_subscription(Id), Channels),
 1197    redis_stream(Conn, S, true),
 1198    Req =.. [subscribe|Channels],
 1199    redis_write_msg(S, Req).
 1200
 1201pubsub_clean(Id) :-
 1202    retractall(listening(Id, _Connection, _Thread)),
 1203    retractall(subscription(Id, _Channel)).
 redis_subscribe(+Id, +Channels) is det
 redis_unsubscribe(+Id, +Channels) is det
Add/remove channels from for the subscription. If no subscriptions remain, the listening thread terminates.
Arguments:
Channels- is either a single channel or a list thereof. Each channel specification is either an atom or a term `A:B:...`, where all parts are atoms.
 1215redis_subscribe(Id, Spec) :-
 1216    channels(Spec, Channels),
 1217    (   listening(Id, Connection, _Thread)
 1218    ->  true
 1219    ;   existence_error(redis_pubsub, Id)
 1220    ),
 1221    maplist(register_subscription(Id), Channels),
 1222    redis_stream(Connection, S, true),
 1223    Req =.. [subscribe|Channels],
 1224    redis_write_msg(S, Req).
 1225
 1226redis_unsubscribe(Id, Spec) :-
 1227    channels(Spec, Channels),
 1228    (   listening(Id, Connection, _Thread)
 1229    ->  true
 1230    ;   existence_error(redis_pubsub, Id)
 1231    ),
 1232    maplist(unregister_subscription(Id), Channels),
 1233    redis_stream(Connection, S, true),
 1234    Req =.. [unsubscribe|Channels],
 1235    redis_write_msg(S, Req).
 redis_current_subscription(?Id, ?Channels)
True when a PUB/SUB subscription with Id is listening on Channels.
 1241redis_current_subscription(Id, Channels) :-
 1242    findall(Id-Channel, subscription(Id, Channel), Pairs),
 1243    keysort(Pairs, Sorted),
 1244    group_pairs_by_key(Sorted, Grouped),
 1245    member(Id-Channels, Grouped).
 1246
 1247channels(Spec, List) :-
 1248    is_list(Spec),
 1249    !,
 1250    maplist(channel_name, Spec, List).
 1251channels(Ch, [Key]) :-
 1252    channel_name(Ch, Key).
 1253
 1254channel_name(Atom, Atom) :-
 1255    atom(Atom),
 1256    !.
 1257channel_name(Key, Atom) :-
 1258    phrase(key_parts(Key), Parts),
 1259    !,
 1260    atomic_list_concat(Parts, :, Atom).
 1261channel_name(Key, _) :-
 1262    type_error(redis_key, Key).
 1263
 1264key_parts(Var) -->
 1265    { var(Var), !, fail }.
 1266key_parts(Atom) -->
 1267    { atom(Atom) },
 1268    !,
 1269    [Atom].
 1270key_parts(A:B) -->
 1271    key_parts(A),
 1272    key_parts(B).
 1273
 1274
 1275
 1276
 1277register_subscription(Id, Channel) :-
 1278    (   subscription(Id, Channel)
 1279    ->  true
 1280    ;   assertz(subscription(Id, Channel))
 1281    ).
 1282
 1283unregister_subscription(Id, Channel) :-
 1284    retractall(subscription(Id, Channel)).
 1285
 1286redis_listen(Redis, Conn) :-
 1287    thread_self(Me),
 1288    pubsub_id(Me, Id),
 1289    setup_call_cleanup(
 1290        assertz(listening(Id, Conn, Me), Ref),
 1291        redis_listen_loop(Redis, Id, Conn),
 1292        erase(Ref)).
 1293
 1294redis_listen_loop(Redis, Id, Conn) :-
 1295    redis_stream(Conn, S, true),
 1296    (   subscription(Id, _)
 1297    ->  redis_read_stream(Redis, S, Reply),
 1298        redis_broadcast(Redis, Reply),
 1299        redis_listen_loop(Redis, Id, Conn)
 1300    ;   true
 1301    ).
 1302
 1303redis_broadcast(_, [subscribe, _Channel, _N]) :-
 1304    !.
 1305redis_broadcast(Redis, [message, Channel, Data]) :-
 1306    !,
 1307    catch(broadcast(redis(Redis, Channel, Data)),
 1308          Error,
 1309          print_message(error, Error)).
 1310redis_broadcast(Redis, Message) :-
 1311    assertion((Message = [Type, Channel, _Data],
 1312               atom(Type),
 1313               atom(Channel))),
 1314    debug(redis(warning), '~p: Unknown message while listening: ~p',
 1315          [Redis,Message]).
 1316
 1317
 1318		 /*******************************
 1319		 *          READ/WRITE		*
 1320		 *******************************/
 redis_read_stream(+Redis, +Stream, -Term) is det
Read a message from a Redis stream. Term is one of

If something goes wrong, the connection is closed and an exception is raised.

 1337redis_read_stream(Redis, SI, Out) :-
 1338    E = error(Formal,_),
 1339    catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true),
 1340    (   var(Formal)
 1341    ->  handle_push_messages(Push, Redis),
 1342        (   var(Error)
 1343        ->  Out = Out0
 1344        ;   resync(Redis),
 1345            throw(Error)
 1346        )
 1347    ;   redis_disconnect(Redis, [force(true)]),
 1348        throw(E)
 1349    ).
 1350
 1351handle_push_messages([], _).
 1352handle_push_messages([H|T], Redis) :-
 1353    (   catch(handle_push_message(H, Redis), E,
 1354              print_message(warning, E))
 1355    ->  true
 1356    ;   true
 1357    ),
 1358    handle_push_messages(T, Redis).
 1359
 1360handle_push_message(["pubsub"|List], Redis) :-
 1361    redis_broadcast(Redis, List).
 1362% some protocol version 3 push messages (such as
 1363% __keyspace@* events) seem to come directly
 1364% without a pubsub header
 1365handle_push_message([message|List], Redis) :-
 1366    redis_broadcast(Redis, [message|List]).
 resync(+Redis) is det
Re-synchronize after an error. This may happen if some type conversion fails and we have read a partial reply. It is hard to figure out what to read from where we are, so we echo a random magic sequence and read until we find the reply.
 1376resync(Redis) :-
 1377    E = error(Formal,_),
 1378    catch(do_resync(Redis), E, true),
 1379    (   var(Formal)
 1380    ->  true
 1381    ;   redis_disconnect(Redis, [force(true)])
 1382    ).
 1383
 1384do_resync(Redis) :-
 1385    A is random(1_000_000_000),
 1386    redis_stream(Redis, S, true),
 1387    redis_write_msg(S, echo(A)),
 1388    catch(call_with_time_limit(0.2, '$redis_resync'(S, A)),
 1389          time_limit_exceeded,
 1390          throw(error(time_limit_exceeded,_))).
 redis_read_msg(+Stream, -Message, -Error, -PushMessages) is det
 redis_write_msg(+Stream, +Message) is det
Read/write a Redis message. Both these predicates are in the foreign module redis4pl.
Arguments:
PushMessages- is a list of push messages that may be non-[] if protocol version 3 (see redis_connect/3) is selected. Using protocol version 2 this list is always empty.
 1405		 /*******************************
 1406		 *            MESSAGES		*
 1407		 *******************************/
 1408
 1409:- multifile
 1410    prolog:error_message//1,
 1411    prolog:message//1. 1412
 1413prolog:error_message(redis_error(Code, String)) -->
 1414    [ 'REDIS: ~w: ~s'-[Code, String] ].
 1415
 1416prolog:message(redis(retry(_Redis, _Failures, Wait, Error))) -->
 1417    [ 'REDIS: connection error.  Retrying in ~2f seconds'-[Wait], nl ],
 1418    [ '    '-[] ], '$messages':translate_message(Error)