Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:24
erlang
3221-Optimize-signal-sending-to-processes-with-...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 3221-Optimize-signal-sending-to-processes-with-message_qu.patch of Package erlang
From 658f86466b25e3c68c5e4dbea5b68b657b82a611 Mon Sep 17 00:00:00 2001 From: Kjell Winblad <kjellwinblad@gmail.com> Date: Tue, 2 Feb 2021 11:48:23 +0100 Subject: [PATCH] Optimize signal sending to processes with message_queue_data=off_heap Erlang guarantees that signals (i.e., message signals and non-message signals) sent from a single process to another process are ordered in send order. However, there are no ordering guarantees for signals sent from different processes to a particular process. Therefore, several processes can send signals in parallel to a specific process without synchronizing with each other. However, such signal sending was previously always serialized as the senders had to acquire the lock for the outer signal queue of the receiving process. This commit makes it possible for several processes to send signals to a process with the message_queue_data=off_heap setting* activated in parallel and without interfering with each other. This parallel signal sending optimization yields much better scalability for signal sending than what was previously possible**. * Information about how to enable the message_queue_data=off_heap setting can be found in the documentation of the function erlang:process_flag/2. ** http://winsh.me/bench/erlang_sig_q/sigq_bench_result.html Implementation -------------- The parallel message sending optimization works only on processes with the message_queue_data=off_heap setting enabled. For processes with the message_queue_data=off_heap setting enabled, the new optimization is activated and deactivated on demand based on heuristics to give a small overhead when the optimization is unnecessary. The optimization is activated when the contention on the lock for the outer message queue is high. It is deactivated when the number of enqueued messages per fetch operation (that fetch messages from the outer message queue to the inner) is low. When the optimization is active, the outer message queue has an array of signal buffers where sending processes enqueue signals. When the receiving process needs to fetch messages from the outer message queue, the contents of the non-empty buffers are append to the outer message queue. Each process is assigned a particular slot in the buffer array (the process ID is used to hash to a particular slot). That way, the system can preserve the send order between messages coming from the same process. --- erts/emulator/beam/erl_alloc.c | 2 + erts/emulator/beam/erl_alloc.types | 1 + erts/emulator/beam/erl_bif_info.c | 4 +- erts/emulator/beam/erl_lock_check.c | 3 +- erts/emulator/beam/erl_message.c | 50 +- erts/emulator/beam/erl_message.h | 62 ++ erts/emulator/beam/erl_proc_sig_queue.c | 599 +++++++++++++++++- erts/emulator/beam/erl_proc_sig_queue.h | 52 +- erts/emulator/beam/erl_process.c | 7 +- erts/emulator/beam/erl_process.h | 3 +- erts/emulator/beam/erl_process_dump.c | 2 +- erts/emulator/beam/erl_process_lock.h | 50 ++ erts/test/Makefile | 3 +- erts/test/parallel_messages_SUITE.erl | 465 ++++++++++++++ .../visualize_throughput.html | 302 +++++++++ 15 files changed, 1552 insertions(+), 53 deletions(-) create mode 100644 erts/test/parallel_messages_SUITE.erl create mode 100644 erts/test/parallel_messages_SUITE_data/visualize_throughput.html diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c index 2cea68a817..c1ce0225e7 100644 --- a/erts/emulator/beam/erl_alloc.c +++ b/erts/emulator/beam/erl_alloc.c @@ -639,6 +639,8 @@ erts_alloc_init(int *argc, char **argv, ErtsAllocInitOpts *eaiop) = ERTS_MAGIC_BIN_UNALIGNED_SIZE(sizeof(ErtsMagicIndirectionWord)); fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_RECV_MARK_BLK)] = sizeof(ErtsRecvMarkerBlock); + fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_SIGQ_BUFFERS)] + = sizeof(ErtsSignalInQueueBufferArray); #ifdef HARD_DEBUG hdbg_init(); diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index bb6d00ab8c..fb4d4094e2 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -254,6 +254,7 @@ type BINARY_FIND SHORT_LIVED PROCESSES binary_find type CRASH_DUMP STANDARD SYSTEM crash_dump type DIST_TRANSCODE SHORT_LIVED SYSTEM dist_transcode_context type RLA_BLOCK_CNTRS LONG_LIVED SYSTEM release_literal_area_block_counters +type SIGQ_BUFFERS FIXED_SIZE PROCESSES process_signal_queue_buffers type THR_Q_EL STANDARD SYSTEM thr_q_element type THR_Q_EL_SL FIXED_SIZE SYSTEM sl_thr_q_element diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 3a1d79662e..4e9d1c4300 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -1110,7 +1110,7 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) sreds = reds_left; if (!local_only) { - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); } @@ -1218,7 +1218,7 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) } if (flags & ERTS_PI_FLAG_NEED_MSGQ_LEN) { ASSERT(locks & ERTS_PROC_LOCK_MAIN); - erts_proc_lock(rp, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_queue_lock(rp); erts_proc_sig_fetch(rp); if (rp->sig_qs.cont) { erts_proc_unlock(rp, locks|ERTS_PROC_LOCK_MSGQ); diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c index dd8a981353..3b71e1ccb8 100644 --- a/erts/emulator/beam/erl_lock_check.c +++ b/erts/emulator/beam/erl_lock_check.c @@ -168,6 +168,7 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "perf", NULL }, { "jit_debug_descriptor", NULL }, { "erts_mmap", NULL }, + { "proc_sig_queue_buffer", "address" }, #ifdef ERTS_ENSURE_OS_MONOTONIC_TIME { "ensure_os_monotonic_time", NULL } #endif diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index a6eecfabdc..86b1b6f6a4 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -343,7 +343,8 @@ erts_queue_dist_message(Process *rcvr, /* Add messages last in message queue */ static void -queue_messages(Process* receiver, +queue_messages(Process* sender, /* is NULL if the sender is not a local process */ + Process* receiver, ErtsProcLocks receiver_locks, ErtsMessage* first, ErtsMessage** last, @@ -367,8 +368,17 @@ queue_messages(Process* receiver, ERTS_LC_ASSERT((erts_proc_lc_my_proc_locks(receiver) & ERTS_PROC_LOCK_MSGQ) == (receiver_locks & ERTS_PROC_LOCK_MSGQ)); + /* + * Try to enqueue to an outer message queue buffer instead of + * directly to the outer message queue + */ + if (erts_proc_sig_queue_try_enqueue_to_buffer(sender, receiver, receiver_locks, + first, last, NULL, len, 0)) { + return; + } + if (!(receiver_locks & ERTS_PROC_LOCK_MSGQ)) { - erts_proc_lock(receiver, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_queue_lock(receiver); locked_msgq = 1; } @@ -390,8 +400,23 @@ queue_messages(Process* receiver, return; } + /* + * Install buffers for the outer message if the heuristic + * indicates that this is beneficial. It is best to do this as + * soon as possible to avoid as much contention as possible. + */ + erts_proc_sig_queue_maybe_install_buffers(receiver, state); + if (last == &first->next) { ASSERT(len == 1); + if (state & ERTS_PSFLG_OFF_HEAP_MSGQ) { + /* + * Flush outer signal queue buffers, if such buffers are + * installed, to ensure that messages from the same + * process cannot be reordered. + */ + erts_proc_sig_queue_flush_buffers(receiver); + } LINK_MESSAGE(receiver, first); } else { @@ -446,7 +471,7 @@ erts_queue_message(Process* receiver, ErtsProcLocks receiver_locks, ERL_MESSAGE_TERM(mp) = msg; ERL_MESSAGE_FROM(mp) = from; ERL_MESSAGE_TOKEN(mp) = am_undefined; - queue_messages(receiver, receiver_locks, mp, &mp->next, 1); + queue_messages(NULL, receiver, receiver_locks, mp, &mp->next, 1); } /** @@ -463,7 +488,7 @@ erts_queue_message_token(Process* receiver, ErtsProcLocks receiver_locks, ERL_MESSAGE_TERM(mp) = msg; ERL_MESSAGE_FROM(mp) = from; ERL_MESSAGE_TOKEN(mp) = token; - queue_messages(receiver, receiver_locks, mp, &mp->next, 1); + queue_messages(NULL, receiver, receiver_locks, mp, &mp->next, 1); } @@ -484,7 +509,7 @@ erts_queue_proc_message(Process* sender, { ERL_MESSAGE_TERM(mp) = msg; ERL_MESSAGE_FROM(mp) = sender->common.id; - queue_messages(receiver, receiver_locks, + queue_messages(sender, receiver, receiver_locks, prepend_pending_sig_maybe(sender, receiver, mp), &mp->next, 1); } @@ -495,7 +520,7 @@ erts_queue_proc_messages(Process* sender, Process* receiver, ErtsProcLocks receiver_locks, ErtsMessage* first, ErtsMessage** last, Uint len) { - queue_messages(receiver, receiver_locks, + queue_messages(sender, receiver, receiver_locks, prepend_pending_sig_maybe(sender, receiver, first), last, len); } @@ -988,8 +1013,19 @@ erts_change_message_queue_management(Process *c_p, Eterm new_state) case am_off_heap: break; case am_on_heap: - c_p->sig_qs.flags |= FS_ON_HEAP_MSGQ; + erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + /* + * The flags are changed while holding the + * ERTS_PROC_LOCK_MSGQ lock so that it is garanteed that + * there are no messages in buffers if (c_p->sig_qs.flags + * & FS_ON_HEAP_MSGQ) and the ERTS_PROC_LOCK_MSGQ is held. + */ + erts_proc_sig_queue_flush_and_deinstall_buffers(c_p); + + c_p->sig_qs.flags |= FS_ON_HEAP_MSGQ; c_p->sig_qs.flags &= ~FS_OFF_HEAP_MSGQ; + + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); /* * We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ * if a off heap change is ongoing. It will be adjusted diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 89017a3998..a4a4ec29db 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -33,6 +33,28 @@ #define ERTS_MSG_COPY_WORDS_PER_REDUCTION 64 #endif +/* The number of buffers have to be 64 or less because we currenlty + use a single word to implement a bitset with information about + non-empty buffers */ +#ifdef DEBUG +#define ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS 64 +#define ERTS_PROC_SIG_INQ_BUFFERED_CONTENTION_INSTALL_LIMIT 250 +#define ERTS_PROC_SIG_INQ_BUFFERED_ALWAYS_TURN_ON 1 +#define ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE 2 +#define ERTS_PROC_SIG_INQ_BUFFERED_MIN_NO_ENQUEUES_TO_KEEP \ + (ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE + \ + ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE / 2) +#else +#define ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS 64 +#define ERTS_PROC_SIG_INQ_BUFFERED_CONTENTION_INSTALL_LIMIT 50 +#define ERTS_PROC_SIG_INQ_BUFFERED_ALWAYS_TURN_ON 0 +#define ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE 8192 +/* At least 1.5 enqueues per flush all op */ +#define ERTS_PROC_SIG_INQ_BUFFERED_MIN_NO_ENQUEUES_TO_KEEP \ + (ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE + \ + ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE / 2) +#endif + struct proc_bin; struct external_thing_; @@ -340,6 +362,46 @@ typedef struct { #endif } ErtsSignalInQueue; +typedef union { + struct ___ErtsSignalInQueueBufferFields { + erts_mtx_t lock; + /* + * Boolean value indicateing if the buffer is alive. An + * enqueue attempt to a dead buffer has to be canceled + */ + int alive; + /* + * The number of enqueues that has been performed to this + * buffer. This value is used to decide if we should adapt + * back to an unbuffered state + */ + Uint nr_of_enqueues; + ErtsSignalInQueue queue; + } b; + byte align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(struct ___ErtsSignalInQueueBufferFields))]; +} ErtsSignalInQueueBuffer; + +#if ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS > 64 +#error The data structure holding information about which slots that are non-empty (the nonempty_slots field in the struct below) needs to be changed (it currently only supports up to 64 slots) +#endif + +typedef struct { + ErtsSignalInQueueBuffer slots[ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS]; + ErtsThrPrgrLaterOp free_item; + erts_atomic64_t nonempty_slots; + erts_atomic64_t nonmsg_slots; + /* + * dirty_refc is incremented by dirty schedulers that access the + * buffer array to prevent deallocation while they are accessing + * the buffer array. This is needed since dirty schedulers are not + * part of the thread progress system. + */ + erts_atomic64_t dirty_refc; + Uint nr_of_rounds; + Uint nr_of_enqueues; + int alive; +} ErtsSignalInQueueBufferArray; + typedef struct erl_trace_message_queue__ { struct erl_trace_message_queue__ *next; /* point to the next receiver */ Eterm receiver; diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 97709e7028..711e15bc92 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -252,6 +252,12 @@ static void Eterm req_id, Eterm result); static void handle_missing_spawn_reply(Process *c_p, ErtsMonitor *omon); +static Uint proc_sig_queue_flush_buffer(Process* proc, + Uint buffer_index, + ErtsSignalInQueueBufferArray* buffers); +static void proc_sig_queue_lock_buffer(ErtsSignalInQueueBuffer* slot); +static void proc_sig_queue_unlock_buffer(ErtsSignalInQueueBuffer* slot); + #ifdef ERTS_PROC_SIG_HARD_DEBUG #define ERTS_PROC_SIG_HDBG_PRIV_CHKQ(P, T, NMN) \ do { \ @@ -526,46 +532,105 @@ static int dbg_count_nmsigs(ErtsMessage *first) } #endif +#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_BUFFERS +static int dbg_count_all(ErtsMessage *first) +{ + ErtsMessage *sig; + int cnt = 0; + + for (sig = first; sig; sig = sig->next) { + ++cnt; + } + return cnt; +} + +static int dbg_check_non_msg(ErtsSignalInQueue* q) +{ + ErtsMessage** m = q->nmsigs.next; + int cnt = 0; + ErtsMessage** prev_m = NULL; + while (m != NULL) { + ERTS_ASSERT(ERTS_SIG_IS_NON_MSG(*m)); + cnt++; + prev_m = m; + m = ((ErtsSignal *) (*m))->common.specific.next; + } + if (cnt > 0) { + ERTS_ASSERT(prev_m == q->nmsigs.last); + } + return cnt; +} +#endif /* ERTS_PROC_SIG_HARD_DEBUG_SIGQ_BUFFERS */ + static ERTS_INLINE erts_aint32_t enqueue_signals(Process *rp, ErtsMessage *first, ErtsMessage **last, ErtsMessage **last_next, Uint num_msgs, - erts_aint32_t in_state) + erts_aint32_t state, + ErtsSignalInQueue* dest_queue) { - erts_aint32_t state = in_state; - ErtsMessage **this = rp->sig_inq.last; + ErtsMessage **this; + int is_to_buffer = dest_queue != &rp->sig_inq; + int flush_buffers = (!is_to_buffer) && (state & ERTS_PSFLG_OFF_HEAP_MSGQ); - ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp); + if (flush_buffers) { + erts_proc_sig_queue_flush_buffers(rp); +#ifdef DEBUG + /* + * The following read is necessary to prevent + * ASSERT(is_to_buffer || state & ERTS_PSFLG_SIG_IN_Q) assert + * below from failing. + */ + state = erts_atomic32_read_nob(&rp->state); +#endif + } + + this = dest_queue->last; + + if ( ! is_to_buffer ){ + ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp); + } ASSERT(!*this); *this = first; - rp->sig_inq.last = last; + dest_queue->last = last; - if (!rp->sig_inq.nmsigs.next) { - ASSERT(!rp->sig_inq.nmsigs.last); + if (!dest_queue->nmsigs.next) { + ASSERT(!dest_queue->nmsigs.last); if (ERTS_SIG_IS_NON_MSG(first)) { - rp->sig_inq.nmsigs.next = this; + dest_queue->nmsigs.next = this; } else if (last_next) { ASSERT(first->next && ERTS_SIG_IS_NON_MSG(first->next)); - rp->sig_inq.nmsigs.next = &first->next; + dest_queue->nmsigs.next = &first->next; } else goto no_nmsig; + if (is_to_buffer) { + /* + * Check state first to avoid write overhead when it is + * unecessary. + */ + if ( ! (state & ERTS_PSFLG_SIG_IN_Q)) { + state = erts_atomic32_read_bor_relb(&rp->state, + ERTS_PSFLG_SIG_IN_Q); + } + } else { + state = erts_atomic32_read_bor_nob(&rp->state, + ERTS_PSFLG_SIG_IN_Q); + } - state = erts_atomic32_read_bor_nob(&rp->state, - ERTS_PSFLG_SIG_IN_Q); no_nmsig: - ASSERT(!(state & ERTS_PSFLG_SIG_IN_Q)); + ; } else { ErtsSignal *sig; - ASSERT(rp->sig_inq.nmsigs.last); + ASSERT(dest_queue->nmsigs.last); - sig = (ErtsSignal *) *rp->sig_inq.nmsigs.last; + sig = (ErtsSignal *) *dest_queue->nmsigs.last; ASSERT(sig && !sig->common.specific.next); - ASSERT(state & ERTS_PSFLG_SIG_IN_Q); + ASSERT(is_to_buffer || state & ERTS_PSFLG_SIG_IN_Q); if (ERTS_SIG_IS_NON_MSG(first)) { sig->common.specific.next = this; } @@ -577,16 +642,16 @@ enqueue_signals(Process *rp, ErtsMessage *first, if (last_next) { ASSERT(dbg_count_nmsigs(first) >= 2); - rp->sig_inq.nmsigs.last = last_next; + dest_queue->nmsigs.last = last_next; } else if (ERTS_SIG_IS_NON_MSG(first)) { ASSERT(dbg_count_nmsigs(first) == 1); - rp->sig_inq.nmsigs.last = this; + dest_queue->nmsigs.last = this; } else ASSERT(dbg_count_nmsigs(first) == 0); - rp->sig_inq.len += num_msgs; + dest_queue->len += num_msgs; ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp); @@ -598,7 +663,7 @@ erts_aint32_t erts_enqueue_signals(Process *rp, ErtsMessage *first, Uint num_msgs, erts_aint32_t in_state) { - return enqueue_signals(rp, first, last, last_next, num_msgs, in_state); + return enqueue_signals(rp, first, last, last_next, num_msgs, in_state, &rp->sig_inq); } void @@ -731,14 +796,27 @@ first_last_done: last->next = NULL; - erts_proc_lock(rp, ERTS_PROC_LOCK_MSGQ); + if (op != ERTS_SIG_Q_OP_PROCESS_INFO && + erts_proc_sig_queue_try_enqueue_to_buffer(c_p, rp, 0, first, + &last->next, last_next, + 0, 1)) { + if (!is_normal_sched) + erts_proc_dec_refc(rp); + return 1; + } + + erts_proc_sig_queue_lock(rp); state = erts_atomic32_read_nob(&rp->state); + erts_proc_sig_queue_maybe_install_buffers(rp, state); + if (ERTS_PSFLG_FREE & state) res = 0; else { - state = enqueue_signals(rp, first, &last->next, last_next, 0, state); + state = enqueue_signals(rp, first, &last->next, + last_next, 0, state, + &rp->sig_inq); if (ERTS_UNLIKELY(op == ERTS_SIG_Q_OP_PROCESS_INFO)) check_push_msgq_len_offs_marker(rp, sig); res = !0; @@ -833,8 +911,6 @@ erts_proc_sig_fetch__(Process *proc) ASSERT(proc->sig_inq.first); if (!proc->sig_inq.nmsigs.next) { - ASSERT(!(ERTS_PSFLG_SIG_IN_Q - & erts_atomic32_read_nob(&proc->state))); ASSERT(!proc->sig_inq.nmsigs.last); if (proc->sig_qs.cont || ERTS_MSG_RECV_TRACED(proc)) { @@ -856,13 +932,10 @@ erts_proc_sig_fetch__(Process *proc) else proc->sig_qs.nmsigs.next = proc->sig_inq.nmsigs.next; - s = erts_atomic32_read_bset_nob(&proc->state, - (ERTS_PSFLG_SIG_Q - | ERTS_PSFLG_SIG_IN_Q), - ERTS_PSFLG_SIG_Q); - - ASSERT((s & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) - == ERTS_PSFLG_SIG_IN_Q); (void)s; + erts_atomic32_read_bset_nob(&proc->state, + (ERTS_PSFLG_SIG_Q + | ERTS_PSFLG_SIG_IN_Q), + ERTS_PSFLG_SIG_Q); } else { ErtsSignal *sig; @@ -4943,7 +5016,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, state = erts_atomic32_read_nob(&c_p->state); if (!local_only) { if (ERTS_PSFLG_SIG_IN_Q & state) { - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); } @@ -5779,8 +5852,6 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp, ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); - ASSERT(!(ERTS_PSFLG_SIG_IN_Q & erts_atomic32_read_nob(&c_p->state))); - limit = *redsp; limit *= ERTS_SIG_REDS_CNT_FACTOR; @@ -6250,7 +6321,7 @@ erts_proc_sig_receive_helper(Process *c_p, consumed_reds += 4; left_reds -= 4; - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); /* * Messages may have been moved directly to @@ -7772,3 +7843,463 @@ erts_proc_sig_hdbg_check_in_queue(Process *p, char *what, char *file, int line) } #endif /* ERTS_PROC_SIG_HARD_DEBUG */ + +void erts_proc_sig_queue_lock(Process* proc) +{ + if (EBUSY == erts_proc_trylock(proc, ERTS_PROC_LOCK_MSGQ)) { + erts_proc_lock(proc, ERTS_PROC_LOCK_MSGQ); + proc->sig_inq_contention_counter += 1; + } else if(proc->sig_inq_contention_counter > 0) { + proc->sig_inq_contention_counter -= 1; + } +} + +static void proc_sig_queue_lock_buffer(ErtsSignalInQueueBuffer* slot) +{ + erts_mtx_lock(&slot->b.lock); +} + +static void proc_sig_queue_unlock_buffer(ErtsSignalInQueueBuffer* slot) +{ + erts_mtx_unlock(&slot->b.lock); +} + +int +erts_proc_sig_queue_try_enqueue_to_buffer(Process* sender, /* is NULL if the sender is not a local process */ + Process* receiver, + ErtsProcLocks receiver_locks, + ErtsMessage* first, + ErtsMessage** last, + ErtsMessage** last_next, + Uint len, + int is_nonmsg_signal_enqueue) +{ + int need_unget_buffers; + ErtsSignalInQueueBufferArray* buffers; + if ((receiver_locks & ERTS_PROC_LOCK_MSGQ) || + NULL == (buffers = erts_proc_sig_queue_get_buffers(receiver, &need_unget_buffers))) { + /* We never need to unget the buffers array if we do not get it */ + return 0; + } else { + /* + * Use the sender process ID to hash to an outer signal queue + * buffer. This guarantees that all signals from the same + * process are ordered in send order. + */ + Uint to_hash = + (sender == NULL ? 0 : internal_pid_number(sender->common.id)); + Uint slot = to_hash % ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; + ErtsSignalInQueueBuffer* buffer = &buffers->slots[slot]; + Uint64 nonempty_slots_before = 1; + Uint32 state; + /* + * Multiple signals or is_nonmsg_signal_enqueue means that we + * report that there is a non-msg signal in the queue. + */ + int is_nonmsg_signal_or_multi_sig = + is_nonmsg_signal_enqueue || !(last == &first->next); + + proc_sig_queue_lock_buffer(buffer); + + if ( ! buffer->b.alive ) { + /* + * The enqueue attempt fails if the buffer is dead. This + * means that the buffer array has got uninstalled. + */ + proc_sig_queue_unlock_buffer(buffer); + erts_proc_sig_queue_unget_buffers(buffers, need_unget_buffers); + return 0; + } + /* + * The buffer is alive and locked. This means that it is safe + * to insert signals to it + */ + + if (&buffer->b.queue.first == buffer->b.queue.last) { + /* The buffer is empty so we need to notify the receiver + * unless some other slot is nonempty (in that case + * another enqueuer has already (or will) notified the + * receiver). + */ + nonempty_slots_before = + (Uint64)erts_atomic64_read_bor_nob(&buffers->nonempty_slots, + (erts_aint64_t)(((Uint64)1) << slot)); + } + + if (is_nonmsg_signal_or_multi_sig && !buffer->b.queue.nmsigs.next) { + /* + * Inserting nonmsg signal and no nonmsg signals in buffer + * before. This means that we have to update the nonmsg + * status of this buffer in the buffers structure. + * + * Acquire barrier is used since we do not want this + * operation to be reordered with setting the + * ERTS_PSFLG_SIG_IN_Q flag inside the enqueue_signals + * call below. + */ + erts_atomic64_read_bor_mb(&buffers->nonmsg_slots, + (erts_aint64_t)(((Uint64)1) << slot)); + } + + state = erts_atomic32_read_nob(&receiver->state); + + if (last == &first->next && !is_nonmsg_signal_or_multi_sig) { + /* + * Optimization for the common case of a single message + * signal. + */ + ASSERT(len == 1); + ASSERT(ERTS_SIG_IS_MSG(first)); + *buffer->b.queue.last = first; + buffer->b.queue.last = &first->next; + buffer->b.queue.len++; + } else { + state = + enqueue_signals(receiver, + first, + last, + last_next, + len, + state, + &buffer->b.queue); + } + buffer->b.nr_of_enqueues += 1; + + proc_sig_queue_unlock_buffer(buffer); + + /* + * The signal(s) are inserted into a buffer. However, we are + * not done because we need to notify the scheduler about that + * we have new signals. + */ + + if (!nonempty_slots_before) { + + /* + * There is one situation in which we need to synchronize + * with the ERTS_PROC_LOCK_MSGQ lock: + * + * The buffer we inserted to was empty before we inserted + * to it, and no other buffer was marked as nonempty. In + * this case the process might hold the + * ERTS_PROC_LOCK_MSGQ to check if there are any more + * messages. If the process does not find any messages, + * it tells the scheduler to put the process to sleep + * while still holding the lock. Therefore, we wait until + * the ERTS_PROC_LOCK_MSGQ is released before we requests + * the scheduler to schedule the process (with a call to + * erts_proc_notify_new_message or + * erts_proc_notify_new_sig) so the request does not get + * overwritten by the sleep request. + * + */ + + erts_proc_lock_wait_until_released(receiver, ERTS_PROC_LOCK_MSGQ); + } + + if (is_nonmsg_signal_or_multi_sig) { + if (is_nonmsg_signal_enqueue) { + erts_proc_notify_new_sig(receiver, state, 0); + } else { + erts_proc_notify_new_sig(receiver, state, ERTS_PSFLG_ACTIVE); + } + } else { + erts_proc_notify_new_message(receiver, receiver_locks); + } + erts_proc_sig_queue_unget_buffers(buffers, need_unget_buffers); + return 1; + } +} + + +static void sig_inq_concat(ErtsSignalInQueue* q1, ErtsSignalInQueue* q2) +{ + ErtsMessage** first_queue_last = q1->last; + /* Second queue should not be empty */ + ASSERT(q2->last != &q2->first); + if (NULL == q1->nmsigs.next) { + /* There is no non-message signals in q1 but maybe in q2 */ + if (q2->nmsigs.next != NULL) { + /* There is non-message signals in q2 but not in q1 */ + if (q2->nmsigs.next == &q2->first) { + /* The first message in q2 is a non-message signal + (The next pointer to the first non-message signal + comes from the first queue) */ + q1->nmsigs.next = first_queue_last; + } else { + /* Internal message in q2 is the first non-message signal */ + q1->nmsigs.next = q2->nmsigs.next; + } + if (q2->nmsigs.next == q2->nmsigs.last) { + /* Only one non-message signal in q2 (q1->nmsigs.last + should be the same as q1->nmsigs.next which is + already set up correctly) */ + q1->nmsigs.last = q1->nmsigs.next; + } else { + /* More than one non-message signals in q2 */ + q1->nmsigs.last = q2->nmsigs.last; + } + } + } else if (NULL != q2->nmsigs.next) { + ErtsMessage** first_nmsig_in_q2; + /* We have non-message signals in both queues */ + if (q2->nmsigs.next == &q2->first) { + /* The first signal in q2 is a non-message signal */ + ErtsSignal *sig; + sig = (ErtsSignal *) *q1->nmsigs.last; + sig->common.specific.next = first_queue_last; + first_nmsig_in_q2 = first_queue_last; + } else { + /* The first signal in q2 is a message signal */ + ErtsSignal *sig; + sig = (ErtsSignal *) *q1->nmsigs.last; + sig->common.specific.next = q2->nmsigs.next; + first_nmsig_in_q2 = q2->nmsigs.next; + } + if (q2->nmsigs.last == &q2->first) { + /* Only one non-message signal in q2 */ + q1->nmsigs.last = first_nmsig_in_q2; + } else { + q1->nmsigs.last = q2->nmsigs.last; + } + } + *q1->last = q2->first; + q1->last = q2->last; + q1->len += q2->len; + ASSERT((!q1->nmsigs.next && !q1->nmsigs.last) || (q1->nmsigs.next && q1->nmsigs.last)); +} + +static Uint proc_sig_queue_flush_buffer(Process* proc, + Uint buffer_index, + ErtsSignalInQueueBufferArray* buffers) +{ + Uint nr_of_enqueues; + ErtsSignalInQueueBuffer* buf = &buffers->slots[buffer_index]; + proc_sig_queue_lock_buffer(buf); + /* This function should only be called when there is at least one + item in the buffer */ + ASSERT(buf->b.queue.first != NULL); + nr_of_enqueues = buf->b.nr_of_enqueues; + buf->b.nr_of_enqueues = 0; + ASSERT(nr_of_enqueues > 0); + if (buf->b.alive) { + sig_inq_concat(&proc->sig_inq, &buf->b.queue); + buf->b.queue.first = NULL; + buf->b.queue.last = &buf->b.queue.first; + buf->b.queue.len = 0; + buf->b.queue.nmsigs.next = NULL; + buf->b.queue.nmsigs.last = NULL; + } + /* + * The appropriate bit in &buffers->nonempty_slots needs to be + * cleared because a thread might have inserted something after + * all bits got cleared in erts_proc_sig_queue_flush_all_buffers. + */ + erts_atomic64_read_band_nob(&buffers->nonempty_slots, + (erts_aint64_t)(~(((Uint64)1) << buffer_index))); + /* + * The nonmsg_slots flag for this slot also needs to be cleared so + * that the erts_proc_sig_fetch function can detect if it has + * reset the ERTS_PSFLG_SIG_IN_Q when it should not do that. + */ + erts_atomic64_read_band_nob(&buffers->nonmsg_slots, + (erts_aint64_t)(~(((Uint64)1) << buffer_index))); + proc_sig_queue_unlock_buffer(buf); + return nr_of_enqueues; +} + + +ErtsSignalInQueueBufferArray* +erts_proc_sig_queue_flush_get_buffers(Process* proc, int *need_unget_buffers) +{ + Uint i; + ErtsSignalInQueueBufferArray* buffers; + Uint64 nonempty_slots; + buffers = erts_proc_sig_queue_get_buffers(proc, need_unget_buffers); + if (NULL == buffers) { + return NULL; + } + nonempty_slots = (Uint64)erts_atomic64_xchg_nob(&buffers->nonempty_slots, + (erts_aint64_t)((Uint64)0)); + if (nonempty_slots != 0) { + for(i = 0; i < ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; i++) { + Uint64 slot_mask = (((Uint64)1) << i); + if (nonempty_slots & slot_mask) { + buffers->nr_of_enqueues += + proc_sig_queue_flush_buffer(proc, i, buffers); + } + } + } + buffers->nr_of_rounds += 1; + if (buffers->nr_of_rounds > + ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE) { + /* Take decision if we should adapt back to the normal state */ + if(buffers->nr_of_enqueues < + ERTS_PROC_SIG_INQ_BUFFERED_MIN_NO_ENQUEUES_TO_KEEP) { + erts_proc_sig_queue_flush_and_deinstall_buffers(proc); + } else { + buffers->nr_of_rounds = 0; + buffers->nr_of_enqueues = 0; + } + } + return buffers; +} + + +void +erts_proc_sig_queue_flush_buffers(Process* proc) +{ + int need_undread_buffers; + ErtsSignalInQueueBufferArray* buffers = + erts_proc_sig_queue_flush_get_buffers(proc, &need_undread_buffers); + erts_proc_sig_queue_unget_buffers(buffers, need_undread_buffers); +} + +static void do_sigq_buffer_array_refc_dec(void *buffers_p) +{ + ErtsSignalInQueueBufferArray* buffers = buffers_p; + erts_proc_sig_queue_unget_buffers(buffers, 1); +} + + +static void do_schedule_sigq_buffer_array_refc_dec(void *buffers_p) +{ + ErtsSignalInQueueBufferArray* buffers = buffers_p; + erts_schedule_thr_prgr_later_cleanup_op(do_sigq_buffer_array_refc_dec, + buffers, + &buffers->free_item, + sizeof(ErtsSignalInQueueBufferArray)); +} + +void erts_proc_sig_queue_flush_and_deinstall_buffers(Process* proc) +{ + Uint i; + ErtsSignalInQueueBufferArray* buffers; + int need_unget_buffers; + ErtsSchedulerData *esdp; + ERTS_LC_ASSERT(ERTS_PROC_IS_EXITING(proc) || + (erts_proc_lc_my_proc_locks(proc) & ERTS_PROC_LOCK_MSGQ)); + buffers = erts_proc_sig_queue_get_buffers(proc, &need_unget_buffers); + if (buffers == NULL) { + return; + } + if (!buffers->alive) { + erts_proc_sig_queue_unget_buffers(buffers, need_unget_buffers);; + return; + } + buffers->alive = 0; + proc->sig_inq_contention_counter = 0; + for (i = 0; i < ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; i++) { + proc_sig_queue_lock_buffer(&buffers->slots[i]); + if (buffers->slots[i].b.queue.first != NULL) { + sig_inq_concat(&proc->sig_inq, &buffers->slots[i].b.queue); + } + buffers->slots[i].b.alive = 0; + proc_sig_queue_unlock_buffer(&buffers->slots[i]); + } + /* + * Nothing can be enqueued to the buffer array any more + */ + erts_atomic_set_mb(&proc->sig_inq_buffers, (erts_aint_t)NULL); + erts_proc_sig_queue_unget_buffers(buffers, need_unget_buffers); + /* + * We should now do an additional reference count decrement to + * force an eventiuall free of buffer array but we need to do that + * after a thread progress period because an unmanaged thread + * might be sleeping just before it will increment the reference + * count. + */ + esdp = erts_get_scheduler_data(); + if (esdp != NULL && esdp->type == ERTS_SCHED_NORMAL) { + erts_schedule_thr_prgr_later_cleanup_op(do_sigq_buffer_array_refc_dec, + buffers, + &buffers->free_item, + sizeof(ErtsSignalInQueueBufferArray)); + } else { + /* + * We cannot schedule a thread progress later cleanup + * operation from an unmanaged thread so we schedule + * that task to be run on a managed thread. + */ + erts_schedule_misc_aux_work(1, + do_schedule_sigq_buffer_array_refc_dec, + buffers); + } +} + +void erts_proc_sig_queue_maybe_install_buffers(Process* p, erts_aint32_t state) +{ + int i; + ErtsSignalInQueueBufferArray* buffers; + if (!(state & ERTS_PSFLG_OFF_HEAP_MSGQ) || + (((ErtsSignalInQueueBufferArray*)erts_atomic_read_nob(&p->sig_inq_buffers)) != NULL) || + (!ERTS_PROC_SIG_INQ_BUFFERED_ALWAYS_TURN_ON && + p->sig_inq_contention_counter <= ERTS_PROC_SIG_INQ_BUFFERED_CONTENTION_INSTALL_LIMIT)) { + return; + } + p->sig_inq_contention_counter = 0; + buffers = erts_alloc(ERTS_ALC_T_SIGQ_BUFFERS, + sizeof(ErtsSignalInQueueBufferArray)); + erts_atomic64_init_nob(&buffers->nonempty_slots, (erts_aint64_t)(Uint64)0); + erts_atomic64_init_nob(&buffers->nonmsg_slots, (erts_aint64_t)(Uint64)0); + erts_atomic64_init_nob(&buffers->dirty_refc, (erts_aint64_t)(Uint64)1); + buffers->nr_of_enqueues = 0; + buffers->nr_of_rounds = 0; + buffers->alive = 1; + /* Initialize slots */ + for(i = 0; i < ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; i++) { + buffers->slots[i].b.alive = 1; + erts_mtx_init(&buffers->slots[i].b.lock, + "proc_sig_queue_buffer", + NIL, + ERTS_LOCK_FLAGS_CATEGORY_PROCESS); + buffers->slots[i].b.queue.first = NULL; + buffers->slots[i].b.queue.last = &buffers->slots[i].b.queue.first; + buffers->slots[i].b.queue.len = 0; + buffers->slots[i].b.queue.nmsigs.next = NULL; + buffers->slots[i].b.queue.nmsigs.last = NULL; + buffers->slots[i].b.nr_of_enqueues = 0; + } + erts_atomic_set_relb(&p->sig_inq_buffers, (erts_aint_t)buffers); +} + +ErtsSignalInQueueBufferArray* +erts_proc_sig_queue_get_buffers(Process* p, int *need_unread) +{ + ErtsThrPrgrDelayHandle dhndl = + erts_thr_progress_unmanaged_delay(); + ErtsSignalInQueueBufferArray* buffers = + (ErtsSignalInQueueBufferArray*)erts_atomic_read_acqb(&p->sig_inq_buffers); + *need_unread = 0; + if (ERTS_THR_PRGR_DHANDLE_MANAGED == dhndl) { + erts_thr_progress_unmanaged_continue(dhndl); + return buffers; + } + if (buffers == NULL) { + erts_thr_progress_unmanaged_continue(dhndl); + return NULL; + } + erts_atomic64_inc_mb(&buffers->dirty_refc); + erts_thr_progress_unmanaged_continue(dhndl); + *need_unread = 1; + return buffers; +} + +void erts_proc_sig_queue_unget_buffers(ErtsSignalInQueueBufferArray* buffers, + int need_unget) +{ + if (!need_unget) { + return; + } else { + int i; + erts_aint64_t refc = erts_atomic64_dec_read_mb(&buffers->dirty_refc); + ASSERT(refc >= 0); + if (refc != 0) { + return; + } + for (i = 0; i < ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; i++) { + erts_mtx_destroy(&buffers->slots[i].b.lock); + } + erts_free(ERTS_ALC_T_SIGQ_BUFFERS, buffers); + } +} diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index 236b20ab4d..37c5a93947 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -91,6 +91,9 @@ #if 0 # define ERTS_PROC_SIG_HARD_DEBUG_RECV_MARKER #endif +#if 0 +# define ERTS_PROC_SIG_HARD_DEBUG_SIGQ_BUFFERS +#endif struct erl_mesg; struct erl_dist_external; @@ -238,6 +241,26 @@ void erl_proc_sig_hdbg_chk_recv_marker_block(struct process *c_p); #include "erl_process.h" #include "erl_bif_unique.h" + +void erts_proc_sig_queue_maybe_install_buffers(Process* p, erts_aint32_t state); +void erts_proc_sig_queue_flush_and_deinstall_buffers(Process* proc); +void erts_proc_sig_queue_flush_buffers(Process* proc); +ErtsSignalInQueueBufferArray* +erts_proc_sig_queue_flush_get_buffers(Process* proc, int *need_unget_buffers); +void erts_proc_sig_queue_lock(Process* proc); +ErtsSignalInQueueBufferArray* +erts_proc_sig_queue_get_buffers(Process* p, int *need_unread); +void erts_proc_sig_queue_unget_buffers(ErtsSignalInQueueBufferArray* buffers, + int need_unget); +int erts_proc_sig_queue_try_enqueue_to_buffer(Process* sender, /* is NULL if the sender is not a local process */ + Process* receiver, + ErtsProcLocks receiver_locks, + ErtsMessage* first, + ErtsMessage** last, + ErtsMessage** last_next, + Uint len, + int is_signal); + #define ERTS_SIG_Q_OP_BITS 8 #define ERTS_SIG_Q_OP_SHIFT 0 #define ERTS_SIG_Q_OP_MASK ((1 << ERTS_SIG_Q_OP_BITS) - 1) @@ -1545,7 +1568,8 @@ erts_proc_sig_fetch(Process *proc) { Sint res = 0; ErtsSignal *sig; - + ErtsSignalInQueueBufferArray* buffers; + int need_unget_buffers; ERTS_LC_ASSERT(ERTS_PROC_IS_EXITING(proc) || ((erts_proc_lc_my_proc_locks(proc) & (ERTS_PROC_LOCK_MAIN @@ -1556,6 +1580,9 @@ erts_proc_sig_fetch(Process *proc) ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(proc); ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc, !0); + buffers = erts_proc_sig_queue_flush_get_buffers(proc, + &need_unget_buffers); + sig = (ErtsSignal *) proc->sig_inq.first; if (sig) { if (ERTS_LIKELY(sig->common.tag != ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK)) @@ -1563,7 +1590,22 @@ erts_proc_sig_fetch(Process *proc) else res = erts_proc_sig_fetch_msgq_len_offs__(proc); } - + if (buffers) { + Uint32 state = erts_atomic32_read_acqb(&proc->state); + if (!(ERTS_PSFLG_SIG_IN_Q & state) && + erts_atomic64_read_nob(&buffers->nonmsg_slots)) { + /* + * We may have raced with a thread inserting into a buffer + * when reseting the flag ERTS_PSFLG_SIG_IN_Q in one of + * the fetch functions above so we have to make sure that + * it is set when there is a nonmsg signal in the buffers. + */ + erts_atomic32_read_bor_nob(&proc->state, + ERTS_PSFLG_SIG_IN_Q | + ERTS_PSFLG_ACTIVE); + } + erts_proc_sig_queue_unget_buffers(buffers, need_unget_buffers); + } res += proc->sig_qs.len; ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc, !0); @@ -1740,7 +1782,7 @@ erts_msgq_recv_marker_clear(Process *c_p, Eterm id) erts_msgq_recv_marker_insert(Process *c_p) { ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); @@ -1789,7 +1831,7 @@ erts_msgq_recv_marker_insert_bind(Process *c_p, Eterm id) ERTS_PROC_SIG_RECV_MARK_CLEAR_OLD_MARK__(blkp); #endif - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); @@ -1881,7 +1923,7 @@ erts_msgq_set_save_end(Process *c_p) /* Set save pointer to end of message queue... */ ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index c5b392370c..6b0bebff6f 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -10455,7 +10455,7 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) st_res = am_false; if (st->arg[0] == am_false) { - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); } @@ -12115,11 +12115,13 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->sig_qs.len = 0; p->sig_qs.nmsigs.next = NULL; p->sig_qs.nmsigs.last = NULL; + p->sig_inq_contention_counter = 0; p->sig_inq.first = NULL; p->sig_inq.last = &p->sig_inq.first; p->sig_inq.len = 0; p->sig_inq.nmsigs.next = NULL; p->sig_inq.nmsigs.last = NULL; + erts_atomic_init_nob(&p->sig_inq_buffers, (erts_aint_t)NULL); #ifdef ERTS_PROC_SIG_HARD_DEBUG p->sig_inq.may_contain_heap_terms = 0; #endif @@ -12618,11 +12620,13 @@ void erts_init_empty_process(Process *p) p->sig_qs.len = 0; p->sig_qs.nmsigs.next = NULL; p->sig_qs.nmsigs.last = NULL; + p->sig_inq_contention_counter = 0; p->sig_inq.first = NULL; p->sig_inq.last = &p->sig_inq.first; p->sig_inq.len = 0; p->sig_inq.nmsigs.next = NULL; p->sig_inq.nmsigs.last = NULL; + erts_atomic_init_nob(&p->sig_inq_buffers, (erts_aint_t)NULL); #ifdef ERTS_PROC_SIG_HARD_DEBUG p->sig_inq.may_contain_heap_terms = 0; #endif @@ -13795,6 +13799,7 @@ restart: erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_queue_flush_and_deinstall_buffers(p); erts_proc_sig_fetch(p); erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 981112ebdc..6509edf947 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -1116,8 +1116,9 @@ struct process { erts_atomic32_t state; /* Process state flags (see ERTS_PSFLG_*) */ erts_atomic32_t dirty_state; /* Process dirty state flags (see ERTS_PDSFLG_*) */ - + Uint sig_inq_contention_counter; ErtsSignalInQueue sig_inq; + erts_atomic_t sig_inq_buffers; ErlTraceMessageQueue *trace_msg_q; erts_proc_lock_t lock; ErtsSchedulerData *scheduler_data; diff --git a/erts/emulator/beam/erl_process_dump.c b/erts/emulator/beam/erl_process_dump.c index 3beafb4b3c..58fb386f44 100644 --- a/erts/emulator/beam/erl_process_dump.c +++ b/erts/emulator/beam/erl_process_dump.c @@ -159,7 +159,7 @@ Uint erts_process_memory(Process *p, int include_sigs_in_transit) * Size of message queue plus size of all signals * in transit to the process! */ - erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_queue_lock(p); erts_proc_sig_fetch(p); erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); diff --git a/erts/emulator/beam/erl_process_lock.h b/erts/emulator/beam/erl_process_lock.h index 1e577cf004..b5ec8e31e1 100644 --- a/erts/emulator/beam/erl_process_lock.h +++ b/erts/emulator/beam/erl_process_lock.h @@ -462,6 +462,9 @@ typedef struct { #define ERTS_PROC_LOCK_FLGS_READ_(L) \ ((ErtsProcLocks) erts_atomic32_read_nob(&(L)->flags)) +#define ERTS_PROC_LOCK_FLGS_READ_ACQB_(L) \ + ((ErtsProcLocks) erts_atomic32_read_acqb(&(L)->flags)) + #else /* no opt atomic ops */ ERTS_GLB_INLINE ErtsProcLocks erts_proc_lock_flags_band(erts_proc_lock_t *, @@ -509,6 +512,7 @@ erts_proc_lock_flags_cmpxchg(erts_proc_lock_t *lck, ErtsProcLocks new, #define ERTS_PROC_LOCK_FLGS_CMPXCHG_RELB_(L, NEW, EXPECTED) \ erts_proc_lock_flags_cmpxchg((L), (NEW), (EXPECTED)) #define ERTS_PROC_LOCK_FLGS_READ_(L) ((L)->flags) +#define ERTS_PROC_LOCK_FLGS_READ_ACQB_(L) ((L)->flags) #endif /* end no opt atomic ops */ #endif /* ERTS_PROC_LOCK_OWN_IMPL */ @@ -918,6 +922,8 @@ ERTS_GLB_INLINE void erts_proc_lock(Process *, ErtsProcLocks); #endif ERTS_GLB_INLINE void erts_proc_unlock(Process *, ErtsProcLocks); ERTS_GLB_INLINE int erts_proc_trylock(Process *, ErtsProcLocks); +ERTS_GLB_INLINE void +erts_proc_lock_wait_until_released(Process *p, ErtsProcLocks locks); ERTS_GLB_INLINE void erts_proc_inc_refc(Process *); ERTS_GLB_INLINE void erts_proc_dec_refc(Process *); @@ -979,6 +985,50 @@ erts_proc_trylock(Process *p, ErtsProcLocks locks) locks); } +ERTS_GLB_INLINE void +erts_proc_lock_wait_until_released(Process *p, ErtsProcLocks locks) +{ +#if ERTS_PROC_LOCK_OWN_IMPL +#if !ERTS_PROC_LOCK_ATOMIC_IMPL + Uint32 was_locked; + erts_pix_lock(pix_lck); + was_locked = (ERTS_PROC_LOCK_FLGS_READ_(&p->lock) & locks); + erts_pix_unlock(pix_lck); + if (was_locked) { + erts_proc_lock(p, locks); + erts_proc_unlock(p, locks); + } +#else + ETHR_MEMBAR(ETHR_StoreLoad | ETHR_LoadLoad); + if (ERTS_PROC_LOCK_FLGS_READ_ACQB_(&p->lock) & locks) { + erts_proc_lock(p, locks); + erts_proc_unlock(p, locks); + } +#endif +#elif ERTS_PROC_LOCK_RAW_MUTEX_IMPL + if (locks & ERTS_PROC_LOCK_MAIN) { + erts_mtx_lock(&p->lock.main); + erts_mtx_unlock(&p->lock.main); + } + if (locks & ERTS_PROC_LOCK_MSGQ) { + erts_mtx_lock(&p->lock.msgq); + erts_mtx_unlock(&p->lock.msgq); + } + if (locks & ERTS_PROC_LOCK_BTM) { + erts_mtx_lock(&p->lock.btm); + erts_mtx_unlock(&p->lock.btm); + } + if (locks & ERTS_PROC_LOCK_STATUS) { + erts_mtx_lock(&p->lock.status); + erts_mtx_unlock(&p->lock.status); + } + if (locks & ERTS_PROC_LOCK_TRACE) { + erts_mtx_lock(&p->lock.trace); + erts_mtx_unlock(&p->lock.trace); + } +#endif +} + ERTS_GLB_INLINE void erts_proc_inc_refc(Process *p) { ASSERT(!(erts_atomic32_read_nob(&p->state) & ERTS_PSFLG_PROXY)); diff --git a/erts/test/Makefile b/erts/test/Makefile index f3831d2bbb..dc5dcf2ca0 100644 --- a/erts/test/Makefile +++ b/erts/test/Makefile @@ -38,7 +38,8 @@ MODULES= \ run_erl_SUITE \ erlexec_SUITE \ z_SUITE \ - upgrade_SUITE + upgrade_SUITE \ + parallel_messages_SUITE ERL_FILES= $(MODULES:%=%.erl) diff --git a/erts/test/parallel_messages_SUITE.erl b/erts/test/parallel_messages_SUITE.erl new file mode 100644 index 0000000000..9c5d6b4f28 --- /dev/null +++ b/erts/test/parallel_messages_SUITE.erl @@ -0,0 +1,465 @@ +-module(parallel_messages_SUITE). + +-export([all/0, + suite/0, + test_throughput_benchmark/1, + test_message_queue_data_switching/1, + throughput_benchmark/0, + large_throughput_benchmark/0]). + +all() -> [test_throughput_benchmark, + test_message_queue_data_switching]. + +suite() -> + [{timetrap, {minutes, 90}}]. + +get_op([{_,O}], _RandNum) -> + O; +get_op([{Prob,O}|Rest], RandNum) -> + case RandNum < Prob of + true -> O; + false -> get_op(Rest, RandNum) + end. +do_op(ProbHelpTab, Operations, Receiver) -> + RandNum = rand:uniform(), + Op = get_op(ProbHelpTab, RandNum), + TheOp = Operations(Op), + TheOp(Receiver). +do_work(WorksDoneSoFar, ProbHelpTab, Operations, Receiver) -> + receive + stop -> WorksDoneSoFar + after + 0 -> do_op(ProbHelpTab, Operations, Receiver), + do_work(WorksDoneSoFar + 1, ProbHelpTab, Operations, Receiver) + end. + +-record(parallel_messages_bench_config, + {benchmark_duration_ms = 500, + recover_time_ms = 500, + thread_counts = not_set, + nr_of_repeats = 1, + report_receive_throughput = [false, true], + spawn_opts = [[{message_queue_data, off_heap}]], + scenarios = + [ + [ + {1.0, {message_size, 1}} + ], + [ + {1.0, {exit_signal_size, 3}} + ], + [ + {0.5, {exit_signal_size, 1}}, + {0.5, {message_size, 1}} + ] + ], + notify_res_fun = fun(_Name, _Throughput) -> ok end, + print_result_paths_fun = + fun(ResultPath, _LatestResultPath) -> + Comment = + io_lib:format("<a href=\"file:///~s\">Result visualization</a>",[ResultPath]), + {comment, Comment} + end + }). + +stdout_notify_res(ResultPath, LatestResultPath) -> + io:format("Result Location: /~s~n", [ResultPath]), + io:format("Latest Result Location: ~s~n", [LatestResultPath]). + + +throughput_benchmark( + #parallel_messages_bench_config{ + benchmark_duration_ms = BenchmarkDurationMs, + recover_time_ms = RecoverTimeMs, + thread_counts = ThreadCountsOpt, + nr_of_repeats = NrOfRepeats, + report_receive_throughput = ReportReceiveThroughputList, + spawn_opts = SpawnOptsList, + scenarios = Scenarios, + notify_res_fun = NotifyResFun, + print_result_paths_fun = PrintResultPathsFun}) -> + NrOfSchedulers = erlang:system_info(schedulers), + %Parent = self(), + %% Mapping benchmark operation names to their action + Operations = + fun({message_size, Size}) -> + case get(Size) of + undefined -> + Msg = lists:seq(1, Size), + NewSendFun = + fun(Receiver) -> + Receiver ! Msg + end, + put(Size, NewSendFun), + NewSendFun; + SendFun -> + SendFun + end; + ({exit_signal_size, Size} = SigType) -> + case get(SigType) of + undefined -> + Msg = lists:seq(1, Size), + NewSendFun = + fun(Receiver) -> + erlang:exit(Receiver, Msg) + end, + put(SigType, NewSendFun), + NewSendFun; + SendFun -> + SendFun + end; + ({message_queue_data_change, off_heap}) -> + fun(Receiver) -> + Receiver ! off_heap + end; + ({message_queue_data_change, on_heap}) -> + fun(Receiver) -> + Receiver ! on_heap + end + end, + %% Helper functions + CalculateThreadCounts = + fun Calculate([Count|Rest]) -> + case Count > NrOfSchedulers of + true -> lists:reverse(Rest); + false -> Calculate([Count*2,Count|Rest]) + end + end, + CalculateOpsProbHelpTab = + fun Calculate([{_, OpName}], _) -> + [{1.0, OpName}]; + Calculate([{OpPropability, OpName}|Res], Current) -> + NewCurrent = Current + OpPropability, + [{NewCurrent, OpName}| Calculate(Res, NewCurrent)] + end, + RenderScenario = + fun R([], StringSoFar) -> + StringSoFar; + R([{Fraction, Operation}], StringSoFar) -> + io_lib:format("~s ~f% ~w",[StringSoFar, Fraction * 100.0, Operation]); + R([{Fraction, Operation}|Rest], StringSoFar) -> + R(Rest, + io_lib:format("~s ~f% ~w, ",[StringSoFar, Fraction * 100.0, Operation])) + end, + DataHolder = + fun DataHolderFun(Data)-> + receive + {get_data, Pid} -> Pid ! {message_bench_data, Data}; + D -> DataHolderFun([Data,D]) + end + end, + DataHolderPid = spawn_link(fun()-> DataHolder([]) end), + PrintData = + fun (Str, List) -> + io:format(Str, List), + DataHolderPid ! io_lib:format(Str, List) + end, + GetData = + fun () -> + DataHolderPid ! {get_data, self()}, + receive {message_bench_data, Data} -> Data end + end, + %% Function that runs a benchmark instance and returns the number + %% of operations that were performed and how long time they took + %% to perform + RunBenchmark = + fun({NrOfProcs, Scenario, Duration, SpawnOpts}) -> + ProbHelpTab = CalculateOpsProbHelpTab(Scenario, 0), + ParentPid = self(), + ReceiveFun = + fun ReceiveFun(NrOfStops, ReceiveCount) when NrOfStops =:= NrOfProcs -> + ParentPid ! {done_nothing_more_to_receive, ReceiveCount}; + ReceiveFun(NrOfStops, ReceiveCount) -> + receive + Msg -> + case Msg of + stop -> + ReceiveFun(NrOfStops + 1, ReceiveCount); + off_heap -> + erlang:process_flag(message_queue_data, off_heap), + ReceiveFun(NrOfStops, ReceiveCount + 1); + on_heap -> + erlang:process_flag(message_queue_data, on_heap), + ReceiveFun(NrOfStops, ReceiveCount + 1); + _X -> + ReceiveFun(NrOfStops, ReceiveCount + 1) + end + end + end, + Receiver = + spawn_opt( + fun() -> + process_flag(trap_exit, true), + ReceiveFun(0, 0) + end, + SpawnOpts), + Worker = + fun() -> + receive start -> ok end, + WorksDone = + do_work(0, ProbHelpTab, Operations, Receiver), + ParentPid ! {works_done, WorksDone}, + Receiver ! stop + end, + ChildPids = + lists:map(fun(_N) -> spawn_link(Worker) end, lists:seq(1, NrOfProcs)), + erlang:garbage_collect(), + timer:sleep(RecoverTimeMs), + lists:foreach(fun(Pid) -> Pid ! start end, ChildPids), + timer:sleep(Duration), + lists:foreach(fun(Pid) -> Pid ! stop end, ChildPids), + TotalWorksDone = lists:foldl( + fun(_, Sum) -> + receive + {works_done, Count} -> Sum + Count + end + end, 0, ChildPids), + {TimeAfterSends, ok} = + timer:tc( + fun() -> + receive + {done_nothing_more_to_receive, ReceiveCount} -> + %% Sanity check + ReceiveCount = TotalWorksDone, + ok + end + end), + {Duration + (TimeAfterSends div 1000), TotalWorksDone} + end, + RunBenchmarkInSepProcess = + fun(ParameterTuple) -> + P = self(), + Results = + [begin + spawn_link(fun()-> P ! {bench_result, RunBenchmark(ParameterTuple)} end), + receive {bench_result, Res} -> Res end + end || _ <- lists:seq(1, NrOfRepeats)], + {R1, R2} = lists:foldl(fun ({I1, I2}, {A1, A2}) -> + {I1 + A1, I2 + A2} + end, {0, 0}, Results), + {R1 / NrOfRepeats, R2 / NrOfRepeats} + end, + RunBenchmarkAndReport = + fun(ThreadCount, + Scenario, + Duration, + ReportReceive, + SpawnOpts) -> + {ReceiveTime, NrOfSends} = + RunBenchmarkInSepProcess({ThreadCount, + Scenario, + Duration, + SpawnOpts}), + Throughput = + case ReportReceive of + true -> + NrOfSends/(ReceiveTime/1000.0); + false -> + NrOfSends/(Duration/1000.0) + end, + PrintData("; ~f",[Throughput]), + Name = io_lib:format("Scenario: ~w, " + "# of Processes: ~w", + [Scenario, ThreadCount]), + NotifyResFun(Name, Throughput) + end, + ThreadCounts = + case ThreadCountsOpt of + not_set -> + CalculateThreadCounts([1]); + _ -> ThreadCountsOpt + end, + Version = + (fun() -> + VersionString = erlang:system_info(system_version), + case re:run(VersionString, "\\[(source\\-[^\\]]+)\\]") of + {match, [_, {StartPos, Length}]} -> + string:slice(VersionString, StartPos, Length); + _ -> + erlang:system_info(otp_release) + end + end)(), + %% Run the benchmark + PrintData("# Each instance of the benchmark runs for ~w seconds:~n", [BenchmarkDurationMs/1000]), + PrintData("# The result of a benchmark instance is presented as a number representing~n",[]), + PrintData("# the number of operations performed per second:~n~n~n",[]), + PrintData("# To plot graphs for the results below:~n",[]), + PrintData("# 1. Open \"$ERL_TOP/erts/test/parallel_messages_SUITE_data/visualize_throughput.html\" in a web browser~n",[]), + PrintData("# 2. Copy the lines between \"#BENCHMARK STARTED$\" and \"#BENCHMARK ENDED$\" below~n",[]), + PrintData("# 3. Paste the lines copied in step 2 to the text box in the browser window opened in~n",[]), + PrintData("# step 1 and press the Render button~n~n",[]), + PrintData("#BENCHMARK STARTED$~n",[]), + %% The following loop runs all benchmark scenarios and prints the results (i.e, operations/second) + lists:foreach( + fun(SpawnOpts) -> + lists:foreach( + fun(Scenario) -> + lists:foreach( + fun(ReportReceiveThroughput) -> + PrintData("Scenario: ~s, send_duration=~w ms, ~s, Spawn Options=~w$~n", + [case ReportReceiveThroughput of + true -> "Receive Throughput"; + false -> "Send Throughput" + end, + BenchmarkDurationMs, + RenderScenario(Scenario, ""), + SpawnOpts]), + lists:foreach( + fun(ThreadCount) -> + PrintData("; ~w",[ThreadCount]) + end, + ThreadCounts), + PrintData("$~n",[]), + PrintData(Version,[]), + lists:foreach( + fun(ThreadCount) -> + %erlang:display({thread_count, ThreadCount}), + RunBenchmarkAndReport(ThreadCount, + Scenario, + BenchmarkDurationMs, + ReportReceiveThroughput, + SpawnOpts) + end, + ThreadCounts), + PrintData("$~n",[]) + end, + ReportReceiveThroughputList) + end, + Scenarios) + end, + SpawnOptsList), + PrintData("~n#BENCHMARK ENDED$~n~n",[]), + DataDir = filename:join(filename:dirname(code:which(?MODULE)), "parallel_messages_SUITE_data"), + TemplatePath = filename:join(DataDir, "visualize_throughput.html"), + {ok, Template} = file:read_file(TemplatePath), + OutputData = string:replace(Template, "#bench_data_placeholder", GetData()), + OutputPath1 = filename:join(DataDir, "message_bench_result.html"), + {{Year, Month, Day}, {Hour, Minute, Second}} = calendar:now_to_datetime(erlang:timestamp()), + StrTime = lists:flatten(io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w",[Year,Month,Day,Hour,Minute,Second])), + OutputPath2 = filename:join(DataDir, io_lib:format("message_bench_result_~s.html", [StrTime])), + file:write_file(OutputPath1, OutputData), + file:write_file(OutputPath2, OutputData), + PrintResultPathsFun(OutputPath2, OutputPath1). + + +throughput_benchmark() -> + throughput_benchmark( + #parallel_messages_bench_config{ + benchmark_duration_ms = 500, + recover_time_ms = 500, + thread_counts = not_set, + nr_of_repeats = 1, + report_receive_throughput = [false, true], + spawn_opts = [[{message_queue_data, off_heap}]], + scenarios = + [ + [ + {1.0, {message_size, 1}} + ], + [ + {1.0, {exit_signal_size, 3}} + ], + [ + {0.5, {exit_signal_size, 1}}, + {0.5, {message_size, 1}} + ] + ], + notify_res_fun = fun(_Name, _Throughput) -> ok end, + print_result_paths_fun = + fun(ResultPath, _LatestResultPath) -> + Comment = + io_lib:format("<a href=\"file:///~s\">Result visualization</a>",[ResultPath]), + {comment, Comment} + end + }). + +test_throughput_benchmark(_) -> + throughput_benchmark(). + +large_throughput_benchmark() -> + throughput_benchmark( + #parallel_messages_bench_config{ + benchmark_duration_ms = 1000, + recover_time_ms = 1000, + thread_counts = [1,2,4,8,15,16,31,32,47,48,63,64], + nr_of_repeats = 3, + report_receive_throughput = [false, true], + spawn_opts = [[{message_queue_data, off_heap}]], + scenarios = + [ + [ + {1.0, {message_size, 1}} + ], + [ + {1.0, {message_size, 10}} + ], + [ + {1.0, {message_size, 100}} + ], + [ + {1.0, {message_size, 1000}} + ], + [ + {1.0, {exit_signal_size, 1}} + ], + [ + {1.0, {exit_signal_size, 10}} + ], + [ + {1.0, {exit_signal_size, 100}} + ], + [ + {1.0, {exit_signal_size, 1000}} + ], + [ + {0.5, {exit_signal_size, 1}}, + {0.5, {message_size, 1}} + ], + [ + {0.5, {exit_signal_size, 10}}, + {0.5, {message_size, 10}} + ], + [ + {0.5, {exit_signal_size, 100}}, + {0.5, {message_size, 100}} + ], + [ + {0.5, {exit_signal_size, 1000}}, + {0.5, {message_size, 1000}} + ] + ], + notify_res_fun = + fun(Name, Throughput) -> + io:format("~n~n#Name: ~s Throughput: ~w~n~n", [Name, Throughput]) + end, + print_result_paths_fun = + fun stdout_notify_res/2 + }). + +test_message_queue_data_switching(_) -> + throughput_benchmark( + #parallel_messages_bench_config{ + benchmark_duration_ms = 100, + recover_time_ms = 500, + thread_counts = [1,2,4], + nr_of_repeats = 1, + report_receive_throughput = [true], + spawn_opts = [[{message_queue_data, off_heap}]], + scenarios = + [ + [ + {0.499995, {exit_signal_size, 1}}, + {0.499995, {message_size, 1}}, + %% About 1 in 100k changes message data type + {0.000005, {message_queue_data_change, off_heap}}, + {0.000005, {message_queue_data_change, on_heap}} + ] + ], + notify_res_fun = fun(_Name, _Throughput) -> ok end, + print_result_paths_fun = + fun(ResultPath, _LatestResultPath) -> + Comment = + io_lib:format("<a href=\"file:///~s\">Result visualization</a>",[ResultPath]), + {comment, Comment} + end + }). diff --git a/erts/test/parallel_messages_SUITE_data/visualize_throughput.html b/erts/test/parallel_messages_SUITE_data/visualize_throughput.html new file mode 100644 index 0000000000..92519876c9 --- /dev/null +++ b/erts/test/parallel_messages_SUITE_data/visualize_throughput.html @@ -0,0 +1,302 @@ +<!doctype html> +<html lang="en"> + +<!-- %% --> +<!-- %% %CopyrightBegin% --> +<!-- %% --> +<!-- %% Copyright Ericsson AB and Kjell Winblad 1996-2020. All Rights Reserved. --> +<!-- %% --> +<!-- %% Licensed under the Apache License, Version 2.0 (the "License"); --> +<!-- %% you may not use this file except in compliance with the License. --> +<!-- %% You may obtain a copy of the License at --> +<!-- %% --> +<!-- %% http://www.apache.org/licenses/LICENSE-2.0 --> +<!-- %% --> +<!-- %% Unless required by applicable law or agreed to in writing, software --> +<!-- %% distributed under the License is distributed on an "AS IS" BASIS, --> +<!-- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --> +<!-- %% See the License for the specific language governing permissions and --> +<!-- %% limitations under the License. --> +<!-- %% --> +<!-- %% %CopyrightEnd% --> +<!-- %% --> +<!-- %% Author: Kjell Winblad --> +<!-- %% --> + + <head> + <meta charset="utf-8"> + <title>Message Send/Receive Benchmark Result Viewer</title> + </head> + + <body> + <div id="insertPlaceholder"></div> + <h1>Message Send/Receive Benchmark Result Viewer</h1> + <p> + This page generates graphs from data produced by the Message Send/Receive Benchmark which is defined in the function <code>parallel_messages_SUITE:test_throughput_benchmark/1</code> (see "<code>$ERL_TOP/erts/test/parallel_messages_SUITE.erl</code>"). + </p> + <p> + Note that one can paste results from several benchmark runs into the field below. Results from the same scenario but from different benchmark runs will be relabeled and plotted in the same graph automatically. + </p> + <p> + Note also that that lines can be hidden by clicking on the corresponding label. + </p> + Paste the generated data in the field below and press the Render button: + <br> + <textarea id="dataField" rows="4" cols="50">#bench_data_placeholder</textarea> + <br> + <input type="checkbox" id="throughputPlot" checked> Include Throughput Plot + <br> + <input type="checkbox" id="betterThanWorstPlot"> Include % More Throughput Than Worst Plot + <br> + <input type="checkbox" id="worseThanBestPlot"> Include % Less Throughput Than Best Plot + <br> + <input type="checkbox" id="barPlot"> Bar Plot + <br> + <input type="checkbox" id="sameSpacing" checked> Same X Spacing Between Points + <br> + <button id="renderButton" type="button">Render</button> + + <script src="https://code.jquery.com/jquery-3.3.1.slim.min.js" + integrity="sha256-3edrmyuQ0w65f8gfBsqowzjJe2iM6n0nKciPUp8y+7E=" + crossorigin="anonymous"></script> + <script> + var loading = false; + function toggleLoadingScreen(){ + if(loading){ + $("#loading").remove(); + loading = false; + }else{ + $('<div id="loading">'+ + '<span style="position: fixed; top: 50%;left: 50%;color: white;"><b>Loading...</b></span>'+ + '</div>') + .css({position: "fixed", + top: 0, + left: 0, + width: "100%", + height: "100%", + 'background-color': "#000", + filter:"alpha(opacity=50)", + '-moz-opacity':"0.5", + '-khtml-opacity': "0.5", + opacity: "0.5", + 'z-index': "10000"}) + .appendTo(document.body); + loading = true; + + } + } + //Start loading screen before downloading plotly which is quite large + toggleLoadingScreen(); + </script> + <script src="https://cdn.plot.ly/plotly-1.5.0.min.js"></script> + <script> + String.prototype.replaceAll = function(search, replacement) { + var target = this; + return target.split(search).join(replacement); + }; + String.prototype.myTrim = function() { + var target = this; + return target.replace(/^\s+|\s+$/g, ''); + }; + function plotGraph(lines, sameSpacing, barPlot, prefix) { + var xvals = null; + var data = []; + while(lines.length > 0 && + (lines[0].myTrim() == "" || + lines[0].myTrim().indexOf(";") !== -1)){ + var line = lines.shift().myTrim(); + if(line == "" || line.startsWith("#")){ + continue; + } else if(line.startsWith(";")) { + xvals = line.split(";") + xvals.shift(); // Remove first + xvals = $.map(xvals, function (i){ + if(sameSpacing){ + return "_"+i.myTrim(); + }else{ + return parseInt(i.myTrim(), 10); + } + }); + }else{ + line = line.split(";") + var label = prefix + line.shift().myTrim(); + var yvals = $.map(line, function (i){ + return parseFloat(i.myTrim(), 10); + }); + var trace = { + x: xvals, + y: yvals, + mode: 'lines+markers', + name: label + }; + if(barPlot){ + trace['type'] = "bar"; + } + data.push(trace); + } + + } + return data; + } + function toCompareData(dataParam, compareWithWorst) { + var data = $.extend(true, [], dataParam); + var worstSoFarMap = {}; + var defaultSoFarValue = compareWithWorst ? Number.MAX_VALUE : Number.MIN_VALUE; + function getWorstBestSoFar(x){ + return worstSoFarMap[x] === undefined ? defaultSoFarValue : worstSoFarMap[x]; + } + function setWorstBestSoFar(x, y){ + return worstSoFarMap[x] = y; + } + function lessOrGreaterThan(n1, n2){ + return compareWithWorst ? n1 < n2 : n1 > n2; + } + $.each(data, function(i, allResConfig) { + $.each(allResConfig.y, function(index, res) { + var xName = allResConfig.x[index]; + if(lessOrGreaterThan(res, getWorstBestSoFar(xName))){ + setWorstBestSoFar(xName, res); + } + }); + }); + $.each(data, function(i, allResConfig) { + $.each(allResConfig.y, function(index, res) { + var xName = allResConfig.x[index]; + if(compareWithWorst){ + allResConfig.y[index] = ((res / getWorstBestSoFar(xName))-1.0) * 100; + }else{ + allResConfig.y[index] = (1.0 -(res / getWorstBestSoFar(xName))) * 100; + } + }); + }); + return data; + } + function toBetterThanWorstData(data){ + return toCompareData(data, true); + } + function toWorseThanBestData(data){ + return toCompareData(data, false); + } + function plotGraphs(){ + var insertPlaceholder = $("#insertPlaceholder"); + var sameSpacing = $('#sameSpacing').is(":checked"); + var barPlot = $('#barPlot').is(":checked"); + var throughputPlot = $('#throughputPlot').is(":checked"); + var betterThanWorstPlot = $('#betterThanWorstPlot').is(":checked"); + var worseThanBestPlot = $('#worseThanBestPlot').is(":checked"); + var lines = $("#dataField").val(); + $('.showCheck').each(function() { + var item = $(this); + if(!item.is(":checked")){ + lines = lines.replaceAll(item.val(), "#"+item.val()) + } + }); + lines = lines.split("$"); + var nrOfGraphs = 0; + var scenarioDataMap = {}; + var scenarioNrOfVersionsMap = {}; + var scenarioList = []; + while(lines.length > 0){ + var line = lines.shift().myTrim(); + if(line == ""){ + continue; + } else if(line.startsWith("Scenario:")) { + nrOfGraphs = nrOfGraphs + 1; + var name = line; + if(scenarioDataMap[name] === undefined){ + scenarioDataMap[name] = []; + scenarioNrOfVersionsMap[name] = 0; + scenarioList.push(line); + } + scenarioNrOfVersionsMap[name] = scenarioNrOfVersionsMap[name] + 1; + var prefix = undefined; + if(scenarioNrOfVersionsMap[name] === 1){ + prefix = ""; + }else{ + prefix = "Ver: " + scenarioNrOfVersionsMap[name] + " "; + } + scenarioDataMap[name] = + scenarioDataMap[name].concat( + plotGraph(lines, sameSpacing, barPlot, prefix)); + } + } + var nrOfGraphs = 0; + function plotScenario(name, plotType) { + var data = scenarioDataMap[name]; + var yAxisTitle = undefined; + nrOfGraphs = nrOfGraphs + 1; + $("<div class='added' id='graph" + nrOfGraphs + "'>") + .insertBefore(insertPlaceholder); + $("<button type='button' class='added' id='fullscreenButton" + nrOfGraphs + "'>Fill screen</button>") + .insertBefore(insertPlaceholder); + $("<span class='added'><br><hr><br></span>") + .insertBefore(insertPlaceholder); + if (plotType === 'throughput') { + yAxisTitle = 'Operations/Second'; + } else if (plotType === 'better_than_worst') { + yAxisTitle = '% More Throughput Than Worst'; + data = toBetterThanWorstData(data); + } else { + yAxisTitle = '% Less Throughput Than Best'; + data = toWorseThanBestData(data); + } + var layout = { + title: name, + xaxis: { + title: '# of Processes' + }, + yaxis: { + title: yAxisTitle + } + }; + $("#fullscreenButton" + nrOfGraphs).click( + function () { + $('#graph' + nrOfGraphs).replaceWith( + $("<div class='added' id='graph" + nrOfGraphs + "'>")); + layout = $.extend({}, layout, { + width: $(window).width() - 40, + height: $(window).height() - 40 + }); + Plotly.newPlot('graph' + nrOfGraphs, data, layout); + }); + Plotly.newPlot('graph' + nrOfGraphs, data, layout); + } + $.each(scenarioList, + function (index, name) { + if (throughputPlot) { + plotScenario(name, 'throughput'); + } + if (betterThanWorstPlot) { + plotScenario(name, 'better_than_worst'); + } + if (worseThanBestPlot) { + plotScenario(name, 'worse_than_best'); + } + }); + } + $(document).ready(function(){ + $('#renderButton').click( + function(){ + toggleLoadingScreen(); + setTimeout(function(){ + try { + $( ".added" ).remove(); + plotGraphs(); + toggleLoadingScreen(); + } catch(e){ + toggleLoadingScreen(); + console.log(e); + alert("Error happened when parsing data.\n" + + "See console for more info"); + } + }, 10); + }); + setTimeout(function(){ + $( ".added" ).remove(); + plotGraphs(); + toggleLoadingScreen(); + }, 10); + }); + </script> + </body> +</html> -- 2.31.1
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor