/* * Copyright (c) 2015, NORDUnet A/S. * See LICENSE for licensing information. */ #define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include "erlport.h" #include "permdb.h" #include "hash.h" #define INDEX_COMMIT_TRAILER_SIZE (sizeof(uint64_t) + SHA256_DIGEST_SIZE + sizeof(index_commit_cookie)) static const int bitsperlevel = 2; static const int keylen = 32; typedef struct { int fd; char *name; node_offset datasize; node_offset lastcommit; node_offset filesize; char *writebuffer; uint64_t writebufferalloc; struct sha256_ctx commit_checksum_context; } buffered_file; struct permdb_object { Hashtab *nodecache; Hashtab *dirtynodes; buffered_file datafile; buffered_file indexfile; char *error; }; static const node_object nullnode = {{0, 0, 0, 0}}; static const node_object errornode = {{NODE_ENTRY_ERROR_NODE, NODE_ENTRY_ERROR_NODE, NODE_ENTRY_ERROR_NODE, NODE_ENTRY_ERROR_NODE}}; int calc_padding(int offset, int alignment) { int misalign = offset % alignment; if (misalign == 0) { return 0; } return alignment - misalign; } #if 0 static void print_entry(node_object node) { for (int i = 0; i < entriespernode; i++) { fprintf(stderr, " %016llx", (long long unsigned int)node.data[i]); } fprintf(stderr, "\n"); } #endif static void writebuffer_add(buffered_file *file, const void *data, uint64_t length); static int writebuffer_flush(buffered_file *file); static uint64_t writebuffer_length(buffered_file *file); struct nodecache { node_object value; char key[]; }; static int comparenodes(void *a_v, void *b_v) { struct nodecache *a = (struct nodecache *)a_v; struct nodecache *b = (struct nodecache *)b_v; //fprintf(stderr, "comparing %s and %s\n", a->key, b->key); return strcmp(a->key, b->key); } static unsigned hashnode(void *node_v) { struct nodecache *node = (struct nodecache *)node_v; unsigned hash = 0; for (int i = 0; node->key[i]; i++) { unsigned oldhighest = hash >> 30; hash <<= 2; hash |= oldhighest; hash ^= (node->key[i] & 0x3); } //fprintf(stderr, "hashing (%p) %s to %u\n", node, node->key, hash); return hash; } #if DEBUG_CACHE || 1 static void print_hex(const void *data, int length) { int i; for (i = 0; i < length; i++) { fprintf(stderr, "%02X ", ((unsigned char *)data)[i]); } fprintf(stderr, "\n"); } #endif #define DEBUG_CACHE 0 #define DEBUG_WRITE 0 #define DEBUG_READ 0 #define DEBUG_PORT 0 static node_object get_node_from_cache(permdb_object *state, const char *key) { #if DEBUG_CACHE fprintf(stderr, "getting cache key %s, ", key); #endif struct nodecache *lookupnode = malloc(sizeof(struct nodecache) + strlen(key) + 1); strcpy(lookupnode->key, key); //fprintf(stderr, "sending in lookup node %p: ", lookupnode); //print_hex(lookupnode, sizeof(struct nodecache) + strlen(key) + 1); struct nodecache *node = (struct nodecache *)hashtabsearch(state->nodecache, lookupnode); free(lookupnode); if (node == NULL) { #if DEBUG_CACHE fprintf(stderr, "found nothing in cache\n"); #endif return errornode; } #if DEBUG_CACHE fprintf(stderr, "got cache key %s: ", node->key); print_hex(&node->value, sizeof(struct nodecache)); #endif return node->value; } static node_object get_node_from_dirtynodes(permdb_object *state, const char *key) { #if DEBUG_CACHE fprintf(stderr, "getting key %s, ", key); #endif struct nodecache *lookupnode = malloc(sizeof(struct nodecache) + strlen(key) + 1); strcpy(lookupnode->key, key); //fprintf(stderr, "sending in lookup node %p: ", lookupnode); //print_hex(lookupnode, sizeof(struct nodecache) + strlen(key) + 1); struct nodecache *node = (struct nodecache *)hashtabsearch(state->dirtynodes, lookupnode); free(lookupnode); if (node == NULL) { #if DEBUG_CACHE fprintf(stderr, "found nothing\n"); #endif return errornode; } #if DEBUG_CACHE fprintf(stderr, "got key %s: ", node->key); print_hex(&node->value, sizeof(struct nodecache)); #endif return node->value; } static void put_node_in_cache(permdb_object *state, const char *key, node_object value) { #if DEBUG_CACHE fprintf(stderr, "putting cache key %s: ", key); print_hex(&value, sizeof(node_object)); #endif struct nodecache *node = malloc(sizeof(struct nodecache) + strlen(key) + 1); strcpy(node->key, key); node->value = value; hashtabaddreplace(state->nodecache, node); } static void put_node_in_dirtynodes(permdb_object *state, const char *key, node_object value) { #if DEBUG_CACHE fprintf(stderr, "putting key %s: ", key); print_hex(&value, sizeof(node_object)); #endif struct nodecache *node = malloc(sizeof(struct nodecache) + strlen(key) + 1); strcpy(node->key, key); node->value = value; hashtabaddreplace(state->dirtynodes, node); } static int true_cond(void *ptr, void *arg) { return TRUE; } static int free_hash_node(void *ptr, void *arg) { free(ptr); return 0; } void delete_all_nodes_in_cache(permdb_object *state) { hashtabforeach(state->nodecache, free_hash_node, NULL); hashtabcleantab(state->nodecache, true_cond, NULL); } static void delete_all_dirty_nodes(permdb_object *state) { hashtabforeach(state->dirtynodes, free_hash_node, NULL); hashtabcleantab(state->dirtynodes, true_cond, NULL); } static const uint64_t index_file_cookie = 0xb7e16b02ba8a6d1b; static const uint64_t index_commit_cookie = 0x2fb1778c74a402e4; static const uint64_t index_node_cookie = 0x2e0f555ad73210d1; static const uint8_t data_file_cookie[] = {0xd5, 0x35, 0x51, 0xba, 0x53, 0x9a, 0x42, 0x52}; static const uint8_t data_entry_cookie[] = {0xe7, 0xc1, 0xcd, 0xc2, 0xba, 0x3d, 0xc7, 0x7c}; static const uint8_t data_commit_start_cookie[] = {0x75, 0xc2, 0xe4, 0xb3, 0xd5, 0xf6, 0x43, 0xa1}; static const uint8_t data_commit_end_cookie[] = {0x2b, 0x05, 0xee, 0xd6, 0x1b, 0x5a, 0xf5, 0x50}; int committree(permdb_object *state); node_offset indexfile_add_header(buffered_file *file) { writebuffer_add(file, &index_file_cookie, sizeof(index_file_cookie)); uint64_t length = writebuffer_length(file); writebuffer_flush(file); return length; } static void writebuffer_add_host64(buffered_file *file, uint64_t value) { writebuffer_add(file, &value, sizeof(uint64_t)); } static void writebuffer_add_be32(buffered_file *file, uint32_t value) { uint32_t value_be = htonl(value); writebuffer_add(file, &value_be, sizeof(uint32_t)); } static void writebuffer_add_be16(buffered_file *file, uint16_t value) { uint16_t value_be = htons(value); writebuffer_add(file, &value_be, sizeof(uint16_t)); } node_offset datafile_add_header(buffered_file *file) { fprintf(stderr, "adding header to %s\n", file->name); writebuffer_add(file, &data_file_cookie, sizeof(data_file_cookie)); writebuffer_add_be32(file, 4096); writebuffer_add_be32(file, 2); writebuffer_add_be32(file, 32); uint64_t length = writebuffer_length(file); writebuffer_flush(file); return length; } void initial_node(permdb_object *state) { char *key = ""; struct nodecache *node = malloc(sizeof(struct nodecache) + strlen(key) + 1); strcpy(node->key, key); node->value = nullnode; hashtabaddreplace(state->dirtynodes, node); } int initial_commit(permdb_object *state) { initial_node(state); return committree(state); } static void set_error(permdb_object *state, const char * __restrict, ...) __attribute__ ((__format__ (__printf__, 2, 3))); unsigned char * read_from_file(buffered_file *file, size_t length, off_t offset) { unsigned char *buffer = malloc(length); if (buffer == NULL) { return NULL; } ssize_t ret = pread(file->fd, buffer, length, (off_t) offset); if (ret != length) { free(buffer); return NULL; } return buffer; } struct commit_info { node_offset start; node_offset length; uint8_t checksum[SHA256_DIGEST_SIZE]; }; int validate_checksum(struct commit_info *commit, buffered_file *file) { //fprintf(stderr, "validate_checksum: read from file: length %llu start %llu\n", commit->length, commit->start); unsigned char *checksumdata = read_from_file(file, commit->length, commit->start); if (checksumdata == NULL) { return -1; } uint8_t checksum[SHA256_DIGEST_SIZE]; struct sha256_ctx commit_checksum_context; sha256_init(&commit_checksum_context); sha256_update(&commit_checksum_context, commit->length, checksumdata); sha256_digest(&commit_checksum_context, SHA256_DIGEST_SIZE, checksum); if (memcmp(checksum, commit->checksum, SHA256_DIGEST_SIZE) == 0) { free(checksumdata); return 0; } free(checksumdata); return -1; } static uint64_t read_host64(void *ptr) { uint64_t data; memcpy(&data, ptr, sizeof(data)); return data; } int verify_index_commit(buffered_file *file, node_offset offset) { //fprintf(stderr, "verifying index file: commit verification\n"); offset -= INDEX_COMMIT_TRAILER_SIZE; unsigned char *data = read_from_file(file, INDEX_COMMIT_TRAILER_SIZE, offset); if (memcmp(data + sizeof(uint64_t) + SHA256_DIGEST_SIZE, &index_commit_cookie, sizeof(index_commit_cookie)) != 0) { fprintf(stderr, "verifying index file: incorrect commit cookie\n"); free(data); return -1; } struct commit_info commit; commit.length = read_host64(data); commit.start = offset + sizeof(uint64_t) - commit.length; memcpy(commit.checksum, data + sizeof(uint64_t), SHA256_DIGEST_SIZE); free(data); return validate_checksum(&commit, file); } int indexfile_verify_file(buffered_file *file) { //fprintf(stderr, "verifying index file\n"); unsigned char *header = read_from_file(file, sizeof(index_file_cookie), 0); if (memcmp(header, &index_file_cookie, sizeof(index_file_cookie)) != 0) { free(header); fprintf(stderr, "verifying index file: incorrect file cookie\n"); return -1; } free(header); if (verify_index_commit(file, file->filesize) < 0) { fprintf(stderr, "verifying index file: commit verification failed\n"); return -1; } return 0; } struct commit_info * read_data_commit_backward(buffered_file *file, node_offset *offset); int datafile_verify_file(buffered_file *file) { unsigned char *header = read_from_file(file, sizeof(data_file_cookie), 0); if (header == NULL || memcmp(header, &data_file_cookie, sizeof(data_file_cookie)) != 0) { free(header); return -1; } free(header); node_offset offset = file->lastcommit; //fprintf(stderr, "verifying commit: %llu\n", offset); struct commit_info *data_commit = read_data_commit_backward(file, &offset); if (data_commit == NULL || validate_checksum(data_commit, file) < 0) { //fprintf(stderr, "commit broken: %llu\n", offset); free(data_commit); return -1; } free(data_commit); return 0; } static uint32_t read_be32(void *ptr); static unsigned char * readdatakeyandlen(permdb_object *state, node_offset offset, size_t *datalen); struct commit_info * read_data_commit(buffered_file *file, node_offset *offset) { unsigned char *data = read_from_file(file, sizeof(uint32_t) + SHA256_DIGEST_SIZE + sizeof(data_commit_end_cookie), *offset); if (data == NULL || memcmp(data + sizeof(uint32_t) + SHA256_DIGEST_SIZE, data_commit_end_cookie, sizeof(data_commit_end_cookie)) != 0) { free(data); return NULL; } *offset += sizeof(uint32_t); struct commit_info *commit = malloc(sizeof(struct commit_info)); //fprintf(stderr, "read commit: %llu\n", *offset); //print_hex(data, sizeof(uint32_t) + SHA256_DIGEST_SIZE); commit->length = read_be32(data); commit->start = *offset - commit->length; memcpy(&commit->checksum, data + sizeof(uint32_t), SHA256_DIGEST_SIZE); *offset += SHA256_DIGEST_SIZE + sizeof(data_commit_end_cookie); free(data); return commit; } struct commit_info * read_data_commit_forward(buffered_file *file, node_offset *offset) { int padding = calc_padding(*offset, 4); *offset += sizeof(data_commit_start_cookie) + padding; return read_data_commit(file, offset); } struct commit_info * read_data_commit_backward(buffered_file *file, node_offset *offset) { *offset -= sizeof(uint32_t) + SHA256_DIGEST_SIZE + sizeof(data_commit_end_cookie); return read_data_commit(file, offset); } int addindex(permdb_object *state, const unsigned char *key, unsigned int keylength, node_offset dataoffset); int rebuild_index_file(permdb_object *state) { state->indexfile.filesize = 0; state->indexfile.datasize = 0; state->indexfile.lastcommit = state->indexfile.datasize; sha256_init(&state->indexfile.commit_checksum_context); ftruncate(state->indexfile.fd, 0); state->indexfile.lastcommit = indexfile_add_header(&state->indexfile); initial_node(state); node_offset offset = sizeof(data_file_cookie) + sizeof(uint32_t) * 3; while (1) { unsigned char *cookie = read_from_file(&state->datafile, sizeof(data_entry_cookie), offset); if (cookie == NULL) { break; } if (memcmp(&data_entry_cookie, cookie, sizeof(data_entry_cookie)) == 0) { size_t datalen; unsigned char *datakey = readdatakeyandlen(state, offset, &datalen); //fprintf(stderr, "entry %llu: %zu\n", offset, datalen); int result = addindex(state, datakey, keylen, offset); offset += sizeof(data_entry_cookie) + keylen + sizeof(uint32_t) + datalen; free(datakey); if (result != 1) { free(cookie); return -1; } } else if (memcmp(&data_commit_start_cookie, cookie, sizeof(data_commit_start_cookie)) == 0) { struct commit_info *data_commit = read_data_commit_forward(&state->datafile, &offset); //fprintf(stderr, "verifying commit: %llu %p\n", offset, data_commit); if (data_commit == NULL || validate_checksum(data_commit, &state->datafile) < 0) { free(data_commit); fprintf(stderr, "commit broken: %llu\n", (unsigned long long) offset); free(cookie); return -1; } free(data_commit); //fprintf(stderr, "commit %llu\n", offset); } else { //fprintf(stderr, "error %llu\n", offset); //print_hex(cookie, sizeof(data_entry_cookie)); free(cookie); return -1; } free(cookie); } fprintf(stderr, "index file rebuilt\n"); return committree(state); } permdb_object * permdb_alloc(const char *dbpath) { char *idxpath = NULL; if (asprintf(&idxpath, "%s.idx", dbpath) == -1) { return NULL; } int fd; int idxfd; int flags = O_RDWR|O_CREAT; fd = open(dbpath, flags, 0666); if (fd == -1) { warn("open %s", dbpath); return NULL; } idxfd = open(idxpath, flags, 0666); if (idxfd == -1) { warn("open %s", idxpath); return NULL; } free(idxpath); permdb_object *state = malloc(sizeof(permdb_object)); state->datafile.fd = fd; state->datafile.name = "datafile"; state->indexfile.fd = idxfd; state->indexfile.name = "indexfile"; state->nodecache = hashtabnewf(1000000, comparenodes, hashnode, HASHTAB_GROW); state->dirtynodes = hashtabnewf(1000000, comparenodes, hashnode, HASHTAB_GROW); off_t datafile_filesize = lseek(fd, 0, SEEK_END); if (datafile_filesize < 0) { warn("lseek %s", dbpath); return NULL; } state->datafile.filesize = (node_offset) datafile_filesize; state->datafile.datasize = state->datafile.filesize; state->datafile.lastcommit = state->datafile.datasize; state->datafile.writebufferalloc = 1024*1024; state->datafile.writebuffer = calloc(state->datafile.writebufferalloc, 1); sha256_init(&state->datafile.commit_checksum_context); off_t indexfile_filesize = lseek(idxfd, 0, SEEK_END); if (indexfile_filesize < 0) { warn("lseek %s", idxpath); return NULL; } state->indexfile.filesize = (node_offset) indexfile_filesize; state->indexfile.datasize = state->indexfile.filesize; state->indexfile.lastcommit = state->indexfile.datasize; state->indexfile.writebufferalloc = 1024*1024; state->indexfile.writebuffer = calloc(state->indexfile.writebufferalloc, 1); sha256_init(&state->indexfile.commit_checksum_context); state->error = NULL; if (state->datafile.filesize == 0 && state->indexfile.filesize == 0) { #if DEBUG_WRITE fprintf(stderr, "writing header\n"); #endif state->indexfile.lastcommit = indexfile_add_header(&state->indexfile); state->datafile.lastcommit = datafile_add_header(&state->datafile); initial_commit(state); } else if (state->datafile.filesize > 0 && state->indexfile.filesize == 0) { if (rebuild_index_file(state) < 0) { warnx("index file rebuilding failed"); permdb_free(state); return NULL; } } if (datafile_verify_file(&state->datafile) < 0) { warnx("data file verification failed"); permdb_free(state); return NULL; } if (indexfile_verify_file(&state->indexfile) < 0) { warnx("index file verification failed, rebuilding"); if (rebuild_index_file(state) < 0) { warnx("index file rebuilding failed"); permdb_free(state); return NULL; } } return state; } void permdb_free(permdb_object *state) { free(state->datafile.writebuffer); free(state->indexfile.writebuffer); delete_all_nodes_in_cache(state); delete_all_dirty_nodes(state); hashtabrelease(state->dirtynodes); hashtabrelease(state->nodecache); free(state); } static uint64_t writebuffer_length(buffered_file *file) { return (uint64_t) file->datasize - (uint64_t) file->filesize; } static int writebuffer_flush_nosync(buffered_file *file); static void writebuffer_add(buffered_file *file, const void *data, uint64_t length) { sha256_update(&file->commit_checksum_context, length, data); #if DEBUG_WRITE fprintf(stderr, "adding data to %s: ", file->name); print_hex(data, length); #endif uint64_t needspace = length + writebuffer_length(file); if (needspace > file->writebufferalloc) { int ret = writebuffer_flush_nosync(file); if (ret < 0) { err(1, "writebuffer_flush_nosync failed"); } needspace = length + writebuffer_length(file); if (needspace > file->writebufferalloc) { uint64_t newsize = file->writebufferalloc * 2; if (needspace > newsize) { newsize = needspace; } file->writebuffer = realloc(file->writebuffer, newsize); memset(file->writebuffer + file->writebufferalloc, 0, newsize - file->writebufferalloc); file->writebufferalloc = newsize; } } memcpy(file->writebuffer + writebuffer_length(file), data, length); file->datasize += length; } static int writebuffer_flush_nosync(buffered_file *file) { ssize_t ret; uint64_t length = writebuffer_length(file); ret = write(file->fd, file->writebuffer, length); if (ret != length) { return -1; } file->filesize += (node_offset) ret; return 0; } static int writebuffer_flush(buffered_file *file) { int ret; ret = writebuffer_flush_nosync(file); if (ret) { return ret; } ret = fsync(file->fd); sha256_init(&file->commit_checksum_context); #if DEBUG_WRITE fprintf(stderr, "clearing data for %s\n", file->name); #endif return ret; } static unsigned char keybits(const unsigned char *key, unsigned int level) { unsigned char b = key[level/4]; return (b >> (6-(level*2) % 8)) & 0x3; } static char * keypart(const unsigned char *key, unsigned int level) { unsigned char *result = malloc(level+1); unsigned int i; for (i = 0; i < level; i++) { unsigned char b = keybits(key, i); result[i] = b + (unsigned char) '0'; } result[level] = 0; return (char *)result; } static void addentry(node_object *node, unsigned int n, node_entry entry) { assert(n < entriespernode); assert(node->data[n] == 0); node->data[n] = entry; } static void overwriteentry(node_object *node, unsigned int n, node_entry entry) { assert(n < entriespernode); assert(node->data[n] != 0); node->data[n] = entry; } node_entry get_entry_in_node(node_object node, unsigned char n) { return node.data[n]; } static void set_error(permdb_object *state, const char *format, ...) { va_list args; int ret; va_start(args, format); char *formatted = NULL; ret = vasprintf (&formatted, format, args); if (ret == -1) { fprintf(stderr, "vasprintf error\n"); return; } if (state->error) { free(state->error); } fprintf(stderr, "error: %s\n", formatted); state->error = formatted; va_end(args); } static node_object unpacknode(permdb_object *state, const char *data, size_t datalen) { if (memcmp(&index_node_cookie, data, sizeof(index_node_cookie)) != 0) { print_hex(data, sizeof(index_node_cookie)); print_hex(&index_node_cookie, sizeof(index_node_cookie)); set_error(state, "incorrect magic (node) %02x%02x\n", (unsigned char)data[0], (unsigned char)data[1]); return errornode; } if (datalen != sizeof(node_object) + sizeof(index_node_cookie)) { return errornode; } node_object node; memcpy(&node, data + sizeof(index_node_cookie), sizeof(node)); return node; } unsigned char * read_internal_data(permdb_object *state, node_offset offset, size_t length) { unsigned char *result = malloc(length); buffered_file *file = &state->datafile; #if DEBUG_READ fprintf(stderr, "reading data: offset %llu\n", offset); #endif if (offset >= file->filesize) { node_offset writebufferoffset = offset - file->filesize; if (offset + length > file->datasize) { set_error(state, "pread: not enough data for offset %llu and length %zu\n", (long long unsigned int) offset, length); return NULL; } memcpy(result, file->writebuffer + writebufferoffset, length); } else { if (offset + length > file->filesize) { set_error(state, "pread: trying to read over file/writebuffer boundary for offset %llu and length %zu\n", (long long unsigned int) offset, length); return NULL; } ssize_t ret = pread(file->fd, result, length, (off_t) offset); if (ret != length) { free(result); set_error(state, "short pread: %zd (wanted %zu) at offset %llu\n", ret, length, (long long unsigned int) offset); return NULL; } } return result; } static int iserrornode(node_object node) { return node.data[0] == NODE_ENTRY_ERROR_NODE && node.data[1] == NODE_ENTRY_ERROR_NODE && node.data[2] == NODE_ENTRY_ERROR_NODE && node.data[3] == NODE_ENTRY_ERROR_NODE; } node_object readnode(permdb_object *state, node_offset offset, const char *cachekey) { #if DEBUG_READ fprintf(stderr, "reading node: offset %llu cachekey '%s'\n", offset, cachekey ? cachekey : "none"); #endif if (cachekey) { node_object dirtynode = get_node_from_dirtynodes(state, cachekey); if (!iserrornode(dirtynode)) { #if DEBUG_READ fprintf(stderr, "reading node: found node in dirty nodes\n"); #endif return dirtynode; } if (offset == NODE_ENTRY_DIRTY_NODE) { set_error(state, "referring to dirty node at key %s, but node not in dirtynodes\n", cachekey); #if DEBUG_READ fprintf(stderr, "reading node: referring to dirty node at key %s, but node not in dirtynodes\n", cachekey); #endif return errornode; } node_object cachednode = get_node_from_cache(state, cachekey); if (!iserrornode(cachednode)) { #if DEBUG_READ fprintf(stderr, "reading node: found node in cache\n"); #endif return cachednode; } } size_t length = sizeof(index_node_cookie) + sizeof(node_object); char *buffer = malloc(length); if (buffer == NULL) { return errornode; } ssize_t ret = pread(state->indexfile.fd, buffer, length, (off_t) offset); if (ret != length) { free(buffer); set_error(state, "node not present at %llu: length %zd\n", (long long unsigned int) offset, ret); return errornode; } node_object result = unpacknode(state, buffer, length); free(buffer); if (cachekey) { put_node_in_cache(state, cachekey, result); } #if DEBUG_READ fprintf(stderr, "reading node: success\n"); #endif return result; } static int isdata(node_entry entry) { return (entry & NODE_ENTRY_ISDATA) == NODE_ENTRY_ISDATA; } static node_offset entryoffset(node_entry entry) { return entry & NODE_ENTRY_OFFSET_MASK; } struct nodelist { node_object *nodes; unsigned int len; unsigned int pos; }; static void add_entry_to_nodelist(struct nodelist *list, node_object node) { list->nodes[list->pos] = node; list->pos++; } static void init_nodelist(struct nodelist *list) { list->len = keylen * 8 / bitsperlevel; list->nodes = malloc(sizeof(node_object) * list->len); list->pos = 0; } static void free_nodelist(struct nodelist *list) { free(list->nodes); } static char getpath(permdb_object *state, const unsigned char *key, struct nodelist *nodes) { unsigned int level = 0; #if 0 if (state->indexfile.lastcommit < (sizeof(index_node_cookie) + sizeof(node_object) + INDEX_COMMIT_TRAILER_SIZE)) { fprintf(stderr, "No commit exists (lastcommit %llu)\n", (long long unsigned int) state->indexfile.lastcommit); return -1; } #endif node_offset rootoffset = state->indexfile.lastcommit - (sizeof(index_node_cookie) + sizeof(node_object) + INDEX_COMMIT_TRAILER_SIZE); node_object node = readnode(state, rootoffset, ""); if (iserrornode(node)) { fprintf(stderr, "cannot find root node at offset %llu (lastcommit %llu)\n", (long long unsigned int) rootoffset, (long long unsigned int) state->indexfile.lastcommit); return -1; } #if DEBUG_READ fprintf(stderr, "getpath: got node\n"); #endif while (1) { unsigned char kb = keybits(key, level); node_entry entry = get_entry_in_node(node, kb); if (nodes->pos >= nodes->len) { fprintf(stderr, "tried to write after end of allocated list: level %d\n", level); return -1; } add_entry_to_nodelist(nodes, node); if (entry == 0 || isdata(entry)) { #if DEBUG_READ fprintf(stderr, "getpath: return node\n"); #endif return (char) kb; } level++; char *kp = keypart(key, level); node = readnode(state, entryoffset(entry), kp); if (iserrornode(node)) { free(kp); #if DEBUG_READ fprintf(stderr, "getpath: not found\n"); #endif return -1; } free(kp); } } static node_entry getpathlastnode(permdb_object *state, const unsigned char *key) { unsigned int level = 0; node_offset rootoffset = state->indexfile.lastcommit - (sizeof(index_node_cookie) + sizeof(node_object) + INDEX_COMMIT_TRAILER_SIZE); node_object node = readnode(state, rootoffset, ""); if (iserrornode(node)) { #if DEBUG_READ fprintf(stderr, "getpathlastnode: no node\n"); #endif return NODE_ENTRY_ERROR_NODE; } #if DEBUG_READ fprintf(stderr, "getpathlastnode: got node\n"); #endif unsigned char kb; while (1) { kb = keybits(key, level); node_entry entry = get_entry_in_node(node, kb); if (entry == 0 || isdata(entry)) { break; } level++; char *kp = keypart(key, level); node = readnode(state, entryoffset(entry), kp); free(kp); } node_entry entry = get_entry_in_node(node, kb); return entry; } static node_offset writenode(permdb_object *state, node_object node, const char *cachekey) { node_offset offset = state->indexfile.datasize; put_node_in_cache(state, cachekey, node); #if DEBUG_WRITE fprintf(stderr, "writing node: offset %llu\n", offset); #endif writebuffer_add(&state->indexfile, &index_node_cookie, sizeof(index_node_cookie)); writebuffer_add(&state->indexfile, &node, sizeof(node_object)); return offset; } node_offset datasize(permdb_object *state) { return state->indexfile.datasize; } static node_entry buildentry(int isdata, node_offset offset) { if (isdata) { return NODE_ENTRY_ISDATA | offset; } else { return offset; } } static void * memsub(void *src, size_t offset, size_t length) { void *result = malloc(length); memcpy(result, ((char *)src) + offset, length); return result; } static unsigned char * readdatakey(permdb_object *state, node_offset offset) { unsigned char *data = read_internal_data(state, offset, sizeof(data_entry_cookie) + keylen); if (data == NULL) { return NULL; } if (memcmp(&data_entry_cookie, data, sizeof(data_entry_cookie)) != 0) { free(data); set_error(state, "incorrect magic (entry) %02x%02x\n", (unsigned char)data[0], (unsigned char)data[1]); return NULL; } unsigned char *result = memsub(data, sizeof(data_entry_cookie), keylen); free(data); return result; } static uint32_t read_be32(void *ptr) { uint32_t data; memcpy(&data, ptr, sizeof(data)); return ntohl(data); } static uint16_t read_be16(void *ptr) { uint16_t data; memcpy(&data, ptr, sizeof(data)); return ntohs(data); } static unsigned char * readdatakeyandlen(permdb_object *state, node_offset offset, size_t *datalen) { unsigned char *data = read_internal_data(state, offset, sizeof(data_entry_cookie) + keylen + 4); if (data == NULL) { return NULL; } if (memcmp(&data_entry_cookie, data, sizeof(data_entry_cookie)) != 0) { free(data); set_error(state, "incorrect magic (entry) %02x%02x\n", (unsigned char)data[0], (unsigned char)data[1]); return NULL; } unsigned char *result = memsub(data, sizeof(data_entry_cookie), keylen); uint16_t nchunks = read_be16(data+sizeof(data_entry_cookie)+keylen); *datalen = read_be16(data+sizeof(data_entry_cookie)+keylen+sizeof(uint16_t)); if (nchunks != 1) { errx(1, "number of chunks is %d, but only one chunk is supported right now", nchunks); } free(data); return result; } static unsigned char * readdata(permdb_object *state, node_offset offset, size_t datalen) { return read_internal_data(state, offset + sizeof(data_entry_cookie) + keylen + 4, datalen); } static node_offset writedata(permdb_object *state, const unsigned char *key, const unsigned char *data, size_t datalength) { if (datalength > 65535) { errx(1, "data length is %zu, but only < 64K lengths are supported right now", datalength); } node_offset offset = state->datafile.datasize; #if DEBUG_WRITE fprintf(stderr, "writing data: offset %llu\n", offset); #endif writebuffer_add(&state->datafile, &data_entry_cookie, sizeof(data_entry_cookie)); writebuffer_add(&state->datafile, key, keylen); writebuffer_add_be16(&state->datafile, 1); writebuffer_add_be16(&state->datafile, datalength); writebuffer_add(&state->datafile, data, datalength); return offset; } int addindex(permdb_object *state, const unsigned char *key, unsigned int keylength, node_offset dataoffset) { struct nodelist nodes; init_nodelist(&nodes); char kb = getpath(state, key, &nodes); if (kb == -1) { free_nodelist(&nodes); return -1; } unsigned int foundlevel = nodes.pos - 1; if (foundlevel >= nodes.len) { fprintf(stderr, "tried to read after end of allocated list\n"); free_nodelist(&nodes); return 0; } node_object lastnode = nodes.nodes[foundlevel]; if (get_entry_in_node(lastnode, (unsigned char) kb) == 0) { addentry(&lastnode, keybits(key, foundlevel), buildentry(1, dataoffset)); } else { node_offset olddataoffset = entryoffset(get_entry_in_node(lastnode, (unsigned char) kb)); unsigned char *olddatakey = readdatakey(state, olddataoffset); if (olddatakey == NULL) { free_nodelist(&nodes); return -1; } if (memcmp(olddatakey, key, keylen) == 0) { free_nodelist(&nodes); free(olddatakey); return 0; } unsigned int level = foundlevel + 1; while (keybits(key, level) == keybits(olddatakey, level)) { level++; } node_object leafnode = nullnode; addentry(&leafnode, keybits(key, level), buildentry(1, dataoffset)); addentry(&leafnode, keybits(olddatakey, level), buildentry(1, olddataoffset)); free(olddatakey); { char *cachekey = keypart(key, level); put_node_in_dirtynodes(state, cachekey, leafnode); free(cachekey); } level--; while (level > foundlevel) { node_object node = nullnode; addentry(&node, keybits(key, level), NODE_ENTRY_DIRTY_NODE); char *cachekey = keypart(key, level); put_node_in_dirtynodes(state, cachekey, node); free(cachekey); level--; } overwriteentry(&lastnode, keybits(key, foundlevel), NODE_ENTRY_DIRTY_NODE); } int level = (int) foundlevel; { char *cachekey = keypart(key, (unsigned int) level); put_node_in_dirtynodes(state, cachekey, lastnode); free(cachekey); } level--; while (level >= 0) { if (level >= (int) nodes.len) { fprintf(stderr, "tried to read after end of allocated list\n"); free_nodelist(&nodes); return 0; } node_object node = nodes.nodes[level]; overwriteentry(&node, keybits(key, (unsigned int) level), NODE_ENTRY_DIRTY_NODE); char *cachekey = keypart(key, (unsigned int) level); put_node_in_dirtynodes(state, cachekey, node); free(cachekey); level--; } free_nodelist(&nodes); return 1; } int addvalue(permdb_object *state, const unsigned char *key, unsigned int keylength, const unsigned char *data, size_t datalength) { struct nodelist nodes; init_nodelist(&nodes); char kb = getpath(state, key, &nodes); if (kb == -1) { free_nodelist(&nodes); return -1; } unsigned int foundlevel = nodes.pos - 1; if (foundlevel >= nodes.len) { fprintf(stderr, "tried to read after end of allocated list\n"); free_nodelist(&nodes); return 0; } node_object lastnode = nodes.nodes[foundlevel]; if (get_entry_in_node(lastnode, (unsigned char) kb) == 0) { node_offset dataoffset = writedata(state, key, data, datalength); addentry(&lastnode, keybits(key, foundlevel), buildentry(1, dataoffset)); } else { node_offset olddataoffset = entryoffset(get_entry_in_node(lastnode, (unsigned char) kb)); unsigned char *olddatakey = readdatakey(state, olddataoffset); if (olddatakey == NULL) { free_nodelist(&nodes); return -1; } if (memcmp(olddatakey, key, keylen) == 0) { free_nodelist(&nodes); free(olddatakey); return 0; } node_offset dataoffset = writedata(state, key, data, datalength); unsigned int level = foundlevel + 1; while (keybits(key, level) == keybits(olddatakey, level)) { level++; } node_object leafnode = nullnode; addentry(&leafnode, keybits(key, level), buildentry(1, dataoffset)); addentry(&leafnode, keybits(olddatakey, level), buildentry(1, olddataoffset)); free(olddatakey); { char *cachekey = keypart(key, level); put_node_in_dirtynodes(state, cachekey, leafnode); free(cachekey); } level--; while (level > foundlevel) { node_object node = nullnode; addentry(&node, keybits(key, level), NODE_ENTRY_DIRTY_NODE); char *cachekey = keypart(key, level); put_node_in_dirtynodes(state, cachekey, node); free(cachekey); level--; } overwriteentry(&lastnode, keybits(key, foundlevel), NODE_ENTRY_DIRTY_NODE); } int level = (int) foundlevel; { char *cachekey = keypart(key, (unsigned int) level); put_node_in_dirtynodes(state, cachekey, lastnode); free(cachekey); } level--; while (level >= 0) { if (level >= (int) nodes.len) { fprintf(stderr, "tried to read after end of allocated list\n"); free_nodelist(&nodes); return 0; } node_object node = nodes.nodes[level]; overwriteentry(&node, keybits(key, (unsigned int) level), NODE_ENTRY_DIRTY_NODE); char *cachekey = keypart(key, (unsigned int) level); put_node_in_dirtynodes(state, cachekey, node); free(cachekey); level--; } free_nodelist(&nodes); return 1; } unsigned char * getvalue(permdb_object *state, const unsigned char *key, size_t keylength, size_t *datalen) { node_entry entry = getpathlastnode(state, key); if (entry == 0) { #if DEBUG_READ fprintf(stderr, "getvalue: no node\n"); #endif return NULL; } #if DEBUG_READ fprintf(stderr, "getvalue: got node\n"); #endif node_offset olddataoffset = entryoffset(entry); unsigned char *datakey = readdatakeyandlen(state, olddataoffset, datalen); if (datakey == NULL) { return NULL; } if (memcmp(datakey, key, keylength) != 0) { free(datakey); return NULL; } free(datakey); return readdata(state, olddataoffset, *datalen); } struct keylist { char **keys; int pos; }; static int add_entry_to_keylist(void *ptr, void *arg) { struct keylist *list = (struct keylist *) arg; struct nodecache *node = (struct nodecache *)ptr; list->keys[list->pos] = strdup(node->key); list->pos++; return 0; } static int string_length_comparison(const void *a_v, const void *b_v) { char * const *a = (char * const *) a_v; char * const *b = (char * const *) b_v; size_t a_len = strlen(*a); size_t b_len = strlen(*b); if (b_len > a_len) { return 1; } else if (b_len == a_len) { return 0; } else { return -1; } } static char ** sorted(permdb_object *state, unsigned int ndirtynodes) { struct keylist keylist; keylist.keys = malloc(sizeof(char *) * ndirtynodes); keylist.pos = 0; hashtabforeach(state->dirtynodes, add_entry_to_keylist, &keylist); #if 0 fprintf(stderr, "before sort\n"); for (int i = 0; i < ndirtynodes; i++) { fprintf(stderr, " %s\n", keylist.keys[i]); } #endif qsort(keylist.keys, ndirtynodes, sizeof(char *), string_length_comparison); #if 0 fprintf(stderr, "after sort\n"); for (int i = 0; i < ndirtynodes; i++) { fprintf(stderr, " %p %s\n", keylist.keys[i], keylist.keys[i]); } #endif return keylist.keys; } static int count_entry(void *ptr, void *arg) { int *flag = (int *)arg; *flag += 1; return 0; } int committree(permdb_object *state) { get_node_from_dirtynodes(state, ""); unsigned int ndirtynodes = 0; hashtabforeach(state->dirtynodes, count_entry, &ndirtynodes); if (ndirtynodes == 0) { return 0; } char **commitlist = sorted(state, ndirtynodes); char *lastobject = commitlist[ndirtynodes - 1]; if (lastobject[0] != '\0') { fprintf(stderr, "sorted list doesn't end with root node\n"); free(commitlist); return -1; } unsigned int ncommits = ndirtynodes; unsigned int i; #if DEBUG_WRITE fprintf(stderr, "committing %d dirty nodes at offset %llu\n", ncommits, state->indexfile.datasize); #endif for (i = 0; i < ncommits; i++) { get_node_from_dirtynodes(state, ""); char *key = commitlist[i]; node_object node = get_node_from_dirtynodes(state, key); assert(get_entry_in_node(node, 0) != NODE_ENTRY_DIRTY_NODE); assert(get_entry_in_node(node, 1) != NODE_ENTRY_DIRTY_NODE); assert(get_entry_in_node(node, 2) != NODE_ENTRY_DIRTY_NODE); assert(get_entry_in_node(node, 3) != NODE_ENTRY_DIRTY_NODE); node_offset offset = writenode(state, node, key); size_t keylength = strlen(key); if (keylength != 0) { get_node_from_dirtynodes(state, ""); char *parent = strdup(key); parent[keylength - 1] = '\0'; unsigned int entrynumber = (unsigned int) (key[keylength - 1] - '0'); node_object parentnode = get_node_from_dirtynodes(state, parent); overwriteentry(&parentnode, entrynumber, offset); put_node_in_dirtynodes(state, parent, parentnode); free(parent); } free(key); } free(commitlist); #if DEBUG_WRITE fprintf(stderr, "writing data commit trailer at offset %llu\n", state->datafile.datasize); #endif int data_commit_padding_size = calc_padding(state->datafile.datasize, 4); uint8_t padding[4] = {0, 0, 0, 0}; unsigned char data_commit_checksum[SHA256_DIGEST_SIZE]; writebuffer_add(&state->datafile, data_commit_start_cookie, 8); writebuffer_add(&state->datafile, padding, data_commit_padding_size); writebuffer_add_be32(&state->datafile, state->datafile.datasize - state->datafile.lastcommit + sizeof(uint32_t)); sha256_digest(&state->datafile.commit_checksum_context, SHA256_DIGEST_SIZE, data_commit_checksum); writebuffer_add(&state->datafile, data_commit_checksum, SHA256_DIGEST_SIZE); writebuffer_add(&state->datafile, &data_commit_end_cookie, sizeof(data_commit_end_cookie)); #if DEBUG_WRITE fprintf(stderr, "finished writing data commit trailer at offset %llu\n", state->datafile.datasize); #endif if (writebuffer_flush(&state->datafile) == -1) { set_error(state, "data file flushing failed\n"); return -1; } state->datafile.lastcommit = state->datafile.datasize; #if DEBUG_WRITE fprintf(stderr, "writing index commit trailer at offset %llu\n", state->indexfile.datasize); #endif uint64_t index_commit_length = state->indexfile.datasize - state->indexfile.lastcommit + sizeof(uint64_t); unsigned char index_commit_checksum[SHA256_DIGEST_SIZE]; writebuffer_add_host64(&state->indexfile, index_commit_length); sha256_digest(&state->indexfile.commit_checksum_context, SHA256_DIGEST_SIZE, index_commit_checksum); writebuffer_add(&state->indexfile, index_commit_checksum, SHA256_DIGEST_SIZE); writebuffer_add(&state->indexfile, &index_commit_cookie, sizeof(index_commit_cookie)); #if DEBUG_WRITE fprintf(stderr, "finished writing index commit trailer at offset %llu\n", state->indexfile.datasize); #endif if (writebuffer_flush(&state->indexfile) == -1) { set_error(state, "index file flushing failed\n"); return -1; } state->indexfile.lastcommit = state->indexfile.datasize; #if DEBUG_WRITE fprintf(stderr, "setting lastcommit to %llu\n", state->indexfile.lastcommit); #endif delete_all_dirty_nodes(state); return 0; } void portloop(permdb_object *state) { unsigned char buf[65536]; ssize_t len; while ((len = read_command(buf, sizeof(buf)-1, 4)) > 0) { if (buf[0] == 0) { #if DEBUG_PORT fprintf(stderr, "get\n"); #endif if (len != keylen+1) { write_reply(NULL, 0, 4); continue; } assert(len == keylen+1); size_t datalen; unsigned char *result = getvalue(state, (buf+1), keylen, &datalen); if (result == NULL) { write_reply(NULL, 0, 4); } else { write_reply(result, datalen, 4); } #if DEBUG_PORT fprintf(stderr, "get reply\n"); #endif free(result); } else if (buf[0] == 1) { #if DEBUG_PORT fprintf(stderr, "add\n"); #endif if (len < (keylen + 1)) { write_reply(NULL, 0, 4); fprintf(stderr, "invalid addvalue command, length was %zd\n", len); continue; } size_t datalen = (size_t) (len - keylen - 1); unsigned char *key = buf + 1; unsigned char *data = key + keylen; int result = addvalue(state, key, keylen, data, datalen); if (result < 0) { write_reply(NULL, 0, 4); } else { unsigned char result_byte = (unsigned char) result; write_reply(&result_byte, 1, 4); } #if DEBUG_PORT fprintf(stderr, "add reply\n"); #endif } else if (buf[0] == 2) { #if DEBUG_PORT fprintf(stderr, "commit\n"); #endif if (len != 1) { write_reply(NULL, 0, 4); fprintf(stderr, "invalid commit command, length was %zd\n", len); continue; } int result = committree(state); if (result < 0) { write_reply(NULL, 0, 4); continue; } else { unsigned char result_byte = (unsigned char) result; write_reply(&result_byte, 1, 4); } #if DEBUG_PORT fprintf(stderr, "commit reply\n"); #endif } else { #if DEBUG_PORT fprintf(stderr, "unknown command\n"); #endif write_reply(NULL, 0, 4); } } if (len == -1) { fprintf(stderr, "read command failed\n"); } permdb_free(state); }