Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:25
erlang
2621-Use-dedicated-ETS-table-for-clients.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 2621-Use-dedicated-ETS-table-for-clients.patch of Package erlang
From 1f53e96cad14d578e24df4a873b74800ca94b4ce Mon Sep 17 00:00:00 2001 From: Raimo Niskanen <raimo@erlang.org> Date: Tue, 17 Oct 2023 14:43:57 +0200 Subject: [PATCH 1/3] Use dedicated ETS table for clients --- lib/kernel/src/inet_gethost_native.erl | 304 ++++++++++++------------- 1 file changed, 147 insertions(+), 157 deletions(-) diff --git a/lib/kernel/src/inet_gethost_native.erl b/lib/kernel/src/inet_gethost_native.erl index 328cb01e05..3993450956 100644 --- a/lib/kernel/src/inet_gethost_native.erl +++ b/lib/kernel/src/inet_gethost_native.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1998-2022. All Rights Reserved. +%% Copyright Ericsson AB 1998-2023. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -75,16 +75,6 @@ %% meant to be used in guards, check one such octet. -define(VALID_V4(Part), is_integer(Part), Part < 256). -% Requests, one per unbique request to the PORT program, may be more than one client!!! --record(request, { - rid, % Request id as sent to port - op, - proto, - rdata, - clients = [] % Can be more than one client per request (Pid's). -}). - - % Statistics, not used yet. -record(statistics, { netdb_timeout = 0, @@ -98,15 +88,32 @@ }). % The main loopstate... --record(state, { - port = noport, % Port() connected to the port program - timeout = 8000, % Timeout value from inet_db:res_option - requests, % Table of request - req_index, % Table of {{op,proto,rdata},rid} - parent, % The supervisor bridge - pool_size = 4, % Number of C processes in pool. - statistics % Statistics record (records error causes). -}). +-record( + state, + { + port = noport, % Port() connected to the port program + timeout = 8000, % Timeout value from inet_db:res_option + %% + %% One per unique request to the PORT program. + %% Clients are registered in req_clients, multiple per RID. + %% ETS set of {RID,{Op,Proto,Data}=OPD} + requests, + %% + %% One per request as the above, + %% but for reverse lookup to find duplicate requests. + %% ETS set of {{Op,Proto,Data}=OPD,RID} + req_index, + %% + %% One per requesting client for RID. + %% When the request succeeds we can take all clients with key RID. + %% When a request times out we can remove just that object from the bag. + %% ETS bag of {RID,ClientPid,ClientRef,TimerRef} + req_clients, + %% + parent, % The supervisor bridge + pool_size = 4, % Number of C processes in pool. + statistics % Statistics record (records error causes). + }). -type state() :: #state{}. %% The supervisor bridge code @@ -168,7 +175,7 @@ run_once() -> %% Server API %%----------------------------------------------------------------------- server_init(Starter, Ref) -> - process_flag(trap_exit,true), + _ = process_flag(trap_exit,true), case whereis(?MODULE) of undefined -> case (catch register(?MODULE,self())) of @@ -180,15 +187,19 @@ server_init(Starter, Ref) -> Winner -> exit({already_started,Winner}) end, + _ = process_flag(message_queue_data, off_heap), Poolsize = get_poolsize(), Port = do_open_port(Poolsize, get_extra_args()), Timeout = ?REQUEST_TIMEOUT, put(rid,0), put(num_requests,0), - RequestTab = ets:new(ign_requests,[{keypos,#request.rid},set,protected]), + RequestTab = ets:new(ign_requests,[set,protected]), RequestIndex = ets:new(ign_req_index,[set,protected]), - State = #state{port = Port, timeout = Timeout, requests = RequestTab, - req_index = RequestIndex, + RequestClients = ets:new(ign_req_clients, [bag,protected]), + State = #state{port = Port, timeout = Timeout, + requests = RequestTab, + req_index = RequestIndex, + req_clients = RequestClients, pool_size = Poolsize, statistics = #statistics{}, parent = Starter}, @@ -196,27 +207,27 @@ server_init(Starter, Ref) -> main_loop(State) -> receive - Any -> + Any -> handle_message(Any,State) end. -handle_message({{Pid,_} = Client, {?OP_GETHOSTBYNAME, Proto, Name} = R}, - State) when is_pid(Pid) -> - NewState = do_handle_call(R,Client,State, - [<<?OP_GETHOSTBYNAME:8, Proto:8>>, Name,0]), - main_loop(NewState); +handle_message({{Pid,Ref}, {?OP_GETHOSTBYNAME, Proto, Name} = R}, State) + when is_pid(Pid) -> + do_handle_call( + R, Pid, Ref, [<<?OP_GETHOSTBYNAME:8, Proto:8>>, Name,0], State), + main_loop(State); -handle_message({{Pid,_} = Client, {?OP_GETHOSTBYADDR, Proto, Data} = R}, - State) when is_pid(Pid) -> - NewState = do_handle_call(R,Client,State, - <<?OP_GETHOSTBYADDR:8, Proto:8, Data/binary>>), - main_loop(NewState); +handle_message({{Pid,Ref}, {?OP_GETHOSTBYADDR, Proto, Data} = R}, State) + when is_pid(Pid) -> + do_handle_call( + R, Pid, Ref, <<?OP_GETHOSTBYADDR:8, Proto:8, Data/binary>>, State), + main_loop(State); handle_message({{Pid,Ref}, {?OP_CONTROL, Ctl, Data}}, State) when is_pid(Pid) -> - catch port_command(State#state.port, - <<?INVALID_SERIAL:32, ?OP_CONTROL:8, - Ctl:8, Data/binary>>), + _ = catch port_command( + State#state.port, + <<?INVALID_SERIAL:32, ?OP_CONTROL:8, Ctl:8, Data/binary>>), Pid ! {Ref, ok}, main_loop(State); @@ -227,37 +238,44 @@ handle_message({{Pid,Ref}, restart_port}, State) main_loop(State#state{port=NewPort}); handle_message({Port, {data, Data}}, State = #state{port = Port}) -> - NewState = case Data of - <<RID:32, BinReply/binary>> -> - case BinReply of - <<Unit, _/binary>> when Unit =:= ?UNIT_ERROR; - Unit =:= ?UNIT_IPV4; - Unit =:= ?UNIT_IPV6 -> - case pick_request(State,RID) of - false -> - State; - Req -> - lists:foreach(fun({P,R,TR}) -> - _= ?CANCEL_TIMER(TR), - P ! {R, - {ok, - BinReply}} - end, - Req#request.clients), - State - end; - _UnitError -> - %% Unexpected data, let's restart it, - %% it must be broken. - NewPort=restart_port(State), - State#state{port=NewPort} - end; - _BasicFormatError -> - NewPort=restart_port(State), - State#state{port=NewPort} - end, + NewState = + case Data of + <<RID:32, BinReply/binary>> -> + case BinReply of + <<Unit, _/binary>> + when Unit =:= ?UNIT_ERROR; + Unit =:= ?UNIT_IPV4; + Unit =:= ?UNIT_IPV6 -> + case ets:lookup(State#state.requests, RID) of + [] -> + %% We must have cancelled this request + State; + [{_,OPD}] -> + %% Clean up the request and reply to clients + ets:delete(State#state.requests, RID), + ets:delete(State#state.req_index, OPD), + lists:foreach( + fun ({_,ClientPid,ClientRef,TimerRef}) -> + _ = ?CANCEL_TIMER(TimerRef), + ClientPid ! + {ClientRef,{ok,BinReply}} + end, + ets:take(State#state.req_clients, RID)), + put(num_requests,get(num_requests) - 1), + State + end; + _UnitError -> + %% Unexpected data, let's restart it, + %% it must be broken. + NewPort = restart_port(State), + State#state{port=NewPort} + end; + _BasicFormatError -> + NewPort = restart_port(State), + State#state{port=NewPort} + end, main_loop(NewState); - + handle_message({'EXIT',Port,_Reason}, State = #state{port = Port}) -> ?dbg("Port died.~n",[]), NewPort=restart_port(State), @@ -268,92 +286,61 @@ handle_message({Port,eof}, State = #state{port = Port}) -> NewPort=restart_port(State), main_loop(State#state{port=NewPort}); -handle_message({timeout, Pid, RID}, State) -> - case pick_client(State,RID,Pid) of - false -> - false; - {more, {P,R,_}} -> - P ! {R,{error,timeout}}; - {last, {LP,LR,_}} -> - LP ! {LR, {error,timeout}}, - %% Remove the whole request structure... - _ = pick_request(State, RID), - %% Also cancel the request to the port program... - (catch port_command(State#state.port, - <<RID:32,?OP_CANCEL_REQUEST>>)) +handle_message({timeout,RID,ClientPid,ClientRef}, State) -> + ClientReqMS = {RID,ClientPid,ClientRef,'_'}, + case ets:match_object(State#state.req_clients, ClientReqMS) of + [ClientReq] -> + ets:delete_object(State#state.req_clients, ClientReq), + ClientPid ! {ClientRef,{error,timeout}}, + case ets:member(State#state.req_clients, RID) of + true -> + %% There are still waiting clients + ok; + false -> + %% The last client timed out - cancel the request + case ets:lookup(State#state.requests, RID) of + [{_,OPD}] -> + ets:delete(State#state.requests,RID), + ets:delete(State#state.req_index,OPD), + put(num_requests,get(num_requests) - 1), + %% Also cancel the request to the port program... + _ = catch port_command( + State#state.port, + <<RID:32,?OP_CANCEL_REQUEST>>), + ok; + [] -> + ok + end + end; + [] -> + ok end, main_loop(State); handle_message({system, From, Req}, State) -> - sys:handle_system_msg(Req, From, State#state.parent, ?MODULE, [], - State); + sys:handle_system_msg( + Req, From, State#state.parent, ?MODULE, [], State); handle_message(_, State) -> % Stray messages from dying ports etc. main_loop(State). -do_handle_call(R,Client0,State,RData) -> - Req = find_request(State,R), - Timeout = State#state.timeout, - {P,Ref} = Client0, - TR = ?SEND_AFTER(Timeout,self(),{timeout, P, Req#request.rid}), - Client = {P,Ref,TR}, - case Req#request.clients of - [] -> - RealRData = [<<(Req#request.rid):32>>|RData], - (catch port_command(State#state.port, RealRData)), - ets:insert(State#state.requests,Req#request{clients = [Client]}); - Tail -> - ets:insert(State#state.requests,Req#request{clients = [Client | Tail]}) +do_handle_call(OPD, ClientPid, ClientRef, RData, State) -> + case ets:lookup(State#state.req_index, OPD) of + [{_,RID}] -> + ok; + [] -> + RID = get_rid(), + _ = catch port_command(State#state.port, [<<RID:32>>|RData]), + ets:insert(State#state.requests, {RID,OPD}), + ets:insert(State#state.req_index, {OPD,RID}) end, - State. - -find_request(State, R = {Op, Proto, Data}) -> - case ets:lookup(State#state.req_index,R) of - [{R, Rid}] -> - [Ret] = ets:lookup(State#state.requests,Rid), - Ret; - [] -> - NRid = get_rid(), - Req = #request{rid = NRid, op = Op, proto = Proto, rdata = Data}, - ets:insert(State#state.requests, Req), - ets:insert(State#state.req_index,{R,NRid}), - put(num_requests,get(num_requests) + 1), - Req - end. - -pick_request(State, RID) -> - case ets:lookup(State#state.requests, RID) of - [] -> - false; - [#request{rid = RID, op = Op, proto = Proto, rdata = Data}=R] -> - ets:delete(State#state.requests,RID), - ets:delete(State#state.req_index,{Op,Proto,Data}), - put(num_requests,get(num_requests) - 1), - R - end. + TimerMsg = {timeout,RID,ClientPid,ClientRef}, + TimerRef = ?SEND_AFTER(State#state.timeout, self(), TimerMsg), + ClientReq = {RID,ClientPid,ClientRef,TimerRef}, + ets:insert(State#state.req_clients, ClientReq), + ok. -pick_client(State,RID,Clid) -> - case ets:lookup(State#state.requests, RID) of - [] -> - false; - [R] -> - case R#request.clients of - [SoleClient] -> - {last, SoleClient}; % Note, not removed, the caller - % should cleanup request data - CList -> - case lists:keyfind(Clid,1,CList) of - false -> - false; - Client -> - NCList = lists:keydelete(Clid,1,CList), - ets:insert(State#state.requests, - R#request{clients = NCList}), - {more, Client} - end - end - end. get_rid () -> New = (get(rid) + 1) rem 16#7FFFFFF, @@ -372,21 +359,24 @@ foreach(Fun,Table,Key) -> foreach(Fun,Table,ets:next(Table,Key)). restart_port(#state{port = Port, requests = Requests}) -> - (catch port_close(Port)), + _ = catch port_close(Port), NewPort = do_open_port(get_poolsize(), get_extra_args()), - foreach(fun(#request{rid = Rid, op = Op, proto = Proto, rdata = Rdata}) -> - case Op of - ?OP_GETHOSTBYNAME -> - port_command(NewPort,[<<Rid:32,?OP_GETHOSTBYNAME:8, - Proto:8>>, - Rdata,0]); - ?OP_GETHOSTBYADDR -> - port_command(NewPort, - <<Rid:32,?OP_GETHOSTBYADDR:8, Proto:8, - Rdata/binary>>) - end - end, - Requests), + %% + %% Redo all requests on the new port + foreach( + fun ({RID,{Op,Proto,Rdata}}) -> + case Op of + ?OP_GETHOSTBYNAME -> + port_command( + NewPort, + [<<RID:32,?OP_GETHOSTBYNAME:8,Proto:8>>, Rdata, 0]); + ?OP_GETHOSTBYADDR -> + port_command( + NewPort, + <<RID:32,?OP_GETHOSTBYADDR:8,Proto:8,Rdata/binary>>) + end + end, + Requests), NewPort. -dialyzer({no_improper_lists, do_open_port/2}). -- 2.35.3
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor