%%% Copyright (c) 2014, NORDUnet A/S. %%% See LICENSE for licensing information. -module(util). -export([tempfilename/1, fsync/1, fsync/2, exit_with_error/3, check_error/3, write_tempfile_and_rename/3, spawn_and_wait/1, parallel_map/3]). -include("timeouts.hrl"). -spec tempfilename(string()) -> string(). tempfilename(Base) -> {MegaSecs, Secs, MicroSecs} = now(), Filename = io_lib:format("~s-~s-~p.~p", [Base, os:getpid(), MegaSecs * 1000000 + Secs, MicroSecs]), Filename. -spec fsync([string()]) -> ok. fsync(Paths, Timeout) -> case fsyncport:fsyncall(Paths, Timeout) of ok -> ok; {error, Error} -> exit_with_error(fsync, Error, "Error in fsync") end. fsync(Paths) -> fsync(Paths, ?UTIL_FSYNC_DEFAULT_TIMEOUT). -spec exit_with_error(atom(), atom(), string()) -> no_return(). exit_with_error(Operation, Error, ErrorMessage) -> io:format("~s(~w): ~w~n", [ErrorMessage, Operation, Error]), exit({fileerror, Operation, Error, ErrorMessage}). -spec check_error(any(), atom(), string()) -> ok. check_error(ReturnValue, Operation, ErrorMessage) -> case ReturnValue of ok -> ok; {error, Error} -> exit_with_error(Operation, Error, ErrorMessage) end. -spec write_tempfile_and_rename(string(), string(), binary()) -> ok. write_tempfile_and_rename(Name, NurseryName, Content) -> case file:open(NurseryName, [write, exclusive]) of {ok, File} -> ok = file:write(File, Content), file:close(File), check_error(file:rename(NurseryName, Name), rename, "Error when renaming tempfile to final file"); {error, eexist} -> %% Should not happen, file name should be unique exit_with_error(writefile, eexist, "File existed when creating tempfile"); {error, Error} -> exit_with_error(writefile, Error, "Error when creating tempfile") end. spawn_and_wait(Fun) -> ParentPid = self(), ChildPid = spawn_link(fun () -> try Result = Fun(), ParentPid ! {result, self(), Result} catch Type:What -> [CrashFunction | Stack] = erlang:get_stacktrace(), lager:error("Crashed process: ~p ~p~n ~p~n ~p~n", [Type, What, CrashFunction, Stack]), ParentPid ! {result, self(), crash} end end), receive {result, ChildPid, Result} -> Result end. parallel_map_worker_loop(ParentPid, Fun, N) -> receive {parallel_map_request, ParentPid, Input} -> Result = Fun(Input), ParentPid ! {parallel_map_result, self(), Result}, parallel_map_worker_loop(ParentPid, Fun, N); {parallel_map_stop, ParentPid} -> ok end. parallel_map_worker(ParentPid, Fun, N) -> try parallel_map_worker_loop(ParentPid, Fun, N) catch Type:What -> [CrashFunction | Stack] = erlang:get_stacktrace(), lager:error("Crashed process: ~p ~p~n ~p~n ~p~n", [Type, What, CrashFunction, Stack]), ParentPid ! {parallel_map_crash, self()} end. parallel_map_loop([], _FreeChildren, WorkingChildren, Acc) -> case queue:out(WorkingChildren) of {{value, FirstChild}, NewWorkingChildren} -> receive {parallel_map_result, FirstChild, Result} -> parallel_map_loop([], [FirstChild], NewWorkingChildren, [Result | Acc]); {parallel_map_crash, FirstChild} -> crash end; {empty, _} -> Acc end; parallel_map_loop(Items, [], WorkingChildren, Acc) -> {{value, FirstChild}, NewWorkingChildren} = queue:out(WorkingChildren), receive {parallel_map_result, FirstChild, Result} -> parallel_map_loop(Items, [FirstChild], NewWorkingChildren, [Result | Acc]); {parallel_map_crash, FirstChild} -> crash end; parallel_map_loop([Item|Rest], [FreeChild|FreeChildren], WorkingChildren, Acc) -> FreeChild ! {parallel_map_request, self(), Item}, parallel_map_loop(Rest, FreeChildren, queue:in(FreeChild, WorkingChildren), Acc). parallel_map(Fun, List, 1) -> lists:map(Fun, List); parallel_map(Fun, List, Parallel) -> ParentPid = self(), ChildPids = lists:map(fun(N) -> spawn_link(fun () -> parallel_map_worker(ParentPid, Fun, N) end) end, lists:seq(1, Parallel)), case parallel_map_loop(List, ChildPids, queue:new(), []) of crash -> exit(crash); Result -> lists:foreach(fun (Child) -> Child ! {parallel_map_stop, self()} end, ChildPids), lists:reverse(Result) end. %%%%%%%%%%%%%%%%%%%% -include_lib("eunit/include/eunit.hrl"). fact(N) -> fact(N, 1). fact(1, Acc) -> Acc; fact(N, Acc) -> fact(N - 1, Acc * N). parallel_map_test() -> Result1 = lists:map(fun fact/1, lists:seq(1, 2000)), Result2 = parallel_map(fun fact/1, lists:seq(1, 2000), 10), ?assertEqual(Result1, Result2).