summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile3
-rw-r--r--c_src/Makefile6
-rw-r--r--c_src/erlport.h7
-rw-r--r--c_src/leveldbport.c154
-rw-r--r--src/leveldb.erl122
-rw-r--r--test/check.config4
-rwxr-xr-xtest/leveldbtest.erl132
7 files changed, 426 insertions, 2 deletions
diff --git a/Makefile b/Makefile
index 6bafe20..5896403 100644
--- a/Makefile
+++ b/Makefile
@@ -1,7 +1,7 @@
build all:
(cd c_src && make all)
mkdir -p priv
- cp c_src/fsynchelper c_src/hsmhelper c_src/permdbport priv/
+ cp c_src/fsynchelper c_src/hsmhelper c_src/permdbport c_src/leveldbport priv/
mkdir -p test/ebin
(cd src && erlc -bber +der src/Dss.asn1 && rm Dss.beam)
(cd test/src && erlc -o ../ebin db.erl)
@@ -18,3 +18,4 @@ tags:
# Unit testing.
check: build
test/check.erl
+ test/leveldbtest.erl
diff --git a/c_src/Makefile b/c_src/Makefile
index 72a47d8..54cd624 100644
--- a/c_src/Makefile
+++ b/c_src/Makefile
@@ -2,12 +2,13 @@ CC = gcc
CFLAGS = -Wall -Werror -std=gnu99
LDFLAGS =
-PORTS = fsynchelper hsmhelper permdbport
+PORTS = fsynchelper hsmhelper permdbport leveldbport
common_OBJS = erlport.o net_read_write.o
fsynchelper_OBJS = fsynchelper.o $(common_OBJS)
hsmhelper_OBJS = hsmhelper.o pkcs11.o $(common_OBJS)
permdbport_OBJS = permdb.o permdbport.o arlamath.o hash.o $(common_OBJS)
+leveldbport_OBJS = leveldbport.o $(common_OBJS)
all: $(PORTS)
@@ -22,3 +23,6 @@ hsmhelper: $(hsmhelper_OBJS)
permdbport: $(permdbport_OBJS)
$(CC) -o permdbport $(permdbport_OBJS) $(LDFLAGS)
+
+leveldbport: $(leveldbport_OBJS)
+ $(CC) -o leveldbport $(leveldbport_OBJS) $(LDFLAGS) -lleveldb
diff --git a/c_src/erlport.h b/c_src/erlport.h
index 58b2591..2d7821f 100644
--- a/c_src/erlport.h
+++ b/c_src/erlport.h
@@ -6,6 +6,10 @@
#ifndef ERLPORT_H
#define ERLPORT_H
+#ifdef __cplusplus
+extern "C" {
+#endif
+
ssize_t
read_command(unsigned char *buf, size_t maxlen, size_t length_size);
@@ -15,4 +19,7 @@ write_reply(unsigned char *msg, size_t len, size_t length_size);
int
write_status(char *msg);
+#ifdef __cplusplus
+}
+#endif
#endif
diff --git a/c_src/leveldbport.c b/c_src/leveldbport.c
new file mode 100644
index 0000000..000a804
--- /dev/null
+++ b/c_src/leveldbport.c
@@ -0,0 +1,154 @@
+/*
+ * Copyright (c) 2016, NORDUnet A/S.
+ * See LICENSE for licensing information.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <assert.h>
+#include <err.h>
+#include <sys/resource.h>
+#include "leveldb/c.h"
+#include "erlport.h"
+
+static const int keylen = 32;
+
+static void __attribute__((noreturn))
+usage()
+{
+ errx(1, "usage: leveldbport <path>");
+}
+
+static void
+getvalue(leveldb_t *db, const char *key)
+{
+ leveldb_readoptions_t *opt = leveldb_readoptions_create();
+ size_t vallen = 0;
+ char *err = NULL;
+ char *val = leveldb_get(db, opt, key, keylen, &vallen, &err);
+ leveldb_readoptions_destroy(opt);
+ if (err) {
+ fprintf(stderr, "leveldb_get: %s\n", err);
+ free(err);
+ write_reply(NULL, 0, 4);
+ return;
+ }
+ write_reply((unsigned char *) val, vallen, 4);
+ free(val);
+}
+
+static void
+addvalue(leveldb_t *db, const char *key, const char *val, size_t vallen)
+{
+ leveldb_writeoptions_t *opt = leveldb_writeoptions_create();
+ char *err = NULL;
+ leveldb_put(db, opt, key, keylen, val, vallen, &err);
+ leveldb_writeoptions_destroy(opt);
+ if (err) {
+ fprintf(stderr, "leveldb_put: %s\n", err);
+ free(err);
+ write_reply(NULL, 0, 4);
+ return;
+ }
+ unsigned char result = (unsigned char) 1;
+ write_reply(&result, 1, 4);
+}
+
+static void
+commit(leveldb_t *db)
+{
+ unsigned char result = (unsigned char) 0;
+ write_reply(&result, 1, 4);
+}
+
+static void
+portloop(leveldb_t *db)
+{
+ unsigned char buf[65536];
+ ssize_t len;
+ while ((len = read_command(buf, sizeof(buf) - 1, 4)) > 0) {
+ switch(buf[0]) {
+ case 0:
+ if (len != keylen + 1) {
+ write_reply(NULL, 0, 4);
+ } else {
+ const char *key = (char *) buf + 1;
+ getvalue(db, key);
+ }
+ break;
+ case 1:
+ if (len < keylen + 1) {
+ write_reply(NULL, 0, 4);
+ } else {
+ const char *key = (char *) buf + 1;
+ const char *val = key + keylen;
+ size_t vallen = len - 1 - keylen;
+ addvalue(db, key, val, vallen);
+ }
+ break;
+ case 2:
+ commit(db);
+ break;
+ default:
+ write_reply(NULL, 0, 4);
+ }
+ }
+}
+
+static leveldb_t*
+db_alloc(const char* name)
+{
+ leveldb_options_t *opt = leveldb_options_create();
+ leveldb_options_set_create_if_missing(opt, 1);
+ char *err = NULL;
+ leveldb_t *db = leveldb_open(opt, name, &err);
+ leveldb_options_destroy(opt);
+ if (err) {
+ fprintf(stderr, "error opening database: %s\n", err);
+ free(err);
+ return NULL;
+ }
+ return db;
+}
+
+static void
+db_free(leveldb_t *db)
+{
+ free(db);
+}
+
+int
+main(int argc, char *argv[])
+{
+ if (argc != 2) {
+ usage();
+ }
+ const char *name = argv[1];
+
+ fprintf(stderr, "leveldbport starting with db '%s'\n", name);
+
+ leveldb_t *state = db_alloc(name);
+ if (state == NULL) {
+ write_reply(NULL, 0, 4);
+ return 1;
+ }
+
+ portloop(state);
+ db_free(state);
+
+ struct rusage rusage;
+ getrusage(RUSAGE_SELF, &rusage);
+ fprintf(stderr, "leveldbport user %ld.%d sys %ld.%d maxrss %ld M\n",
+ rusage.ru_utime.tv_sec, (int)rusage.ru_utime.tv_usec,
+ rusage.ru_stime.tv_sec, (int)rusage.ru_utime.tv_usec,
+ rusage.ru_maxrss/1024);
+
+ return 0;
+}
+
+/* Local Variables: */
+/* c-file-style: "BSD" */
+/* End: */
diff --git a/src/leveldb.erl b/src/leveldb.erl
new file mode 100644
index 0000000..b1dc02c
--- /dev/null
+++ b/src/leveldb.erl
@@ -0,0 +1,122 @@
+%%% Copyright (c) 2016, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(leveldb).
+
+-behaviour(gen_server).
+
+-export([start_link/2, stop/1, init_module/0]).
+-export([getvalue/2, addvalue/3, commit/1, commit/2]).
+
+%% gen_server callbacks.
+-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
+ code_change/3]).
+
+-record(state,
+ {cachename, name, port, requests, requestcounter}).
+
+getvalue(Name, Key) ->
+ gen_server:call(Name, {getvalue, Key}).
+
+addvalue(Name, Key, Value) ->
+ gen_server:call(Name, {addvalue, Key, Value}).
+
+commit(Name) ->
+ gen_server:call(Name, {commit}).
+commit(Name, Timeout) ->
+ gen_server:call(Name, {commit}, Timeout).
+
+init([Name, Filename]) ->
+ lager:info("starting leveldb server '~p'", [Name]),
+ process_flag(trap_exit, true),
+ Port = open_port({spawn_executable, code:priv_dir(plop) ++ "/leveldbport"},
+ [{packet, 4}, {args, [Filename]}, binary]),
+ {ok, #state{name = Name,
+ port = Port,
+ requestcounter = 0,
+ requests = queue:new()}}.
+
+init_module() ->
+ ok.
+
+start_link(Name, Filename) ->
+ gen_server:start_link({local, Name}, ?MODULE, [Name, Filename], []).
+
+stop(Name) ->
+ gen_server:call(Name, stop).
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info({Port, {data, Data}}, State) when is_port(Port) ->
+ lager:debug("response: ~p", [Data]),
+ {{value, {From, Action}}, Requests} = queue:out(State#state.requests),
+ lager:debug("response ~p ~p: ~p",
+ [State#state.name,
+ State#state.requestcounter - queue:len(State#state.requests),
+ Action]),
+ gen_server:reply(From, case Action of
+ getvalue ->
+ case Data of
+ <<>> ->
+ noentry;
+ _ ->
+ Data
+ end;
+ addvalue ->
+ case Data of
+ <<>> ->
+ util:exit_with_error(
+ putvalue, "Error in putvalue");
+ _ ->
+ ok
+ end;
+ commit ->
+ Data
+ end),
+ {noreply, State#state{requests = Requests}};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, State) ->
+ io:format("~p terminating~n", [?MODULE]),
+ State#state.port ! {self(), {command, <<>>}},
+ ok.
+
+add_request(State, From, Action) ->
+ State#state{
+ requests = queue:in({From, Action}, State#state.requests),
+ requestcounter = State#state.requestcounter + 1
+ }.
+
+handle_call(stop, _From, State) ->
+ {stop, normal, stopped, State};
+
+handle_call({getvalue, Key}, From, State) ->
+ lager:debug("getvalue ~p ~p: ~p", [State#state.name,
+ State#state.requestcounter, Key]),
+ getvalue_port_command(State#state.port, Key),
+ {noreply, add_request(State, From, getvalue)};
+
+handle_call({addvalue, Key, Value}, From, State) ->
+ lager:debug("addvalue ~p ~p: ~p ~p", [State#state.name,
+ State#state.requestcounter, Key, Value]),
+ addvalue_port_command(State#state.port, Key, Value),
+ {noreply, add_request(State, From, addvalue)};
+
+handle_call({commit}, From, State) ->
+ lager:debug("commit ~p ~p", [State#state.name, State#state.requestcounter]),
+ commit_port_command(State#state.port),
+ {noreply, add_request(State, From, commit)}.
+
+getvalue_port_command(Port, Key) ->
+ Port ! {self(), {command, <<0:8, Key/binary>>}}.
+
+addvalue_port_command(Port, Key, Value) ->
+ Port ! {self(), {command, <<1:8, Key:32/binary, Value/binary>>}}.
+
+commit_port_command(Port) ->
+ Port ! {self(), {command, <<2:8>>}}.
diff --git a/test/check.config b/test/check.config
new file mode 100644
index 0000000..4180771
--- /dev/null
+++ b/test/check.config
@@ -0,0 +1,4 @@
+%% -*- erlang -*-
+[{lager,
+ [{handlers,
+ [{lager_console_backend, warning}]}]}].
diff --git a/test/leveldbtest.erl b/test/leveldbtest.erl
new file mode 100755
index 0000000..e95390b
--- /dev/null
+++ b/test/leveldbtest.erl
@@ -0,0 +1,132 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+%%! -pa test -pa ../lager/ebin -pa ../lager/deps/goldrush/ebin -s lager -config test/check.config
+
+-mode(compile).
+
+testinit(Filename) ->
+ {ok, _Pid} = leveldb:start_link(testdb, Filename).
+
+teststop() ->
+ leveldb:stop(testdb).
+
+testget(_Filename, TestData, Datasize) ->
+ getvalue_loop(TestData, none, Datasize),
+ ok.
+
+testadd(_Filename, TestData, Datasize) ->
+ addvalue_loop(TestData, none, Datasize),
+ case leveldb:commit(testdb) of
+ <<0>> ->
+ ok;
+ Other ->
+ io:format("commit expected: 0 got: ~p~n", [Other]),
+ exit(mismatch)
+ end.
+
+stop() ->
+ teststop(),
+ receive
+ after
+ 100 ->
+ ok
+ end.
+
+gentestdata(Size) ->
+ [{crypto:hash(sha256, <<E:32, 0:32>>), crypto:hash(sha256, <<E:32, 1:32>>)} || E <- lists:seq(0, Size-1)].
+
+genemptytestdata(Size) ->
+ [{crypto:hash(sha256, <<E:32, 0:32>>), noentry} || E <- lists:seq(0, Size-1)].
+
+timeprint(Time) ->
+ io_lib:format("~.2fs", [Time/1000000]).
+
+constructdata(VSeed, Size) ->
+ A = binary:copy(VSeed, Size div 32),
+ B = binary:part(VSeed, 0, Size rem 32),
+ <<A/binary, B/binary>>.
+
+getvalue_loop([], _Port, _Datasize) ->
+ none;
+getvalue_loop([{K, VSeed}|Rest], Port, Datasize) ->
+ V = case VSeed of
+ noentry ->
+ noentry;
+ _ ->
+ constructdata(VSeed, Datasize)
+ end,
+ case leveldb:getvalue(testdb, K) of
+ V ->
+ getvalue_loop(Rest, Port, Datasize);
+ VOther ->
+ io:format("expected: ~p got: ~p~nkey: ~p~n", [V, VOther, K]),
+ exit(mismatch)
+ end.
+
+addvalue_loop([], _Port, _Datasize) ->
+ none;
+addvalue_loop([{K, VSeed}|Rest], Port, Datasize) ->
+ V = constructdata(VSeed, Datasize),
+ case leveldb:addvalue(testdb, K, V) of
+ ok ->
+ addvalue_loop(Rest, Port, Datasize);
+ Other ->
+ io:format("expected: 0 or 1 got: ~p~n", [Other]),
+ exit(mismatch)
+ end.
+
+remove_db(Dir) ->
+ case file:list_dir(Dir) of
+ {ok, List} ->
+ lists:foreach(fun(File) ->
+ ok = file:delete(filename:join(Dir, File)) end,
+ List),
+ ok = file:del_dir(Dir);
+ _ ->
+ ok
+ end.
+
+main([]) ->
+ {ok, Cwd} = file:get_cwd(),
+ code:add_path(Cwd ++ "/ebin"),
+ Size = 10,
+ Datasize = 99,
+ Filename = "testleveldb",
+ remove_db(Filename),
+ {Time1, TestData} = timer:tc(fun () -> gentestdata(Size) end),
+ EmptyTestData = genemptytestdata(Size),
+
+ io:format("Init with ~p entries: ~s~n", [Size, timeprint(Time1)]),
+ testinit(Filename),
+ Testadd = fun () ->
+ {Time2, ok} = timer:tc(fun () -> testadd(Filename, TestData, Datasize) end),
+ io:format("Add ~p entries: ~s ~.1f entries/s (~.2f microseconds)~n", [Size, timeprint(Time2), Size*1000000/Time2, Time2/Size])
+ end,
+ Testadd(),
+ Testget = fun () ->
+ {Time2, ok} = timer:tc(fun () -> testget(Filename, TestData, Datasize) end),
+ io:format("Get ~p entries: ~s ~.1f entries/s (~.2f microseconds)~n", [Size, timeprint(Time2), Size*1000000/Time2, Time2/Size])
+ end,
+ Testget(),
+ stop(),
+
+ io:format("------------------------------------------------------------~n", []),
+ remove_db(Filename),
+ testinit(Filename),
+ Testemptyget = fun () ->
+ {Time2, ok} = timer:tc(fun () -> testget(Filename, EmptyTestData, Datasize) end),
+ io:format("Get ~p entries: ~s ~.1f entries/s (~.2f microseconds)~n", [Size, timeprint(Time2), Size*1000000/Time2, Time2/Size])
+ end,
+ Testemptyget(),
+ testadd(Filename, gentestdata(1), 99),
+ testadd(Filename, gentestdata(1+2), 99),
+ testadd(Filename, gentestdata(1+2+3), 99),
+ testadd(Filename, gentestdata(1+2+3+4), 99),
+ testget(Filename, gentestdata(1+2+3+4), 99),
+ stop(),
+
+ testinit(Filename),
+ testget(Filename, gentestdata(1+2+3+4), 99),
+ stop(),
+
+ ok.