Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:26
erlang
2456-Rewrite-buffer-handling-and-packet-decodin...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 2456-Rewrite-buffer-handling-and-packet-decoding.patch of Package erlang
From d68df4c1ae45b91e94a3b48317036f58422db87e Mon Sep 17 00:00:00 2001 From: Raimo Niskanen <raimo@erlang.org> Date: Thu, 11 Jan 2024 17:55:01 +0100 Subject: [PATCH 06/14] Rewrite buffer handling and packet decoding Use `socket:recv(S, 0)` and buffer then decode from buffer. This gives much better throughput than limiting the receive to the content of the current packet. --- lib/kernel/src/gen_tcp_socket.erl | 602 ++++++------------------------ 1 file changed, 111 insertions(+), 491 deletions(-) diff --git a/lib/kernel/src/gen_tcp_socket.erl b/lib/kernel/src/gen_tcp_socket.erl index e93ddaf0a8..2e9e8dcac4 100644 --- a/lib/kernel/src/gen_tcp_socket.erl +++ b/lib/kernel/src/gen_tcp_socket.erl @@ -682,7 +682,9 @@ sendfile( %% ------------------------------------------------------------------------- -recv(?MODULE_socket(Server, _Socket), Length, Timeout) -> +recv(?MODULE_socket(Server, _Socket), Length, Timeout) + when is_integer(Length), 0 =< Length, is_integer(Timeout), 0 =< Timeout; + is_integer(Length), 0 =< Length, Timeout =:= infinity -> ?badarg_exit(call(Server, {recv, Length, Timeout})). %% ------------------------------------------------------------------------- @@ -949,13 +951,6 @@ socket_send(Socket, Data, Timeout) -> ok end. --compile({inline, [socket_recv_peek/2]}). -socket_recv_peek(Socket, Length) -> - Options = [peek], - Result = socket:recv(Socket, Length, Options, nowait), - %% ?DBG({Socket, Length, Options, Result}), - Result. - -compile({inline, [socket_recv/2]}). socket_recv(Socket, Length) -> Result = socket:recv(Socket, Length, nowait), @@ -2076,7 +2071,9 @@ handle_event( %% ?DBG([recv, {length, Length}, {timeout, Timeout}, {state, State}]), case State of 'connected' -> - handle_recv_start(P, D, From, Length, Timeout); + handle_recv( + P, D#{recv_length => Length, recv_from => From}, + [{{timeout, recv}, Timeout, recv}]); #recv{} -> %% Receive in progress {keep_state_and_data, @@ -2091,7 +2088,7 @@ handle_event( #recv{info = ?select_info(SelectRef)} = _State, {#params{socket = Socket} = P, D}) -> %% ?DBG([info, {socket, Socket}, {ref, SelectRef}]), - handle_recv(P, D, [], recv); + handle_recv(P, D, []); %% handle_event( info, ?socket_abort(Socket, SelectRef, Reason), @@ -2107,7 +2104,16 @@ handle_event( #recv{info = ?completion_info(CompletionRef)} = _State, {#params{socket = Socket} = P, D}) -> %% ?DBG(['completion msg', {socket, Socket}, {ref, CompletionRef}]), - handle_recv(P, D, [], CompletionStatus); + case CompletionStatus of + {ok, <<Data/binary>>} -> + D_1 = D#{buffer := buffer(Data, maps:get(buffer, D))}, + handle_recv(P, D_1, []); + {error, {Reason, <<Data/binary>>}} -> + D_1 = D#{buffer := buffer(Data, maps:get(buffer, D))}, + handle_recv_error(P, D_1, [], Reason); + {error, Reason} -> + handle_recv_error(P, D, [], Reason) + end; %% handle_event( info, ?socket_abort(Socket, CompletionRef, Reason), @@ -2366,7 +2372,7 @@ handle_active(P, D, State, Actions) -> %% => put data in buffer and enter state 'connected'). case State of 'connected' -> - handle_connected(P, handle_buffered(P, D), Actions); + handle_connected(P, D, Actions); _ -> {keep_state, {P, D}, Actions} end. @@ -2383,503 +2389,127 @@ handle_connected(P, D, ActionsR) -> {P, D}, reverse(ActionsR)}; #{active := _} -> - handle_recv(P, recv_start(D), ActionsR, recv) + handle_recv(P, recv_start(D), ActionsR) end. -handle_recv_start( - P, #{packet := Packet, buffer := Buffer} = D, From, Length, Timeout) - when Packet =:= raw, 0 < Length; - Packet =:= 0, 0 < Length -> + +handle_recv( + P, #{packet := Packet, recv_length := Length, buffer := Buffer} = D, + ActionsR) + when Packet =:= raw; + Packet =:= 0 -> Size = iolist_size(Buffer), - %% ?DBG([{packet, Packet}, {length, Length}, {buf_sz, Size}]), if - Length =< Size -> + 0 < Length, Length =< Size -> + %% Deliver part of buffered data {Data, NewBuffer} = split_binary(condense_buffer(Buffer), Length), handle_recv_deliver( - P, - D#{recv_length => Length, % Redundant - recv_from => From, - buffer := NewBuffer}, - [], Data); - true -> - N = Length - Size, - handle_recv( - P, D#{recv_length => N, recv_from => From}, - [{{timeout, recv}, Timeout, recv}], - recv) - end; -handle_recv_start(P, D, From, _Length, Timeout) -> - %% ?DBG([{p, P}, {d, D}]), - handle_recv( - P, D#{recv_length => 0, recv_from => From}, - [{{timeout, recv}, Timeout, recv}], - recv). - -handle_recv(P, #{packet := Packet, recv_length := Length} = D, ActionsR, CS) -> - %% ?DBG([{packet, Packet}, {recv_length, Length}]), - if + P, D#{buffer := NewBuffer}, [], Data); 0 < Length -> - handle_recv_length(P, D, ActionsR, Length, CS); - Packet =:= raw; - Packet =:= 0 -> - handle_recv_length(P, D, ActionsR, Length, CS); - Packet =:= 1; - Packet =:= 2; - Packet =:= 4 -> - handle_recv_peek(P, D, ActionsR, Packet, CS); - true -> - handle_recv_packet(P, D, ActionsR, CS) - end. - -handle_recv_peek(P, D, ActionsR, Packet, CS) -> - %% Peek Packet bytes - %% ?DBG({packet, Packet}), - case D of - #{buffer := Buffer} when is_list(Buffer) -> - %% ?DBG('buffer is list - condence'), - Data = condense_buffer(Buffer), - handle_recv_peek(P, D#{buffer := Data}, ActionsR, Packet, CS); - #{buffer := <<Data:Packet/binary, _Rest/binary>>} -> - %% ?DBG('buffer contains header'), - handle_recv_peek2(P, D, ActionsR, Packet, Data); - #{buffer := <<ShortData/binary>>} when (CS =:= recv) -> - N = Packet - byte_size(ShortData), - %% ?DBG(['buffer does not contain complete header', - %% {cs, CS}, - %% {packet, Packet}, {n, N}, - %% {short_data, byte_size(ShortData)}]), - case socket_recv_peek(P#params.socket, N) of - {ok, <<FinalData/binary>>} -> - handle_recv_peek2( - P, D, ActionsR, Packet, - <<ShortData/binary, FinalData/binary>>); - - {select, Select} -> - {next_state, - #recv{ - info = - case Select of - {?select_info(_) = SelectInfo, _Data} -> - SelectInfo; - ?select_info(_) = SelectInfo -> - SelectInfo - end}, - {P, D}, - reverse(ActionsR)}; - - {completion, Completion} -> - {next_state, - #recv{info = Completion}, - {P, D}, - reverse(ActionsR)}; - - {error, {Reason, <<_Data/binary>>}} -> - handle_recv_error(P, D, ActionsR, Reason); - {error, Reason} -> - handle_recv_error(P, D, ActionsR, Reason) - end; - #{buffer := <<ShortData/binary>>} -> - %% ?DBG(['buffer did not contain complete header', - %% {cs, CS}, - %% {packet, Packet}, - %% {short_data, byte_size(ShortData)}]), - case CS of - {ok, <<FinalData/binary>>} -> - handle_recv_peek2( - P, D, ActionsR, Packet, - <<ShortData/binary, FinalData/binary>>); - {error, {Reason, <<_Data/binary>>}} -> - handle_recv_error(P, D, ActionsR, Reason); - {error, Reason} -> - handle_recv_error(P, D, ActionsR, Reason) - end - end. - -handle_recv_peek2(P, D, ActionsR, Packet, Data) -> - <<?header(Packet, N)>> = Data, - #{packet_size := PacketSize} = D, - %% ?DBG([{'packet size', Packet, N, PacketSize}]), - if - 0 < PacketSize, PacketSize < N -> - %% ?DBG({emsgsize}), - handle_recv_error(P, D, ActionsR, emsgsize); - true -> - %% ?DBG({'read a message'}), - handle_recv_length(P, D, ActionsR, Packet + N, recv) - end. - - -handle_buffered(_P, #{recv_from := _From} = D) -> - D; -handle_buffered(P, #{active := Active} = D) when (Active =/= false) -> - case D of - #{buffer := Buffer} when is_list(Buffer) andalso (Buffer =/= []) -> + %% Need to receive more data + handle_recv_more(P, D, ActionsR); + 0 < Size -> % Length == 0 + %% Deliver all buffered data Data = condense_buffer(Buffer), - handle_buffered(P, D, Data); - #{buffer := Data} when is_binary(Data) andalso (byte_size(Data) > 0) -> - handle_buffered(P, D, Data); - _ -> - D + handle_recv_deliver(P, D#{buffer := <<>>}, ActionsR, Data); + true -> % Length == 0, Size == 0 + %% Need to receive more data + handle_recv_more(P, D, ActionsR) end; -handle_buffered(_P, D) -> - D. - -handle_buffered(P, - #{packet := line, - line_delimiter := LineDelimiter, - packet_size := PacketSize} = D, - Data) -> - DecodeOpts = [{line_delimiter, LineDelimiter}, - {line_length, PacketSize}], - handle_buffered(P, D, Data, DecodeOpts); -handle_buffered(P, D, Data) -> - handle_buffered(P, D, Data, []). - -handle_buffered(P, #{packet_size := PacketSize} = D, - Data, DecocdeOpts0) -> - DecodeOpts = [{packet_size, PacketSize}|DecocdeOpts0], - Type = decode_packet(D), - case erlang:decode_packet(Type, Data, DecodeOpts) of - {ok, Decoded, Rest} -> - D2 = deliver_buffered_data(P, D, Decoded), - %% Prepare the rest - %% is_list(Buffer) -> try to decode first - %% is_binary(Buffer) -> get more data first - Buffer = - case Rest of - <<>> -> Rest; - <<_/binary>> -> [Rest] - end, - D2#{buffer := Buffer}; - {more, _} -> - D; - {error, Reason} -> - %% What do we do here? - %% Keep the buffer and hope that it will go better with more data? - %% Or discard it and continue as if nothing happened? - warning_msg("Failed decoding message" - "~n Socket: ~p" - "~n Socket server: ~p" - "~n Packet type: ~p" - "~n byte_size(Data): ~p" - "~n Reason: ~p", - [P#params.socket, self(), - Type, byte_size(Data), Reason]), - D - end. - -%% If we get this far, we *know* that the socket is 'active'. -deliver_buffered_data(#params{owner = Owner} = P, - #{active := Active, - mode := Mode, - header := Header, - deliver := Deliver, - packet := Packet} = D, Data) -> - DeliverData = deliver_data(Data, Mode, Header, Packet), - ModuleSocket = module_socket(P), - Owner ! - case Deliver of - term -> - {tag(Packet), ModuleSocket, DeliverData}; - port -> - {ModuleSocket, {data, DeliverData}} - end, - case Active of - true -> - recv_start(next_packet(D, Packet, Data)); - once -> - recv_stop(next_packet(D, Packet, Data, false)); - 1 -> - Owner ! {tcp_passive, ModuleSocket}, - recv_stop(next_packet(D, Packet, Data, false)); - N when is_integer(N) -> - recv_start(next_packet(D, Packet, Data, Active - 1)) - end. - - -handle_recv_packet(P, D, ActionsR, CS) -> - case D of - #{buffer := Buffer} when is_list(Buffer) -> - Data = condense_buffer(Buffer), - handle_recv_decode(P, D, ActionsR, Data, CS); - #{buffer := Data} when is_binary(Data) -> - handle_recv_more(P, D, ActionsR, Data, CS) - end. +handle_recv(P, D, ActionsR) -> + handle_recv_packet(P, D, ActionsR). -handle_recv_length(P, #{buffer := Buffer} = D, ActionsR, Length, CS) -> - handle_recv_length(P, D, ActionsR, Length, Buffer, CS). -%% -%% Here and downwards until handle_recv_deliver() all buffered data -%% is the last argument binary and D#{buffer} is not updated -%% -handle_recv_length(P, D, ActionsR, Length, Buffer, CS) - when (0 < Length) andalso (CS =:= recv) -> - %% ?DBG(['try socket recv', {length, Length}, {cs, CS}]), - case socket_recv(P#params.socket, Length) of +handle_recv_more(P, #{buffer := Buffer} = D, ActionsR) -> + case socket_recv(P#params.socket, 0) of {ok, <<Data/binary>>} -> - handle_recv_deliver( - P, D#{buffer := <<>>}, ActionsR, - condense_buffer([Data | Buffer])); - - {select, {?select_info(_) = SelectInfo, Data}} -> - N = Length - byte_size(Data), - {next_state, - #recv{info = SelectInfo}, - {P, D#{buffer := [Data | Buffer], recv_length := N}}, - reverse(ActionsR)}; + handle_recv(P, D#{buffer := [Data | Buffer]}, ActionsR); {select, ?select_info(_) = SelectInfo} -> - %% ?DBG(['recv select']), + %% ?DBG([{select_info, SelectInfo}]), {next_state, #recv{info = SelectInfo}, - {P, D#{buffer := Buffer}}, + {P, D}, reverse(ActionsR)}; - {completion, ?completion_info(_) = CompletionInfo} -> - %% ?DBG(['recv completion']), + %% ?DBG([{completion_info, CompletionInfo}]), {next_state, #recv{info = CompletionInfo}, - {P, D#{buffer := Buffer}}, + {P, D}, reverse(ActionsR)}; - - {error, {Reason, <<Data/binary>>}} -> - %% Error before all data - %% ?DBG({'recv error w rest-data', Reason, byte_size(Data)}), - handle_recv_error( - P, D#{buffer := [Data | Buffer]}, ActionsR, Reason); - {error, Reason} -> - %% ?DBG({'recv error wo rest-data', Reason}), - handle_recv_error(P, D#{buffer := Buffer}, ActionsR, Reason) - end; -handle_recv_length(P, D, ActionsR, Length, Buffer, CS) - when (0 < Length) -> - %% ?DBG(['socket recv result', {cs_result, element(1, CS)}]), - case CS of - {ok, <<Data/binary>>} -> - %% ?DBG([{received, byte_size(Data)}]), - handle_recv_deliver( - P, D#{buffer := <<>>}, ActionsR, - condense_buffer([Data | Buffer])); - {error, {Reason, <<Data/binary>>}} -> - %% Error before all data - %% ?DBG({'recv error w rest-data', Reason, byte_size(Data)}), - handle_recv_error( + handle_recv_error_packet( P, D#{buffer := [Data | Buffer]}, ActionsR, Reason); - {error, Reason} -> - %% ?DBG({'recv error wo rest-data', Reason}), - handle_recv_error(P, D#{buffer := Buffer}, ActionsR, Reason) - end; -handle_recv_length(P, D, ActionsR, _0, Buffer, CS) when (CS =:= recv) -> - %% ?DBG([{buffer_size, byte_size(Buffer)}, {cs, CS}]), - case Buffer of - <<>> -> - %% We should not need to update the buffer field here - %% since the only way to get here with empty Buffer - %% is when Buffer comes from the buffer field - Socket = P#params.socket, - %% ?DBG(['try read some more', {buffer_size, byte_size(Buffer)}]), - case socket_recv(Socket, 0) of - {ok, <<Data/binary>>} -> - %% ?DBG(['got some data', {data_size, byte_size(Data)}]), - handle_recv_deliver(P, D, ActionsR, Data); - - {select, {?select_info(_) = SelectInfo, Data}} -> - %% ?DBG({'select with data', byte_size(Data)}), - case socket:cancel(Socket, SelectInfo) of - ok -> - handle_recv_deliver(P, D, ActionsR, Data); - {error, Reason} -> - handle_recv_error(P, D, ActionsR, Reason, Data) - end; - {select, ?select_info(_) = SelectInfo} -> - %% ?DBG({'select', SelectInfo}), - {next_state, - #recv{info = SelectInfo}, - {P, D}, - reverse(ActionsR)}; - - {completion, ?completion_info(_) = CompletionInfo} -> - %% ?DBG(['completion', - %% {completion_info, CompletionInfo}]), - {next_state, - #recv{info = CompletionInfo}, - {P, D}, - reverse(ActionsR)}; - - {error, {Reason, <<Data/binary>>}} -> - %% ?DBG(['error with data', - %% {reason, Reason}, {data_size, byte_size(Data)}]), - handle_recv_error(P, D, ActionsR, Reason, Data); - {error, Reason} -> - %% ?DBG(['error', {reason, Reason}]), - handle_recv_error(P, D, ActionsR, Reason) - end; - <<Data/binary>> -> - handle_recv_deliver(P, D#{buffer := <<>>}, ActionsR, Data); - _ when is_list(Buffer) -> - Data = condense_buffer(Buffer), - handle_recv_deliver(P, D#{buffer := <<>>}, ActionsR, Data) - end; -handle_recv_length(P, D, ActionsR, _0, Buffer, CS) -> - %% ?DBG([{buffer, byte_size(Buffer)}, {cs_result, element(1, CS)}]), - case Buffer of - <<>> -> - %% We should not need to update the buffer field here - %% since the only way to get here with empty Buffer - %% is when Buffer comes from the buffer field - case CS of - {ok, <<Data/binary>>} -> - %% ?DBG({'got some', byte_size(Data)}), - handle_recv_deliver(P, D, ActionsR, Data); - - {error, Reason} -> - %% ?DBG(['error', {reason, Reason}]), - handle_recv_error(P, D, ActionsR, Reason) - end; - <<_/binary>> -> - case CS of - {ok, <<Data/binary>>} -> - %% ?DBG(['got some data', {data_size, byte_size(Data)}]), - handle_recv_deliver(P, D#{buffer := <<>>}, ActionsR, - condense_buffer([Data, Buffer])); - - {error, Reason} -> - %% ?DBG(['error', {reason, Reason}]), - handle_recv_error(P, D, ActionsR, Reason) - end; - _ when is_list(Buffer) -> - case CS of - {ok, <<Data/binary>>} -> - %% ?DBG(['got some data', {data_size, byte_size(Data)}]), - handle_recv_deliver(P, D#{buffer := <<>>}, ActionsR, - condense_buffer([Data | Buffer])); - - {error, Reason} -> - %% ?DBG(['error', {reason, Reason}]), - handle_recv_error(P, D, ActionsR, Reason) - end + %% ?DBG({P#params.socket, error, Reason}), + handle_recv_error(P, D, ActionsR, Reason) end. -handle_recv_decode(P, - #{packet := line, - line_delimiter := LineDelimiter, - packet_size := PacketSize} = D, - ActionsR, Data, CS) -> - DecodeOpts = [{line_delimiter, LineDelimiter}, - {line_length, PacketSize}], - handle_recv_decode(P, D, - ActionsR, Data, DecodeOpts, CS); -handle_recv_decode(P, D, ActionsR, Data, CS) -> - handle_recv_decode(P, D, ActionsR, Data, [], CS). - -handle_recv_decode(P, #{packet_size := PacketSize} = D, - ActionsR, Data, DecocdeOpts0, CS) -> - %% ?DBG([{packet_sz, PacketSize}, {decode_opts0, DecocdeOpts0}, {cs, CS}]), - DecodeOpts = [{packet_size, PacketSize}|DecocdeOpts0], - case erlang:decode_packet(decode_packet(D), Data, DecodeOpts) of +handle_recv_packet(P, D, ActionsR) -> + case decode_packet(D) of {ok, Decoded, Rest} -> - %% ?DBG(['packet decoded', {decoded, Decoded}, {rest, Rest}]), - %% is_list(Buffer) -> try to decode first - %% is_binary(Buffer) -> get more data first - Buffer = - case Rest of - <<>> -> Rest; - <<_/binary>> -> [Rest] - end, - handle_recv_deliver(P, D#{buffer := Buffer}, ActionsR, Decoded); - {more, undefined} -> - %% ?DBG(['more undef']), - handle_recv_more(P, D, ActionsR, Data, CS); - {more, Length} -> - %% ?DBG(['more', {length, Length}]), - N = Length - byte_size(Data), - handle_recv_length(P, D, ActionsR, N, Data, CS); + handle_recv_deliver(P, D#{buffer := Rest}, ActionsR, Decoded); + {more, _} -> + handle_recv_more(P, D, ActionsR); {error, Reason} -> - %% ?DBG(['error', {reason, Reason}]), handle_recv_error( - P, D#{buffer := Data}, ActionsR, + P, D, ActionsR, case Reason of invalid -> emsgsize; _ -> Reason end) end. -handle_recv_error_decode( - P, #{packet_size := PacketSize} = D, ActionsR, Reason, Data) -> - %% - case - erlang:decode_packet( - decode_packet(D), Data, - [{packet_size, PacketSize}, - {line_length, PacketSize}]) - of +handle_recv_error_packet(P, D, ActionsR, Reason) -> + case decode_packet(D) of {ok, Decoded, Rest} -> - %% is_list(Buffer) -> try to decode first - %% is_binary(Buffer) -> get more data first - Buffer = - case Rest of - <<>> -> Rest; - <<_/binary>> -> [Rest] - end, handle_recv_error( - P, D#{buffer := Buffer}, ActionsR, Reason, Decoded); + P, D#{buffer := Rest}, ActionsR, Reason, Decoded); {more, _} -> - handle_recv_error(P, D#{buffer := Data}, ActionsR, Reason); + handle_recv_error(P, D, ActionsR, Reason); {error, Reason} -> handle_recv_error( - P, D#{buffer := Data}, ActionsR, + P, D, ActionsR, case Reason of invalid -> emsgsize; _ -> Reason end) end. -handle_recv_more(P, D, ActionsR, BufferedData, CS) when (CS =:= recv) -> - case socket_recv(P#params.socket, 0) of - {ok, <<MoreData/binary>>} -> - %% ?DBG([{more_data_sz, byte_size(MoreData)}]), - Data = catbin(BufferedData, MoreData), - handle_recv_decode(P, D, ActionsR, Data, recv); - - {select, ?select_info(_) = SelectInfo} -> - %% ?DBG([{select_info, SelectInfo}]), - {next_state, - #recv{info = SelectInfo}, - {P, D#{buffer := BufferedData}}, - reverse(ActionsR)}; - - {completion, ?completion_info(_) = CompletionInfo} -> - %% ?DBG([{completion_info, CompletionInfo}]), - {next_state, - #recv{info = CompletionInfo}, - {P, D#{buffer := BufferedData}}, - reverse(ActionsR)}; - - {error, {Reason, <<MoreData/binary>>}} -> - %% ?DBG({P#params.socket, error, Reason, byte_size(MoreData)}), - Data = catbin(BufferedData, MoreData), - handle_recv_error_decode(P, D, ActionsR, Reason, Data); - {error, Reason} -> - %% ?DBG({P#params.socket, error, Reason}), - handle_recv_error( - P, D#{buffer := BufferedData}, ActionsR, Reason) - end; -handle_recv_more(P, D, ActionsR, BufferedData, CS) -> - case CS of - {ok, <<MoreData/binary>>} -> - %% ?DBG([{more_data_sz, byte_size(MoreData)}]), - Data = catbin(BufferedData, MoreData), - handle_recv_decode(P, D, ActionsR, Data, recv); - - {error, Reason} -> - %% ?DBG({P#params.socket, error, Reason}), - handle_recv_error( - P, D#{buffer := BufferedData}, ActionsR, Reason) - end. +decode_packet( + #{packet := (PacketType = line), + line_delimiter := LineDelimiter, + packet_size := PacketSize, + buffer := Buffer}) -> + %% + erlang:decode_packet( + PacketType, condense_buffer(Buffer), + [{packet_size, PacketSize}, + {line_delimiter, LineDelimiter}, + {line_length, PacketSize}]); +decode_packet( + #{packet := http, + recv_httph := true, + packet_size := PacketSize, + buffer := Buffer}) -> + %% + erlang:decode_packet( + httph, condense_buffer(Buffer), [{packet_size, PacketSize}]); +decode_packet( + #{packet := http_bin, + recv_httph := true, + packet_size := PacketSize, + buffer := Buffer}) -> + %% + erlang:decode_packet( + httph_bin, condense_buffer(Buffer), [{packet_size, PacketSize}]); +decode_packet( + #{packet := PacketType, + packet_size := PacketSize, + buffer := Buffer}) -> + %% + erlang:decode_packet( + PacketType, condense_buffer(Buffer), [{packet_size, PacketSize}]). -%% Here D#{buffer} is supposed to be updated again handle_recv_deliver(P, D, ActionsR, Data) -> handle_connected(P, recv_data_deliver(P, D, ActionsR, Data)). @@ -2996,13 +2626,6 @@ recv_start(D) -> recv_stop(D) -> maps:without([recv_from, recv_length], D). -decode_packet(#{packet := Packet} = D) -> - case D of - #{packet := http, recv_httph := true} -> httph; - #{packet := http_bin, recv_httph := true} -> httph_bin; - #{packet := Packet} -> Packet - end. - %% Deliver data and update the active state %% -> {NewD, NewActionsR} recv_data_deliver( @@ -3022,7 +2645,7 @@ recv_data_deliver( {{timeout, recv}, cancel} | ActionsR]}; #{active := false} -> - D_1 = D#{buffer := unrecv_buffer(Data, maps:get(buffer, D))}, + D_1 = D#{buffer := buffer(Data, maps:get(buffer, D))}, {recv_stop(next_packet(D_1, Packet, Data)), ActionsR}; #{active := Active} -> @@ -3090,36 +2713,31 @@ next_packet(D, Packet, Data, Active) -> D#{active => Active} end. -catbin(<<>>, Bin) when is_binary(Bin) -> Bin; -catbin(Bin, <<>>) when is_binary(Bin) -> Bin; -catbin(Bin1, Bin2) when is_binary(Bin1), is_binary(Bin2) -> - <<Bin1/binary, Bin2/binary>>. - -unrecv_buffer(Data, Buffer) -> - case Buffer of - <<>> -> - Data; - _ when is_binary(Buffer) -> - [Data, Buffer]; - _ -> - [Data | Buffer] - end. +-compile({inline, [buffer/2]}). +buffer(Data, <<>>) -> + Data; +buffer(Data, Buffer) -> + [Data | Buffer]. +-compile({inline, [condense_buffer/1]}). +condense_buffer(Bin) when is_binary(Bin) -> Bin; condense_buffer([Bin]) when is_binary(Bin) -> Bin; condense_buffer(Buffer) -> iolist_to_binary(reverse_improper(Buffer, [])). deliver_data(Data, Mode, Header, Packet) -> if - Packet =:= 1; - Packet =:= 2; - Packet =:= 4 -> - <<?header(Packet, _Size), Payload/binary>> = Data, - deliver_data(Payload, Mode, Header); +%%% Packet =:= 1; +%%% Packet =:= 2; +%%% Packet =:= 4 -> +%%% %% Strip {packet,N} header +%%% <<?header(Packet, _Size), Payload/binary>> = Data, +%%% deliver_data(Payload, Mode, Header); Packet =:= http; Packet =:= http_bin; Packet =:= httph; Packet =:= httph_bin -> + %% These haven't got mixed mode header delivery Data; true -> deliver_data(Data, Mode, Header) @@ -3128,8 +2746,10 @@ deliver_data(Data, Mode, Header, Packet) -> deliver_data(Data, list, _N) -> binary_to_list(Data); deliver_data(Data, binary, 0) -> Data; deliver_data(Data, binary, N) -> + %% Mixed mode with header bytes as list and payload as binary in tail case Data of - <<_:N/binary>> -> binary_to_list(Data); + <<_:N/binary>> -> + binary_to_list(Data); <<Header:N/binary, Payload/binary>> -> binary_to_list(Header) ++ Payload end. -- 2.35.3
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor