We need all the _push_ facilities of Redis: - redis_subscribe/4 to listen to volatile PUB/SUB messages - Listen on reliable redis streams using ``XREAD`` - Listen on reliable redis streams using consumer groups Note that config-available sets up the redis server using the alias `swish`. Streams (redis keys) to listen on are registered using the multifile predicate stream/2. */ :- multifile stream/2. :- listen(http(pre_server_start(Port)), init_redis(Port)). :- dynamic port/1, % Server port thread/1. % Listener thread. init_redis(_Port) :- \+ swish_config:config(redis, _), !. init_redis(_Port) :- catch(thread_property(redis_listener, id(_)), error(_,_), fail), !. init_redis(Port) :- init_pubsub, retractall(port(_)), asserta(port(Port)), findall(Group-S, group_stream(S, Group), Pairs), keysort(Pairs, Sorted), group_pairs_by_key(Sorted, Grouped), consumer(Port, Consumer), maplist(create_listener(Consumer), Grouped), publish_consumer(Consumer). create_listener(_, (-)-Streams) :- !, thread_create(xlisten(swish, Streams, []), Id, [ alias(redis_no_group) ]), assertz(thread(Id)). create_listener(Consumer, Group-Streams) :- atom_concat(redis_, Group, Alias), thread_create(xlisten_group(swish, Group, Consumer, Streams, [ block(1) ]), Id, [ alias(Alias) ]), assertz(thread(Id)). %! reinit_redis % % Stop and start the redis thread. May be used to reconfigure it or % restart when crashed. reinit_redis :- forall(retract(thread(Id)), catch(stop_listener(Id), error(_,_), true)), port(Port), init_redis(Port). stop_listener(Id) :- thread_signal(Id, redis(stop(false))), thread_join(Id, _). group_stream(Key, Group) :- stream(Name, Options), redis_swish_stream(Name, Key), option(max_len(MaxLen), Options, 1000), option(group(Group), Options, -), add_consumer_group(Group, Key), xstream_set(swish, Key, maxlen(MaxLen)). add_consumer_group(-, _) :- !. add_consumer_group(Group, Key) :- catch(redis(swish, xgroup(create, Key, Group, $, mkstream), _), error(redis_error(busygroup,_),_), true). redis_swish_stream(Name, Key) :- swish_config(redis_prefix, Prefix), atomic_list_concat([Prefix, Name], :, Key). %! consumer(+Address, -Consumer) is det. % % Find the name of this node in the redis network. Each node needs to % have a name to be part of a Redis consumer node, as well as to know % which sessions reside on which node. :- dynamic consumer/1. consumer(_, Consumer) :- consumer(Consumer0), !, Consumer = Consumer0. consumer(Address, Consumer) :- address_consumer(Address, Consumer0), asserta(consumer(Consumer0)), Consumer = Consumer0. address_consumer(_, Consumer) :- swish_config(redis_consumer, Consumer), !. address_consumer(Host:Port, Consumer) :- !, atomic_list_concat([Host,Port], :, Consumer). address_consumer(Port, Consumer) :- gethostname(Host), atomic_list_concat([Host,Port], :, Consumer). %! redis_consumer(-Consumer) is det. % % True when Consumer is the name of this redis node. redis_consumer(Consumer) :- consumer(Consumer). publish_consumer(Consumer) :- http_absolute_uri(swish(.), URL), consumer_key(Server, Key), redis(Server, hset(Key:url, Consumer, URL)), redis(Server, publish(swish:swish, joined(Consumer, URL) as prolog), Count), print_message(informational, swish(redis_peers(Count))), at_halt(publish_halt). % More reliable than at_halt/1. :- listen(http(shutdown), publish_halt). publish_halt :- redis_consumer(Consumer), consumer_key(Server, Key), ( redis(Server, hdel(Key:url, Consumer), 0) -> true ; redis(Server, publish(swish:swish, left(Consumer) as prolog), _Count) ). consumer_key(swish, Key) :- swish_config(redis_prefix, Prefix), atomic_list_concat([Prefix, consumer], :, Key). %! swish_cluster(-Pairs) is det. % % True when Pairs is a list Consumer-URL of peer SWISH servers in this % cluster. swish_cluster(Pairs) :- consumer_key(Server, Key), redis(Server, hgetall(Key:url), Pairs). :- http_handler(swish(backends), backends, [id(backends)]). backends(_Request) :- swish_cluster(Pairs), maplist(backend_stats, Pairs, Pairs1), dict_pairs(Dict, json, Pairs1), reply_json(Dict). backend_stats(Consumer-URL, Consumer-Stat) :- broadcast_request(swish(backend_status(Consumer, Stat0))), !, Stat = Stat0.put(url, URL). backend_stats(Consumer-URL, Consumer-json{url:URL}). %! init_pubsub is det. % % Prepare to listen to the SWISH pubsub channels. init_pubsub :- redis_current_subscription(redis_pubsub, _), !. init_pubsub :- redis_subscribe(swish, [ swish:swish, % Overall control swish:chat, % Chat broadcast messages swish:gitty % Gitty sync requests ], _, [ alias(redis_pubsub) ]). :- initialization listen(redis(_, 'swish:swish', Message), swish_message(swish(Message))). swish_message(Message) :- print_message(informational, Message). :- multifile prolog:message//1. prolog:message(swish(redis_peers(Count))) --> [ 'Redis: the are ~d peers in the cluster'-[Count] ]. prolog:message(swish(joined(Consumer, URL))) --> ( { redis_consumer(Consumer) } -> [] ; [ 'Redis: ~w joined the cluster, at ~w'-[Consumer, URL] ] ). prolog:message(swish(left(Consumer))) --> ( { redis_consumer(Consumer) } -> [] ; [ 'Redis: ~w left the cluster'-[Consumer] ] ).