Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:23
erlang
5743-pg-Implement-single-group-monitoring.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 5743-pg-Implement-single-group-monitoring.patch of Package erlang
From af9b3d19d834e6bdc1da21eaaf7e43d74220004b Mon Sep 17 00:00:00 2001 From: Maxim Fedorov <maximfca@gmail.com> Date: Tue, 7 Jun 2022 15:15:11 -0700 Subject: [PATCH 3/4] [pg] Implement single group monitoring Implement pg:monitor/1,2 that subscribe to changes for a single process group (similar to monitor_scope/0,1). --- lib/kernel/doc/src/pg.xml | 15 ++- lib/kernel/src/pg.erl | 185 ++++++++++++++++++++++++----------- lib/kernel/test/pg_SUITE.erl | 97 ++++++++++++------ 3 files changed, 210 insertions(+), 87 deletions(-) diff --git a/lib/kernel/doc/src/pg.xml b/lib/kernel/doc/src/pg.xml index 85b1529f44..4a308dca73 100644 --- a/lib/kernel/doc/src/pg.xml +++ b/lib/kernel/doc/src/pg.xml @@ -165,12 +165,25 @@ </desc> </func> + <func> + <name name="monitor" arity="1" since="OTP 25.1"/> + <name name="monitor" arity="2" since="OTP 25.1"/> + <fsummary>Starts membership monitoring for a specified group.</fsummary> + <desc> + <p>Subscribes the caller to updates for the specified group. Returns + list of processes currently in the group, and a reference to match + the upcoming notifications.</p> + <p>See <seemfa marker="#monitor_scope/0"><c>monitor_scope/0</c></seemfa> + for the update message structure.</p> + </desc> + </func> + <func> <name name="demonitor" arity="1" since="OTP 25.1"/> <name name="demonitor" arity="2" since="OTP 25.1"/> <fsummary>Stops group membership monitoring.</fsummary> <desc> - <p>Unsubscribes the caller from updates off the specified scope. + <p>Unsubscribes the caller from updates (scope or group). Flushes all outstanding updates that were already in the message queue of the calling process.</p> </desc> diff --git a/lib/kernel/src/pg.erl b/lib/kernel/src/pg.erl index 845021c7fa..a44794e352 100644 --- a/lib/kernel/src/pg.erl +++ b/lib/kernel/src/pg.erl @@ -62,6 +62,8 @@ leave/2, monitor_scope/0, monitor_scope/1, + monitor/1, + monitor/2, demonitor/1, demonitor/2, get_members/1, @@ -160,6 +162,26 @@ monitor_scope() -> monitor_scope(Scope) -> gen_server:call(Scope, monitor, infinity). +%%-------------------------------------------------------------------- +%% @doc +%% Returns list of processes in the requested group, and begins monitoring +%% group changes. Calling process will receive {Ref, join, Group, Pids} +%% message when new Pids join the Group, and {Ref, leave, Group, Pids} when +%% Pids leave the group. +-spec monitor(Group :: group()) -> {reference(), [pid()]}. +monitor(Group) -> + ?MODULE:monitor(?DEFAULT_SCOPE, Group). + +%%-------------------------------------------------------------------- +%% @doc +%% Returns list of processes in the requested group, and begins monitoring +%% group changes. Calling process will receive {Ref, join, Group, Pids} +%% message when new Pids join the Group, and {Ref, leave, Group, Pids} when +%% Pids leave the group. +-spec monitor(Scope :: atom(), Group :: group()) -> {reference(), [pid()]}. +monitor(Scope, Group) -> + gen_server:call(Scope, {monitor, Group}, infinity). + %%-------------------------------------------------------------------- %% @doc %% Stops monitoring Scope for groups changes. Flushes all @@ -238,7 +260,10 @@ which_local_groups(Scope) when is_atom(Scope) -> %% remote node: scope process monitor and map of groups to pids for fast sync routine remote = #{} :: #{pid() => {reference(), #{group() => [pid()]}}}, %% processes monitoring group membership - scope_monitors = #{} :: #{reference() => pid()} + scope_monitors = #{} :: #{reference() => pid()}, + %% processes monitoring specific groups (forward and reverse map) + group_monitors = #{} :: #{reference() => group()}, + monitored_groups = #{} :: #{group() => [{pid(), reference()}]} }). -type state() :: #state{}. @@ -254,25 +279,26 @@ init([Scope]) -> -spec handle_call(Call :: {join_local, Group :: group(), Pid :: pid()} | {leave_local, Group :: group(), Pid :: pid()} | monitor + | {monitor, Group :: group()} | {demonitor, Ref :: reference()}, From :: {pid(), Tag :: any()}, State :: state()) -> {reply, ok | not_joined | {reference(), #{group() => [pid()]}} | false, state()}. handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, - remote = Remote, scope_monitors = ScopeMon} = State) -> + remote = Remote, scope_monitors = ScopeMon, monitored_groups = MG} = State) -> NewLocal = join_local(PidOrPids, Group, Local), - join_local_update_ets(Scope, ScopeMon, Group, PidOrPids), + join_local_update_ets(Scope, ScopeMon, MG, Group, PidOrPids), broadcast(maps:keys(Remote), {join, self(), Group, PidOrPids}), {reply, ok, State#state{local = NewLocal}}; handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, - remote = Remote, scope_monitors = ScopeMon} = State) -> + remote = Remote, scope_monitors = ScopeMon, monitored_groups = MG} = State) -> case leave_local(PidOrPids, Group, Local) of Local -> {reply, not_joined, State}; NewLocal -> - leave_local_update_ets(Scope, ScopeMon, Group, PidOrPids), + leave_local_update_ets(Scope, ScopeMon, MG, Group, PidOrPids), broadcast(maps:keys(Remote), {leave, self(), PidOrPids, [Group]}), {reply, ok, State#state{local = NewLocal}} end; @@ -283,24 +309,45 @@ handle_call(monitor, {Pid, _Tag}, #state{scope = Scope, scope_monitors = ScopeMo MRef = erlang:monitor(process, Pid), %% monitor the monitor, to discard it upon termination, and generate MRef {reply, {MRef, Local}, State#state{scope_monitors = ScopeMon#{MRef => Pid}}}; -handle_call({demonitor, Ref}, _From, #state{scope_monitors = ScopeMon} = State) -> +handle_call({monitor, Group}, {Pid, _Tag}, #state{scope = Scope, group_monitors = GM, monitored_groups = MG} = State) -> + %% ETS cache is writable only from this process - so get_members is safe to use + Members = get_members(Scope, Group), + MRef = erlang:monitor(process, Pid), + NewMG = maps:update_with(Group, fun (Ex) -> [{Pid, MRef} | Ex] end, [{Pid, MRef}], MG), + {reply, {MRef, Members}, State#state{group_monitors = GM#{MRef => {Pid, Group}}, monitored_groups = NewMG}}; + +handle_call({demonitor, Ref}, _From, #state{scope_monitors = ScopeMon, group_monitors = GM, + monitored_groups = MG} = State) -> + %% not using maybe_drop_monitor here as it does not demonitor, and can not return 'false' case maps:take(Ref, ScopeMon) of {_, NewMons} -> erlang:demonitor(Ref), {reply, ok, State#state{scope_monitors = NewMons}}; error -> - {reply, false, State} + %% group monitor + case maps:take(Ref, GM) of + {{Pid, Group}, NewGM} -> + erlang:demonitor(Ref), + {reply, ok, State#state{group_monitors = NewGM, + monitored_groups = demonitor_group({Pid, Ref}, Group, MG)}}; + error -> + {reply, false, State} + end end; handle_call(_Request, _From, _S) -> erlang:error(badarg). -spec handle_cast( - {sync, Peer :: pid(), Groups :: [{group(), [pid()]}]}, + {sync, Peer :: pid(), Groups :: [{group(), [pid()]}]} | + {discover, Peer :: pid()} | + {join, Peer :: pid(), group(), pid() | [pid()]} | + {leave, Peer :: pid(), pid() | [pid()], [group()]}, State :: state()) -> {noreply, state()}. -handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) -> - {noreply, State#state{remote = handle_sync(Scope, ScopeMon, Peer, Remote, Groups)}}; +handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon, + monitored_groups = MG} = State) -> + {noreply, State#state{remote = handle_sync(Scope, ScopeMon, MG, Peer, Remote, Groups)}}; handle_cast(_, _State) -> erlang:error(badarg). @@ -313,10 +360,11 @@ handle_cast(_, _State) -> {nodedown, node()} | {nodeup, node()}, State :: state()) -> {noreply, state()}. %% remote pid or several pids joining the group -handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) -> +handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon, + monitored_groups = MG} = State) -> case maps:get(Peer, Remote, []) of {MRef, RemoteGroups} -> - join_remote_update_ets(Scope, ScopeMon, Group, PidOrPids), + join_remote_update_ets(Scope, ScopeMon, MG, Group, PidOrPids), %% store remote group => pids map for fast sync operation NewRemoteGroups = join_remote(Group, PidOrPids, RemoteGroups), {noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteGroups}}}}; @@ -329,10 +377,11 @@ handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remot end; %% remote pid leaving (multiple groups at once) -handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) -> +handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon, + monitored_groups = MG} = State) -> case maps:get(Peer, Remote, []) of {MRef, RemoteMap} -> - _ = leave_remote_update_ets(Scope, ScopeMon, PidOrPids, Groups), + _ = leave_remote_update_ets(Scope, ScopeMon, MG, PidOrPids, Groups), NewRemoteMap = leave_remote(PidOrPids, RemoteMap, Groups), {noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteMap}}}}; [] -> @@ -358,28 +407,29 @@ handle_info({discover, Peer}, #state{remote = Remote, local = Local} = State) -> {noreply, State#state{remote = Remote#{Peer => {MRef, #{}}}}} end; -%% handle local process exit -handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local, remote = Remote, - scope_monitors = ScopeMon} = State) when node(Pid) =:= node() -> +%% handle local process exit, or a local monitor exit +handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local, + remote = Remote, scope_monitors = ScopeMon, monitored_groups = MG} = State) when node(Pid) =:= node() -> case maps:take(Pid, Local) of error -> - maybe_drop_monitor(MRef, State); + {noreply, maybe_drop_monitor(MRef, State)}; {{MRef, Groups}, NewLocal} -> - [leave_local_update_ets(Scope, ScopeMon, Group, Pid) || Group <- Groups], - %% send update to all scope processes on remote nodes + [leave_local_update_ets(Scope, ScopeMon, MG, Group, Pid) || Group <- Groups], + %% send update to all remote peers broadcast(maps:keys(Remote), {leave, self(), Pid, Groups}), {noreply, State#state{local = NewLocal}} end; %% handle remote node down or scope leaving overlay network, or a monitor from the remote node went down handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote, - scope_monitors = ScopeMon} = State) -> + scope_monitors = ScopeMon, monitored_groups = MG} = State) -> case maps:take(Pid, Remote) of {{MRef, RemoteMap}, NewRemote} -> - maps:foreach(fun (Group, Pids) -> leave_remote_update_ets(Scope, ScopeMon, Pids, [Group]) end, RemoteMap), + maps:foreach(fun (Group, Pids) -> + leave_remote_update_ets(Scope, ScopeMon, MG, Pids, [Group]) end, RemoteMap), {noreply, State#state{remote = NewRemote}}; error -> - maybe_drop_monitor(MRef, State) + {noreply, maybe_drop_monitor(MRef, State)} end; %% nodedown: ignore, and wait for 'DOWN' signal for monitored process @@ -420,7 +470,7 @@ ensure_local(Bad) -> %% Override all knowledge of the remote node with information it sends %% to local node. Current implementation must do the full table scan %% to remove stale pids (just as for 'nodedown'). -handle_sync(Scope, ScopeMon, Peer, Remote, Groups) -> +handle_sync(Scope, ScopeMon, MG, Peer, Remote, Groups) -> %% can't use maps:get() because it evaluates 'default' value first, %% and in this case monitor() call has side effect. {MRef, RemoteGroups} = @@ -431,25 +481,25 @@ handle_sync(Scope, ScopeMon, Peer, Remote, Groups) -> MRef0 end, %% sync RemoteMap and transform ETS table - _ = sync_groups(Scope, ScopeMon, RemoteGroups, Groups), + _ = sync_groups(Scope, ScopeMon, MG, RemoteGroups, Groups), Remote#{Peer => {MRef, maps:from_list(Groups)}}. -sync_groups(Scope, ScopeMon, RemoteGroups, []) -> +sync_groups(Scope, ScopeMon, MG, RemoteGroups, []) -> %% leave all missing groups - [leave_remote_update_ets(Scope, ScopeMon, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)]; -sync_groups(Scope, ScopeMon, RemoteGroups, [{Group, Pids} | Tail]) -> + [leave_remote_update_ets(Scope, ScopeMon, MG, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)]; +sync_groups(Scope, ScopeMon, MG, RemoteGroups, [{Group, Pids} | Tail]) -> case maps:take(Group, RemoteGroups) of {Pids, NewRemoteGroups} -> - sync_groups(Scope, ScopeMon, NewRemoteGroups, Tail); + sync_groups(Scope, ScopeMon, MG, NewRemoteGroups, Tail); {OldPids, NewRemoteGroups} -> [{Group, AllOldPids, LocalPids}] = ets:lookup(Scope, Group), %% should be really rare... AllNewPids = Pids ++ AllOldPids -- OldPids, true = ets:insert(Scope, {Group, AllNewPids, LocalPids}), - sync_groups(Scope, ScopeMon, NewRemoteGroups, Tail); + sync_groups(Scope, ScopeMon, MG, NewRemoteGroups, Tail); error -> - join_remote_update_ets(Scope, ScopeMon, Group, Pids), - sync_groups(Scope, ScopeMon, RemoteGroups, Tail) + join_remote_update_ets(Scope, ScopeMon, MG, Group, Pids), + sync_groups(Scope, ScopeMon, MG, RemoteGroups, Tail) end. join_local(Pid, Group, Local) when is_pid(Pid) -> @@ -465,39 +515,39 @@ join_local([], _Group, Local) -> join_local([Pid | Tail], Group, Local) -> join_local(Tail, Group, join_local(Pid, Group, Local)). -join_local_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) -> +join_local_update_ets(Scope, ScopeMon, MG, Group, Pid) when is_pid(Pid) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> ets:insert(Scope, {Group, [Pid | All], [Pid | Local]}); [] -> ets:insert(Scope, {Group, [Pid], [Pid]}) end, - notify_group(ScopeMon, join, Group, [Pid]); -join_local_update_ets(Scope, ScopeMon, Group, Pids) -> + notify_group(ScopeMon, MG, join, Group, [Pid]); +join_local_update_ets(Scope, ScopeMon, MG, Group, Pids) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> ets:insert(Scope, {Group, Pids ++ All, Pids ++ Local}); [] -> ets:insert(Scope, {Group, Pids, Pids}) end, - notify_group(ScopeMon, join, Group, Pids). + notify_group(ScopeMon, MG, join, Group, Pids). -join_remote_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) -> +join_remote_update_ets(Scope, ScopeMon, MG, Group, Pid) when is_pid(Pid) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> ets:insert(Scope, {Group, [Pid | All], Local}); [] -> ets:insert(Scope, {Group, [Pid], []}) end, - notify_group(ScopeMon, join, Group, [Pid]); -join_remote_update_ets(Scope, ScopeMon, Group, Pids) -> + notify_group(ScopeMon, MG, join, Group, [Pid]); +join_remote_update_ets(Scope, ScopeMon, MG, Group, Pids) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> ets:insert(Scope, {Group, Pids ++ All, Local}); [] -> ets:insert(Scope, {Group, Pids, []}) end, - notify_group(ScopeMon, join, Group, Pids). + notify_group(ScopeMon, MG, join, Group, Pids). join_remote(Group, Pid, RemoteGroups) when is_pid(Pid) -> maps:update_with(Group, fun (List) -> [Pid | List] end, [Pid], RemoteGroups); @@ -524,19 +574,19 @@ leave_local([], _Group, Local) -> leave_local([Pid | Tail], Group, Local) -> leave_local(Tail, Group, leave_local(Pid, Group, Local)). -leave_local_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) -> +leave_local_update_ets(Scope, ScopeMon, MG, Group, Pid) when is_pid(Pid) -> case ets:lookup(Scope, Group) of [{Group, [Pid], [Pid]}] -> ets:delete(Scope, Group), - notify_group(ScopeMon, leave, Group, [Pid]); + notify_group(ScopeMon, MG, leave, Group, [Pid]); [{Group, All, Local}] -> ets:insert(Scope, {Group, lists:delete(Pid, All), lists:delete(Pid, Local)}), - notify_group(ScopeMon, leave, Group, [Pid]); + notify_group(ScopeMon, MG, leave, Group, [Pid]); [] -> %% rare race condition when 'DOWN' from monitor stays in msg queue while process is leave-ing. true end; -leave_local_update_ets(Scope, ScopeMon, Group, Pids) -> +leave_local_update_ets(Scope, ScopeMon, MG, Group, Pids) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> case All -- Pids of @@ -545,25 +595,25 @@ leave_local_update_ets(Scope, ScopeMon, Group, Pids) -> NewAll -> ets:insert(Scope, {Group, NewAll, Local -- Pids}) end, - notify_group(ScopeMon, leave, Group, Pids); + notify_group(ScopeMon, MG, leave, Group, Pids); [] -> true end. -leave_remote_update_ets(Scope, ScopeMon, Pid, Groups) when is_pid(Pid) -> +leave_remote_update_ets(Scope, ScopeMon, MG, Pid, Groups) when is_pid(Pid) -> _ = [ case ets:lookup(Scope, Group) of [{Group, [Pid], []}] -> ets:delete(Scope, Group), - notify_group(ScopeMon, leave, Group, [Pid]); + notify_group(ScopeMon, MG, leave, Group, [Pid]); [{Group, All, Local}] -> ets:insert(Scope, {Group, lists:delete(Pid, All), Local}), - notify_group(ScopeMon, leave, Group, [Pid]); + notify_group(ScopeMon, MG, leave, Group, [Pid]); [] -> true end || Group <- Groups]; -leave_remote_update_ets(Scope, ScopeMon, Pids, Groups) -> +leave_remote_update_ets(Scope, ScopeMon, MG, Pids, Groups) -> _ = [ case ets:lookup(Scope, Group) of [{Group, All, Local}] -> @@ -573,7 +623,7 @@ leave_remote_update_ets(Scope, ScopeMon, Pids, Groups) -> NewAll -> ets:insert(Scope, {Group, NewAll, Local}) end, - notify_group(ScopeMon, leave, Group, Pids); + notify_group(ScopeMon, MG, leave, Group, Pids); [] -> true end || @@ -611,25 +661,44 @@ broadcast([Dest | Tail], Msg) -> erlang:send(Dest, Msg, [noconnect]), broadcast(Tail, Msg). - %% drops a monitor if DOWN was received -maybe_drop_monitor(MRef, #state{scope_monitors = ScopeMon} = State) -> +maybe_drop_monitor(MRef, #state{scope_monitors = ScopeMon, group_monitors = GMs, monitored_groups = MG} = State) -> %% could be a local monitor going DOWN. Since it's a rare event, check should %% not stay in front of any other, more frequent events case maps:take(MRef, ScopeMon) of error -> - %% this can only happen when leave request and 'DOWN' are in pg queue - {noreply, State}; + case maps:take(MRef, GMs) of + error -> + State; + {{Pid, Group}, NewGM} -> + %% clean up the inverse map + State#state{group_monitors = NewGM, monitored_groups = demonitor_group({Pid, MRef}, Group, MG)} + end; {_Pid, NewScopeMon} -> - {noreply, State#state{scope_monitors = NewScopeMon}} + State#state{scope_monitors = NewScopeMon} + end. + +demonitor_group(Tag, Group, MG) -> + case maps:find(Group, MG) of + {ok, [Tag]} -> + maps:remove(Group, MG); + {ok, Tags} -> + maps:put(Group, Tags -- [Tag], MG) end. -%% notify all scope monitors about an Action in Groups for Pids -notify_group(ScopeMon, Action, Group, Pids) -> +%% notify all monitors about an Action in Groups for Pids +notify_group(ScopeMonitors, MG, Action, Group, Pids) -> maps:foreach( fun (Ref, Pid) -> erlang:send(Pid, {Ref, Action, Group, Pids}, [noconnect]) - end, ScopeMon). + end, ScopeMonitors), + case maps:find(Group, MG) of + error -> + ok; + {ok, Monitors} -> + [erlang:send(Pid, {Ref, Action, Group, Pids}, [noconnect]) || {Pid, Ref} <- Monitors], + ok + end. %% remove all messages that were send to monitor groups flush(Ref) -> diff --git a/lib/kernel/test/pg_SUITE.erl b/lib/kernel/test/pg_SUITE.erl index 5d56b751e0..130b54a5f8 100644 --- a/lib/kernel/test/pg_SUITE.erl +++ b/lib/kernel/test/pg_SUITE.erl @@ -55,7 +55,8 @@ disconnected_start/1, forced_sync/0, forced_sync/1, group_leave/1, - monitor_scope/0, monitor_scope/1 + monitor_scope/0, monitor_scope/1, + monitor/1 ]). -export([ @@ -82,7 +83,7 @@ groups() -> {cluster, [parallel], [process_owner_check, two, initial, netsplit, trisplit, foursplit, exchange, nolocal, double, scope_restart, missing_scope_join, empty_group_by_remote_leave, disconnected_start, forced_sync, group_leave]}, - {monitor, [parallel], [monitor_scope]} + {monitor, [parallel], [monitor_scope, monitor]} ]. %%-------------------------------------------------------------------- @@ -111,7 +112,7 @@ errors(_Config) -> ?assertException(error, badarg, pg:handle_cast(garbage, garbage)), %% kill with call {ok, _Pid} = pg:start(second), - ?assertException(exit, {{badarg, _}, _}, gen_server:call(second, garbage, 100)). + ?assertException(exit, {{badarg, _}, _}, gen_server:call(second, garbage)). leave_exit_race() -> [{doc, "Tests that pg correctly handles situation when leave and 'DOWN' messages are both in pg queue"}]. @@ -228,7 +229,9 @@ two(Config) when is_list(Config) -> Pid = erlang:spawn(forever()), ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)), ?assertEqual([Pid], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), - %% first RPC must be serialised + %% first RPC must be serialised 3 times + sync({?FUNCTION_NAME, TwoPeer}), + sync(?FUNCTION_NAME), sync({?FUNCTION_NAME, TwoPeer}), ?assertEqual([Pid], rpc:call(TwoPeer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), ?assertEqual([], rpc:call(TwoPeer, pg, get_local_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), @@ -243,7 +246,7 @@ two(Config) when is_list(Config) -> ?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid2])), ?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid3])), %% serialise through the *other* node - sync({?FUNCTION_NAME, TwoPeer}), + sync_via({?FUNCTION_NAME, TwoPeer}, ?FUNCTION_NAME), ?assertEqual(lists:sort([Pid2, Pid3]), lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), %% stop the peer @@ -311,7 +314,11 @@ initial(Config) when is_list(Config) -> ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)), ?assertEqual([Pid], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), - %% first RPC must be serialised + %% first sync makes the peer node to process 'nodeup' (and send discover) + sync({?FUNCTION_NAME, Peer}), + %% second sync makes origin node pg to reply to discover' + sync(?FUNCTION_NAME), + %% third sync makes peer node to finish processing 'exchange' sync({?FUNCTION_NAME, Peer}), ?assertEqual([Pid], rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), @@ -564,24 +571,44 @@ monitor_scope() -> [{doc, "Tests monitor_scope/1 and demonitor/2"}]. monitor_scope(Config) when is_list(Config) -> - Self = self(), - Scope = ?FUNCTION_NAME, - Group = ?FUNCTION_ARITY, %% ensure that demonitoring returns 'false' when monitor is not installed - ?assertEqual(false, pg:demonitor(Scope, erlang:make_ref())), - %% start the actual test case - {Ref, #{}} = pg:monitor_scope(Scope), + ?assertEqual(false, pg:demonitor(?FUNCTION_NAME, erlang:make_ref())), + InitialMonitor = fun (Scope) -> {Ref, #{}} = pg:monitor_scope(Scope), Ref end, + SecondMonitor = fun (Scope, Group, Control) -> {Ref, #{Group := [Control]}} = pg:monitor_scope(Scope), Ref end, + %% WHITE BOX: knowing pg state internals - only the original monitor should stay + DownMonitor = fun (Scope, Ref, Self) -> + {state, _, _, _, ScopeMonitors, _, _} = sys:get_state(Scope), + ?assertEqual(#{Ref => Self}, ScopeMonitors, "pg did not remove DOWNed scope monitor") + end, + monitor_test_impl(?FUNCTION_NAME, ?FUNCTION_ARITY, InitialMonitor, SecondMonitor, DownMonitor). + +monitor(Config) when is_list(Config) -> + ExpectedGroup = {?FUNCTION_NAME, ?FUNCTION_ARITY}, + InitialMonitor = fun (Scope) -> {Ref, []} = pg:monitor(Scope, ExpectedGroup), Ref end, + SecondMonitor = fun (Scope, Group, Control) -> + {Ref, [Control]} = pg:monitor(Scope, Group), Ref end, + DownMonitor = fun (Scope, Ref, Self) -> + {state, _, _, _, _, GM, MG} = sys:get_state(Scope), + ?assertEqual(#{Ref => {Self, ExpectedGroup}}, GM, "pg did not remove DOWNed group monitor"), + ?assertEqual(#{ExpectedGroup => [{Self, Ref}]}, MG, "pg did not remove DOWNed group") + end, + monitor_test_impl(?FUNCTION_NAME, ExpectedGroup, InitialMonitor, SecondMonitor, DownMonitor). + +monitor_test_impl(Scope, Group, InitialMonitor, SecondMonitor, DownMonitor) -> + Self = self(), + Ref = InitialMonitor(Scope), %% local join ?assertEqual(ok, pg:join(Scope, Group, Self)), wait_message(Ref, join, Group, [Self], "Local"), %% start second monitor (which has 1 local pid at the start) - SecondMonitor = spawn_link(fun() -> second_monitor(Scope, Group, Self) end), - Ref2 = receive {second, SecondRef} -> SecondRef end, + ExtraMonitor = spawn_link(fun() -> second_monitor(Scope, Group, Self, SecondMonitor) end), + Ref2 = receive {ExtraMonitor, SecondRef} -> SecondRef end, %% start a remote node, and a remote monitor {Peer, Node} = spawn_node(Scope), ScopePid = whereis(Scope), %% do not care about the remote monitor, it is started only to check DOWN handling - _ThirdMonitor = spawn(Node, fun() -> second_monitor(ScopePid, Group, Self) end), + ThirdMonitor = spawn_link(Node, fun() -> second_monitor(ScopePid, Group, Self, SecondMonitor) end), + Ref3 = receive {ThirdMonitor, ThirdRef} -> ThirdRef end, %% remote join RemotePid = erlang:spawn(Node, forever()), ?assertEqual(ok, rpc:call(Node, pg, join, [Scope, Group, [RemotePid, RemotePid]])), @@ -592,20 +619,27 @@ monitor_scope(Config) when is_list(Config) -> wait_message(Ref, leave, Group, [Self], "Local"), %% remote leave ?assertEqual(ok, rpc:call(Node, pg, leave, [Scope, Group, RemotePid])), + %% flush the local pg scope via remote pg (to ensure local pg finished sending notifications) + sync_via({?FUNCTION_NAME, Node}, ?FUNCTION_NAME), wait_message(Ref, leave, Group, [RemotePid], "Remote"), - %% drop the SecondMonitor - this keeps original and remote monitors - SecondMonMsgs = gen_server:call(SecondMonitor, flush), + %% drop the ExtraMonitor - this keeps original and remote monitors + SecondMonMsgs = gen_server:call(ExtraMonitor, flush), %% inspect the queue, it should contain double remote join, then single local and single remove leave - ?assertEqual([ + ExpectedLocalMessages = [ {Ref2, join, Group, [RemotePid, RemotePid]}, {Ref2, leave, Group, [Self]}, {Ref2, leave, Group, [RemotePid]}], - SecondMonMsgs), + ?assertEqual(ExpectedLocalMessages, SecondMonMsgs, "Local monitor failed"), + %% inspect remote monitor queue + ThirdMonMsgs = gen_server:call(ThirdMonitor, flush), + ExpectedRemoteMessages = [ + {Ref3, join, Group, [RemotePid, RemotePid]}, + {Ref3, leave, Group, [Self]}, + {Ref3, leave, Group, [RemotePid]}], + ?assertEqual(ExpectedRemoteMessages, ThirdMonMsgs, "Remote monitor failed"), %% remote leave via stop (causes remote monitor to go DOWN) wait_message(Ref, leave, Group, [RemotePid], "Remote stop"), - %% WHITE BOX: knowing pg state internals - only the original monitor should stay - {state, _, _, _, InternalMonitors} = sys:get_state(?FUNCTION_NAME), - ?assertEqual(#{Ref => Self}, InternalMonitors, "pg did not remove DOWNed monitor"), + DownMonitor(Scope, Ref, Self), %% demonitor ?assertEqual(ok, pg:demonitor(Scope, Ref)), ?assertEqual(false, pg:demonitor(Scope, Ref)), @@ -615,7 +649,7 @@ monitor_scope(Config) when is_list(Config) -> sync(Scope), %% join should not be here receive {Ref, Action, Group, [Self]} -> ?assert(false, lists:concat(["Unexpected ", Action, "event"])) - after 0 -> ok end. + after 0 -> ok end. wait_message(Ref, Action, Group, Pids, Msg) -> receive @@ -624,12 +658,12 @@ wait_message(Ref, Action, Group, Pids, Msg) -> after 1000 -> {messages, Msgs} = process_info(self(), messages), ct:pal("Message queue: ~0p", [Msgs]), - ?assert(false, Msg ++ " " ++ atom_to_list(Action) ++ " event failed to occur") + ?assert(false, lists:flatten(io_lib:format("Expected ~s ~s for ~p", [Msg, Action, Group]))) end. -second_monitor(Scope, Group, Control) -> - {Ref, #{Group := [Control]}} = pg:monitor_scope(Scope), - Control ! {second, Ref}, +second_monitor(Scope, Group, Control, SecondMonitor) -> + Ref = SecondMonitor(Scope, Group, Control), + Control ! {self(), Ref}, second_monitor([]). second_monitor(Msgs) -> @@ -643,9 +677,16 @@ second_monitor(Msgs) -> %%-------------------------------------------------------------------- %% Test Helpers - start/stop additional Erlang nodes +%% flushes GS (GenServer) queue, ensuring that all prior +%% messages have been processed sync(GS) -> _ = sys:log(GS, get). +%% flushes GS queue from the point of view of a registered process RegName +%% running on the Node. +sync_via({RegName, Node}, GS) -> + rpc:call(Node, sys, replace_state, [RegName, fun (S) -> (catch sys:get_state(GS)), S end]). + ensure_peers_info(Scope, Peers) -> %% Ensures that pg server on local node has gotten info from %% pg servers on all Peer nodes passed as argument (assuming @@ -659,7 +700,7 @@ ensure_peers_info(Scope, Nodes) -> %% sync(Scope), - %% Known: nodup handled and discover sent to Peer + %% Known: nodeup handled and discover sent to Peer lists:foreach(fun (Peer) -> sync({Scope, Peer}) end, Peers), %% Known: nodeup handled by Peers and discover sent to local -- 2.35.3
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor