Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:26
erlang
3042-Rewrite-handling-of-wrap-counters.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 3042-Rewrite-handling-of-wrap-counters.patch of Package erlang
From 9b58966d62caa1ea115b4706204545c80fd4001c Mon Sep 17 00:00:00 2001 From: Raimo Niskanen <raimo@erlang.org> Date: Fri, 1 Mar 2024 17:31:59 +0100 Subject: [PATCH 12/22] Rewrite handling of wrap counters When reading the wrap counters we cannot use a receive loop with selective receive that doesn't use the ref recv optimization. Rewrite the code to use an intermediate temporary state that only collects wrap counters and postpones all other events. --- lib/kernel/src/gen_tcp_socket.erl | 571 ++++++++++++++++-------------- 1 file changed, 301 insertions(+), 270 deletions(-) diff --git a/lib/kernel/src/gen_tcp_socket.erl b/lib/kernel/src/gen_tcp_socket.erl index 407f567bb3..d8a0883631 100644 --- a/lib/kernel/src/gen_tcp_socket.erl +++ b/lib/kernel/src/gen_tcp_socket.erl @@ -775,7 +775,7 @@ getstat(?MODULE_socket(Server, _Socket), What) when is_list(What) -> %% ------------------------------------------------------------------------- info(?MODULE_socket(Server, _Socket)) -> - case call(Server, info) of + case call(Server, {info, inet:stats()}) of {error, closed} -> ?CLOSED_SOCKET; Other -> @@ -1434,7 +1434,7 @@ call(Server, Msg) -> gen_statem:call(Server, Msg) catch exit:{noproc, {gen_statem, call, _Args}} -> - {error, closed}; + {error, einval}; exit:{{shutdown, _}, _} -> {error, closed}; C:E:S -> @@ -1482,6 +1482,11 @@ callback_mode() -> handle_event_function. listen_socket :: socket:socket()}). %% Socket is not created +-record(wrap_counters, + {ref :: reference(), + call :: info | {getstat, list()}, + state :: term()}). + %% 'connect' % A listen socket stays here -record(connect, @@ -1578,6 +1583,8 @@ terminate(_Reason, State, P_D) -> case State of #controlling_process{state = OldState} -> terminate(OldState, P_D); + #wrap_counters{state = OldState} -> + terminate(OldState, P_D); _ -> terminate(State, P_D) end. @@ -1597,39 +1604,165 @@ module_socket(#params{socket = Socket}) -> %% ------------------------------------------------------------------------- %% Event Handler (callback) -%% Any state: - -%% Call: get_server_opts/0 -handle_event({call, From}, get_server_opts, _State, {_P, D}) -> - ServerData = maps:with(maps:keys(server_opts()), D), - {keep_state_and_data, - [{reply, From, {ok, ServerData}}]}; - -%% Info: Owner 'DOWN' handle_event( info, {'DOWN', OwnerMon, _, _, Reason}, _State, {#params{owner_mon = OwnerMon} = _P, _D} = P_D) -> %% {stop, {shutdown, Reason}, P_D}; -%% Info: ?socket_counter_wrap/2 +%% ------- +%% State: 'accept' | #accept{} + +handle_event( + {call, From}, {accept, ListenSocket, Timeout}, + 'accept' = _State, {P, D}) -> + handle_accept(P, D, From, ListenSocket, Timeout, accept); +handle_event(Type, Content, 'accept' = State, P_D) -> + handle_unexpected(Type, Content, State, P_D); + +handle_event( + info, ?socket_select(ListenSocket, SelectRef), + #accept{ + info = ?select_info(SelectRef), from = From, + listen_socket = ListenSocket}, + {P, D}) -> + handle_accept(P, D, From, ListenSocket, update, select); +handle_event( + info, ?socket_completion(ListenSocket, CompletionRef, CompletionStatus), + #accept{ + info = ?completion_info(CompletionRef), from = From, + listen_socket = ListenSocket}, + {P, D}) -> + handle_accept(P, D, From, ListenSocket, update, CompletionStatus); handle_event( - info, ?socket_counter_wrap(Socket, Counter), - 'connected' = _State, {#params{socket = Socket} = P, D}) -> - %% ?DBG([{state, _State}, {counter, Counter}]), - {keep_state, {P, wrap_counter(Counter, D)}}; + info, ?socket_abort(ListenSocket, SelectRef, Reason), + #accept{ + info = ?select_info(SelectRef), from = From, + listen_socket = ListenSocket}, + P_D) -> + {next_state,'accept', P_D, + [{{timeout, accept}, cancel}, + {reply, From, {error, Reason}}]}; handle_event( - info, ?socket_counter_wrap(Socket, Counter), - #recv{} = _State, {#params{socket = Socket} = P, D}) -> - %% ?DBG([{state, _State}, {counter, Counter}]), - {keep_state, {P, wrap_counter(Counter, D)}}; + info, ?socket_abort(ListenSocket, CompletionRef, Reason), + #accept{ + info = ?completion_info(CompletionRef), from = From, + listen_socket = ListenSocket}, + P_D) -> + {next_state, 'accept', P_D, + [{{timeout, accept}, cancel}, + {reply, From, {error, Reason}}]}; handle_event( - info, ?socket_counter_wrap(_Socket, _Counter), _State, _P_D) -> - %% ?DBG([{state, _State}, {counter, _Counter}]), + {timeout, accept}, Info, + #accept{ + info = Info, from = From, + listen_socket = ListenSocket}, + P_D) -> + socket_cancel(ListenSocket, Info), + {next_state, 'accept', P_D, + [{reply, From, {error, timeout}}]}; +handle_event(Type, Content, #accept{} = State, P_D) -> + handle_unexpected(Type, Content, State, P_D); + +%% State: 'accept' | #accept{} +%% ------- +%% When an accept socket has cleared 'accept' | #accept{} +%% #params.socket is a socket:socket(), not undefined. +%% +%% For listen and connect sockets it is so from state machine start. +%% ------- #params.socket is defined from here on --------------------------- + + +handle_event({call, From}, get_server_opts, _State, {_P, D}) -> + ServerData = maps:with(maps:keys(server_opts()), D), {keep_state_and_data, - [postpone]}; + [{reply, From, {ok, ServerData}}]}; + + +%% How to handle counter wrap messages, +%% i.e how to keep counter wraps in sync with counter values +%% a.k.a how to get consistent counter values... +%% +%% Wrap messages are sent from NIF calls, a call made by +%% this server, or by another process which would be a send operation. +%% +%% A process calls info(Socket), we get the message, there may +%% be a wrap message induced by a NIF call we just made that still +%% is in transit. So we read the counters, send them plus the current +%% wrap value in a special message to ourselves and enter +%% an intermediate state where we postpone all (almost) events +%% except wrap message events. When we receive the special +%% message, the wrap counters we have have been updated. +%% But after we did read the counters a counter may wrap (e.g due to some +%% other process calling a NIF send operation). This wrap message may +%% arrive before *or* after we get the special message when we read +%% the counters a second time. +%% +%% So: when we read the counters the second time we almost trust them +%% together with the current wrap value. But if the wrap value +%% hasn't changed since we sent the special message we need to check +%% if the counter value has wrapped (jumped down); then there is +%% a wrap message in transit from after our first counter read, +%% and we add 1 to the current wrap count when calculating +%% the counter value. The actual wrap counter will be updated when +%% the wrap message arrives. +%% +%% That all assumes that we are fast enough to not have to handle +%% two wraps of the same counter in this procedure... +handle_event( + info, ?socket_counter_wrap(Socket, CounterName), _State, + {#params{socket = Socket} = P, D}) -> + {keep_state, {P, count_wrap(CounterName, D)}}; +%% +handle_event({call, From}, {Tag, What} = Call, State, {P, D} = P_D) + when info =:= Tag; + getstat =:= Tag -> + case State of + 'closed' -> + {keep_state_and_data, + [{reply, From, ?CLOSED_SOCKET}]}; + _ -> + Socket = P#params.socket, + #{counters := Counters_0} = socket:info(Socket), + CounterKeys = counter_keys(What), + Counters = maps:with(CounterKeys, Counters_0), + Wraps = maps:with(CounterKeys, D), + Ref = make_ref(), + %% We could send D as Wraps, but we filter out + %% the wrap counter keys to keep the message size down + self() ! {wrap_counters, Ref, From, Counters, Wraps}, + {next_state, + #wrap_counters{ref = Ref, call = Call, state = State}, + P_D} + end; +%% ------- +%% State: #wrap_counters{} +handle_event( + info, {wrap_counters, Ref, From, Counters1, Wraps1}, + #wrap_counters{ref = Ref, call = {Tag, What}, state = NewState}, + {#params{socket = Socket} = P, D} = P_D) -> + Info = #{counters := Counters2} = socket:info(Socket), + Counters = wrap_counters(Counters1, Counters2, Wraps1, D), + GetstatCounters = getstat_what(What, Counters), + Reply = + case Tag of + info -> + Owner = P#params.owner, + Active = maps:get(active, D), + Info + #{counters := maps:from_list(GetstatCounters), + owner => Owner, + active => Active}; + getstat -> + GetstatCounters + end, + {next_state, NewState, P_D, + [{reply, From, Reply}]}; +handle_event(_Type, _Content, #wrap_counters{}, _P_D) -> + {keep_state_and_data, [postpone]}; +%% State: #info{} +%% ------- -%% Call: controlling_process/1 handle_event( {call, {Caller, _} = From}, {controlling_process, NewOwner}, State, {P, _D} = P_D) -> @@ -1647,10 +1780,8 @@ handle_event( {keep_state_and_data, [{reply, From, {error, not_owner}}]} end; -%% +%% ------- %% State: #controlling_process{} -%% -%% Call: controlling_process/0 handle_event( {call, {Owner, _} = From}, controlling_process, #controlling_process{owner = NewOwner, state = State}, @@ -1670,13 +1801,15 @@ handle_event( _StateData) -> %% {keep_state_and_data, [postpone]}; -%% Handled state: #controlling_process{} +%% State: #controlling_process{} +%% ------- -%% Call: close/0 handle_event({call, From}, close, State, {P, D}) -> handle_close(P, D, State, [{reply, From, ok}]); -%% Call: getopts/1 +%%% handle_event({call, From}, {send_error, Reason}, State, {P, D}) -> +%%% handle_send_error(P, D, State, From, Reason); + handle_event({call, From}, {getopts, Opts}, State, {P, D}) -> %% ?DBG([{opts, Opts}, {state, State}, {d, D}]), Result = case state_getopts(P, D, State, Opts) of @@ -1690,7 +1823,6 @@ handle_event({call, From}, {getopts, Opts}, State, {P, D}) -> {keep_state_and_data, [{reply, From, Result}]}; -%% Call: setopts/1 handle_event({call, From}, {setopts, Opts}, State, {P, D}) -> %% ?DBG([{setopts, Opts}, {state, State}, {d, D}]), %% @@ -1733,7 +1865,6 @@ handle_event({call, From}, {setopts, Opts}, State, {P, D}) -> Reply = {reply, From, Result}, handle_active(P, D_2, State, [Reply]); -%% Call: setopt_active/1 handle_event({call, From}, {setopt_active, Active}, State, {P, D}) -> %% {active, _} is in server_read_opts(), so State needs to be checked %% like in state_setopts/4. @@ -1747,31 +1878,8 @@ handle_event({call, From}, {setopt_active, Active}, State, {P, D}) -> handle_active(P, D_1, State, [Reply]) end; -%% Call: getstat/2 -handle_event({call, From}, {getstat, What}, State, {P, D}) -> - case State of - 'closed' -> - {keep_state_and_data, - [{reply, From, {error, closed}}]}; - _ -> - {D_1, Result} = getstat(P#params.socket, D, What), - {keep_state, {P, D_1}, - [{reply, From, {ok, Result}}]} - end; - -%% Call: info/1 -handle_event({call, From}, info, State, {P, D}) -> - case State of - 'closed' -> - {keep_state_and_data, - [{reply, From, ?CLOSED_SOCKET}]}; - _ -> - {D_1, Result} = handle_info(P#params.socket, P#params.owner, D), - {keep_state, {P, D_1}, - [{reply, From, Result}]} - end; - -%% State: 'closed_read' - what is not handled above +%% ------- +%% State: 'closed_read' handle_event(Type, Content, 'closed_read' = State, {P, _D}) -> case Type of {call, From} -> @@ -1786,71 +1894,14 @@ handle_event(Type, Content, 'closed_read' = State, {P, _D}) -> [P#params.socket, State, Type, Content]), keep_state_and_data end; - -%% State: 'closed' - what is not handled above +%% State: 'closed_read' +%% ------- +%% State: 'closed' handle_event(Type, Content, 'closed' = State, P_D) -> handle_unexpected(Type, Content, State, P_D); +%% State: 'closed' +%% ------- -%% Handled states: 'closed_read', 'closed' - - -%% State: 'accept' -handle_event( - {call, From}, {accept, ListenSocket, Timeout}, - 'accept' = _State, {P, D}) -> - handle_accept(P, D, From, ListenSocket, Timeout, accept); -handle_event(Type, Content, 'accept' = State, P_D) -> - handle_unexpected(Type, Content, State, P_D); -%% -%% State: #accept{} -handle_event( - info, ?socket_select(ListenSocket, SelectRef), - #accept{ - info = ?select_info(SelectRef), from = From, - listen_socket = ListenSocket}, - {P, D}) -> - handle_accept(P, D, From, ListenSocket, update, select); -handle_event( - info, ?socket_completion(ListenSocket, CompletionRef, CompletionStatus), - #accept{ - info = ?completion_info(CompletionRef), from = From, - listen_socket = ListenSocket}, - {P, D}) -> - handle_accept(P, D, From, ListenSocket, update, CompletionStatus); -handle_event( - info, ?socket_abort(ListenSocket, SelectRef, Reason), - #accept{ - info = ?select_info(SelectRef), from = From, - listen_socket = ListenSocket}, - P_D) -> - {next_state,'accept', P_D, - [{{timeout, accept}, cancel}, - {reply, From, {error, Reason}}]}; -handle_event( - info, ?socket_abort(ListenSocket, CompletionRef, Reason), - #accept{ - info = ?completion_info(CompletionRef), from = From, - listen_socket = ListenSocket}, - P_D) -> - {next_state, 'accept', P_D, - [{{timeout, accept}, cancel}, - {reply, From, {error, Reason}}]}; -handle_event( - {timeout, accept}, Info, - #accept{ - info = Info, from = From, - listen_socket = ListenSocket}, - P_D) -> - socket_cancel(ListenSocket, Info), - {next_state, 'accept', P_D, - [{reply, From, {error, timeout}}]}; -handle_event(Type, Content, #accept{} = State, P_D) -> - handle_unexpected(Type, Content, State, P_D); -%% Handled states: 'accept' | #accept{} - -%% ------- Socket is defined from here on ----------------------------------- - -%% Call: bind/1 handle_event({call, From}, {bind, BindAddr} = _BIND, _State, {P, _D}) -> %% ?DBG([_BIND, {state, _State}, {p, P}]), %% _ = socket:setopt(P#params.socket, otp, debug, true), @@ -1867,7 +1918,6 @@ handle_event({call, From}, {bind, BindAddr} = _BIND, _State, {P, _D}) -> %% on {listen, _} is convenient. %% It also reflects the API behaviour (gen_tcp:listen(...) -> {ok, Socket}) -%% Call: listen/1 handle_event( {call, From}, {listen, Backlog} = _LISTEN, _State, {#params{socket = Socket} = P, D}) -> @@ -1881,51 +1931,46 @@ handle_event( {keep_state, {P, D#{type => listen}}, [{reply, From, Result}]}; -%% Call: recv/2 - active socket -handle_event( - {call, From}, {recv, _Length, _Timeout}, - _State, {_P, #{active := Active} = _D}) - when Active =/= false -> - {keep_state_and_data, - [{reply, From, {error, einval}}]}; - +%% ------- %% State: 'connect' -%% -%% Call: connect/2 + handle_event( {call, From}, {connect, Addr, Timeout}, 'connect' = _State, {P, D}) -> handle_connect(P, D, From, Addr, Timeout, connect); -%% -%% Call: recv/2 - not connected + handle_event( {call, From}, {recv, _Length, _Timeout}, 'connect' = _State, _P_D) -> {keep_state_and_data, [{reply, From, {error, enotconn}}]}; -%% Call: fdopen/2 + handle_event( {call, From}, fdopen, 'connect' = _State, {#params{socket = Socket} = P, D}) -> handle_connected( P, D#{type => fdopen}, [{reply, From, {ok, Socket}}]); + handle_event(Type, Content, 'connect' = State, P_D) -> handle_unexpected(Type, Content, State, P_D); -%% +%% State: 'connect' +%% ------- %% State: #connect{} + handle_event( info, ?socket_select(Socket, SelectRef), #connect{info = ?select_info(SelectRef), from = From, addr = Addr} = _State, {#params{socket = Socket} = P, D}) -> %% ?DBG(['select message', {ref, SelectRef}]), handle_connect(P, D, From, Addr, update, select); + handle_event( info, ?socket_abort(Socket, SelectRef, Reason), #connect{info = ?select_info(SelectRef), from = From} = _State, {#params{socket = Socket}, _D} = P_D) -> {next_state, 'connect', P_D, [{{timeout, connect}, cancel}, {reply, From, {error, Reason}}]}; -%% + handle_event( info, ?socket_completion(Socket, CompletionRef, CompletionStatus), #connect{info = ?completion_info(CompletionRef), @@ -1935,6 +1980,7 @@ handle_event( %% ?DBG(['completion message', %% {ref, CompletionRef}, {status, CompletionStatus}]), handle_connect(P, D, From, Addr, update, CompletionStatus); + handle_event( info, ?socket_abort(Socket, CompletionRef, Reason), #connect{info = ?completion_info(CompletionRef), from = From} = _State, @@ -1950,21 +1996,18 @@ handle_event( socket_cancel(Socket, Info), {next_state, 'connect', P_D, [{reply, From, {error, timeout}}]}; -%% -%% Call: recv/2 - not connected + handle_event( {call, From}, {recv, _Length, _Timeout}, #connect{} = _State, _P_D) -> {keep_state_and_data, [{reply, From, {error, enotconn}}]}; handle_event(Type, Content, #connect{} = State, P_D) -> handle_unexpected(Type, Content, State, P_D); -%% Handled states: 'connect' | #connect{} - - -%% Remaining states: 'connected' | #recv{} +%% State: #connect{} +%% ------- +%% State: 'connected' -%% Call: recv/2 - last part handle_event( {call, From}, {recv, Length, Timeout}, State, {P, D}) -> %% ?DBG([recv, {length, Length}, {timeout, Timeout}, {state, State}]), @@ -1974,21 +2017,28 @@ handle_event( P, D#{recv_length => Length, recv_from => From}, [{{timeout, recv}, Timeout, recv}]); #recv{} -> - %% Receive in progress - {keep_state_and_data, - [postpone]} + case maps:get(active, D) of + false -> + %% Receive in progress + {keep_state_and_data, + [postpone]}; + _ -> + {keep_state_and_data, + [{reply, From, {error, einval}}]} + end end; +%% State: 'connected' +%% ------- %% State: #recv{} -%% -%% Handle select done - try recv again + handle_event( info, ?socket_select(Socket, SelectRef), #recv{info = ?select_info(SelectRef)} = _State, {#params{socket = Socket} = P, D}) -> %% ?DBG([info, {socket, Socket}, {ref, SelectRef}]), handle_recv(P, D, []); -%% + handle_event( info, ?socket_abort(Socket, SelectRef, Reason), #recv{info = ?select_info(SelectRef)} = _State, @@ -1996,8 +2046,6 @@ handle_event( %% ?DBG({abort, Reason}), handle_recv_error(P, D, [], Reason); -%% -%% Handle completion done handle_event( info, ?socket_completion(Socket, CompletionRef, CompletionStatus), #recv{info = ?completion_info(CompletionRef)} = _State, @@ -2013,7 +2061,7 @@ handle_event( {error, Reason} -> handle_recv_error(P, D, [], Reason) end; -%% + handle_event( info, ?socket_abort(Socket, CompletionRef, Reason), #recv{info = ?completion_info(CompletionRef)} = _State, @@ -2021,14 +2069,15 @@ handle_event( %% ?DBG(['abort msg', {reason, Reason}]), handle_recv_error(P, D, [], completion_status_reason(Reason)); -%% -%% Timeout on recv in passive mode handle_event( {timeout, recv}, recv, #recv{info = Info}, {#params{socket = Socket} = P, D}) -> socket_cancel(Socket, Info), handle_recv_error(P, D, [], timeout); +%% State: #recv{} +%% ------- + %% Catch-all handle_event(Type, Content, State, P_D) -> handle_unexpected(Type, Content, State, P_D). @@ -2417,8 +2466,7 @@ handle_recv_deliver(P, D, ActionsR, Data) -> handle_recv_error(P, {D, ActionsR}, Reason) -> handle_recv_error(P, D, ActionsR, Reason). %% -handle_recv_error( - P, #{active := Active, show_econnreset := ShowEconnreset} = D, +handle_recv_error(P, #{active := Active} = D, ActionsR, Reason) -> %% %% Send active socket messages @@ -2436,32 +2484,15 @@ handle_recv_error( Owner ! {tcp_error, ModuleSocket, Reason}, D#{active := false} end, - handle_recv_error( - P, D_1, ActionsR, Reason, - 'connected'); + handle_recv_error(P, D_1, ActionsR, Reason, 'connected'); true -> - ShowEconnreset = maps:get(show_econnreset, D), - ReplyReason = - case ShowEconnreset of - true when Reason =:= closed -> econnreset; - false when Reason =:= econnreset -> closed; - _ -> Reason - end, case Active of false -> handle_recv_error( - P, D, ActionsR, ReplyReason, 'closed_read'); + P, D, ActionsR, Reason, 'closed_read'); _ -> - if - ShowEconnreset =:= true; - Reason =/= closed, Reason =/= econnreset -> - Owner ! - {tcp_error, ModuleSocket, ReplyReason}, - ok; - true -> ok - end, + Owner ! {tcp_error, ModuleSocket, Reason}, Owner ! {tcp_closed, ModuleSocket}, - D_1 = D#{active := false, tcp_closed := true}, NextState = case maps:get(exit_on_close, D) of true -> @@ -2470,12 +2501,12 @@ handle_recv_error( false -> 'closed_read' end, - handle_recv_error( - P, D_1, ActionsR, ReplyReason, NextState) + D_1 = D#{active := false, tcp_closed := true}, + handle_recv_error(P, D_1, ActionsR, Reason, NextState) end end. %% -handle_recv_error(P, D, ActionsR, ReplyReason, NextState) -> +handle_recv_error(P, D, ActionsR, Reason, NextState) -> %% %% Create state machine actions; reply and cancel timeout %% @@ -2484,11 +2515,32 @@ handle_recv_error(P, D, ActionsR, ReplyReason, NextState) -> {next_state, NextState, {P, recv_stop(D)}, reverse(ActionsR, [{{timeout, recv}, cancel}, - {reply, From, {error, ReplyReason}}])}; + {reply, From, {error, Reason}}])}; #{} -> {next_state, NextState, {P, D}, reverse(ActionsR)} end. +-ifdef(undefined). +handle_send_error(P, D, State, From, Reason) -> + case Active of + false -> + case (Reason =:= econnreset) andalso maps:get(show_econnreset) of + true -> + case State of + #recv{info = Info} -> + socket_cancel(P#params.socket, Info), + socket_close(Socket), + {next_state, _, P_D_1, Actions_1} = + handle_recv_error(P, D, ActionsR, closed), + {P_D_1, Actions_1}; + + #recv{} -> + + ShowEconnreset = maps:get(show_econnreset), + + {keep_state_and_data, [{reply, From, Reason}]}. +-endif. % -ifdef(undefined). + %% ------------------------------------------------------------------------- %% Callback Helpers @@ -2925,106 +2977,85 @@ state_getopts(P, D, State, [Tag | Tags], Acc) -> %% %% ------- -handle_info(Socket, Owner, #{active := Active} = D) -> - %% Read counters - Counters_1 = socket_info_counters(Socket), - %% Check for recent wraps - {D_1, Wrapped} = receive_counter_wrap(Socket, D, []), - %% - %% Assumption: a counter that we just now got a wrap message from - %% will not wrap again before we read the updated value - %% - %% Update wrapped counters - Info = #{counters := Counters_2} = socket:info(Socket), - Counters_3 = maps:merge(Counters_1, maps:with(Wrapped, Counters_2)), - %% Go ahead with wrap updated counters - Counters_4 = maps:from_list(getstat_what(D_1, Counters_3)), - {D_1, Info#{counters => Counters_4, owner => Owner, active => Active}}. - -getstat(Socket, D, What) -> - %% Read counters - Counters_1 = socket_info_counters(Socket), - %% Check for recent wraps - {D_1, Wrapped} = receive_counter_wrap(Socket, D, []), - %% - %% Assumption: a counter that we just now got a wrap message from - %% will not wrap again before we read the updated value - %% - %% Update wrapped counters - Counters_2 = socket_info_counters(Socket), - Counters_3 = maps:merge(Counters_1, maps:with(Wrapped, Counters_2)), - %% Go ahead with wrap updated counters - {D_1, getstat_what(What, D_1, Counters_3)}. - -getstat_what(D, C) -> - getstat_what(inet:stats(), D, C). - -getstat_what([], _D, _C) -> []; -getstat_what([Tag | What], D, C) -> - Val = - case Tag of - recv_oct -> - counter_value(read_byte, D, C); - recv_cnt -> - counter_value(read_pkg, D, C); - recv_max -> - getstat_avg(read_byte, D, C, read_pkg); - recv_avg -> - getstat_avg(read_byte, D, C, read_pkg); - recv_dvi -> 0; - %% - send_oct -> - counter_value(write_byte, D, C); - send_cnt -> - counter_value(write_pkg, D, C); - send_max -> - getstat_avg(write_byte, D, C, write_pkg); - send_avg -> - getstat_avg(write_byte, D, C, write_pkg); - send_pend -> 0 - end, - [{Tag, Val} | getstat_what(What, D, C)]. +counter_keys(What) -> + lists:usort(counter_keys_1(What)). +%% +counter_keys_1([]) -> []; +counter_keys_1([Tag | Tags]) -> + counter_key(Tag) ++ counter_keys_1(Tags). -getstat_avg(SumTag, D, C, CntTag) -> - Cnt = counter_value(CntTag, D, C), +counter_key(Tag) -> + case Tag of + recv_oct -> [read_byte]; + recv_cnt -> [read_pkg]; + recv_max -> [read_pkg_max]; + recv_avg -> [read_byte, read_pkg]; + send_oct -> [write_byte]; + send_cnt -> [write_pkg]; + send_max -> [write_pkg_max]; + send_avg -> [write_byte, write_pkg]; + _ -> [] + end. + +getstat_what([], _Counters) -> []; +getstat_what([Tag | Tags], Counters) -> + case counter_key(Tag) of + [SocketTag] -> + [{Tag, maps:get(SocketTag, Counters)} + | getstat_what(Tags, Counters)]; + [NumTag, DenomTag] -> + [{Tag, + getstat_div( + maps:get(NumTag, Counters), + maps:get(DenomTag, Counters))} + | getstat_what(Tags, Counters)]; + [] -> + getstat_what(Tags, Counters) + end. + +getstat_div(N, 0) when is_integer(N), 0 =< N -> N; +getstat_div(N, D) + when is_integer(N), 0 =< N, is_integer(D), 0 < D -> + %% Integer division with rounding + Q = N div D, + R = N rem D, if - Cnt =:= 0 -> - counter_value(SumTag, D, C); - true -> - round(counter_value(SumTag, D, C) / Cnt) - end. - -socket_info_counters(Socket) -> - #{counters := Counters} = socket:info(Socket), - Counters. - -receive_counter_wrap(Socket, D, Wrapped) -> - receive - ?socket_counter_wrap(Socket, Counter) -> - %% ?DBG([{counter, Counter}]), - receive_counter_wrap( - Socket, wrap_counter(Counter, D), [Counter | Wrapped]) - after 0 -> - {D, Wrapped} + D =< R bsl 1 -> Q + 1; + true -> Q end. -wrap_counter(Counter, D) -> - case D of - #{Counter := N} -> - D#{Counter := N + 1}; +count_wrap(CounterName, Wraps) -> + case Wraps of + #{CounterName := N} -> + Wraps#{CounterName := N + 1}; #{} -> - D#{Counter => 1} + Wraps#{CounterName => 1} end. --define(COUNTER_BITS, 32). -counter_value(Counter, D, Counters) -> - case D of - #{Counter := Wraps} -> - (Wraps bsl ?COUNTER_BITS) + maps:get(Counter, Counters); - #{} -> - maps:get(Counter, Counters) +wrap_counters(Counters1, Counters2, Wraps1, Wraps2) -> + #{Nm => counter_value(C1, Counters2, Wraps1, Wraps2, Nm) + || Nm := C1 <- Counters1}. + +counter_value(C1, Counters2, Wraps1, Wraps2, Nm) -> + W1 = maps:get(Nm, Wraps1, 0), + W2 = maps:get(Nm, Wraps2, 0), + C2 = maps:get(Nm, Counters2), + if + W1 < W2 -> + counter_value(W2, C2); + true -> + if + C2 < C1 -> + counter_value(W2 + 1, C2); + true -> + counter_value(W2, C2) + end end. +-compile({inline, [counter_value/2]}). +-define(COUNTER_BITS, 32). +counter_value(W, C) -> + (W bsl ?COUNTER_BITS) + C. -compile({inline, [reverse/1]}). -- 2.35.3
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor