Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:23
erlang
2961-Rewrite-erpc-multicall-to-utilize-new-rece...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 2961-Rewrite-erpc-multicall-to-utilize-new-receive-optimi.patch of Package erlang
From 3cff897b83decdd82e2eebc60d98dfa57d89a467 Mon Sep 17 00:00:00 2001 From: Rickard Green <rickard@erlang.org> Date: Mon, 15 Feb 2021 19:53:11 +0100 Subject: [PATCH] Rewrite erpc:multicall() to utilize new receive optimization --- lib/kernel/src/erpc.erl | 194 +++++++++++++++++++++------------ lib/kernel/test/erpc_SUITE.erl | 112 +++++++++++++++++++ 2 files changed, 235 insertions(+), 71 deletions(-) diff --git a/lib/kernel/src/erpc.erl b/lib/kernel/src/erpc.erl index 89b286f479..a6b691754b 100644 --- a/lib/kernel/src/erpc.erl +++ b/lib/kernel/src/erpc.erl @@ -319,33 +319,9 @@ multicall(Ns, M, F, A, T) -> true = is_atom(M), true = is_atom(F), true = is_list(A), - Deadline = deadline(T), - {ReqIds, LC} = mcall_send_requests(Ns, M, F, A, [], T, false), - LRes = case LC of - false -> - undefined; - true -> - %% Timeout infinity and call on local node wanted; - %% execute local call in this process... - try - {return, Return} = execute_call(M, F, A), - {ok, Return} - catch - throw:Thrown -> - {throw, Thrown}; - exit:Reason -> - {exit, {exception, Reason}}; - error:Reason:Stack -> - case is_arg_error(Reason, M, F, A) of - true -> - {error, {?MODULE, Reason}}; - false -> - ErpcStack = trim_stack(Stack, M, F, A), - {error, {exception, Reason, ErpcStack}} - end - end - end, - mcall_receive_replies(ReqIds, [], LRes, Deadline) + Tag = make_ref(), + SendState = mcall_send_requests(Tag, Ns, M, F, A, T), + mcall_receive_replies(Tag, SendState) catch error:NotIErr when NotIErr /= internal_error -> error({?MODULE, badarg}) @@ -569,6 +545,8 @@ deadline(T) when ?IS_VALID_TMO_INT(T) -> time_left(infinity) -> infinity; +time_left(expired) -> + 0; time_left(Deadline) -> case Deadline - erlang:monotonic_time() of TimeLeft when TimeLeft =< 0 -> @@ -577,50 +555,124 @@ time_left(Deadline) -> erlang:convert_time_unit(TimeLeft-1, native, millisecond) + 1 end. -mcall_send_requests([], _M, _F, _A, RIDs, _T, LC) -> - {RIDs, LC}; -mcall_send_requests([N|Ns], M, F, A, RIDs, - infinity, false) when N == node() -> - mcall_send_requests(Ns, M, F, A, [local_call|RIDs], infinity, true); -mcall_send_requests([N|Ns], M, F, A, RIDs, T, LC) -> - RID = try - send_request(N, M, F, A) - catch - _:_ -> - %% Bad arguments... Abandon - %% requests we've already sent - %% and then fail... - mcall_failure_abandon(RIDs) - end, - mcall_send_requests(Ns, M, F, A, [RID|RIDs], T, LC); -mcall_send_requests(_, _, _, _, RIDs, _T, _LC) -> - %% Bad nodes list... Abandon requests we've - %% already sent and then fail... - mcall_failure_abandon(RIDs). - -mcall_failure_abandon([]) -> - error(badarg); -mcall_failure_abandon([local_call|RIDs]) -> - mcall_failure_abandon(RIDs); -mcall_failure_abandon([RID|RIDs]) -> +mcall_local_call(M, F, A) -> try - _ = receive_response(RID, 0), - ok + {return, Return} = execute_call(M, F, A), + {ok, Return} + catch + throw:Thrown -> + {throw, Thrown}; + exit:Reason -> + {exit, {exception, Reason}}; + error:Reason:Stack -> + case is_arg_error(Reason, M, F, A) of + true -> + {error, {?MODULE, Reason}}; + false -> + ErpcStack = trim_stack(Stack, M, F, A), + {error, {exception, Reason, ErpcStack}} + end + end. + +mcall_send_request(T, N, M, F, A) when is_reference(T), + is_atom(N), + is_atom(M), + is_atom(F), + is_list(A) -> + spawn_request(N, ?MODULE, execute_call, [T, M, F, A], + [{reply, error_only}, + {reply_tag, T}, + {monitor, [{tag, T}]}]). + +mcall_send_requests(Tag, Ns, M, F, A, Tmo) -> + DL = deadline(Tmo), + mcall_send_requests(Tag, Ns, M, F, A, [], DL, undefined, 0). + +mcall_send_requests(_Tag, [], M, F, A, RIDs, DL, local_call, NRs) -> + %% Timeout infinity and call on local node wanted; + %% excecute local call in this process... + LRes = mcall_local_call(M, F, A), + {ok, RIDs, #{local_call => LRes}, NRs, DL}; +mcall_send_requests(_Tag, [], _M, _F, _A, RIDs, DL, _LC, NRs) -> + {ok, RIDs, #{}, NRs, DL}; +mcall_send_requests(Tag, [N|Ns], M, F, A, RIDs, + infinity, undefined, NRs) when N == node() -> + mcall_send_requests(Tag, Ns, M, F, A, [local_call|RIDs], + infinity, local_call, NRs); +mcall_send_requests(Tag, [N|Ns], M, F, A, RIDs, DL, LC, NRs) -> + try mcall_send_request(Tag, N, M, F, A) of + RID -> + mcall_send_requests(Tag, Ns, M, F, A, [RID|RIDs], + DL, LC, NRs+1) catch _:_ -> - ok - end, - mcall_failure_abandon(RIDs). - -mcall_receive_replies([], Replies, undefined, _Deadline) -> - Replies; -mcall_receive_replies([local_call|RIDs], Replies, LRes, Deadline) -> - mcall_receive_replies(RIDs, [LRes|Replies], undefined, Deadline); -mcall_receive_replies([RID|RIDs], Replies, LRes, Deadline) -> - Reply = try - {ok, receive_response(RID, time_left(Deadline))} - catch - Class:Reason -> - {Class, Reason} - end, - mcall_receive_replies(RIDs, [Reply|Replies], LRes, Deadline). + %% Bad argument... Abandon requests and cleanup + %% any responses by receiving replies with a zero + %% timeout and then fail... + {badarg, RIDs, #{}, NRs, expired} + end; +mcall_send_requests(_Tag, _Ns, _M, _F, _A, RIDs, _DL, _LC, NRs) -> + %% Bad nodes list... Abandon requests and cleanup any responses + %% by receiving replies with a zero timeout and then fail... + {badarg, RIDs, #{}, NRs, expired}. + +mcall_receive_replies(Tag, {SendRes, RIDs, Rpls, NRs, DL}) -> + ResRpls = mcall_receive_replies(Tag, RIDs, Rpls, NRs, DL), + if SendRes /= ok -> + error(SendRes); %% Cleanup done; fail... + true -> + mcall_map_replies(RIDs, ResRpls, []) + end. + +mcall_receive_replies(_Tag, _ReqIds, Rpls, 0, _DL) -> + Rpls; +mcall_receive_replies(Tag, ReqIDs, Rpls, NRs, DL) -> + Tmo = time_left(DL), + receive + {Tag, ReqId, error, Reason} -> + Res = mcall_result(spawn_reply, ReqId, Tag, Reason), + mcall_receive_replies(Tag, ReqIDs, Rpls#{ReqId => Res}, + NRs-1, DL); + {Tag, ReqId, process, _Pid, Reason} -> + Res = mcall_result(down, ReqId, Tag, Reason), + mcall_receive_replies(Tag, ReqIDs, Rpls#{ReqId => Res}, + NRs-1, DL) + after Tmo -> + if ReqIDs == [] -> + Rpls; + true -> + NewNRs = mcall_abandon(Tag, ReqIDs, Rpls, NRs), + mcall_receive_replies(Tag, [], Rpls, NewNRs, expired) + end + end. + +mcall_result(ResType, ReqId, Tag, ResultReason) -> + try + {ok, result(ResType, ReqId, Tag, ResultReason)} + catch + Class:Reason -> + {Class, Reason} + end. + +mcall_abandon(_Tag, [], _Rpls, NRs) -> + NRs; +mcall_abandon(Tag, [local_call | RIDs], Rpls, NRs) -> + mcall_abandon(Tag, RIDs, Rpls, NRs); +mcall_abandon(Tag, [RID | RIDs], Rpls, NRs) -> + NewNRs = case maps:is_key(RID, Rpls) of + true -> + NRs; + false -> + case call_abandon(RID) of + true -> NRs-1; + false -> NRs + end + end, + mcall_abandon(Tag, RIDs, Rpls, NewNRs). + +mcall_map_replies([], _Rpls, Res) -> + Res; +mcall_map_replies([RID|RIDs], Rpls, Res) -> + Timeout = {error, {?MODULE, timeout}}, + mcall_map_replies(RIDs, Rpls, [maps:get(RID, Rpls, Timeout) | Res]). + diff --git a/lib/kernel/test/erpc_SUITE.erl b/lib/kernel/test/erpc_SUITE.erl index 1c969ad183..b1e8ab7c75 100644 --- a/lib/kernel/test/erpc_SUITE.erl +++ b/lib/kernel/test/erpc_SUITE.erl @@ -28,6 +28,9 @@ send_request_check_reqtmo/1, send_request_against_old_node/1, multicall/1, multicall_reqtmo/1, + multicall_recv_opt/1, + multicall_recv_opt2/1, + multicall_recv_opt3/1, multicast/1, timeout_limit/1]). -export([init_per_testcase/2, end_per_testcase/2]). @@ -55,6 +58,9 @@ all() -> send_request_against_old_node, multicall, multicall_reqtmo, + multicall_recv_opt, + multicall_recv_opt2, + multicall_recv_opt3, multicast, timeout_limit]. @@ -1088,6 +1094,102 @@ multicall_reqtmo(Config) when is_list(Config) -> stop_node(QuickNode2), Res. +multicall_recv_opt(Config) when is_list(Config) -> + Loops = 1000, + HugeMsgQ = 500000, + process_flag(message_queue_data, off_heap), + {ok, Node1} = start_node(Config), + {ok, Node2} = start_node(Config), + ExpRes = [{ok, node()}, {ok, Node1}, {ok, Node2}], + Nodes = [node(), Node1, Node2], + Fun = fun () -> erlang:node() end, + _Warmup = time_multicall(ExpRes, Nodes, Fun, infinity, Loops div 10), + Empty = time_multicall(ExpRes, Nodes, Fun, infinity, Loops), + io:format("Time with empty message queue: ~p microsecond~n", + [erlang:convert_time_unit(Empty, native, microsecond)]), + _ = [self() ! {msg,N} || N <- lists:seq(1, HugeMsgQ)], + Huge = time_multicall(ExpRes, Nodes, Fun, infinity, Loops), + io:format("Time with huge message queue: ~p microsecond~n", + [erlang:convert_time_unit(Huge, native, microsecond)]), + stop_node(Node1), + stop_node(Node2), + Q = Huge / Empty, + HugeMsgQ = flush_msgq(), + case Q > 10 of + true -> + ct:fail({ratio, Q}); + false -> + {comment, "Ratio: "++erlang:float_to_list(Q)} + end. + +multicall_recv_opt2(Config) when is_list(Config) -> + Loops = 1000, + HugeMsgQ = 500000, + process_flag(message_queue_data, off_heap), + {ok, Node1} = start_node(Config), + stop_node(Node1), + {ok, Node2} = start_node(Config), + ExpRes = [{ok, node()}, {error, {erpc, noconnection}}, {ok, Node2}], + Nodes = [node(), Node1, Node2], + Fun = fun () -> erlang:node() end, + _Warmup = time_multicall(ExpRes, Nodes, Fun, infinity, Loops div 10), + Empty = time_multicall(ExpRes, Nodes, Fun, infinity, Loops), + io:format("Time with empty message queue: ~p microsecond~n", + [erlang:convert_time_unit(Empty, native, microsecond)]), + _ = [self() ! {msg,N} || N <- lists:seq(1, HugeMsgQ)], + Huge = time_multicall(ExpRes, Nodes, Fun, infinity, Loops), + io:format("Time with huge message queue: ~p microsecond~n", + [erlang:convert_time_unit(Huge, native, microsecond)]), + stop_node(Node2), + Q = Huge / Empty, + HugeMsgQ = flush_msgq(), + case Q > 10 of + true -> + ct:fail({ratio, Q}); + false -> + {comment, "Ratio: "++erlang:float_to_list(Q)} + end. + +multicall_recv_opt3(Config) when is_list(Config) -> + Loops = 1000, + HugeMsgQ = 500000, + process_flag(message_queue_data, off_heap), + {ok, Node1} = start_node(Config), + stop_node(Node1), + {ok, Node2} = start_node(Config), + Nodes = [node(), Node1, Node2], + Fun = fun () -> erlang:node() end, + _Warmup = time_multicall(undefined, Nodes, Fun, infinity, Loops div 10), + Empty = time_multicall(undefined, Nodes, Fun, infinity, Loops), + io:format("Time with empty message queue: ~p microsecond~n", + [erlang:convert_time_unit(Empty, native, microsecond)]), + _ = [self() ! {msg,N} || N <- lists:seq(1, HugeMsgQ)], + Huge = time_multicall(undefined, Nodes, Fun, 0, Loops), + io:format("Time with huge message queue: ~p microsecond~n", + [erlang:convert_time_unit(Huge, native, microsecond)]), + stop_node(Node2), + Q = Huge / Empty, + HugeMsgQ = flush_msgq(), + case Q > 10 of + true -> + ct:fail({ratio, Q}); + false -> + {comment, "Ratio: "++erlang:float_to_list(Q)} + end. + +time_multicall(Expect, Nodes, Fun, Tmo, Times) -> + Start = erlang:monotonic_time(), + ok = do_time_multicall(Expect, Nodes, Fun, Tmo, Times), + erlang:monotonic_time() - Start. + +do_time_multicall(_Expect, _Nodes, _Fun, _Tmo, 0) -> + ok; +do_time_multicall(undefined, Nodes, Fun, Tmo, N) -> + _ = erpc:multicall(Nodes, Fun, Tmo), + do_time_multicall(undefined, Nodes, Fun, Tmo, N-1); +do_time_multicall(Expect, Nodes, Fun, Tmo, N) -> + Expect = erpc:multicall(Nodes, Fun, Tmo), + do_time_multicall(Expect, Nodes, Fun, Tmo, N-1). multicast(Config) when is_list(Config) -> {ok, Node} = start_node(Config), @@ -1280,3 +1382,13 @@ f() -> f2() -> timer:sleep(500), halt(). + +flush_msgq() -> + flush_msgq(0). +flush_msgq(N) -> + receive + _ -> + flush_msgq(N+1) + after 0 -> + N + end. -- 2.26.2
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor