From 29ac49eabca61c4a9e0c3a0d8f9ba57ab516ebae Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 25 Sep 2014 01:35:33 +0200 Subject: Permanent storage implementation --- Makefile | 2 + c_src/erlport.c | 105 +++++++++++++++++++++++++++++++++++++++++++++++++ c_src/erlport.h | 15 +++++++ c_src/fsynchelper.c | 64 ++++++++++++++++++++++++++++++ c_src/net_read_write.c | 93 +++++++++++++++++++++++++++++++++++++++++++ c_src/net_read_write.h | 10 +++++ src/fsyncport.erl | 88 +++++++++++++++++++++++++++++++++++++++++ src/perm.erl | 95 ++++++++++++++++++++++++++++++++++++++++++++ src/plop_sup.erl | 5 +++ 9 files changed, 477 insertions(+) create mode 100644 c_src/erlport.c create mode 100644 c_src/erlport.h create mode 100644 c_src/fsynchelper.c create mode 100644 c_src/net_read_write.c create mode 100644 c_src/net_read_write.h create mode 100644 src/fsyncport.erl create mode 100644 src/perm.erl diff --git a/Makefile b/Makefile index 2efdd34..4e54096 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,6 @@ build all: + (cd c_src && make all) + cp c_src/fsynchelper priv/fsynchelper erl -make clean: -rm ebin/*.beam diff --git a/c_src/erlport.c b/c_src/erlport.c new file mode 100644 index 0000000..5e5c17c --- /dev/null +++ b/c_src/erlport.c @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2014 Kungliga Tekniska Högskolan + * (KTH Royal Institute of Technology, Stockholm, Sweden). + */ + +#include +#include +#include +#include +#include +#include + +#include "net_read_write.h" +#include "erlport.h" + +static ssize_t +read_length(size_t length_size) +{ + unsigned char buf[2]; + + if (length_size != 2) { + return -1; + } + + if (length_size > sizeof(buf)) { + return -1; + } + + ssize_t ret; + + ret = net_read(0, (char *)buf, length_size); + + if (ret != (ssize_t) length_size) { + return -1; + } + + return (ssize_t)(((unsigned long)buf[0] << 8) | (unsigned long)buf[1]); +} + +ssize_t +read_command(char *buf, size_t maxlen) +{ + ssize_t len; + + len = read_length(2); + + if (len < 0) { + return -1; + } + + if (len > (ssize_t) maxlen) { + return -1; + } + return net_read(0, buf, (size_t)len); +} + +static int +write_length(size_t len, size_t length_size) +{ + unsigned char buf[2]; + + if (length_size != 2) { + return -1; + } + + buf[0] = (len >> 8) & 0xff; + buf[1] = len & 0xff; + + ssize_t ret; + + ret = net_write(1, (char *)buf, length_size); + + if (ret < 0) { + return -1; + } + + if (ret != (ssize_t) length_size) { + return -1; + } + + return 0; +} + +static int +write_reply(char *msg, size_t len) +{ + ssize_t ret; + + ret = write_length(len, 2); + if (ret < 0) { + return -1; + } + ret = net_write(1, msg, len); + if (ret < 0) { + return -1; + } + + return 0; +} + +int +write_status(char *msg) +{ + return write_reply(msg, strlen(msg)); +} diff --git a/c_src/erlport.h b/c_src/erlport.h new file mode 100644 index 0000000..49e1b7c --- /dev/null +++ b/c_src/erlport.h @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2014 Kungliga Tekniska Högskolan + * (KTH Royal Institute of Technology, Stockholm, Sweden). + */ + +#ifndef ERLPORT_H +#define ERLPORT_H + +ssize_t +read_command(char *buf, size_t len); + +int +write_status(char *msg); + +#endif diff --git a/c_src/fsynchelper.c b/c_src/fsynchelper.c new file mode 100644 index 0000000..e6a04be --- /dev/null +++ b/c_src/fsynchelper.c @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2014 Kungliga Tekniska Högskolan + * (KTH Royal Institute of Technology, Stockholm, Sweden). + */ + +#include +#include +#include +#include + +#include +#include + +#include "erlport.h" + +static int +dosync(int fd) +{ +#ifdef F_FULLFSYNC + int ret = fcntl(fd, F_FULLFSYNC); +#else + int ret = fsync(fd); +#endif + return ret; +} + +int +main() +{ + char buf[100]; + ssize_t len; + + /* XXX: exits when command size is 0 */ + + while ((len = read_command(buf, sizeof(buf)-1)) > 0) { + buf[len] = '\0'; + while (1) { + int fd; + + fd = open(buf, O_RDONLY); + if (fd == -1) { + /* XXX: better errors */ + write_status("openerror"); + break; + } + + if (dosync(fd) == 0) { + write_status("ok"); + } else if (errno == EBADF) { + write_status("ebadf"); + } else if (errno == EINTR) { + close(fd); + continue; + } else { + write_status("fsyncerror"); + } + + close(fd); + break; + } + } + + return 0; +} diff --git a/c_src/net_read_write.c b/c_src/net_read_write.c new file mode 100644 index 0000000..f8f14f0 --- /dev/null +++ b/c_src/net_read_write.c @@ -0,0 +1,93 @@ +/* + * Copyright (c) 1995, 1996, 1997, 1998 Kungliga Tekniska Högskolan + * (Royal Institute of Technology, Stockholm, Sweden). + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the Institute nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include + +#include "net_read_write.h" + +/* + * Like read but never return partial data. + */ + +ssize_t +net_read (int fd, void *buf, size_t nbytes) +{ + char *cbuf = (char *)buf; + ssize_t count; + size_t rem = nbytes; + + while (rem > 0) { + count = read (fd, cbuf, rem); + if (count < 0) { + if (errno == EINTR) + continue; + else + return count; + } else if (count == 0) { + return count; + } + cbuf += (size_t) count; + rem -= (size_t) count; + } + return (ssize_t)nbytes; +} + +/* + * Like write but never return partial data. + */ + +ssize_t +net_write (int fd, const void *buf, size_t nbytes) +{ + const char *cbuf = (const char *)buf; + ssize_t count; + size_t rem = nbytes; + + while (rem > 0) { + count = write (fd, cbuf, rem); + if (count < 0) { + if (errno == EINTR) + continue; + else + return count; + } + cbuf += (size_t)count; + rem -= (size_t)count; + } + return (ssize_t)nbytes; +} diff --git a/c_src/net_read_write.h b/c_src/net_read_write.h new file mode 100644 index 0000000..80b92b3 --- /dev/null +++ b/c_src/net_read_write.h @@ -0,0 +1,10 @@ +#ifndef NET_READ_WRITE_H +#define NET_READ_WRITE_H + +ssize_t +net_read (int, void *, size_t); + +ssize_t +net_write (int, const void *, size_t); + +#endif diff --git a/src/fsyncport.erl b/src/fsyncport.erl new file mode 100644 index 0000000..8bc8c60 --- /dev/null +++ b/src/fsyncport.erl @@ -0,0 +1,88 @@ +%% +%% Copyright (c) 2014 Kungliga Tekniska Högskolan +%% (KTH Royal Institute of Technology, Stockholm, Sweden). +%% + +-module(fsyncport). +-export([start_link/0, stop/0, init/1]). +-export([fsync/1]). + +start_link() -> + Pid = spawn(?MODULE, init, [code:priv_dir(plop) ++ "/fsynchelper"]), + {ok, Pid}. +stop() -> + fsyncport ! stop. + +fsync(Path) -> + call_port({fsync, Path}). + +call_port(Msg) -> + fsyncport ! {call, self(), Msg}, + receive + {fsyncport, Result} -> + Result + end. + +init(ExtPrg) -> + register(fsyncport, self()), + process_flag(trap_exit, true), + Ports = lists:map(fun(_N) -> open_port({spawn_executable, ExtPrg}, + [{packet, 2}]) end, + lists:seq(1, 32)), + loop(Ports). + +loop(Ports) -> + loop(Ports, dict:new(), queue:new()). +loop(IdlePorts, BusyPorts, Waiting) -> + receive + {call, Caller, {fsync, Path}} -> + case IdlePorts of + [] -> + loop(IdlePorts, + BusyPorts, + queue:in({Caller, Path}, Waiting)); + [Port | Rest] -> + Port ! {self(), {command, Path}}, + loop(Rest, + dict:store(Port, {Caller, os:timestamp()}, BusyPorts), + Waiting) + end; + + {Port, {data, Data}} when is_port(Port) -> + {Caller, Starttime} = dict:fetch(Port, BusyPorts), + Stoptime = os:timestamp(), + statreport({fsync, Stoptime, Starttime}), + Caller ! {fsyncport, list_to_atom(Data)}, + case queue:out(Waiting) of + {empty, _} -> + loop([Port | IdlePorts], + dict:erase(Port, BusyPorts), + Waiting); + {{value, {NewCaller, NewPath}}, NewWaiting} -> + IdlePorts = [], + Port ! {self(), {command, NewPath}}, + loop(IdlePorts, + dict:store(Port, {NewCaller, os:timestamp()}, + BusyPorts), + NewWaiting) + end; + stop -> + lists:foreach(fun (Port) -> + Port ! {self(), close} + end, + IdlePorts), + lists:foreach(fun ({Port, {_Caller, _Starttime}}) -> + Port ! {self(), close} + end, + dict:to_list(BusyPorts)), + receive + {Port, closed} when is_port(Port) -> + exit(normal) %% XXX exits when first port is closed + end; + {'EXIT', Port, _Reason} when is_port(Port) -> + %% XXX supervisor doesn't restart fsyncport, why? + exit(port_terminated) + end. + +statreport(_Entry) -> + none. diff --git a/src/perm.erl b/src/perm.erl new file mode 100644 index 0000000..2ce5b46 --- /dev/null +++ b/src/perm.erl @@ -0,0 +1,95 @@ +%% +%% Copyright (c) 2014 Kungliga Tekniska Högskolan +%% (KTH Royal Institute of Technology, Stockholm, Sweden). +%% + +-module(perm). +-export([ensurefile/3]). + +fsync(Name) -> + fsyncport:fsync(Name). + +readfile_and_verify(Name, Content) -> + case file:read_file(Name) of + {ok, ContentsReadBinary} -> + ContentsRead = binary_to_list(ContentsReadBinary), + if Content == ContentsRead -> + ok; + true -> + {error, "File contents differ"} + end; + {error, Error} -> + {error, Error} + end. + +writefile(Name, NurseryName, Content) -> + case file:open(NurseryName, [write, exclusive]) of + {ok, File} -> + %io:format("Write file: ~p~n", [Name]), + ok = file:write(File, Content), + file:close(File), + Result = file:rename(NurseryName, Name), + Result; + {error, eexist} -> + %% Should not happen, file name should be unique + {error, eexist}; + {error, Error} -> + {error, Error} + end. + +make_dir(Name) -> + case file:make_dir(Name) of + ok -> + ok; + {error, eexist} -> + ok; + {error, Error} -> + {error, Error} + end. + +make_dirs([]) -> + ok; +make_dirs([Name | Rest]) -> + case make_dir(Name) of + ok -> + make_dirs(Rest); + {error, Error} -> + {error, Error} + end. + +path_for_key(Rootdir, Key) -> + Name = hex:bin_to_hexstr(Key), + [C1, C2, C3, C4, C5, C6 | _] = Name, + Firstlevel = Rootdir ++ [C1, C2], + Secondlevel = Firstlevel ++ "/" ++ [C3, C4], + Thirdlevel = Secondlevel ++ "/" ++ [C5, C6], + Fullpath = Thirdlevel ++ "/" ++ Name, + {[Firstlevel, Secondlevel, Thirdlevel], Fullpath}. + +tempfilename(Base) -> + {MegaSecs, Secs, MicroSecs} = now(), + Filename = io_lib:format("~s-~s-~p.~p", [Base, os:getpid(), + MegaSecs * 1000000 + Secs, MicroSecs]), + Filename. + +ensurefile(Rootdir, Key, Content) -> + {Dirs, Path} = path_for_key(Rootdir, Key), + case readfile_and_verify(Path, Content) of + ok -> + lists:foreach(fun (Dir) -> fsync(Dir) end, [Path, Rootdir | Dirs]); + {error, enoent} -> + case make_dirs([Rootdir, Rootdir ++ "nursery/"] ++ Dirs) of + ok -> + NurseryName = Rootdir ++ "nursery/" ++ + tempfilename(hex:bin_to_hexstr(Key)), + _Result = writefile(Path, NurseryName, Content), + lists:foreach(fun (Dir) -> + fsync(Dir) + end, + [Path, Rootdir | Dirs]); %% XXX check results + {error, Error} -> + io:format("Error creating directory: ~w~n", [Error]) + end; + {error, Error} -> + exit({perm, fileerror, "Error reading file", Error}) + end. diff --git a/src/plop_sup.erl b/src/plop_sup.erl index a5ce905..bcb9756 100644 --- a/src/plop_sup.erl +++ b/src/plop_sup.erl @@ -23,6 +23,11 @@ init(Args) -> permanent, 10000, worker, [db]}, + {fsync, + {fsyncport, start_link, []}, + permanent, + 10000, + worker, [fsyncport]}, {the_ht, {ht, start_link, []}, permanent, -- cgit v1.1