Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:24
erlang
2016-dialyzer_coordinator-dialyzer_worker-Refac...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 2016-dialyzer_coordinator-dialyzer_worker-Refactor-for-si.patch of Package erlang
From f0abe18ef2bc3095a07e0fc5dfc5eaaae6f06d53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Gustavsson?= <bjorn@erlang.org> Date: Thu, 16 Sep 2021 05:06:12 +0200 Subject: [PATCH 16/20] dialyzer_coordinator, dialyzer_worker: Refactor for simplicity and clarity --- lib/dialyzer/src/dialyzer_coordinator.erl | 212 +++++++++++++--------- lib/dialyzer/src/dialyzer_worker.erl | 120 ++++-------- 2 files changed, 161 insertions(+), 171 deletions(-) diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 866eb326b2..2ca3acc4cb 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -11,22 +11,21 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. - -%%%------------------------------------------------------------------- -%%% File : dialyzer_coordinator.erl -%%% Authors : Stavros Aronis <aronisstav@gmail.com> -%%%------------------------------------------------------------------- +%% +%% Original author: Stavros Aronis <aronisstav@gmail.com> +%% +%% Purpose: Spawn and coordinate parallel jobs. -module(dialyzer_coordinator). %%% Export for dialyzer main process -export([parallel_job/4]). -%%% Export for all possible workers --export([job_done/3]). +%%% Exports for all workers +-export([request_activation/1, job_done/3]). %%% Exports for the typesig and dataflow analysis workers --export([sccs_to_pids/2, request_activation/1]). +-export([wait_for_success_typings/2]). %%% Exports for the compilation workers -export([get_next_label/2]). @@ -37,7 +36,7 @@ -type collector() :: pid(). -type regulator() :: pid(). --type scc_to_pid() :: ets:tid() | 'unused'. +-type scc_to_pid() :: ets:tid() | 'none'. -opaque coordinator() :: {collector(), regulator(), scc_to_pid()}. -type timing() :: dialyzer_timing:timing_server(). @@ -105,6 +104,7 @@ -include("dialyzer.hrl"). %%-------------------------------------------------------------------- +%% API functions for the main dialyzer process. -spec parallel_job('compile', [compile_job()], compile_init_data(), timing()) -> {compile_result(), integer()}; @@ -125,74 +125,143 @@ parallel_job(Mode, Jobs, InitData, Timing) -> State = spawn_jobs(Mode, Jobs, InitData, Timing), collect_result(State). +%%-------------------------------------------------------------------- +%% API functions for workers (dialyzer_worker). + +-spec request_activation(coordinator()) -> ok. + +request_activation({_Collector, Regulator, _SCCtoPid}) -> + Regulator ! {req, self()}, + wait_activation(). + +-spec job_done(job(), job_result(), coordinator()) -> ok. + +job_done(Job, Result, {Collector, Regulator, _SCCtoPid}) -> + Regulator ! done, + Collector ! {done, Job, Result}, + ok. + +-spec get_next_label(integer(), coordinator()) -> integer(). + +%% For the 'compile' worker. +get_next_label(EstimatedSize, {Collector, _Regulator, _SCCtoPid}) -> + Collector ! {next_label_request, EstimatedSize, self()}, + receive + {next_label_reply, NextLabel} -> NextLabel + end. + +-spec wait_for_success_typings([scc() | module()], coordinator()) -> + 'ok'. + +%% Helper for 'sigtype' and 'dataflow' workers. +wait_for_success_typings(SCCs, {_Collector, _Regulator, SCCtoPid}) -> + F = fun(SCC) -> + %% The SCCs that SCC depends on have always been started. + try ets:lookup_element(SCCtoPid, SCC, 2) of + Pid when is_pid(Pid) -> + Ref = erlang:monitor(process, Pid), + receive + {'DOWN', Ref, process, Pid, _Info} -> + ok + end + catch + _:_ -> + %% Already finished. + ok + end + end, + lists:foreach(F, SCCs). + + +%%-------------------------------------------------------------------- +%% Local functions. + spawn_jobs(Mode, Jobs, InitData, Timing) -> Collector = self(), Regulator = spawn_regulator(), - TypesigOrDataflow = (Mode =:= 'typesig') orelse (Mode =:= 'dataflow'), - SCCtoPID = - case TypesigOrDataflow of - true -> ets:new(scc_to_pid, [{read_concurrency, true}]); - false -> unused - end, - Coordinator = {Collector, Regulator, SCCtoPID}, - JobFun = - fun(Job) -> - Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), - case TypesigOrDataflow of - true -> true = ets:insert(SCCtoPID, {Job, Pid}); - false -> true - end + + SCCtoPid = + if + Mode =:= 'typesig'; Mode =:= 'dataflow' -> + ets:new(scc_to_pid, [{read_concurrency, true}]); + true -> + none end, - JobCount = length(Jobs), - NumberOfInitJobs = min(JobCount, 20 * dialyzer_utils:parallelism()), - {InitJobs, RestJobs} = lists:split(NumberOfInitJobs, Jobs), - lists:foreach(JobFun, InitJobs), + + Coordinator = {Collector, Regulator, SCCtoPid}, + + JobFun = job_fun(SCCtoPid, Mode, InitData, Coordinator), + + %% Limit the number of processes we start in order to save memory. + MaxNumberOfInitJobs = 20 * dialyzer_utils:parallelism(), + RestJobs = launch_jobs(Jobs, JobFun, MaxNumberOfInitJobs), + Unit = case Mode of 'typesig' -> "SCCs"; _ -> "modules" end, + JobCount = length(Jobs), dialyzer_timing:send_size_info(Timing, JobCount, Unit), + InitResult = case Mode of 'compile' -> dialyzer_analysis_callgraph:compile_init_result(); _ -> [] end, + #state{mode = Mode, active = JobCount, result = InitResult, next_label = 0, job_fun = JobFun, jobs = RestJobs, - init_data = InitData, regulator = Regulator, scc_to_pid = SCCtoPID}. + init_data = InitData, regulator = Regulator, scc_to_pid = SCCtoPid}. + +launch_jobs(Jobs, _JobFun, 0) -> + Jobs; +launch_jobs([Job|Jobs], JobFun, N) -> + JobFun(Job), + launch_jobs(Jobs, JobFun, N - 1); +launch_jobs([], _JobFun, _) -> + []. + +job_fun(none, Mode, InitData, Coordinator) -> + fun(Job) -> + _ = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), + ok + end; +job_fun(SCCtoPid, Mode, InitData, Coordinator) -> + fun(Job) -> + Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), + true = ets:insert(SCCtoPid, {Job, Pid}), + ok + end. collect_result(#state{mode = Mode, active = Active, result = Result, next_label = NextLabel, init_data = InitData, jobs = JobsLeft, job_fun = JobFun, - regulator = Regulator, scc_to_pid = SCCtoPID} = State) -> + regulator = Regulator, scc_to_pid = SCCtoPid} = State) -> receive {next_label_request, Estimation, Pid} -> Pid ! {next_label_reply, NextLabel}, collect_result(State#state{next_label = NextLabel + Estimation}); {done, Job, Data} -> NewResult = update_result(Mode, InitData, Job, Data, Result), - TypesigOrDataflow = (Mode =:= 'typesig') orelse (Mode =:= 'dataflow'), case Active of 1 -> + %% This was the last running job. Clean up and return the result. kill_regulator(Regulator), case Mode of 'compile' -> {NewResult, NextLabel}; - _ when TypesigOrDataflow -> - ets:delete(SCCtoPID), - NewResult; - 'warnings' -> - NewResult; - 'contract_remote_types' -> - NewResult; - 'record_remote_types' -> - NewResult + _ -> + if + SCCtoPid =:= none -> ok; + true -> ets:delete(SCCtoPid) + end, + NewResult end; N -> - case TypesigOrDataflow of - true -> true = ets:delete(SCCtoPID, Job); - false -> true + if + SCCtoPid =:= none -> ok; + true -> true = ets:delete(SCCtoPid, Job) end, NewJobsLeft = case JobsLeft of @@ -219,36 +288,13 @@ update_result(Mode, InitData, Job, Data, Result) -> Data ++ Result end. --spec sccs_to_pids([scc() | module()], coordinator()) -> - [dialyzer_worker:worker()]. - -sccs_to_pids(SCCs, {_Collector, _Regulator, SCCtoPID}) -> - Fold = - fun(SCC, Pids) -> - %% The SCCs that SCC depends on have always been started. - try ets:lookup_element(SCCtoPID, SCC, 2) of - Pid when is_pid(Pid) -> - [Pid|Pids] - catch - _:_ -> Pids - end - end, - lists:foldl(Fold, [], SCCs). - --spec job_done(job(), job_result(), coordinator()) -> ok. - -job_done(Job, Result, {Collector, Regulator, _SCCtoPID}) -> - Regulator ! done, - Collector ! {done, Job, Result}, - ok. - --spec get_next_label(integer(), coordinator()) -> integer(). - -get_next_label(EstimatedSize, {Collector, _Regulator, _SCCtoPID}) -> - Collector ! {next_label_request, EstimatedSize, self()}, - receive - {next_label_reply, NextLabel} -> NextLabel - end. +%%-------------------------------------------------------------------- +%% The regulator server +%% +%% The regulator limits the number of simultaneous running jobs to the +%% number of schedulers. Note that there are usually many more worker +%% processes started, but they are only allowed to do light work (such +%% as monitoring other processes) when they have not been activated. -spec wait_activation() -> ok. @@ -258,12 +304,6 @@ wait_activation() -> activate_pid(Pid) -> Pid ! activate. --spec request_activation(coordinator()) -> ok. - -request_activation({_Collector, Regulator, _SCCtoPID}) -> - Regulator ! {req, self()}, - wait_activation(). - spawn_regulator() -> InitTickets = dialyzer_utils:parallelism(), spawn_link(fun() -> regulator_loop(InitTickets, queue:new()) end). @@ -279,15 +319,13 @@ regulator_loop(Tickets, Queue) -> regulator_loop(N-1, Queue) end; done -> - {Waiting, NewQueue} = queue:out(Queue), - NewTickets = - case Waiting of - empty -> Tickets + 1; - {value, Pid} -> - activate_pid(Pid), - Tickets - end, - regulator_loop(NewTickets, NewQueue); + case queue:out(Queue) of + {empty, NewQueue} -> + regulator_loop(Tickets + 1, NewQueue); + {{value, Pid}, NewQueue} -> + activate_pid(Pid), + regulator_loop(Tickets, NewQueue) + end; stop -> ok end. diff --git a/lib/dialyzer/src/dialyzer_worker.erl b/lib/dialyzer/src/dialyzer_worker.erl index 1cbee44fe0..c9ceb75a40 100644 --- a/lib/dialyzer/src/dialyzer_worker.erl +++ b/lib/dialyzer/src/dialyzer_worker.erl @@ -12,6 +12,8 @@ %% See the License for the specific language governing permissions and %% limitations under the License. +%% Purpose: Run individual jobs in separate processes. + -module(dialyzer_worker). -export([launch/4]). @@ -29,12 +31,9 @@ mode :: mode(), job :: job(), coordinator :: coordinator(), - init_data :: init_data(), - depends_on = [] :: list() + init_data :: init_data() }). --include("dialyzer.hrl"). - %% -define(DEBUG, true). -ifdef(DEBUG). @@ -55,96 +54,46 @@ launch(Mode, Job, InitData, Coordinator) -> spawn_link(fun() -> init(State) end). %%-------------------------------------------------------------------- +%% Local functions. -init(#state{job = SCC, mode = Mode, init_data = InitData, - coordinator = Coordinator} = State) when - Mode =:= 'typesig'; Mode =:= 'dataflow' -> - DependsOnSCCs = dialyzer_succ_typings:find_depends_on(SCC, InitData), - ?debug("~w: Deps ~p: ~p\n", [self(), SCC, DependsOnSCCs]), - Pids = dialyzer_coordinator:sccs_to_pids(DependsOnSCCs, Coordinator), - ?debug("~w: PidsDeps ~p\n", [self(), Pids]), - DependsOn = [{Pid, erlang:monitor(process, Pid)} || Pid <- Pids], - loop(updating, State#state{depends_on = DependsOn}); +init(#state{job = SCC, mode = Mode, init_data = InitData} = State) + when Mode =:= 'typesig'; Mode =:= 'dataflow' -> + wait_for_success_typings(SCC, InitData, State), + run(State); init(#state{mode = Mode} = State) when Mode =:= 'compile'; Mode =:= 'warnings'; Mode =:= 'contract_remote_types'; Mode =:= 'record_remote_types' -> - loop(running, State). - -loop(updating, #state{mode = Mode} = State) when - Mode =:= 'typesig'; Mode =:= 'dataflow' -> - ?debug("~w: Update: ~p\n", [self(), State#state.job]), - NextStatus = - case waits_more_success_typings(State) of - true -> waiting; - false -> running - end, - loop(NextStatus, State); -loop(waiting, #state{mode = Mode} = State) when - Mode =:= 'typesig'; Mode =:= 'dataflow' -> - ?debug("~w: Wait: ~p\n", [self(), State#state.job]), - NewState = wait_for_success_typings(State), - loop(updating, NewState); -loop(running, #state{mode = Mode} = State) when - Mode =:= 'contract_remote_types'; Mode =:= 'record_remote_types' -> - request_activation(State), - ?debug("~w: Remote types: ~p\n", [self(), State#state.job]), - Result = do_work(State), - report_to_coordinator(Result, State); -loop(running, #state{mode = 'compile'} = State) -> - request_activation(State), - ?debug("Compile: ~s\n",[State#state.job]), - Result = - case start_compilation(State) of - {ok, EstimatedSize, Data} -> - Label = ask_coordinator_for_label(EstimatedSize, State), - continue_compilation(Label, Data); - {error, _Reason} = Error -> - Error - end, - report_to_coordinator(Result, State); -loop(running, #state{mode = 'warnings'} = State) -> - request_activation(State), - ?debug("Warning: ~s\n",[State#state.job]), - Result = collect_warnings(State), - report_to_coordinator(Result, State); -loop(running, #state{mode = Mode} = State) when - Mode =:= 'typesig'; Mode =:= 'dataflow' -> - request_activation(State), - ?debug("~w: Run: ~p\n", [self(), State#state.job]), - NotFixpoint = do_work(State), - report_to_coordinator(NotFixpoint, State). - -waits_more_success_typings(#state{depends_on = Depends}) -> - Depends =/= []. - -wait_for_success_typings(#state{depends_on = DependsOn} = State) -> - receive - {'DOWN', Ref, process, Pid, _Info} -> - ?debug("~w: ~p got DOWN: ~p\n", [self(), State#state.job, Pid]), - State#state{depends_on = DependsOn -- [{Pid, Ref}]} - after - 5000 -> - ?debug("~w: Still Waiting ~p:\n ~p\n", [self(), State#state.job, DependsOn]), - State - end. + run(State). -request_activation(#state{coordinator = Coordinator}) -> - dialyzer_coordinator:request_activation(Coordinator). +run(#state{coordinator = Coordinator, job = Job} = State) -> + dialyzer_coordinator:request_activation(Coordinator), + Result = run_job(State), + ?debug("~w: Done: ~p\n",[self(), Job]), + dialyzer_coordinator:job_done(Job, Result, Coordinator). -do_work(#state{mode = Mode, job = Job, init_data = InitData}) -> +run_job(#state{mode = Mode, job = Job, init_data = InitData} = State) -> + ?debug("~w: ~p: ~p\n", [self(), Mode, Job]), case Mode of - typesig -> dialyzer_succ_typings:find_succ_types_for_scc(Job, InitData); - dataflow -> dialyzer_succ_typings:refine_one_module(Job, InitData); + compile -> + case start_compilation(State) of + {ok, EstimatedSize, Data} -> + Label = ask_coordinator_for_label(EstimatedSize, State), + continue_compilation(Label, Data); + {error, _Reason} = Error -> + Error + end; + typesig -> + dialyzer_succ_typings:find_succ_types_for_scc(Job, InitData); + dataflow -> + dialyzer_succ_typings:refine_one_module(Job, InitData); contract_remote_types -> dialyzer_contracts:process_contract_remote_types_module(Job, InitData); record_remote_types -> - dialyzer_utils:process_record_remote_types_module(Job, InitData) + dialyzer_utils:process_record_remote_types_module(Job, InitData); + warnings -> + dialyzer_succ_typings:collect_warnings(Job, InitData) end. -report_to_coordinator(Result, #state{job = Job, coordinator = Coordinator}) -> - ?debug("~w: Done: ~p\n",[self(), Job]), - dialyzer_coordinator:job_done(Job, Result, Coordinator). - start_compilation(#state{job = Job, init_data = InitData}) -> dialyzer_analysis_callgraph:start_compilation(Job, InitData). @@ -154,5 +103,8 @@ ask_coordinator_for_label(EstimatedSize, #state{coordinator = Coordinator}) -> continue_compilation(Label, Data) -> dialyzer_analysis_callgraph:continue_compilation(Label, Data). -collect_warnings(#state{job = Job, init_data = InitData}) -> - dialyzer_succ_typings:collect_warnings(Job, InitData). +%% Wait for the results of success typings of modules or SCCs that we +%% depend on. ('typesig' or 'dataflow' mode) +wait_for_success_typings(SCC, InitData, #state{coordinator = Coordinator}) -> + DependsOnSCCs = dialyzer_succ_typings:find_depends_on(SCC, InitData), + dialyzer_coordinator:wait_for_success_typings(DependsOnSCCs, Coordinator). -- 2.31.1
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor