View source with formatted comments or as raw
    1/*  Part of SWISH
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@cs.vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (C): 2017-2020, VU University Amsterdam
    7			      CWI Amsterdam
    8                              SWI-Prolog Solutions b.v.
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(chat_store,
   38          [ chat_store/1,               % +Message
   39            chat_messages/3             % +DocID, -Messages, +Options
   40          ]).   41:- use_module(library(settings)).   42:- use_module(library(filesex)).   43:- use_module(library(option)).   44:- use_module(library(sha)).   45:- use_module(library(apply)).   46:- use_module(library(http/http_dispatch)).   47:- use_module(library(http/http_parameters)).   48:- use_module(library(http/http_json)).   49
   50:- use_module(config).   51
   52:- http_handler(swish(chat/messages), chat_messages, [ id(chat_messages) ]).   53:- http_handler(swish(chat/status),   chat_status,   [ id(chat_status)   ]).   54
   55:- setting(directory, callable, data(chat),
   56	   'The directory for storing chat messages.').   57
   58/** <module> Store chat messages
   59
   60When using redis, the messages for  a   document  are  stored in the key
   61below as a _sorted set_ where the score is  the time in ms and the value
   62is a Prolog dict holding the message.
   63
   64    Prefix:chat:docid:DocId
   65*/
   66
   67:- multifile
   68    swish_config:chat_count_about/2.	% +DocID, -Count
   69
   70:- listen(http(pre_server_start),
   71          open_chatstore).   72
   73%!  redis_docid_key(+DocID, -Server, -Key) is semidet.
   74
   75redis_docid_key(DocId, Server, Key) :-
   76    swish_config(redis, Server),
   77    swish_config(redis_prefix, Prefix),
   78    atomic_list_concat([Prefix, chat, docid, DocId], :, Key).
   79
   80uses_redis :-
   81    swish_config(redis, _).
   82
   83:- dynamic  storage_dir/1.   84:- volatile storage_dir/1.   85
   86open_chatstore :-
   87    uses_redis,
   88    !.
   89open_chatstore :-
   90    storage_dir(_),
   91    !.
   92open_chatstore :-
   93    with_mutex(chat_store, open_chatstore_guarded).
   94
   95open_chatstore_guarded :-
   96    storage_dir(_),
   97    !.
   98open_chatstore_guarded :-
   99    setting(directory, Spec),
  100    absolute_file_name(Spec, Dir,
  101		       [ file_type(directory),
  102			 access(write),
  103			 file_errors(fail)
  104		       ]), !,
  105    asserta(storage_dir(Dir)).
  106open_chatstore_guarded :-
  107    setting(directory, Spec),
  108    absolute_file_name(Spec, Dir,
  109		       [ solutions(all)
  110		       ]),
  111    \+ exists_directory(Dir),
  112    catch(make_directory(Dir),
  113	  error(permission_error(create, directory, Dir), _),
  114	  fail), !,
  115    asserta(storage_dir(Dir)).
  116
  117%!  chat_dir_file(+DocID, -Path, -File)
  118%
  119%   True when Path/File is the place to store char messages about DocID.
  120
  121chat_dir_file(DocID, Path, File) :-
  122    open_chatstore,
  123    sha_hash(DocID, Bin, []),
  124    hash_atom(Bin, Hash),
  125    sub_atom(Hash, 0, 2, _, D1),
  126    sub_atom(Hash, 2, 2, _, D2),
  127    sub_atom(Hash, 4, _, 0, Name),
  128    storage_dir(Dir),
  129    atomic_list_concat([Dir, D1, D2], /, Path),
  130    atomic_list_concat([Path, Name], /, File).
  131
  132%!  existing_chat_file(+DocID, -File) is semidet.
  133%
  134%   True when File is the path of   the  file holding chat messages from
  135%   DocID.
  136
  137existing_chat_file(DocID, File) :-
  138    chat_dir_file(DocID, _, File),
  139    exists_file(File).
  140
  141%!  chat_store(+Message:dict) is det.
  142%
  143%   Add a chat message to the chat  store. If `Message.create == false`,
  144%   the message is only stored if the   chat  is already active. This is
  145%   used to only insert messages about changes   to the file if there is
  146%   an ongoing chat so we know to which version chat messages refer.
  147
  148chat_store(Message0) :-
  149    uses_redis,
  150    !,
  151    (   prepare_message(Message0, DocID, Create, Message),
  152        redis_docid_key(DocID, Server, Key),
  153        (   Create == false
  154        ->  redis(Server, exists(Key), 1)
  155        ;   true
  156        )
  157    ->  Score is integer(Message.time*1000),
  158        redis(Server, zadd(Key, nx, Score, Message as prolog))
  159    ;   true
  160    ).
  161chat_store(Message0) :-
  162    prepare_message(Message0, DocID, Create, Message),
  163    chat_dir_file(DocID, Dir, File),
  164    (   Create == false
  165    ->  exists_file(File)
  166    ;   true
  167    ),
  168    !,
  169    make_directory_path(Dir),
  170    with_mutex(chat_store,
  171               (   setup_call_cleanup(
  172                       open(File, append, Out, [encoding(utf8)]),
  173                       format(Out, '~q.~n', [Message]),
  174                       close(Out)),
  175                   increment_message_count(DocID)
  176               )).
  177chat_store(_).
  178
  179prepare_message(Message0, DocID, Create, Message) :-
  180    chat{docid:DocIDS} :< Message0,
  181    atom_string(DocID, DocIDS),
  182    (	del_dict(create, Message0, false, Message1)
  183    ->  Create = false
  184    ;   Create = true,
  185        Message1 = Message0
  186    ),
  187    strip_chat(Message1, Message).
  188
  189
  190
  191%!  strip_chat(_Message0, -Message) is det.
  192%
  193%   Remove  stuff  from  a  chat  message   that  is  useless  to  store
  194%   permanently, such as the wsid (WebSocket id).
  195
  196strip_chat(Message0, Message) :-
  197    strip_chat_user(Message0.get(user), User),
  198    !,
  199    Message = Message0.put(user, User).
  200strip_chat(Message, Message).
  201
  202strip_chat_user(User0, User) :-
  203    del_dict(wsid, User0, _, User),
  204    !.
  205strip_chat_user(User, User).
  206
  207
  208%!  chat_messages(+DocID, -Messages:list, +Options) is det.
  209%
  210%   Get messages associated with DocID.  Options include
  211%
  212%     - max(+Max)
  213%     Maximum number of messages to retrieve.  Default is 25.
  214%     - after(+TimeStamp)
  215%     Only get messages after TimeStamp
  216
  217chat_messages(DocID, Messages, Options) :-
  218    redis_docid_key(DocID, Server, Key),
  219    !,
  220    (   option(max(Max), Options)
  221    ->  Start is -Max,
  222        redis(Server, zrange(Key, Start, -1), Messages0),
  223        filter_old(Messages0, Messages, Options)
  224    ;   option(after(Time), Options)
  225    ->  Score is integer(Time*1000)+1,
  226        redis(Server, zrangebyscore(Key, Score, +inf), Messages)
  227    ;   redis(Server, zrange(Key, 0, -1), Messages)
  228    ).
  229chat_messages(DocID, Messages, Options) :-
  230    chat_messages_from_files(DocID, Messages, Options).
  231
  232chat_messages_from_files(DocID, Messages, Options) :-
  233    (   existing_chat_file(DocID, File)
  234    ->  read_messages(File, Messages0, Options),
  235        filter_old(Messages0, Messages, Options)
  236    ;   Messages = []
  237    ).
  238
  239read_messages(File, Messages, Options) :-
  240    setup_call_cleanup(
  241        open(File, read, In, [encoding(utf8)]),
  242        read_messages_from_stream(In, Messages, Options),
  243        close(In)).
  244
  245read_messages_from_stream(In, Messages, Options) :-
  246    option(max(Max), Options, 25),
  247    integer(Max),
  248    setup_call_cleanup(
  249        set_stream(In, encoding(octet)),
  250        (   seek(In, 0, eof, _Pos),
  251            backskip_lines(In, Max)
  252        ),
  253        set_stream(In, encoding(utf8))),
  254    !,
  255    read_terms(In, Messages).
  256read_messages_from_stream(In, Messages, _Options) :-
  257    seek(In, 0, bof, _NewPos),
  258    read_terms(In, Messages).
  259
  260read_terms(In, Terms) :-
  261    read_term(In, H, []),
  262    (   H == end_of_file
  263    ->  Terms = []
  264    ;   Terms = [H|T],
  265        read_terms(In, T)
  266    ).
  267
  268backskip_lines(Stream, Lines) :-
  269    byte_count(Stream, Here),
  270    between(10, 20, X),
  271    Start is max(0, Here-(1<<X)),
  272    seek(Stream, Start, bof, _NewPos),
  273    skip(Stream, 0'\n),
  274    line_starts(Stream, Here, Starts),
  275    reverse(Starts, RStarts),
  276    nth1(Lines, RStarts, LStart),
  277    !,
  278    seek(Stream, LStart, bof, _).
  279
  280line_starts(Stream, To, Starts) :-
  281    byte_count(Stream, Here),
  282    (   Here >= To
  283    ->  Starts = []
  284    ;   Starts = [Here|T],
  285        skip(Stream, 0'\n),
  286        line_starts(Stream, To, T)
  287    ).
  288
  289filter_old(Messages0, Messages, Options) :-
  290    option(after(After), Options),
  291    After > 0,
  292    !,
  293    include(after(After), Messages0, Messages).
  294filter_old(Messages, Messages, _).
  295
  296after(After, Message) :-
  297    is_dict(Message),
  298    Message.get(time) > After.
  299
  300%!  chat_message_count(+DocID, -Count) is det.
  301%
  302%   Count the number of message stored for   DocID.  This is the same as
  303%   the number of lines.
  304
  305:- dynamic  message_count/2.  306:- volatile message_count/2.  307
  308chat_message_count(DocID, Count) :-
  309    redis_docid_key(DocID, Server, Key),
  310    !,
  311    redis(Server, zcount(Key, 0, +inf), Count).
  312chat_message_count(DocID, Count) :-
  313    message_count(DocID, Count),
  314    !.
  315chat_message_count(DocID, Count) :-
  316    count_messages(DocID, Count),
  317    asserta(message_count(DocID, Count)).
  318
  319count_messages(DocID, Count) :-
  320    (   existing_chat_file(DocID, File)
  321    ->  setup_call_cleanup(
  322            open(File, read, In, [encoding(iso_latin_1)]),
  323            (   skip(In, 256),
  324                line_count(In, Line)
  325            ),
  326            close(In)),
  327        Count is Line - 1
  328    ;   Count = 0
  329    ).
  330
  331increment_message_count(DocID) :-
  332    clause(message_count(DocID, Count0), _, CRef),
  333    !,
  334    Count is Count0+1,
  335    asserta(message_count(DocID, Count)),
  336    erase(CRef).
  337increment_message_count(_).
  338
  339%!  swish_config:chat_count_about(+DocID, -Count)
  340%
  341%   True when Count is the number of messages about DocID
  342
  343swish_config:chat_count_about(DocID, Count) :-
  344    chat_message_count(DocID, Count).
  345
  346
  347		 /*******************************
  348		 *              HTTP		*
  349		 *******************************/
  350
  351%!  chat_messages(+Request)
  352%
  353%   HTTP handler that returns chat messages for a document
  354
  355chat_messages(Request) :-
  356    http_parameters(Request,
  357                    [ docid(DocID, []),
  358                      max(Max, [nonneg, optional(true)]),
  359                      after(After, [number, optional(true)])
  360                    ]),
  361    include(ground, [max(Max), after(After)], Options),
  362    chat_messages(DocID, Messages, Options),
  363    reply_json_dict(Messages).
  364
  365%!  chat_status(+Request)
  366%
  367%   HTTP handler that returns chat status for document
  368
  369chat_status(Request) :-
  370    http_parameters(Request,
  371                    [ docid(DocID, []),
  372                      max(Max, [nonneg, optional(true)]),
  373                      after(After, [number, optional(true)])
  374                    ]),
  375    include(ground, [max(Max), after(After)], Options),
  376    chat_message_count(DocID, Total),
  377    (   Options == []
  378    ->  Count = Total
  379    ;   chat_messages(DocID, Messages, Options),
  380        length(Messages, Count)
  381    ),
  382    reply_json_dict(
  383        json{docid: DocID,
  384             total: Total,
  385             count: Count
  386            })