Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:26
erlang
3102-erts-esock-Add-nif-support-for-sendv-funct...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 3102-erts-esock-Add-nif-support-for-sendv-function.patch of Package erlang
From 023b2c7588e5c63243140d6e4a494cc46f8669a6 Mon Sep 17 00:00:00 2001 From: Micael Karlberg <bmk@erlang.org> Date: Wed, 27 Sep 2023 18:25:27 +0200 Subject: [PATCH 02/11] [erts|esock] Add (nif) support for sendv function OTP-18845 --- erts/emulator/nifs/common/prim_socket_int.h | 2 + erts/emulator/nifs/common/prim_socket_nif.c | 121 ++++++++- erts/emulator/nifs/common/socket_asyncio.h | 6 + erts/emulator/nifs/common/socket_int.h | 24 +- erts/emulator/nifs/common/socket_io.h | 7 + erts/emulator/nifs/common/socket_syncio.h | 6 + erts/emulator/nifs/unix/unix_socket_syncio.c | 170 ++++++++++++ erts/emulator/nifs/win32/win_socket_asyncio.c | 247 +++++++++++++++++- 8 files changed, 559 insertions(+), 24 deletions(-) diff --git a/erts/emulator/nifs/common/prim_socket_int.h b/erts/emulator/nifs/common/prim_socket_int.h index dcd38666e1..fede46d7b8 100644 --- a/erts/emulator/nifs/common/prim_socket_int.h +++ b/erts/emulator/nifs/common/prim_socket_int.h @@ -816,6 +816,8 @@ extern ESockCmsgSpec* esock_lookup_cmsg_spec(ESockCmsgSpec* table, size_t num, ERL_NIF_TERM eType); + + /* *** Sendfile 'stuff' *** */ #ifdef HAVE_SENDFILE diff --git a/erts/emulator/nifs/common/prim_socket_nif.c b/erts/emulator/nifs/common/prim_socket_nif.c index 457718799f..a41e3b0cd2 100644 --- a/erts/emulator/nifs/common/prim_socket_nif.c +++ b/erts/emulator/nifs/common/prim_socket_nif.c @@ -912,6 +912,7 @@ const int esock_ioctl_flags_length = NUM(esock_ioctl_flags); /* #define sock_send(s,buf,len,flag) send((s),(buf),(len),(flag)) */ /* #define sock_sendto(s,buf,blen,flag,addr,alen) \ sendto((s),(buf),(blen),(flag),(addr),(alen)) */ +/* #define sock_sendv(s,iov,iovlen) writev((s),(iov),(iovlen)) */ #define sock_setopt(s,l,o,v,ln) setsockopt((s),(l),(o),(v),(ln)) #define sock_shutdown(s, how) shutdown((s), (how)) @@ -1024,6 +1025,7 @@ ESockSendfileCounters initESockSendfileCounters = * nif_send * nif_sendto * nif_sendmsg + * nif_sendv * nif_sendfile * nif_recv * nif_recvfrom @@ -1104,6 +1106,7 @@ typedef struct { ESockIOSend send; ESockIOSendTo sendto; ESockIOSendMsg sendmsg; + ESockIOSendv sendv; ESockIOSendFileStart sendfile_start; ESockIOSendFileContinue sendfile_cont; ESockIOSendFileDeferredClose sendfile_dc; @@ -2241,6 +2244,7 @@ static const struct in6_addr in6addr_loopback = GLOBAL_ATOM_DECL(sendmsg); \ GLOBAL_ATOM_DECL(sendsrcaddr); \ GLOBAL_ATOM_DECL(sendto); \ + GLOBAL_ATOM_DECL(sendv); \ GLOBAL_ATOM_DECL(seqpacket); \ GLOBAL_ATOM_DECL(setfib); \ GLOBAL_ATOM_DECL(set_peer_primary_addr); \ @@ -2572,12 +2576,19 @@ static ESockIoBackend io_backend = {0}; (SOCKR), (SENDR), \ (DP), (F), (TAP), (TAL)) : \ enif_raise_exception((ENV), MKA((ENV), "notsup"))) -#define ESOCK_IO_SENDMSG(ENV, D, \ - SOCKR, SENDR, EM, F, EIOV) \ - ((io_backend.sendmsg != NULL) ? \ - io_backend.sendmsg((ENV), (D), \ - (SOCKR), (SENDR), \ - (EM), (F), (EIOV), &data) : \ +#define ESOCK_IO_SENDMSG(ENV, D, \ + SOCKR, SENDR, EM, F, EIOV) \ + ((io_backend.sendmsg != NULL) ? \ + io_backend.sendmsg((ENV), (D), \ + (SOCKR), (SENDR), \ + (EM), (F), (EIOV), &data) : \ + enif_raise_exception((ENV), MKA((ENV), "notsup"))) +#define ESOCK_IO_SENDV(ENV, D, \ + SOCKR, SENDR, EIOV) \ + ((io_backend.sendv != NULL) ? \ + io_backend.sendv((ENV), (D), \ + (SOCKR), (SENDR), \ + (EIOV), &data) : \ enif_raise_exception((ENV), MKA((ENV), "notsup"))) #define ESOCK_IO_SENDFILE_START(ENV, D, \ SOR, SNR, \ @@ -3894,9 +3905,10 @@ extern ErlNifEnv* esock_alloc_env(const char* slogan) * nif_connect(Sock, SockAddr) * nif_listen(Sock, Backlog) * nif_accept(LSock, Ref) - * nif_send(Sock, SendRef, Data, Flags) - * nif_sendto(Sock, SendRef, Data, Dest, Flags) - * nif_sendmsg(Sock, SendRef, Msg, Flags) + * nif_send(Sock, Data, Flags, SendRef) + * nif_sendto(Sock, Data, Dest, Flags, SendRef) + * nif_sendmsg(Sock, Msg, Flags, SendRef, IOV) + * nif_sendv(Sock, IOV, SendRef) * nif_sendfile(Sock, SendRef, Offset, Count, InFileRef) * nif_sendfile(Sock, SendRef, Offset, Count) * nif_sendfile(Sock) @@ -5768,6 +5780,69 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, +/* ---------------------------------------------------------------------- + * nif_sendv + * + * Description: + * Send a message (in the form of a list of binaries = I/O vector) on a + * socket. + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + * IOV - List of binaries + * SendRef - A unique id reference() for this (sendv) request. + */ + +static +ERL_NIF_TERM nif_sendv(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + ERL_NIF_TERM res, sockRef, sendRef, eIOV; + ESockDescriptor* descP; + + ESOCK_ASSERT( argc == 3 ); + + SGDBG( ("SOCKET", "nif_sendv -> entry with argc: %d\r\n", argc) ); + + sockRef = argv[0]; // We need this in case we send abort (to the caller) + eIOV = argv[1]; + sendRef = argv[2]; + + if (! ESOCK_GET_RESOURCE(env, sockRef, (void**) &descP)) { + SGDBG( ("SOCKET", "nif_sendv -> get resource failed\r\n") ); + return enif_make_badarg(env); + } + + /* Extract arguments and perform preliminary validation */ + + if (! enif_is_ref(env, sendRef)) { + SSDBG( descP, ("SOCKET", "nif_sendv -> argv decode failed\r\n") ); + return enif_make_badarg(env); + } + + MLOCK(descP->writeMtx); + + SSDBG( descP, + ("SOCKET", "nif_sendv(%T), {%d,0x%X} ->" + "\r\n sendRef: %T" + "\r\n", + sockRef, descP->sock, descP->writeState, sendRef) ); + + res = ESOCK_IO_SENDV(env, descP, sockRef, sendRef, eIOV); + + MUNLOCK(descP->writeMtx); + + SSDBG( descP, ("SOCKET", "nif_sendv(%T) -> done with" + "\r\n res: %T" + "\r\n", sockRef, res) ); + + return res; + +} + + + #ifdef FOOBAR /* ---------------------------------------------------------------------- @@ -10462,6 +10537,12 @@ ERL_NIF_TERM esock_cancel(ErlNifEnv* env, MUNLOCK(descP->writeMtx); return result; } + if (COMPARE(op, esock_atom_sendv) == 0) { + MLOCK(descP->writeMtx); + result = ESOCK_IO_CANCEL_SEND(env, descP, sockRef, opRef); + MUNLOCK(descP->writeMtx); + return result; + } } } @@ -13492,6 +13573,7 @@ ErlNifFunc esock_funcs[] = {"nif_send", 4, nif_send, 0}, {"nif_sendto", 5, nif_sendto, 0}, {"nif_sendmsg", 5, nif_sendmsg, 0}, + {"nif_sendv", 3, nif_sendv, 0}, {"nif_sendfile", 5, nif_sendfile, ERL_NIF_DIRTY_JOB_IO_BOUND}, {"nif_sendfile", 4, nif_sendfile, ERL_NIF_DIRTY_JOB_IO_BOUND}, {"nif_sendfile", 1, nif_sendfile, ERL_NIF_DIRTY_JOB_IO_BOUND}, @@ -13555,12 +13637,14 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) unsigned int ioNumThreads, ioNumThreadsDef; /* +++ Local atoms and error reason atoms +++ */ + // ESOCK_EPRINTF("\r\n[ESOCK] create local atoms\r\n"); #define LOCAL_ATOM_DECL(A) atom_##A = MKA(env, #A) LOCAL_ATOMS; // LOCAL_ERROR_REASON_ATOMS; #undef LOCAL_ATOM_DECL /* Global atom(s) and error reason atom(s) */ + // ESOCK_EPRINTF("\r\n[ESOCK] create global atoms\r\n"); #define GLOBAL_ATOM_DECL(A) esock_atom_##A = MKA(env, #A) GLOBAL_ATOMS; GLOBAL_ERROR_REASON_ATOMS; @@ -13568,6 +13652,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_socket_tag = MKA(env, "$socket"); + // ESOCK_EPRINTF("\r\n[ESOCK] get registry pid\r\n"); if (! esock_extract_pid_from_map(env, load_info, atom_registry, &data.regPid)) { @@ -13576,18 +13661,21 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) } /* --esock-disable-registry */ + // ESOCK_EPRINTF("\r\n[ESOCK] get use-registry\r\n"); data.useReg = esock_get_bool_from_map(env, load_info, esock_atom_use_registry, ESOCK_USE_SOCKET_REGISTRY); /* --esock-enable-iow */ + // ESOCK_EPRINTF("\r\n[ESOCK] get enable-iow\r\n"); data.iow = esock_get_bool_from_map(env, load_info, atom_iow, ESOCK_NIF_IOW_DEFAULT); /* --enable-extended-error-info */ + // ESOCK_EPRINTF("\r\n[ESOCK] maybe enable eei\r\n"); #if defined(ESOCK_USE_EXTENDED_ERROR_INFO) data.eei = TRUE; #else @@ -13595,6 +13683,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) #endif /* --esock-debug-file=<filename> */ + // ESOCK_EPRINTF("\r\n[ESOCK] debug filename\r\n"); { char *debug_filename; @@ -13617,9 +13706,11 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) FREE(debug_filename); } + // ESOCK_EPRINTF("\r\n[ESOCK] create protocols mutex\r\n"); data.protocolsMtx = MCREATE("esock.protocols"); /* +++ Global Counters +++ */ + // ESOCK_EPRINTF("\r\n[ESOCK] create global counters mutex (and init counters)\r\n"); data.cntMtx = MCREATE("esock.gcnt"); data.numSockets = 0; data.numTypeDGrams = 0; @@ -13634,6 +13725,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) data.numProtoSCTP = 0; + // ESOCK_EPRINTF("\r\n[ESOCK] init opts and cmsg tables\r\n"); initOpts(); initCmsgTables(); @@ -13664,6 +13756,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) #endif + // ESOCK_EPRINTF("\r\n[ESOCK] init IOV max\r\n"); data.iov_max = #if defined(NO_SYSCONF) || (! defined(_SC_IOV_MAX)) # ifdef IOV_MAX @@ -13679,6 +13772,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) /* This is (currently) intended for Windows use */ + // ESOCK_EPRINTF("\r\n[ESOCK] (win) system info\r\n"); enif_system_info(&sysInfo, sizeof(ErlNifSysInfo)); /* We should have a config options for this: @@ -13686,6 +13780,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) * * ESOCK_IO_NUM_THREADS */ + // ESOCK_EPRINTF("\r\n[ESOCK] (win) number of schedulers\r\n"); ioNumThreadsDef = (unsigned int) (sysInfo.scheduler_threads > 0) ? 2*sysInfo.scheduler_threads : 2; @@ -13694,6 +13789,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_io_num_threads, ioNumThreadsDef); + // ESOCK_EPRINTF("\r\n[ESOCK] init I/O backend callbacks\r\n"); #ifdef __WIN32__ io_backend.init = esaio_init; @@ -13713,6 +13809,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) io_backend.send = esaio_send; io_backend.sendto = esaio_sendto; io_backend.sendmsg = esaio_sendmsg; + io_backend.sendv = esaio_sendv; io_backend.sendfile_start = NULL; io_backend.sendfile_cont = NULL; io_backend.sendfile_dc = NULL; @@ -13763,6 +13860,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) io_backend.send = essio_send; io_backend.sendto = essio_sendto; io_backend.sendmsg = essio_sendmsg; + io_backend.sendv = essio_sendv; io_backend.sendfile_start = essio_sendfile_start; io_backend.sendfile_cont = essio_sendfile_cont; io_backend.sendfile_dc = essio_sendfile_deferred_close; @@ -13796,17 +13894,22 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) #endif + // ESOCK_EPRINTF("\r\n[ESOCK] init I/O backend\r\n"); if (ESOCK_IO_INIT(ioNumThreads) != ESOCK_IO_OK) { esock_error_msg("Failed initiating I/O backend"); return 1; // Failure } + // ESOCK_EPRINTF("\r\n[ESOCK] open socket (nif) resource\r\n"); esocks = enif_open_resource_type_x(env, "sockets", &esockInit, ERL_NIF_RT_CREATE, NULL); + /* ESOCK_EPRINTF("\r\n[ESOCK] open socket (nif) resource res: 0x%lX\r\n", */ + /* esocks); */ + if (esocks != NULL) { int ores; diff --git a/erts/emulator/nifs/common/socket_asyncio.h b/erts/emulator/nifs/common/socket_asyncio.h index ac3eff8f7e..672f724dc2 100644 --- a/erts/emulator/nifs/common/socket_asyncio.h +++ b/erts/emulator/nifs/common/socket_asyncio.h @@ -88,6 +88,12 @@ extern ERL_NIF_TERM esaio_sendmsg(ErlNifEnv* env, int flags, ERL_NIF_TERM eIOV, const ESockData* dataP); +extern ERL_NIF_TERM esaio_sendv(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM sendRef, + ERL_NIF_TERM eIOV, + const ESockData* dataP); /* extern ERL_NIF_TERM esaio_sendfile_start(ErlNifEnv* env, diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index a9df0d311d..1a5818d7cf 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -518,6 +518,7 @@ typedef long ssize_t; GLOBAL_ATOM_DEF(sendmsg); \ GLOBAL_ATOM_DEF(sendsrcaddr); \ GLOBAL_ATOM_DEF(sendto); \ + GLOBAL_ATOM_DEF(sendv); \ GLOBAL_ATOM_DEF(seqpacket); \ GLOBAL_ATOM_DEF(setfib); \ GLOBAL_ATOM_DEF(set_peer_primary_addr); \ @@ -656,24 +657,27 @@ GLOBAL_ERROR_REASON_ATOM_DEFS; #define IS_UNDEFINED(T) IS_IDENTICAL((T), esock_atom_undefined) #define IS_OK(T) IS_IDENTICAL((T), esock_atom_ok) -#define IS_ATOM(E, TE) enif_is_atom((E), (TE)) -#define IS_BIN(E, TE) enif_is_binary((E), (TE)) -#define IS_LIST(E, TE) enif_is_list((E), (TE)) -#define IS_MAP(E, TE) enif_is_map((E), (TE)) -#define IS_NUM(E, TE) enif_is_number((E), (TE)) -#define IS_TUPLE(E, TE) enif_is_tuple((E), (TE)) -#define IS_INTEGER(E, TE) esock_is_integer((E), (TE)) +#define IS_ATOM(E, TE) enif_is_atom((E), (TE)) +#define IS_BIN(E, TE) enif_is_binary((E), (TE)) +#define IS_LIST(E, TE) enif_is_list((E), (TE)) +#define IS_MAP(E, TE) enif_is_map((E), (TE)) +#define IS_NUM(E, TE) enif_is_number((E), (TE)) +#define IS_TUPLE(E, TE) enif_is_tuple((E), (TE)) +#define IS_INTEGER(E, TE) esock_is_integer((E), (TE)) -#define IS_PID_UNDEF(P) enif_is_pid_undefined((P)) -#define SET_PID_UNDEF(P) enif_set_pid_undefined((P)) +#define IS_PID_UNDEF(P) enif_is_pid_undefined((P)) +#define SET_PID_UNDEF(P) enif_set_pid_undefined((P)) #define GET_ATOM_LEN(E, TE, LP) \ enif_get_atom_length((E), (TE), (LP), ERL_NIF_LATIN1) #define GET_ATOM(E, TE, BP, MAX) \ enif_get_atom((E), (TE), (BP), (MAX), ERL_NIF_LATIN1) -#define GET_BIN(E, TE, BP) enif_inspect_iolist_as_binary((E), (TE), (BP)) +#define GET_BIN(E, TE, BP) \ + enif_inspect_iolist_as_binary((E), (TE), (BP)) #define GET_INT(E, TE, IP) enif_get_int((E), (TE), (IP)) #define GET_INT64(E, TE, IP) enif_get_int64((E), (TE), (IP)) +#define GET_IOV(ME, EIOV, T, IOV) \ + enif_inspect_iovec(NULL, (ME), (EIOV), (T), (IOV)) #define GET_LIST_ELEM(E, L, HP, TP) enif_get_list_cell((E), (L), (HP), (TP)) #define GET_LIST_LEN(E, L, LP) enif_get_list_length((E), (L), (LP)) #define GET_LONG(E, TE, LP) enif_get_long((E), (TE), (LP)) diff --git a/erts/emulator/nifs/common/socket_io.h b/erts/emulator/nifs/common/socket_io.h index 100ca372be..9f84e24807 100644 --- a/erts/emulator/nifs/common/socket_io.h +++ b/erts/emulator/nifs/common/socket_io.h @@ -106,6 +106,13 @@ typedef ERL_NIF_TERM (*ESockIOSendMsg)(ErlNifEnv* env, ERL_NIF_TERM eIOV, const ESockData* dataP); +typedef ERL_NIF_TERM (*ESockIOSendv)(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM sendRef, + ERL_NIF_TERM eIOV, + const ESockData* dataP); + typedef ERL_NIF_TERM (*ESockIOSendFileStart)(ErlNifEnv* env, ESockDescriptor* descP, ERL_NIF_TERM sockRef, diff --git a/erts/emulator/nifs/common/socket_syncio.h b/erts/emulator/nifs/common/socket_syncio.h index 2d548b8645..4e1e9ef33e 100644 --- a/erts/emulator/nifs/common/socket_syncio.h +++ b/erts/emulator/nifs/common/socket_syncio.h @@ -86,6 +86,12 @@ extern ERL_NIF_TERM essio_sendmsg(ErlNifEnv* env, int flags, ERL_NIF_TERM eIOV, const ESockData* dataP); +extern ERL_NIF_TERM essio_sendv(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM sendRef, + ERL_NIF_TERM eIOV, + const ESockData* dataP); extern ERL_NIF_TERM essio_sendfile_start(ErlNifEnv* env, ESockDescriptor* descP, diff --git a/erts/emulator/nifs/unix/unix_socket_syncio.c b/erts/emulator/nifs/unix/unix_socket_syncio.c index 6e1411974c..639c82aaf6 100644 --- a/erts/emulator/nifs/unix/unix_socket_syncio.c +++ b/erts/emulator/nifs/unix/unix_socket_syncio.c @@ -104,6 +104,7 @@ #define sock_sendmsg(s,msghdr,flag) sendmsg((s),(msghdr),(flag)) #define sock_sendto(s,buf,blen,flag,addr,alen) \ sendto((s),(buf),(blen),(flag),(addr),(alen)) +#define sock_sendv(s,iov,iovcnt) writev((s), (iov), (iovcnt)) #define sock_shutdown(s, how) shutdown((s), (how)) @@ -2269,6 +2270,175 @@ ERL_NIF_TERM essio_sendmsg(ErlNifEnv* env, } +/* ======================================================================== + */ +extern +ERL_NIF_TERM essio_sendv(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM sendRef, + ERL_NIF_TERM eIOV, + const ESockData* dataP) +{ + ERL_NIF_TERM eres; + ErlNifIOVec *iovec = NULL; + ssize_t dataSize, sendv_result; + ERL_NIF_TERM writerCheck, tail; + BOOLEAN_T dataInTail; + + if (! IS_OPEN(descP->writeState)) + return esock_make_error_closed(env); + + /* Connect and Write uses the same select flag + * so they can not be simultaneous + */ + if (descP->connectorP != NULL) + return esock_make_error_invalid(env, esock_atom_state); + + /* Ensure that we either have no current writer or we are it, + * or enqueue this process if there is a current writer */ + if (! send_check_writer(env, descP, sendRef, &writerCheck)) { + SSDBG( descP, + ("UNIX-ESSIO", "essio_sendv {%d} -> writer check failed: " + "\r\n %T\r\n", descP->sock, writerCheck) ); + return writerCheck; + } + + + /* Extract the 'iov', which must be an erlang:iovec(), + * from which we take at most IOV_MAX binaries + */ + if (! enif_inspect_iovec(NULL, dataP->iov_max, eIOV, &tail, &iovec)) { + SSDBG( descP, ("UNIX-ESSIO", + "essio_sendv {%d} -> iov inspection failed\r\n", + descP->sock) ); + + return esock_make_invalid(env, esock_atom_iov); + } + + if (iovec == NULL) { + SSDBG( descP, ("UNIX-ESSIO", + "essio_sendv {%d} -> not an iov\r\n", + descP->sock) ); + + return esock_make_invalid(env, esock_atom_iov); + } + + // ESOCK_ASSERT( iovec != NULL ); + + dataInTail = (! enif_is_empty_list(env, tail)); + + SSDBG( descP, ("UNIX-ESSIO", "essio_sendv {%d} ->" + "\r\n iovcnt: %lu" + "\r\n tail: %s" + "\r\n", descP->sock, + (unsigned long) iovec->iovcnt, B2S(dataInTail)) ); + + /* We now have an allocated iovec - verify vector size */ + + if (iovec->iovcnt > dataP->iov_max) { + if (descP->type == SOCK_STREAM) { + iovec->iovcnt = dataP->iov_max; + } else { + /* We can not send the whole packet in one sendv() call */ + SSDBG( descP, ("UNIX-ESSIO", + "essio_sendv {%d} -> iovcnt > iov_max\r\n", + descP->sock) ); + + FREE_IOVEC( iovec ); + + return esock_make_invalid(env, esock_atom_iov); + } + } + + dataSize = 0; + { + ERL_NIF_TERM h, t; + ErlNifBinary bin; + size_t i; + + /* Find out if there is remaining data in the tail. + * Skip empty binaries otherwise break. + * If 'tail' after loop exit is the empty list + * there was no more data. Otherwise there is more + * data or the 'iov' is invalid. + */ + for (;;) { + if (enif_get_list_cell(env, tail, &h, &t) && + enif_inspect_binary(env, h, &bin) && + (bin.size == 0)) { + tail = t; + continue; + } else + break; + } + + if (dataInTail && + (descP->type != SOCK_STREAM)) { + /* We can not send the whole packet in one sendmsg() call */ + SSDBG( descP, ("UNIX-ESSIO", + "essio_sendv {%d} -> invalid tail\r\n", + descP->sock) ); + + FREE_IOVEC( iovec ); + + return esock_make_invalid(env, esock_atom_iov); + + } + + /* Calculate the data size */ + + for (i = 0; i < iovec->iovcnt; i++) { + size_t len = iovec->iov[i].iov_len; + dataSize += len; + if (dataSize < len) { + /* Overflow */ + SSDBG( descP, ("UNIX-ESSIO", "essio_sendv {%d} -> Overflow" + "\r\n i: %lu" + "\r\n len: %lu" + "\r\n dataSize: %ld" + "\r\n", descP->sock, (unsigned long) i, + (unsigned long) len, (long) dataSize) ); + + FREE_IOVEC( iovec ); + + return esock_make_invalid(env, esock_atom_iov); + + } + } + } + + SSDBG( descP, + ("UNIX-ESSIO", + "essio_sendv {%d} -> iovec size verified" + "\r\n iov length: %lu" + "\r\n data size: %u" + "\r\n", + descP->sock, + (unsigned long) iovec->iovcnt, (long) dataSize) ); + + ESOCK_CNT_INC(env, descP, sockRef, + esock_atom_write_tries, &descP->writeTries, 1); + + /* And now, try to send the message */ + sendv_result = sock_sendv(descP->sock, iovec->iov, iovec->iovcnt); + + eres = send_check_result(env, descP, sendv_result, + dataSize, dataInTail, + sockRef, sendRef); + + FREE_IOVEC( iovec ); + + SSDBG( descP, ("UNIX-ESSIO", "essio_sendv {%d} -> done" + "\r\n data size: %lu" + "\r\n data in tail: %s" + "\r\n", descP->sock, + dataSize, B2S(dataInTail)) ); + + return eres; +} + + /* ======================================================================== * Start a sendfile() operation */ diff --git a/erts/emulator/nifs/win32/win_socket_asyncio.c b/erts/emulator/nifs/win32/win_socket_asyncio.c index 82db98651e..2eaedaf7e0 100644 --- a/erts/emulator/nifs/win32/win_socket_asyncio.c +++ b/erts/emulator/nifs/win32/win_socket_asyncio.c @@ -160,6 +160,8 @@ ctrl.sendmsg((s), (buf), (flag), NULL, (ol), NULL) #define sock_sendto_O(s,buf,flag,ta,tal,o) \ WSASendTo((s), (buf), 1, NULL, (flag), (ta), (tal), (o), NULL) +#define sock_sendv_O(s,iov,iovcnt,o) \ + WSASend((s), (iov), iovcnt, NULL, 0, (o), NULL) #define sock_setopt(s,l,o,v,ln) setsockopt((s),(l),(o),(v),(ln)) @@ -339,6 +341,15 @@ typedef struct __ESAIOOpDataSendMsg { * of the send request */ } ESAIOOpDataSendMsg; +typedef struct __ESAIOOpDataSendv { + /* WSASend (used with an io-vector) */ + ErlNifIOVec* iovec; + + ERL_NIF_TERM sockRef; /* The socket */ + ERL_NIF_TERM sendRef; /* The (unique) reference (ID) + * of the send request */ +} ESAIOOpDataSendv; + typedef struct __ESAIOOpDataRecv { /* WSARecv */ DWORD toRead; /* Can be 0 (= zero) @@ -404,6 +415,7 @@ typedef struct __ESAIOOperation { #define ESAIO_OP_SEND 0x0021 // WSASend #define ESAIO_OP_SENDTO 0x0022 // WSASendTo #define ESAIO_OP_SENDMSG 0x0023 // WSASendMsg +#define ESAIO_OP_SENDV 0x0024 // WSASend (with vector) /* Commands for receiving */ #define ESAIO_OP_RECV 0x0031 // WSARecv #define ESAIO_OP_RECVFROM 0x0032 // WSARecvFrom @@ -437,6 +449,9 @@ typedef struct __ESAIOOperation { /* +++ sendmsg +++ */ ESAIOOpDataSendMsg sendmsg; + /* +++ sendv +++ */ + ESAIOOpDataSendv sendv; + /* +++ recv +++ */ ESAIOOpDataRecv recv; @@ -2936,7 +2951,7 @@ ERL_NIF_TERM esaio_sendmsg(ErlNifEnv* env, esock_atom_write_tries, &descP->writeTries, 1); wres = sock_sendmsg_O(descP->sock, &opP->data.sendmsg.msg, flags, - (OVERLAPPED*) opP); + (OVERLAPPED*) opP); eres = send_check_result(env, descP, opP, caller, wres, dataSize, @@ -2953,8 +2968,8 @@ ERL_NIF_TERM esaio_sendmsg(ErlNifEnv* env, /* The i/o vector belongs to the op env, * so it goes when the env goes. */ - esock_clear_env("esaio_sendto - cleanup", opP->env); - esock_free_env("esaio_sendto - cleanup", opP->env); + esock_clear_env("esaio_sendmsg - cleanup", opP->env); + esock_free_env("esaio_sendmsg - cleanup", opP->env); FREE( opP ); @@ -3065,7 +3080,7 @@ BOOLEAN_T verify_sendmsg_iovec_tail(ErlNifEnv* env, /* We can not send the whole packet in one sendmsg() call */ SSDBG( descP, ("WIN-ESAIO", - "essio_sendmsg {%d} -> invalid tail\r\n", + "verify_sendmsg_iovec_tail {%d} -> invalid tail\r\n", descP->sock) ); return FALSE; @@ -3093,7 +3108,7 @@ BOOLEAN_T check_sendmsg_iovec_overflow(ESockDescriptor* descP, /* Overflow */ SSDBG( descP, ("WIN-ESAIO", - "verify_sendmsg_iovec_size {%d} -> Overflow" + "check_sendmsg_iovec_overflow {%d} -> Overflow" "\r\n i: %lu" "\r\n len: %lu" "\r\n dataSize: %ld" @@ -3115,6 +3130,228 @@ BOOLEAN_T check_sendmsg_iovec_overflow(ESockDescriptor* descP, +/* ======================================================================== + * Do the actual sendv. + * Do some initial writer checks, do the actual send and then + * analyze the result. + */ +extern +ERL_NIF_TERM esaio_sendv(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM sendRef, + ERL_NIF_TERM eIOV, + const ESockData* dataP) +{ + ErlNifPid caller; + ERL_NIF_TERM eres; + ESockAddress addr; + ERL_NIF_TERM tail; + ssize_t dataSize, sendv_result; + BOOLEAN_T dataInTail, cleanup; + ESAIOOperation* opP = NULL; + + SSDBG( descP, + ("WIN-ESAIO", "esaio_sendv(%d) -> get caller\r\n", + descP->sock) ); + + ESOCK_ASSERT( enif_self(env, &caller) != NULL ); + + SSDBG( descP, + ("WIN-ESAIO", "esaio_sendv(%d) -> check state\r\n", + descP->sock) ); + + if (! IS_OPEN(descP->writeState)) + return esock_make_error_closed(env); + + SSDBG( descP, + ("WIN-ESAIO", "esaio_sendv(%d) -> check if connecting\r\n", + descP->sock) ); + + /* Connect and Write uses the same select flag + * so they can not be simultaneous + */ + if (descP->connectorP != NULL) + return esock_make_error_invalid(env, esock_atom_state); + + + SSDBG( descP, + ("WIN-ESAIO", "esaio_sendv(%d) -> check if already writing\r\n", + descP->sock) ); + + /* Ensure that this caller does not *already* have a + * (send) request waiting */ + if (esock_writer_search4pid(env, descP, &caller)) { + /* Sender already in queue */ + ESOCK_EPRINTF("esaio_send(%T, %d) -> ALREADY SENDING\r\n", + sockRef, descP->sock); + return esock_raise_invalid(env, esock_atom_state); + } + + + SSDBG( descP, + ("WIN-ESAIO", "esaio_sendv(%d) -> allocations\r\n", + descP->sock) ); + + /* Allocate the operation */ + opP = MALLOC( sizeof(ESAIOOperation) ); + ESOCK_ASSERT( opP != NULL); + sys_memzero((char*) opP, sizeof(ESAIOOperation)); + + opP->tag = ESAIO_OP_SENDV; + + /* Its a bit annoying that we have to alloc an env and then + * copy the ref *before* we know that we actually need it. + * How much does this cost? + */ + opP->env = esock_alloc_env("esaio_sendv - operation"); + + SSDBG( descP, + ("WIN-ESAIO", "esaio_sendv(%d) -> extract I/O vector\r\n", + descP->sock) ); + + /* Extract the 'iov', which must be an erlang:iovec(), + * from which we take at most IOV_MAX binaries. + * The env *cannot* be NULL because we don't actually know if + * the send succeeds *now*. It could be sceduled! + */ + if (! enif_inspect_iovec(opP->env, + dataP->iov_max, eIOV, &tail, + &opP->data.sendv.iovec)) { + + SSDBG( descP, ("WIN-ESAIO", + "esaio_sendv {%d} -> iov inspection failed\r\n", + descP->sock) ); + + esock_free_env("esaio-sendv - iovec inspection failure", opP->env); + FREE( opP ); + + return esock_make_error_invalid(env, esock_atom_iov); + } + + if (opP->data.sendv.iovec == NULL) { + + SSDBG( descP, ("UNIX-ESSIO", + "esaio_sendv {%d} -> not an iov\r\n", + descP->sock) ); + + esock_free_env("esaio-sendv - iovec failure", opP->env); + FREE( opP ); + + return esock_make_invalid(env, esock_atom_iov); + } + + SSDBG( descP, + ("WIN-ESAIO", "esaio_sendv(%d) -> check if data in tail\r\n", + descP->sock) ); + + dataInTail = (! enif_is_empty_list(opP->env, tail)); + + SSDBG( descP, ("WIN-ESAIO", "esaio_sendv(%d) -> verify iovec size when" + "\r\n iovcnt: %lu" + "\r\n tail: %s" + "\r\n", descP->sock, + (unsigned long) opP->data.sendv.iovec->iovcnt, + B2S(dataInTail)) ); + + /* We now have an allocated iovec - verify vector size */ + + if (! verify_sendmsg_iovec_size(dataP, descP, opP->data.sendv.iovec)) { + + /* We can not send the whole packet in one sendv() call */ + SSDBG( descP, ("WIN-ESAIO", + "esaio_sendv {%d} -> iovcnt > iov_max\r\n", + descP->sock) ); + + // No need - belongs to op env: FREE_IOVEC( opP->data.send.iovec ); + esock_free_env("esaio-sendv - iovec failure", opP->env); + FREE( opP ); + + return esock_make_error_invalid(env, esock_atom_iov); + } + + SSDBG( descP, + ("WIN-ESAIO", "esaio_sendv(%d) -> verify iovec tail\r\n", + descP->sock) ); + + /* Verify that we can send the entire message. + * On DGRAM the tail must be "empty" (= everything must fit in one message). + */ + if (! verify_sendmsg_iovec_tail(opP->env, descP, &tail)) { + + // No need - belongs to op env: FREE_IOVEC( opP->data.send.iovec ); + esock_free_env("esaio-sendv - iovec tail failure", opP->env); + FREE( opP ); + + return esock_make_error_invalid(env, esock_atom_iov); + + } + + SSDBG( descP, + ("WIN-ESAIO", "esaio_sendv(%d) -> check iovec overflow\r\n", + descP->sock) ); + + if (! check_sendmsg_iovec_overflow(descP, + opP->data.sendv.iovec, &dataSize)) { + + // No need - belongs to op env: FREE_IOVEC( opP->data.sendv.iovec ); + esock_free_env("esaio-sendv - iovec size failure", opP->env); + FREE( opP ); + + return esock_make_error_invalid(env, esock_atom_iov); + + } + + SSDBG( descP, + ("WIN-ESAIO", + "esaio_sendv {%d} -> iovec size verified" + "\r\n iov length: %lu" + "\r\n data size: %u" + "\r\n", + descP->sock, + (unsigned long) opP->data.sendv.iovec->iovcnt, + (long) dataSize) ); + + ESOCK_CNT_INC(env, descP, sockRef, + esock_atom_write_tries, &descP->writeTries, 1); + + /* And now, try to send the message */ + sendv_result = sock_sendv_O(descP->sock, + (LPWSABUF) opP->data.sendv.iovec->iov, + opP->data.sendv.iovec->iovcnt, + (OVERLAPPED*) opP); + + eres = send_check_result(env, descP, opP, caller, + sendv_result, dataSize, dataInTail, + sockRef, sendRef, &cleanup); + + SSDBG( descP, + ("WIN-ESAIO", "esaio_sendv(%d) -> sent and analyzed: %d, %T\r\n", + descP->sock, sendv_result, eres) ); + + if (cleanup) { + + /* The i/o vector belongs to the op env, + * so it goes when the env goes. + */ + esock_clear_env("esaio_sendv - cleanup", opP->env); + esock_free_env("esaio_sendv - cleanup", opP->env); + + FREE( opP ); + + } + + SSDBG( descP, + ("WIN-ESAIO", "esaio_sendv {%d} -> done (%s)" + "\r\n %T" + "\r\n", descP->sock, B2S(cleanup), eres) ); + + return eres; + +} + + + /* *** Control message utility functions *** */ /* +++ decode_cmsghdrs +++ -- 2.35.3
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor