diff options
author | Linus Nordberg <linus@nordu.net> | 2016-05-11 08:53:47 +0200 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2016-05-12 09:39:05 +0200 |
commit | 8f4eb2702f1cd41f410bc61625e7f4e97705aa96 (patch) | |
tree | b6dd8698778d16e5272ab909724f01495c0c517b | |
parent | 7f15cd5a881d2d14f80ddf5219577561d3d91eeb (diff) |
Initial support for leveldb as a database backend.leveldb
Not integrated in system tests (make tests) but make check will run
rudimentary tests.
-rw-r--r-- | Makefile | 3 | ||||
-rw-r--r-- | c_src/Makefile | 6 | ||||
-rw-r--r-- | c_src/erlport.h | 7 | ||||
-rw-r--r-- | c_src/leveldbport.c | 154 | ||||
-rw-r--r-- | src/leveldb.erl | 122 | ||||
-rw-r--r-- | test/check.config | 4 | ||||
-rwxr-xr-x | test/leveldbtest.erl | 132 |
7 files changed, 426 insertions, 2 deletions
@@ -1,7 +1,7 @@ build all: (cd c_src && make all) mkdir -p priv - cp c_src/fsynchelper c_src/hsmhelper c_src/permdbport priv/ + cp c_src/fsynchelper c_src/hsmhelper c_src/permdbport c_src/leveldbport priv/ mkdir -p test/ebin (cd src && erlc -bber +der src/Dss.asn1 && rm Dss.beam) (cd test/src && erlc -o ../ebin db.erl) @@ -18,3 +18,4 @@ tags: # Unit testing. check: build test/check.erl + test/leveldbtest.erl diff --git a/c_src/Makefile b/c_src/Makefile index 72a47d8..54cd624 100644 --- a/c_src/Makefile +++ b/c_src/Makefile @@ -2,12 +2,13 @@ CC = gcc CFLAGS = -Wall -Werror -std=gnu99 LDFLAGS = -PORTS = fsynchelper hsmhelper permdbport +PORTS = fsynchelper hsmhelper permdbport leveldbport common_OBJS = erlport.o net_read_write.o fsynchelper_OBJS = fsynchelper.o $(common_OBJS) hsmhelper_OBJS = hsmhelper.o pkcs11.o $(common_OBJS) permdbport_OBJS = permdb.o permdbport.o arlamath.o hash.o $(common_OBJS) +leveldbport_OBJS = leveldbport.o $(common_OBJS) all: $(PORTS) @@ -22,3 +23,6 @@ hsmhelper: $(hsmhelper_OBJS) permdbport: $(permdbport_OBJS) $(CC) -o permdbport $(permdbport_OBJS) $(LDFLAGS) + +leveldbport: $(leveldbport_OBJS) + $(CC) -o leveldbport $(leveldbport_OBJS) $(LDFLAGS) -lleveldb diff --git a/c_src/erlport.h b/c_src/erlport.h index 58b2591..2d7821f 100644 --- a/c_src/erlport.h +++ b/c_src/erlport.h @@ -6,6 +6,10 @@ #ifndef ERLPORT_H #define ERLPORT_H +#ifdef __cplusplus +extern "C" { +#endif + ssize_t read_command(unsigned char *buf, size_t maxlen, size_t length_size); @@ -15,4 +19,7 @@ write_reply(unsigned char *msg, size_t len, size_t length_size); int write_status(char *msg); +#ifdef __cplusplus +} +#endif #endif diff --git a/c_src/leveldbport.c b/c_src/leveldbport.c new file mode 100644 index 0000000..000a804 --- /dev/null +++ b/c_src/leveldbport.c @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2016, NORDUnet A/S. + * See LICENSE for licensing information. + */ + +#include <stdio.h> +#include <string.h> +#include <stdlib.h> +#include <unistd.h> +#include <stdint.h> +#include <assert.h> +#include <err.h> +#include <sys/resource.h> +#include "leveldb/c.h" +#include "erlport.h" + +static const int keylen = 32; + +static void __attribute__((noreturn)) +usage() +{ + errx(1, "usage: leveldbport <path>"); +} + +static void +getvalue(leveldb_t *db, const char *key) +{ + leveldb_readoptions_t *opt = leveldb_readoptions_create(); + size_t vallen = 0; + char *err = NULL; + char *val = leveldb_get(db, opt, key, keylen, &vallen, &err); + leveldb_readoptions_destroy(opt); + if (err) { + fprintf(stderr, "leveldb_get: %s\n", err); + free(err); + write_reply(NULL, 0, 4); + return; + } + write_reply((unsigned char *) val, vallen, 4); + free(val); +} + +static void +addvalue(leveldb_t *db, const char *key, const char *val, size_t vallen) +{ + leveldb_writeoptions_t *opt = leveldb_writeoptions_create(); + char *err = NULL; + leveldb_put(db, opt, key, keylen, val, vallen, &err); + leveldb_writeoptions_destroy(opt); + if (err) { + fprintf(stderr, "leveldb_put: %s\n", err); + free(err); + write_reply(NULL, 0, 4); + return; + } + unsigned char result = (unsigned char) 1; + write_reply(&result, 1, 4); +} + +static void +commit(leveldb_t *db) +{ + unsigned char result = (unsigned char) 0; + write_reply(&result, 1, 4); +} + +static void +portloop(leveldb_t *db) +{ + unsigned char buf[65536]; + ssize_t len; + while ((len = read_command(buf, sizeof(buf) - 1, 4)) > 0) { + switch(buf[0]) { + case 0: + if (len != keylen + 1) { + write_reply(NULL, 0, 4); + } else { + const char *key = (char *) buf + 1; + getvalue(db, key); + } + break; + case 1: + if (len < keylen + 1) { + write_reply(NULL, 0, 4); + } else { + const char *key = (char *) buf + 1; + const char *val = key + keylen; + size_t vallen = len - 1 - keylen; + addvalue(db, key, val, vallen); + } + break; + case 2: + commit(db); + break; + default: + write_reply(NULL, 0, 4); + } + } +} + +static leveldb_t* +db_alloc(const char* name) +{ + leveldb_options_t *opt = leveldb_options_create(); + leveldb_options_set_create_if_missing(opt, 1); + char *err = NULL; + leveldb_t *db = leveldb_open(opt, name, &err); + leveldb_options_destroy(opt); + if (err) { + fprintf(stderr, "error opening database: %s\n", err); + free(err); + return NULL; + } + return db; +} + +static void +db_free(leveldb_t *db) +{ + free(db); +} + +int +main(int argc, char *argv[]) +{ + if (argc != 2) { + usage(); + } + const char *name = argv[1]; + + fprintf(stderr, "leveldbport starting with db '%s'\n", name); + + leveldb_t *state = db_alloc(name); + if (state == NULL) { + write_reply(NULL, 0, 4); + return 1; + } + + portloop(state); + db_free(state); + + struct rusage rusage; + getrusage(RUSAGE_SELF, &rusage); + fprintf(stderr, "leveldbport user %ld.%d sys %ld.%d maxrss %ld M\n", + rusage.ru_utime.tv_sec, (int)rusage.ru_utime.tv_usec, + rusage.ru_stime.tv_sec, (int)rusage.ru_utime.tv_usec, + rusage.ru_maxrss/1024); + + return 0; +} + +/* Local Variables: */ +/* c-file-style: "BSD" */ +/* End: */ diff --git a/src/leveldb.erl b/src/leveldb.erl new file mode 100644 index 0000000..b1dc02c --- /dev/null +++ b/src/leveldb.erl @@ -0,0 +1,122 @@ +%%% Copyright (c) 2016, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(leveldb). + +-behaviour(gen_server). + +-export([start_link/2, stop/1, init_module/0]). +-export([getvalue/2, addvalue/3, commit/1, commit/2]). + +%% gen_server callbacks. +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). + +-record(state, + {cachename, name, port, requests, requestcounter}). + +getvalue(Name, Key) -> + gen_server:call(Name, {getvalue, Key}). + +addvalue(Name, Key, Value) -> + gen_server:call(Name, {addvalue, Key, Value}). + +commit(Name) -> + gen_server:call(Name, {commit}). +commit(Name, Timeout) -> + gen_server:call(Name, {commit}, Timeout). + +init([Name, Filename]) -> + lager:info("starting leveldb server '~p'", [Name]), + process_flag(trap_exit, true), + Port = open_port({spawn_executable, code:priv_dir(plop) ++ "/leveldbport"}, + [{packet, 4}, {args, [Filename]}, binary]), + {ok, #state{name = Name, + port = Port, + requestcounter = 0, + requests = queue:new()}}. + +init_module() -> + ok. + +start_link(Name, Filename) -> + gen_server:start_link({local, Name}, ?MODULE, [Name, Filename], []). + +stop(Name) -> + gen_server:call(Name, stop). + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({Port, {data, Data}}, State) when is_port(Port) -> + lager:debug("response: ~p", [Data]), + {{value, {From, Action}}, Requests} = queue:out(State#state.requests), + lager:debug("response ~p ~p: ~p", + [State#state.name, + State#state.requestcounter - queue:len(State#state.requests), + Action]), + gen_server:reply(From, case Action of + getvalue -> + case Data of + <<>> -> + noentry; + _ -> + Data + end; + addvalue -> + case Data of + <<>> -> + util:exit_with_error( + putvalue, "Error in putvalue"); + _ -> + ok + end; + commit -> + Data + end), + {noreply, State#state{requests = Requests}}; +handle_info(_Info, State) -> + {noreply, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + io:format("~p terminating~n", [?MODULE]), + State#state.port ! {self(), {command, <<>>}}, + ok. + +add_request(State, From, Action) -> + State#state{ + requests = queue:in({From, Action}, State#state.requests), + requestcounter = State#state.requestcounter + 1 + }. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}; + +handle_call({getvalue, Key}, From, State) -> + lager:debug("getvalue ~p ~p: ~p", [State#state.name, + State#state.requestcounter, Key]), + getvalue_port_command(State#state.port, Key), + {noreply, add_request(State, From, getvalue)}; + +handle_call({addvalue, Key, Value}, From, State) -> + lager:debug("addvalue ~p ~p: ~p ~p", [State#state.name, + State#state.requestcounter, Key, Value]), + addvalue_port_command(State#state.port, Key, Value), + {noreply, add_request(State, From, addvalue)}; + +handle_call({commit}, From, State) -> + lager:debug("commit ~p ~p", [State#state.name, State#state.requestcounter]), + commit_port_command(State#state.port), + {noreply, add_request(State, From, commit)}. + +getvalue_port_command(Port, Key) -> + Port ! {self(), {command, <<0:8, Key/binary>>}}. + +addvalue_port_command(Port, Key, Value) -> + Port ! {self(), {command, <<1:8, Key:32/binary, Value/binary>>}}. + +commit_port_command(Port) -> + Port ! {self(), {command, <<2:8>>}}. diff --git a/test/check.config b/test/check.config new file mode 100644 index 0000000..4180771 --- /dev/null +++ b/test/check.config @@ -0,0 +1,4 @@ +%% -*- erlang -*- +[{lager, + [{handlers, + [{lager_console_backend, warning}]}]}]. diff --git a/test/leveldbtest.erl b/test/leveldbtest.erl new file mode 100755 index 0000000..e95390b --- /dev/null +++ b/test/leveldbtest.erl @@ -0,0 +1,132 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! -pa test -pa ../lager/ebin -pa ../lager/deps/goldrush/ebin -s lager -config test/check.config + +-mode(compile). + +testinit(Filename) -> + {ok, _Pid} = leveldb:start_link(testdb, Filename). + +teststop() -> + leveldb:stop(testdb). + +testget(_Filename, TestData, Datasize) -> + getvalue_loop(TestData, none, Datasize), + ok. + +testadd(_Filename, TestData, Datasize) -> + addvalue_loop(TestData, none, Datasize), + case leveldb:commit(testdb) of + <<0>> -> + ok; + Other -> + io:format("commit expected: 0 got: ~p~n", [Other]), + exit(mismatch) + end. + +stop() -> + teststop(), + receive + after + 100 -> + ok + end. + +gentestdata(Size) -> + [{crypto:hash(sha256, <<E:32, 0:32>>), crypto:hash(sha256, <<E:32, 1:32>>)} || E <- lists:seq(0, Size-1)]. + +genemptytestdata(Size) -> + [{crypto:hash(sha256, <<E:32, 0:32>>), noentry} || E <- lists:seq(0, Size-1)]. + +timeprint(Time) -> + io_lib:format("~.2fs", [Time/1000000]). + +constructdata(VSeed, Size) -> + A = binary:copy(VSeed, Size div 32), + B = binary:part(VSeed, 0, Size rem 32), + <<A/binary, B/binary>>. + +getvalue_loop([], _Port, _Datasize) -> + none; +getvalue_loop([{K, VSeed}|Rest], Port, Datasize) -> + V = case VSeed of + noentry -> + noentry; + _ -> + constructdata(VSeed, Datasize) + end, + case leveldb:getvalue(testdb, K) of + V -> + getvalue_loop(Rest, Port, Datasize); + VOther -> + io:format("expected: ~p got: ~p~nkey: ~p~n", [V, VOther, K]), + exit(mismatch) + end. + +addvalue_loop([], _Port, _Datasize) -> + none; +addvalue_loop([{K, VSeed}|Rest], Port, Datasize) -> + V = constructdata(VSeed, Datasize), + case leveldb:addvalue(testdb, K, V) of + ok -> + addvalue_loop(Rest, Port, Datasize); + Other -> + io:format("expected: 0 or 1 got: ~p~n", [Other]), + exit(mismatch) + end. + +remove_db(Dir) -> + case file:list_dir(Dir) of + {ok, List} -> + lists:foreach(fun(File) -> + ok = file:delete(filename:join(Dir, File)) end, + List), + ok = file:del_dir(Dir); + _ -> + ok + end. + +main([]) -> + {ok, Cwd} = file:get_cwd(), + code:add_path(Cwd ++ "/ebin"), + Size = 10, + Datasize = 99, + Filename = "testleveldb", + remove_db(Filename), + {Time1, TestData} = timer:tc(fun () -> gentestdata(Size) end), + EmptyTestData = genemptytestdata(Size), + + io:format("Init with ~p entries: ~s~n", [Size, timeprint(Time1)]), + testinit(Filename), + Testadd = fun () -> + {Time2, ok} = timer:tc(fun () -> testadd(Filename, TestData, Datasize) end), + io:format("Add ~p entries: ~s ~.1f entries/s (~.2f microseconds)~n", [Size, timeprint(Time2), Size*1000000/Time2, Time2/Size]) + end, + Testadd(), + Testget = fun () -> + {Time2, ok} = timer:tc(fun () -> testget(Filename, TestData, Datasize) end), + io:format("Get ~p entries: ~s ~.1f entries/s (~.2f microseconds)~n", [Size, timeprint(Time2), Size*1000000/Time2, Time2/Size]) + end, + Testget(), + stop(), + + io:format("------------------------------------------------------------~n", []), + remove_db(Filename), + testinit(Filename), + Testemptyget = fun () -> + {Time2, ok} = timer:tc(fun () -> testget(Filename, EmptyTestData, Datasize) end), + io:format("Get ~p entries: ~s ~.1f entries/s (~.2f microseconds)~n", [Size, timeprint(Time2), Size*1000000/Time2, Time2/Size]) + end, + Testemptyget(), + testadd(Filename, gentestdata(1), 99), + testadd(Filename, gentestdata(1+2), 99), + testadd(Filename, gentestdata(1+2+3), 99), + testadd(Filename, gentestdata(1+2+3+4), 99), + testget(Filename, gentestdata(1+2+3+4), 99), + stop(), + + testinit(Filename), + testget(Filename, gentestdata(1+2+3+4), 99), + stop(), + + ok. |