# A database storing dictionaries, keyed on a timestamp. value = A # dict which will be stored as a JSON object encoded in UTF-8. Note # that dict keys of type integer or float will become strings while # values will keep their type. # Note that there's a (slim) chance that you'd stomp on the previous # value if you're too quick with generating the timestamps, ie # invoking time.time() several times quickly enough. from typing import Dict, List, Tuple, Union, Any import os import sys import time from src import couch from .schema import as_index_list, validate_collector_data class DictDB: def __init__(self) -> None: """ Check if the database exists, otherwise we will create it together with the indexes specified in index.py. """ print(os.environ) try: self.database = os.environ["COUCHDB_NAME"] self.hostname = os.environ["COUCHDB_HOSTNAME"] self.username = os.environ["COUCHDB_USER"] self.password = os.environ["COUCHDB_PASSWORD"] except KeyError: print( "The environment variables COUCHDB_NAME, COUCHDB_HOSTNAME," + " COUCHDB_USER and COUCHDB_PASSWORD must be set." ) sys.exit(-1) if "COUCHDB_PORT" in os.environ: couchdb_port = os.environ["COUCHDB_PORT"] else: couchdb_port = "5984" self.server = couch.client.Server(f"http://{self.username}:{self.password}@{self.hostname}:{couchdb_port}/") try: self.couchdb = self.server.database(self.database) print("Database already exists") except couch.exceptions.NotFound: print("Creating database and indexes.") self.couchdb = self.server.create(self.database) for i in as_index_list(): self.couchdb.index(i) self._ts = time.time() def unique_key(self) -> int: """ Create a unique key based on the current time. We will use this as the ID for any new documents we store in CouchDB. """ ts = time.time() while round(ts * 1000) == self._ts: ts = time.time() self._ts = round(ts * 1000) return self._ts # Why batch_write??? def add(self, data: Union[List[Dict[str, Any]], Dict[str, Any]]) -> Union[str, Tuple[str, str]]: """ Store a document in CouchDB. """ if isinstance(data, List): for item in data: error = validate_collector_data(item) if error != "": return error item["_id"] = str(self.unique_key()) ret: Tuple[str, str] = self.couchdb.save_bulk(data) else: error = validate_collector_data(data) if error != "": return error data["_id"] = str(self.unique_key()) ret = self.couchdb.save(data) return ret def get(self, key: int) -> Dict[str, Any]: """ Get a document based on its ID, return an empty dict if not found. """ try: doc: Dict[str, Any] = self.couchdb.get(key) except couch.exceptions.NotFound: doc = {} return doc # # def slice(self, key_from=None, key_to=None): # pass def search(self, limit: int = 25, skip: int = 0, **kwargs: Any) -> List[Dict[str, Any]]: """ Execute a Mango query, ideally we should have an index matching the query otherwise things will be slow. """ data: List[Dict[str, Any]] = [] selector: Dict[str, Any] = {} try: limit = int(limit) skip = int(skip) except ValueError: limit = 25 skip = 0 if kwargs: selector = {"limit": limit, "skip": skip, "selector": {}} for key in kwargs: if kwargs[key] and kwargs[key].isnumeric(): kwargs[key] = int(kwargs[key]) selector["selector"][key] = {"$eq": kwargs[key]} for doc in self.couchdb.find(selector, wrapper=None, limit=5): data.append(doc) return data def delete(self, key: int) -> Union[int, None]: """ Delete a document based on its ID. """ try: self.couchdb.delete(key) except couch.exceptions.NotFound: return None return key