summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2015-10-01 15:09:33 +0200
committerLinus Nordberg <linus@nordu.net>2015-11-11 13:32:37 +0100
commitf7a0018fb849bf0baefbea4af16ce8ce61ec69d0 (patch)
tree3afb7bffe904a7dc065d1a1e44e98c89c76f2515
parent0a3e6aafee314eaf9e5343c4cad89a9e2ae1d913 (diff)
Added util:parallel_map and use it when checking entries.
-rw-r--r--src/frontend.erl11
-rw-r--r--src/util.erl84
-rwxr-xr-xtest/check.erl4
3 files changed, 91 insertions, 8 deletions
diff --git a/src/frontend.erl b/src/frontend.erl
index b7fa4b1..1055e5f 100644
--- a/src/frontend.erl
+++ b/src/frontend.erl
@@ -241,16 +241,17 @@ check_entries_int(Entries, Start, End) ->
end, [], lists:zip(Entries, lists:seq(Start, End))).
check_entries_noreverse(Entries, Start, End) ->
- lists:foldl(fun ({Hash, Index}, Acc) ->
- lager:info("checking entry ~p", [Index]),
- case check_entry_noreverse(Hash, Index) of
+ Results = util:parallel_map(fun ({Hash, Index}) ->
+ check_entry_noreverse(Hash, Index)
+ end, lists:zip(Entries, lists:seq(Start, End)), 2),
+ lists:foldl(fun (Result, Acc) ->
+ case Result of
ok ->
- lager:info("entry ~p is correct", [Index]),
Acc;
Error ->
[Error | Acc]
end
- end, [], lists:zip(Entries, lists:seq(Start, End))).
+ end, [], Results).
entryhash_from_entry(Entry) ->
{ok, {Module, Function}} = application:get_env(plop, entryhash_from_entry),
diff --git a/src/util.erl b/src/util.erl
index c3b30db..af78b93 100644
--- a/src/util.erl
+++ b/src/util.erl
@@ -4,7 +4,7 @@
-module(util).
-export([tempfilename/1, fsync/1, fsync/2, exit_with_error/3,
check_error/3, write_tempfile_and_rename/3,
- spawn_and_wait/1]).
+ spawn_and_wait/1, parallel_map/3]).
-spec tempfilename(string()) -> string().
tempfilename(Base) ->
@@ -73,3 +73,85 @@ spawn_and_wait(Fun) ->
{result, ChildPid, Result} ->
Result
end.
+
+
+parallel_map_worker_loop(ParentPid, Fun, N) ->
+ receive
+ {parallel_map_request, ParentPid, Input} ->
+ Result = Fun(Input),
+ ParentPid ! {parallel_map_result, self(), Result},
+ parallel_map_worker_loop(ParentPid, Fun, N);
+ {parallel_map_stop, ParentPid} ->
+ ok
+ end.
+
+parallel_map_worker(ParentPid, Fun, N) ->
+ try
+ parallel_map_worker_loop(ParentPid, Fun, N)
+ catch
+ Type:What ->
+ [CrashFunction | Stack] = erlang:get_stacktrace(),
+ lager:error("Crashed process: ~p ~p~n ~p~n ~p~n", [Type, What, CrashFunction, Stack]),
+ ParentPid ! {parallel_map_crash, self()}
+ end.
+
+parallel_map_loop([], _FreeChildren, WorkingChildren, Acc) ->
+ case queue:out(WorkingChildren) of
+ {{value, FirstChild}, NewWorkingChildren} ->
+ receive
+ {parallel_map_result, FirstChild, Result} ->
+ parallel_map_loop([], [FirstChild], NewWorkingChildren, [Result | Acc]);
+ {parallel_map_crash, FirstChild} ->
+ crash
+ end;
+ {empty, _} ->
+ Acc
+ end;
+
+parallel_map_loop(Items, [], WorkingChildren, Acc) ->
+ {{value, FirstChild}, NewWorkingChildren} = queue:out(WorkingChildren),
+ receive
+ {parallel_map_result, FirstChild, Result} ->
+ parallel_map_loop(Items, [FirstChild], NewWorkingChildren, [Result | Acc]);
+ {parallel_map_crash, FirstChild} ->
+ crash
+ end;
+
+parallel_map_loop([Item|Rest], [FreeChild|FreeChildren], WorkingChildren, Acc) ->
+ FreeChild ! {parallel_map_request, self(), Item},
+ parallel_map_loop(Rest, FreeChildren, queue:in(FreeChild, WorkingChildren), Acc).
+
+
+parallel_map(Fun, List, Parallel) ->
+ ParentPid = self(),
+ ChildPids = lists:map(fun(N) ->
+ spawn_link(fun () ->
+ parallel_map_worker(ParentPid, Fun, N)
+ end)
+ end, lists:seq(1, Parallel)),
+ case parallel_map_loop(List, ChildPids, queue:new(), []) of
+ crash ->
+ exit(crash);
+ Result ->
+ lists:foreach(fun (Child) ->
+ Child ! {parallel_map_stop, self()}
+ end, ChildPids),
+ lists:reverse(Result)
+ end.
+
+%%%%%%%%%%%%%%%%%%%%
+
+-include_lib("eunit/include/eunit.hrl").
+
+fact(N) ->
+ fact(N, 1).
+
+fact(1, Acc) ->
+ Acc;
+fact(N, Acc) ->
+ fact(N - 1, Acc * N).
+
+parallel_map_test() ->
+ Result1 = lists:map(fun fact/1, lists:seq(1, 2000)),
+ Result2 = parallel_map(fun fact/1, lists:seq(1, 2000), 10),
+ ?assertEqual(Result1, Result2).
diff --git a/test/check.erl b/test/check.erl
index f4fdb8d..fd726e5 100755
--- a/test/check.erl
+++ b/test/check.erl
@@ -9,5 +9,5 @@
main(_) ->
ok = ht:test(),
ok = ts:test(),
- ok = tlv:test().
-
+ ok = tlv:test(),
+ ok = util:test().