33
34:- module(mqi,
35 [ mqi_start/0,
36 mqi_start/1, 37 mqi_stop/1, 38 mqi_version/2 39 ]). 40
110:- use_module(library(socket)). 111:- use_module(library(http/json)). 112:- use_module(library(http/json_convert)). 113:- use_module(library(http/http_stream)). 114:- use_module(library(option)). 115:- use_module(library(term_to_json)). 116:- use_module(library(debug)). 117:- use_module(library(filesex)). 118:- use_module(library(gensym)). 119:- use_module(library(lists)). 120:- use_module(library(main)). 121:- use_module(library(make)). 122:- use_module(library(prolog_source)). 123:- use_module(library(time)). 124:- use_module(library(uuid)). 125
127:- dynamic(mqi_thread/3). 128
130:- dynamic(mqi_worker_threads/3). 131:- dynamic(mqi_socket/5). 132
136:- dynamic(query_in_progress/1). 137
140:- dynamic(safe_to_cancel/1). 141
175mqi_version(1, 0).
176
177
181mqi_start(Options) :-
182 Encoding = utf8,
183 option(pending_connections(Connection_Count), Options, 5),
184 option(query_timeout(Query_Timeout), Options, -1),
185 option(port(Port), Options, _),
186 option(run_server_on_thread(Run_Server_On_Thread), Options, true),
187 option(exit_main_on_failure(Exit_Main_On_Failure), Options, false),
188 option(write_connection_values(Write_Connection_Values), Options, false),
189 option(unix_domain_socket(Unix_Domain_Socket_Path_And_File), Options, _),
190 ( ( memberchk(unix_domain_socket(_), Options),
191 var(Unix_Domain_Socket_Path_And_File)
192 )
193 -> unix_domain_socket_path(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File)
194 ; true
195 ),
196 option(server_thread(Server_Thread_ID), Options, _),
197 ( var(Server_Thread_ID)
198 -> gensym(mqi, Server_Thread_ID)
199 ; true
200 ),
201 option(password(Password), Options, _),
202 ( var(Password)
203 -> ( current_prolog_flag(bounded, false)
204 -> uuid(UUID, [format(integer)])
205 ; UUID is random(1<<62)
206 ),
207 format(string(Password), '~d', [UUID])
208 ; true
209 ),
210 string_concat(Password, '.\n', Final_Password),
211 bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address),
212 send_client_startup_data(Write_Connection_Values, user_output, Unix_Domain_Socket_Path_And_File, Client_Address, Password),
213 option(write_output_to_file(File), Options, _),
214 ( var(File)
215 -> true
216 ; write_output_to_file(File)
217 ),
218 Server_Goal = (
219 catch(server_thread(Server_Thread_ID, Socket, Client_Address, Final_Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure), error(E1, E2), true),
220 debug(mqi(protocol), "Stopped MQI on thread: ~w due to exception: ~w", [Server_Thread_ID, error(E1, E2)])
221 ),
222 start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File).
223
224opt_type(port, port, natural).
225opt_type(create_unix_domain_socket, create_unix_domain_socket, boolean).
226opt_type(unix_domain_socket, unix_domain_socket, file(write)).
227opt_type(password, password, string).
228opt_type(pending_connections, pending_connections, nonneg).
229opt_type(query_timeout, query_timeout, float).
230opt_type(run_server_on_thread, run_server_on_thread, boolean).
231opt_type(exit_main_on_failure, exit_main_on_failure, boolean).
232opt_type(write_connection_values, write_connection_values, boolean).
233opt_type(write_output_to_file, write_output_to_file, file(write)).
234
235opt_help(port, "TCP/IP port for clients to connect to").
236opt_help(create_unix_domain_socket, "Create a Unix domain socket for clients to connect to").
237opt_help(unix_domain_socket, "File path for the Unix domain socket").
238opt_help(password, "Connection password").
239opt_help(pending_connections, "Max number of queued connections (5)").
240opt_help(query_timeout, "Max query runtime in seconds (default infinite)").
241opt_help(run_server_on_thread, "Run server in a background thread (true)").
242opt_help(exit_main_on_failure, "Exit the process on a failure").
243opt_help(write_connection_values, "Print info for clients to connect").
244opt_help(write_output_to_file, "Write stdout and stderr to file").
245
281
282
290mqi_start :-
291 current_prolog_flag(argv, Argv),
292 argv_options(Argv, _Args, Options),
293 merge_options(Options, [exit_main_on_failure(true)], Options1),
294 select_option(create_unix_domain_socket(Create_Unix_Domain_Socket), Options1, Options2, false),
295 ( Create_Unix_Domain_Socket == true
296 -> merge_options(Options2, [unix_domain_socket(_)], FinalOptions)
297 ; FinalOptions = Options2
298 ),
299 option(run_server_on_thread(Run_Server_On_Thread), FinalOptions, true),
300 ( Run_Server_On_Thread == true
301 -> true
302 ; throw(domain_error(cannot_be_set_in_embedded_mode, run_server_on_thread))
303 ),
304 mqi_start(FinalOptions),
305 on_signal(int, _, quit),
306 thread_get_message(quit_mqi).
307
308
309quit(_) :-
310 thread_send_message(main, quit_mqi).
311
312
318
323mqi_stop(Server_Thread_ID) :-
324 325 forall(retract(mqi_thread(Server_Thread_ID, _, Socket)),
326 ( debug(mqi(protocol), "Found server: ~w", [Server_Thread_ID]),
327 catch(tcp_close_socket(Socket), _0Socket_Exception, true),
328 abortSilentExit(Server_Thread_ID, _0Server_Thread_Exception),
329 debug(mqi(protocol), "Stopped server thread: ~w, \c
330 socket_close_exception(~w), stop_thread_exception(~w)",
331 [Server_Thread_ID, _0Socket_Exception, _0Server_Thread_Exception])
332 )),
333 forall(retract(mqi_worker_threads(Server_Thread_ID, Communication_Thread_ID, Goal_Thread_ID)),
334 ( abortSilentExit(Communication_Thread_ID, _0CommunicationException),
335 debug(mqi(protocol), "Stopped server: ~w communication thread: ~w, exception(~w)",
336 [Server_Thread_ID, Communication_Thread_ID, _0CommunicationException]),
337 abortSilentExit(Goal_Thread_ID, _0Goal_Exception),
338 debug(mqi(protocol), "Stopped server: ~w goal thread: ~w, exception(~w)",
339 [Server_Thread_ID, Goal_Thread_ID, _0Goal_Exception])
340 )).
341
342
343start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :-
344 ( Run_Server_On_Thread
345 -> ( thread_create(Server_Goal, _, [ alias(Server_Thread_ID),
346 at_exit((delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File),
347 detach_if_expected(Server_Thread_ID)
348 ))
349 ]),
350 debug(mqi(protocol), "Started server on thread: ~w", [Server_Thread_ID])
351 )
352 ; ( Server_Goal,
353 delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File),
354 debug(mqi(protocol), "Halting.", [])
355 )
356 ).
357
358
362delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :-
363 ( nonvar(Unix_Domain_Socket_Path)
364 -> catch(delete_directory_and_contents(Unix_Domain_Socket_Path), error(_, _), true)
365 ; ( nonvar(Unix_Domain_Socket_Path_And_File)
366 -> catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true)
367 ; true
368 )
369 ).
370
371:- if(current_predicate(unix_domain_socket/1)). 372 optional_unix_domain_socket(Socket) :-
373 unix_domain_socket(Socket).
374:- else. 375 optional_unix_domain_socket(_).
376:- endif. 377
380bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address) :-
381 ( nonvar(Unix_Domain_Socket_Path_And_File)
382 -> debug(mqi(protocol), "Using Unix domain socket name: ~w", [Unix_Domain_Socket_Path_And_File]),
383 optional_unix_domain_socket(Socket),
384 catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true),
385 tcp_bind(Socket, Unix_Domain_Socket_Path_And_File),
386 Client_Address = Unix_Domain_Socket_Path_And_File
387 ; ( tcp_socket(Socket),
388 tcp_setopt(Socket, reuseaddr),
389 tcp_bind(Socket, '127.0.0.1':Port),
390 debug(mqi(protocol), "Using TCP/IP port: ~w", ['127.0.0.1':Port]),
391 Client_Address = Port
392 )
393 ),
394 assert(mqi_thread(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Socket)).
395
398send_client_startup_data(Write_Connection_Values, Stream, Unix_Domain_Socket_Path_And_File, Port, Password) :-
399 ( Write_Connection_Values
400 -> ( ( var(Unix_Domain_Socket_Path_And_File)
401 -> format(Stream, "~d\n", [Port])
402 ; format(Stream, "~w\n", [Unix_Domain_Socket_Path_And_File])
403 ),
404 format(Stream, "~w\n", [Password]),
405 flush_output(Stream)
406 )
407 ; true
408 ).
409
410
414server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure) :-
415 debug(mqi(protocol), "Listening on address: ~w", [Address]),
416 tcp_listen(Socket, Connection_Count),
417 tcp_open_socket(Socket, AcceptFd, _),
418 create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure),
419 server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure).
420
421
426create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure) :-
427 debug(mqi(protocol), "Waiting for client connection...", []),
428 tcp_accept(AcceptFd, Socket, _Peer),
429 debug(mqi(protocol), "Client connected", []),
430 gensym('conn', Connection_Base),
431 atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_comm'], Thread_Alias),
432 atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_goal'], Goal_Alias),
433 mutex_create(Goal_Alias, [alias(Goal_Alias)]),
434 assert(mqi_worker_threads(Server_Thread_ID, Thread_Alias, Goal_Alias)),
435 thread_create(goal_thread(Thread_Alias),
436 _,
437 [alias(Goal_Alias), at_exit(detach_if_expected(Goal_Alias))]),
438 thread_create(communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Alias, Query_Timeout, Exit_Main_On_Failure),
439 _,
440 [alias(Thread_Alias), at_exit(detach_if_expected(Thread_Alias))]).
441
442
450goal_thread(Respond_To_Thread_ID) :-
451 thread_self(Self_ID),
452 throw_if_testing(Self_ID),
453 thread_get_message(Self_ID, goal(Unexpanded_Goal, Binding_List, Query_Timeout, Find_All)),
454 expand_goal(Unexpanded_Goal, Goal),
455 debug(mqi(query), "Received Findall = ~w, Query_Timeout = ~w, binding list: ~w, unexpanded: ~w, goal: ~w", [Find_All, Query_Timeout, Binding_List, Unexpanded_Goal, Goal]),
456 ( Find_All
457 -> One_Answer_Goal = findall(Binding_List, @(user:Goal, user), Answers)
458 ; One_Answer_Goal = ( findall( One_Answer,
459 ( @(user:Goal, user),
460 One_Answer = [Binding_List],
461 send_next_result(Respond_To_Thread_ID, One_Answer, _, Find_All)
462 ),
463 Answers
464 ),
465 ( Answers == []
466 -> send_next_result(Respond_To_Thread_ID, [], _, Find_All)
467 ; true
468 )
469 )
470 ),
471 Cancellable_Goal = run_cancellable_goal(Self_ID, One_Answer_Goal),
472 ( Query_Timeout == -1
473 -> catch(Cancellable_Goal, Top_Exception, true)
474 ; catch(call_with_time_limit(Query_Timeout, Cancellable_Goal), Top_Exception, true)
475 ),
476 ( var(Top_Exception)
477 -> ( Find_All
478 -> send_next_result(Respond_To_Thread_ID, Answers, _, Find_All)
479 ; send_next_result(Respond_To_Thread_ID, [], no_more_results, Find_All)
480 )
481 ; send_next_result(Respond_To_Thread_ID, [], Top_Exception, true)
482 ),
483 goal_thread(Respond_To_Thread_ID).
484
485
487throw_if_testing(Self_ID) :-
488 ( thread_peek_message(Self_ID, testThrow(Test_Exception))
489 -> ( debug(mqi(query), "TESTING: Throwing test exception: ~w", [Test_Exception]),
490 throw(Test_Exception)
491 )
492 ; true
493 ).
494
495
503run_cancellable_goal(Mutex_ID, Goal) :-
504 thread_self(Self_ID),
505 setup_call_cleanup(
506 assert(safe_to_cancel(Self_ID), Assertion),
507 Goal,
508 with_mutex(Mutex_ID, erase(Assertion))
509 ).
510
511
522communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout, Exit_Main_On_Failure) :-
523 thread_self(Self_ID),
524 ( catch(communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout),
525 error(Serve_Exception1, _0Serve_Exception2),
526 true),
527 debug(mqi(protocol), "Session finished. Communication thread exception: ~w",
528 [error(Serve_Exception1, _0Serve_Exception2)]),
529 abortSilentExit(Goal_Thread_ID, _),
530 retractall(mqi_worker_threads(Server_Thread_ID, Self_ID, Goal_Thread_ID))
531 -> Halt = (nonvar(Serve_Exception1), Exit_Main_On_Failure)
532 ; Halt = true
533 ),
534 ( Halt
535 -> debug(mqi(protocol), "Ending session and halting Prolog server due to thread ~w: exception(~w)",
536 [Self_ID, error(Serve_Exception1, _0Serve_Exception2)]),
537 quit(_)
538 ; debug(mqi(protocol), "Ending session ~w", [Self_ID]),
539 catch(tcp_close_socket(Socket), error(_, _), true)
540 ).
541
542
547communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout) :-
548 tcp_open_socket(Socket, Read_Stream, Write_Stream),
549 thread_self(Communication_Thread_ID),
550 assert(mqi_socket(Server_Thread_ID, Communication_Thread_ID, Socket, Read_Stream, Write_Stream)),
551 set_stream(Read_Stream, encoding(Encoding)),
552 set_stream(Write_Stream, encoding(Encoding)),
553 read_message(Read_Stream, Sent_Password),
554 ( Password == Sent_Password
555 -> ( debug(mqi(protocol), "Password matched.", []),
556 thread_self(Self_ID),
557 mqi_version(Major, Minor),
558 reply(Write_Stream, true([[threads(Self_ID, Goal_Thread_ID), version(Major, Minor)]]))
559 )
560 ; ( debug(mqi(protocol), "Password mismatch, failing. ~w", [Sent_Password]),
561 reply_error(Write_Stream, password_mismatch),
562 throw(password_mismatch)
563 )
564 ),
565 process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout),
566 debug(mqi(protocol), "Session finished.", []).
567
568
583process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout) :-
584 process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command),
585 ( Command == close
586 -> ( debug(mqi(protocol), "Command: close. Client closed the connection cleanly.", []),
587 true
588 )
589 ; ( Command == quit
590 -> ( debug(mqi(protocol), "Command: quit.", []),
591 false
592 )
593 ;
594 process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout)
595 )
596 ).
597
615process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command) :-
616 debug(mqi(protocol), "Waiting for next message ...", []),
617 ( state_receive_raw_message(Read_Stream, Message_String)
618 -> ( state_parse_command(Write_Stream, Message_String, Command, Binding_List)
619 -> state_process_command(Write_Stream, Goal_Thread_ID, Query_Timeout, Command, Binding_List)
620 ; true
621 )
622 ; false
623 ).
624
625
630state_receive_raw_message(Read, Command_String) :-
631 read_message(Read, Command_String),
632 debug(mqi(protocol), "Valid message: ~w", [Command_String]).
633
634
644state_parse_command(Write_Stream, Command_String, Parsed_Command, Binding_List) :-
645 ( catch(read_term_from_atom(Command_String, Parsed_Command, [variable_names(Binding_List), module(user)]), Parse_Exception, true)
646 -> ( var(Parse_Exception)
647 -> debug(mqi(protocol), "Parse Success: ~w", [Parsed_Command])
648 ; ( reply_error(Write_Stream, Parse_Exception),
649 fail
650 )
651 )
652 ; ( reply_error(Write_Stream, error(couldNotParseCommand, _)),
653 fail
654 )
655 ).
656
657
669state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(Goal, Timeout), Binding_List) :-
670 !,
671 debug(mqi(protocol), "Command: run/1. Timeout: ~w", [Timeout]),
672 repeat_until_false((
673 query_in_progress(Goal_Thread_ID),
674 debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]),
675 heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer),
676 debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]),
677 debug(mqi(query), " Discarded answer: ~w", [Unused_Answer])
678 )),
679 debug(mqi(protocol), "All previous results drained", []),
680 send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, true),
681 heartbeat_until_result(Goal_Thread_ID, Stream, Answers),
682 reply_with_result(Goal_Thread_ID, Stream, Answers).
683
684
687state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run_async(Goal, Timeout, Find_All), Binding_List) :-
688 !,
689 debug(mqi(protocol), "Command: run_async/1.", []),
690 debug(mqi(query), " Goal: ~w", [Goal]),
691 repeat_until_false((
692 query_in_progress(Goal_Thread_ID),
693 debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]),
694 heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer),
695 debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]),
696 debug(mqi(query), " Discarded answer: ~w", [Unused_Answer])
697 )),
698 debug(mqi(protocol), "All previous results drained", []),
699 send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, Find_All),
700 reply(Stream, true([[]])).
701
702
704state_process_command(Stream, Goal_Thread_ID, _, async_result(Timeout), _) :-
705 !,
706 debug(mqi(protocol), "Command: async_result, timeout: ~w.", [Timeout]),
707 ( once((var(Timeout) ; Timeout == -1))
708 -> Options = []
709 ; Options = [timeout(Timeout)]
710 ),
711 ( query_in_progress(Goal_Thread_ID)
712 -> ( ( debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]),
713 get_next_result(Goal_Thread_ID, Stream, Options, Result)
714 )
715 -> reply_with_result(Goal_Thread_ID, Stream, Result)
716 ; reply_error(Stream, result_not_available)
717 )
718 ; ( debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]),
719 reply_error(Stream, no_query)
720 )
721 ).
722
723
730state_process_command(Stream, Goal_Thread_ID, _, cancel_async, _) :-
731 !,
732 debug(mqi(protocol), "Command: cancel_async/0.", []),
733 with_mutex(Goal_Thread_ID, (
734 ( safe_to_cancel(Goal_Thread_ID)
735 -> ( thread_signal(Goal_Thread_ID, throw(cancel_goal)),
736 reply(Stream, true([[]]))
737 )
738 ; ( query_in_progress(Goal_Thread_ID)
739 -> ( debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]),
740 reply(Stream, true([[]]))
741 )
742 ; ( debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]),
743 reply_error(Stream, no_query)
744 )
745 )
746 )
747 )).
748
749
753state_process_command(Stream, Goal_Thread_ID, Query_Timeout, testThrowGoalThread(Test_Exception), Binding_List) :-
754 !,
755 debug(mqi(protocol), "TESTING: requested goal thread unhandled exception", []),
756 thread_send_message(Goal_Thread_ID, testThrow(Test_Exception)),
757 state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(true, -1), Binding_List).
758
759
760state_process_command(Stream, _, _, close, _) :-
761 !,
762 reply(Stream, true([[]])).
763
764
765state_process_command(Stream, _, _, quit, _) :-
766 !,
767 reply(Stream, true([[]])).
768
769
771state_process_command(Stream, _, _, Command, _) :-
772 debug(mqi(protocol), "Unknown command ~w", [Command]),
773 reply_error(Stream, unknownCommand).
774
775
781heartbeat_until_result(Goal_Thread_ID, Stream, Answers) :-
782 ( get_next_result(Goal_Thread_ID, Stream, [timeout(2)], Answers)
783 -> debug(mqi(query), "Received answer from goal thread: ~w", [Answers])
784 ; ( debug(mqi(protocol), "heartbeat...", []),
785 write_heartbeat(Stream),
786 heartbeat_until_result(Goal_Thread_ID, Stream, Answers)
787 )
788 ).
789
790
794write_heartbeat(Stream) :-
795 put_char(Stream, '.'),
796 flush_output(Stream).
797
798
807send_goal_to_thread(Stream, Goal_Thread_ID, Default_Timeout, Timeout, Goal, Binding_List, Find_All) :-
808 ( var(Timeout)
809 -> Timeout = Default_Timeout
810 ; true
811 ),
812 ( var(Binding_List)
813 -> Binding_List = []
814 ; true
815 ),
816 debug(mqi(query), "Sending to goal thread with timeout = ~w: ~w", [Timeout, Goal]),
817 assert(query_in_progress(Goal_Thread_ID)),
818 catch(thread_send_message(Goal_Thread_ID, goal(Goal, Binding_List, Timeout, Find_All)), Send_Message_Exception, true),
819 ( var(Send_Message_Exception)
820 -> true
821 ; ( reply_error(Stream, connection_failed),
822 throw(Send_Message_Exception)
823 )
824 ).
825
826
828send_next_result(Respond_To_Thread_ID, Answer, Exception_In_Goal, Find_All) :-
829 ( var(Exception_In_Goal)
830 -> ( ( debug(mqi(query), "Sending result of goal to communication thread, Result: ~w", [Answer]),
831 Answer == []
832 )
833 -> thread_send_message(Respond_To_Thread_ID, result(false, Find_All))
834 ; handle_constraints(Answer, Final_Answer),
835 thread_send_message(Respond_To_Thread_ID, result(true(Final_Answer), Find_All))
836 )
837 ; ( debug(mqi(query), "Sending result of goal to communication thread, Exception: ~w", [Exception_In_Goal]),
838 thread_send_message(Respond_To_Thread_ID, result(error(Exception_In_Goal), Find_All))
839 )
840 ).
841
842
843handle_constraints(Answer, Final_Answer) :-
844 ( term_attvars(Answer, [])
845 -> Final_Answer = Answer
846 ; findall( Single_Answer_With_Attributes,
847 ( member(Single_Answer, Answer),
848 copy_term(Single_Answer, Single_Answer_Copy, Attributes),
849 append(['$residuals' = Attributes], Single_Answer_Copy, Single_Answer_With_Attributes)
850 ),
851 Final_Answer
852 ),
853 debug(mqi(query), "Constraints detected, converted: ~w to ~w", [Answer, Final_Answer])
854 ).
855
856
863get_next_result(Goal_Thread_ID, Stream, Options, Answers) :-
864 ( thread_property(Goal_Thread_ID, status(running))
865 -> true
866 ; ( reply_error(Stream, connection_failed),
867 throw(connection_failed)
868 )
869 ),
870 thread_self(Self_ID),
871 thread_get_message(Self_ID, result(Answers, Find_All), Options),
872 ( Find_All
873 -> ( debug(mqi(protocol), "Query completed and answers drained for findall ~w", [Goal_Thread_ID]),
874 retractall(query_in_progress(Goal_Thread_ID))
875 )
876 ; ( Answers = error(_)
877 -> ( debug(mqi(protocol), "Query completed and answers drained for non-findall ~w", [Goal_Thread_ID]),
878 retractall(query_in_progress(Goal_Thread_ID))
879 )
880 ; true
881 )
882 ).
883
884
887reply_with_result(_, Stream, error(Error)) :-
888 !,
889 reply_error(Stream, Error).
890
892reply_with_result(_, Stream, Result) :-
893 !,
894 catch(reply(Stream, Result), error(Exception, _), reply_with_result(_, Stream, error(Exception))).
895
896
899reply(Stream, Term) :-
900 debug(mqi(query), "Responding with Term: ~w", [Term]),
901 term_to_json_string(Term, Json_String),
902 write_message(Stream, Json_String).
903
904
907reply_error(Stream, Error_Term) :-
908 debug(mqi(query), "Responding with exception: ~w", [Error_Term]),
909 ( error(Error_Value, _) = Error_Term
910 -> Response = exception(Error_Value)
911 ; ( atom(Error_Term)
912 ->
913 Response = exception(Error_Term)
914 ; ( compound_name_arity(Error_Term, Name, _),
915 Response = exception(Name)
916 )
917 )
918 ),
919 reply(Stream, Response).
920
921
925
926
929write_message(Stream, String) :-
930 write_string_length(Stream, String),
931 write(Stream, String),
932 flush_output(Stream).
933
934
939read_message(Stream, String) :-
940 read_string_length(Stream, Length),
941 stream_property(Stream, encoding(Encoding)),
942 setup_call_cleanup(
943 stream_range_open(Stream, Tmp, [size(Length)]),
944 ( set_stream(Tmp, encoding(Encoding)),
945 read_string(Tmp, _, String)
946 ),
947 close(Tmp)).
948
949
951write_string_length(Stream, String) :-
952 stream_property(Stream, encoding(Encoding)),
953 string_encoding_length(String, Encoding, Length),
954 format(Stream, "~d.\n", [Length]).
955
956
959read_string_length(Stream, Length) :-
960 read_term(Stream, Length, []),
961 get_char(Stream, _).
962
963
965string_encoding_length(String, Encoding, Length) :-
966 setup_call_cleanup(
967 open_null_stream(Out),
968 ( set_stream(Out, encoding(Encoding)),
969 write(Out, String),
970 byte_count(Out, Length)
971 ),
972 close(Out)).
973
974
977term_to_json_string(Term, Json_String) :-
978 term_to_json(Term, Json),
979 with_output_to(string(Json_String),
980 ( current_output(Stream),
981 json_write(Stream, Json),
982 put(Stream, '\n')
983 )).
984
985
988repeat_until_false(Goal) :-
989 (\+ (\+ Goal)), !, repeat_until_false(Goal).
990repeat_until_false(_).
991
992
999abortSilentExit(Thread_ID, Exception) :-
1000 catch(thread_signal(Thread_ID, abort), error(Exception, _), true),
1001 debug(mqi(protocol), "Attempting to abort thread: ~w. thread_signal_exception: ~w", [Thread_ID, Exception]).
1002
1003
1010detach_if_expected(Thread_ID) :-
1011 thread_property(Thread_ID, status(Status)),
1012 debug(mqi(protocol), "Thread ~w exited with status ~w", [Thread_ID, Status]),
1013 ( once((Status = true ; Status = false))
1014 -> ( debug(mqi(protocol), "Expected thread status, detaching thread ~w", [Thread_ID]),
1015 thread_detach(Thread_ID)
1016 )
1017 ; true
1018 ).
1019
1020
1021write_output_to_file(File) :-
1022 debug(mqi(protocol), "Writing all STDOUT and STDERR to file:~w", [File]),
1023 open(File, write, Stream, [buffer(false)]),
1024 set_prolog_IO(user_input, Stream, Stream).
1025
1026
1045unix_domain_socket_path(Created_Directory, File_Path) :-
1046 tmp_file(udsock, Created_Directory),
1047 make_directory(Created_Directory),
1048 catch( chmod(Created_Directory, urwx),
1049 Exception,
1050 ( catch(delete_directory(Created_Directory), error(_, _), true),
1051 throw(Exception)
1052 )
1053 ),
1054 setup_call_cleanup( ( current_prolog_flag(tmp_dir, Save_Tmp_Dir),
1055 set_prolog_flag(tmp_dir, Created_Directory)
1056 ),
1057 tmp_file_stream(File_Path, Stream, []),
1058 set_prolog_flag(tmp_dir, Save_Tmp_Dir)
1059 ),
1060 close(Stream)