diff options
Diffstat (limited to 'src/collector/db.py')
-rwxr-xr-x | src/collector/db.py | 148 |
1 files changed, 148 insertions, 0 deletions
diff --git a/src/collector/db.py b/src/collector/db.py new file mode 100755 index 0000000..0bfa014 --- /dev/null +++ b/src/collector/db.py @@ -0,0 +1,148 @@ +# A database storing dictionaries, keyed on a timestamp. value = A +# dict which will be stored as a JSON object encoded in UTF-8. Note +# that dict keys of type integer or float will become strings while +# values will keep their type. + +# Note that there's a (slim) chance that you'd stomp on the previous +# value if you're too quick with generating the timestamps, ie +# invoking time.time() several times quickly enough. + +from typing import Dict, List, Tuple, Union, Any +import os +import sys +import time + +from src import couch +from .schema import as_index_list, validate_collector_data + + +class DictDB: + def __init__(self) -> None: + """ + Check if the database exists, otherwise we will create it together + with the indexes specified in index.py. + """ + + print(os.environ) + + try: + self.database = os.environ["COUCHDB_NAME"] + self.hostname = os.environ["COUCHDB_HOSTNAME"] + self.username = os.environ["COUCHDB_USER"] + self.password = os.environ["COUCHDB_PASSWORD"] + except KeyError: + print( + "The environment variables COUCHDB_NAME, COUCHDB_HOSTNAME," + + " COUCHDB_USER and COUCHDB_PASSWORD must be set." + ) + sys.exit(-1) + + if "COUCHDB_PORT" in os.environ: + couchdb_port = os.environ["COUCHDB_PORT"] + else: + couchdb_port = "5984" + + self.server = couch.client.Server(f"http://{self.username}:{self.password}@{self.hostname}:{couchdb_port}/") + + try: + self.couchdb = self.server.database(self.database) + print("Database already exists") + except couch.exceptions.NotFound: + print("Creating database and indexes.") + self.couchdb = self.server.create(self.database) + + for i in as_index_list(): + self.couchdb.index(i) + + self._ts = time.time() + + def unique_key(self) -> int: + """ + Create a unique key based on the current time. We will use this as + the ID for any new documents we store in CouchDB. + """ + + ts = time.time() + while round(ts * 1000) == self._ts: + ts = time.time() + self._ts = round(ts * 1000) + + return self._ts + + # Why batch_write??? + def add(self, data: Union[List[Dict[str, Any]], Dict[str, Any]]) -> Union[str, Tuple[str, str]]: + """ + Store a document in CouchDB. + """ + + if isinstance(data, List): + for item in data: + error = validate_collector_data(item) + if error != "": + return error + item["_id"] = str(self.unique_key()) + ret: Tuple[str, str] = self.couchdb.save_bulk(data) + else: + error = validate_collector_data(data) + if error != "": + return error + data["_id"] = str(self.unique_key()) + ret = self.couchdb.save(data) + + return ret + + def get(self, key: int) -> Dict[str, Any]: + """ + Get a document based on its ID, return an empty dict if not found. + """ + + try: + doc: Dict[str, Any] = self.couchdb.get(key) + except couch.exceptions.NotFound: + doc = {} + + return doc + + # + # def slice(self, key_from=None, key_to=None): + # pass + + def search(self, limit: int = 25, skip: int = 0, **kwargs: Any) -> List[Dict[str, Any]]: + """ + Execute a Mango query, ideally we should have an index matching + the query otherwise things will be slow. + """ + + data: List[Dict[str, Any]] = [] + selector: Dict[str, Any] = {} + + try: + limit = int(limit) + skip = int(skip) + except ValueError: + limit = 25 + skip = 0 + + if kwargs: + selector = {"limit": limit, "skip": skip, "selector": {}} + + for key in kwargs: + if kwargs[key] and kwargs[key].isnumeric(): + kwargs[key] = int(kwargs[key]) + selector["selector"][key] = {"$eq": kwargs[key]} + + for doc in self.couchdb.find(selector, wrapper=None, limit=5): + data.append(doc) + + return data + + def delete(self, key: int) -> Union[int, None]: + """ + Delete a document based on its ID. + """ + try: + self.couchdb.delete(key) + except couch.exceptions.NotFound: + return None + + return key |