From 8baecf339e8061160bee519e87ffe837d1525c18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Victor=20N=C3=A4slund?= Date: Wed, 2 Nov 2022 15:31:23 +0100 Subject: more freshup --- src/collector/__init__.py | 4 + src/collector/db.py | 148 +++++++++++++++++++++++++ src/collector/main.py | 267 ++++++++++++++++++++++++++++++++++++++++++++++ src/collector/py.typed | 0 src/collector/schema.py | 136 +++++++++++++++++++++++ src/couch/__init__.py | 2 +- src/couch/client.py | 82 ++++++++------ src/couch/feedreader.py | 8 +- src/couch/resource.py | 60 ++++++----- src/couch/utils.py | 6 +- src/db.py | 148 ------------------------- src/main.py | 258 -------------------------------------------- src/schema.py | 136 ----------------------- src/test/__init__.py | 0 src/test/test_api.py | 232 ---------------------------------------- 15 files changed, 647 insertions(+), 840 deletions(-) create mode 100644 src/collector/__init__.py create mode 100755 src/collector/db.py create mode 100755 src/collector/main.py create mode 100644 src/collector/py.typed create mode 100644 src/collector/schema.py delete mode 100755 src/db.py delete mode 100755 src/main.py delete mode 100644 src/schema.py delete mode 100644 src/test/__init__.py delete mode 100644 src/test/test_api.py (limited to 'src') diff --git a/src/collector/__init__.py b/src/collector/__init__.py new file mode 100644 index 0000000..6530fdd --- /dev/null +++ b/src/collector/__init__.py @@ -0,0 +1,4 @@ +"""Collector +""" + +__version__ = "1.03" diff --git a/src/collector/db.py b/src/collector/db.py new file mode 100755 index 0000000..0bfa014 --- /dev/null +++ b/src/collector/db.py @@ -0,0 +1,148 @@ +# A database storing dictionaries, keyed on a timestamp. value = A +# dict which will be stored as a JSON object encoded in UTF-8. Note +# that dict keys of type integer or float will become strings while +# values will keep their type. + +# Note that there's a (slim) chance that you'd stomp on the previous +# value if you're too quick with generating the timestamps, ie +# invoking time.time() several times quickly enough. + +from typing import Dict, List, Tuple, Union, Any +import os +import sys +import time + +from src import couch +from .schema import as_index_list, validate_collector_data + + +class DictDB: + def __init__(self) -> None: + """ + Check if the database exists, otherwise we will create it together + with the indexes specified in index.py. + """ + + print(os.environ) + + try: + self.database = os.environ["COUCHDB_NAME"] + self.hostname = os.environ["COUCHDB_HOSTNAME"] + self.username = os.environ["COUCHDB_USER"] + self.password = os.environ["COUCHDB_PASSWORD"] + except KeyError: + print( + "The environment variables COUCHDB_NAME, COUCHDB_HOSTNAME," + + " COUCHDB_USER and COUCHDB_PASSWORD must be set." + ) + sys.exit(-1) + + if "COUCHDB_PORT" in os.environ: + couchdb_port = os.environ["COUCHDB_PORT"] + else: + couchdb_port = "5984" + + self.server = couch.client.Server(f"http://{self.username}:{self.password}@{self.hostname}:{couchdb_port}/") + + try: + self.couchdb = self.server.database(self.database) + print("Database already exists") + except couch.exceptions.NotFound: + print("Creating database and indexes.") + self.couchdb = self.server.create(self.database) + + for i in as_index_list(): + self.couchdb.index(i) + + self._ts = time.time() + + def unique_key(self) -> int: + """ + Create a unique key based on the current time. We will use this as + the ID for any new documents we store in CouchDB. + """ + + ts = time.time() + while round(ts * 1000) == self._ts: + ts = time.time() + self._ts = round(ts * 1000) + + return self._ts + + # Why batch_write??? + def add(self, data: Union[List[Dict[str, Any]], Dict[str, Any]]) -> Union[str, Tuple[str, str]]: + """ + Store a document in CouchDB. + """ + + if isinstance(data, List): + for item in data: + error = validate_collector_data(item) + if error != "": + return error + item["_id"] = str(self.unique_key()) + ret: Tuple[str, str] = self.couchdb.save_bulk(data) + else: + error = validate_collector_data(data) + if error != "": + return error + data["_id"] = str(self.unique_key()) + ret = self.couchdb.save(data) + + return ret + + def get(self, key: int) -> Dict[str, Any]: + """ + Get a document based on its ID, return an empty dict if not found. + """ + + try: + doc: Dict[str, Any] = self.couchdb.get(key) + except couch.exceptions.NotFound: + doc = {} + + return doc + + # + # def slice(self, key_from=None, key_to=None): + # pass + + def search(self, limit: int = 25, skip: int = 0, **kwargs: Any) -> List[Dict[str, Any]]: + """ + Execute a Mango query, ideally we should have an index matching + the query otherwise things will be slow. + """ + + data: List[Dict[str, Any]] = [] + selector: Dict[str, Any] = {} + + try: + limit = int(limit) + skip = int(skip) + except ValueError: + limit = 25 + skip = 0 + + if kwargs: + selector = {"limit": limit, "skip": skip, "selector": {}} + + for key in kwargs: + if kwargs[key] and kwargs[key].isnumeric(): + kwargs[key] = int(kwargs[key]) + selector["selector"][key] = {"$eq": kwargs[key]} + + for doc in self.couchdb.find(selector, wrapper=None, limit=5): + data.append(doc) + + return data + + def delete(self, key: int) -> Union[int, None]: + """ + Delete a document based on its ID. + """ + try: + self.couchdb.delete(key) + except couch.exceptions.NotFound: + return None + + return key diff --git a/src/collector/main.py b/src/collector/main.py new file mode 100755 index 0000000..c363885 --- /dev/null +++ b/src/collector/main.py @@ -0,0 +1,267 @@ +from typing import Dict, Union, List, Callable, Awaitable, Any +import json +import os +import sys +import time + +import uvicorn +from fastapi import Depends, FastAPI, Request, Response +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +from fastapi_jwt_auth import AuthJWT +from fastapi_jwt_auth.auth_config import AuthConfig +from fastapi_jwt_auth.exceptions import AuthJWTException +from pydantic import BaseModel + +from .db import DictDB +from .schema import get_index_keys, validate_collector_data + +app = FastAPI() + +app.add_middleware( + CORSMiddleware, + allow_origins=["http://localhost:8001"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + expose_headers=["X-Total-Count"], +) + +# TODO: X-Total-Count + + +@app.middleware("http") +async def mock_x_total_count_header(request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response: + + print(type(call_next)) + + response: Response = await call_next(request) + response.headers["X-Total-Count"] = "100" + return response + + +for i in range(10): + try: + db = DictDB() + except Exception as e: + print(f"Database not responding, will try again soon. Attempt {i + 1} of 10.") + else: + break + time.sleep(1) +else: + print("Database did not respond after 10 attempts, quitting.") + sys.exit(-1) + + +def get_pubkey() -> str: + try: + if "JWT_PUBKEY_PATH" in os.environ: + keypath = os.environ["JWT_PUBKEY_PATH"] + else: + keypath = "/opt/certs/public.pem" + + with open(keypath, "r") as fd: + pubkey = fd.read() + except FileNotFoundError: + print(f"Could not find JWT certificate in {keypath}") + sys.exit(-1) + + return pubkey + + +def get_data( + key: Union[int, None] = None, + limit: int = 25, + skip: int = 0, + ip: Union[str, None] = None, + port: Union[int, None] = None, + asn: Union[str, None] = None, + domain: Union[str, None] = None, +) -> List[Dict[str, Any]]: + if key: + return [db.get(key)] + + selectors: Dict[str, Any] = {} + indexes = get_index_keys() + selectors["domain"] = domain + + if ip and "ip" in indexes: + selectors["ip"] = ip + if port and "port" in indexes: + selectors["port"] = port + if asn and "asn" in indexes: + selectors["asn"] = asn + + data: List[Dict[str, Any]] = db.search(**selectors, limit=limit, skip=skip) + + return data + + +class JWTConfig(BaseModel): + authjwt_algorithm: str = "ES256" + authjwt_public_key: str = get_pubkey() + + +@AuthJWT.load_config # type: ignore +def jwt_config(): + return JWTConfig() + + +@app.exception_handler(AuthJWTException) +def authjwt_exception_handler(request: Request, exc: AuthJWTException) -> JSONResponse: + return JSONResponse(content={"status": "error", "message": exc.message}, status_code=400) + + +@app.exception_handler(RuntimeError) +def app_exception_handler(request: Request, exc: RuntimeError) -> JSONResponse: + return JSONResponse(content={"status": "error", "message": str(exc.with_traceback(None))}, status_code=400) + + +@app.get("/sc/v0/get") +async def get( + key: Union[int, None] = None, + limit: int = 25, + skip: int = 0, + ip: Union[str, None] = None, + port: Union[int, None] = None, + asn: Union[str, None] = None, + Authorize: AuthJWT = Depends(), +) -> JSONResponse: + + Authorize.jwt_required() + + data = [] + raw_jwt = Authorize.get_raw_jwt() + + if "read" not in raw_jwt: + return JSONResponse( + content={ + "status": "error", + "message": "Could not find read claim in JWT token", + }, + status_code=400, + ) + else: + domains = raw_jwt["read"] + + for domain in domains: + data.extend(get_data(key, limit, skip, ip, port, asn, domain)) + + return JSONResponse(content={"status": "success", "docs": data}) + + +@app.get("/sc/v0/get/{key}") +async def get_key(key: Union[int, None] = None, Authorize: AuthJWT = Depends()) -> JSONResponse: + + Authorize.jwt_required() + + raw_jwt = Authorize.get_raw_jwt() + + if "read" not in raw_jwt: + return JSONResponse( + content={ + "status": "error", + "message": "Could not find read claim in JWT token", + }, + status_code=400, + ) + else: + allowed_domains = raw_jwt["read"] + + data_list = get_data(key) + + # Handle if missing + data = data_list[0] + + if data and data["domain"] not in allowed_domains: + return JSONResponse( + content={ + "status": "error", + "message": "User not authorized to view this object", + }, + status_code=400, + ) + + return JSONResponse(content={"status": "success", "docs": data}) + + +# WHY IS AUTH OUTCOMMENTED??? +@app.post("/sc/v0/add") +async def add(data: Request, Authorize: AuthJWT = Depends()) -> JSONResponse: + # Authorize.jwt_required() + + try: + json_data = await data.json() + except json.decoder.JSONDecodeError: + return JSONResponse( + content={ + "status": "error", + "message": "Invalid JSON.", + }, + status_code=400, + ) + + key = db.add(json_data) + + if isinstance(key, str): + return JSONResponse( + content={ + "status": "error", + "message": key, + }, + status_code=400, + ) + + return JSONResponse(content={"status": "success", "docs": key}) + + +@app.delete("/sc/v0/delete/{key}") +async def delete(key: int, Authorize: AuthJWT = Depends()) -> JSONResponse: + + Authorize.jwt_required() + + raw_jwt = Authorize.get_raw_jwt() + + if "write" not in raw_jwt: + return JSONResponse( + content={ + "status": "error", + "message": "Could not find write claim in JWT token", + }, + status_code=400, + ) + else: + allowed_domains = raw_jwt["write"] + + data_list = get_data(key) + + # Handle if missing + data = data_list[0] + + if data and data["domain"] not in allowed_domains: + return JSONResponse( + content={ + "status": "error", + "message": "User not authorized to delete this object", + }, + status_code=400, + ) + + if db.delete(key) is None: + return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) + + return JSONResponse(content={"status": "success", "docs": data}) + + +# def main(standalone: bool = False): +# print(type(app)) +# if not standalone: +# return app + +# uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug") + + +# if __name__ == "__main__": +# main(standalone=True) +# else: +# app = main() diff --git a/src/collector/py.typed b/src/collector/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/collector/schema.py b/src/collector/schema.py new file mode 100644 index 0000000..e291f10 --- /dev/null +++ b/src/collector/schema.py @@ -0,0 +1,136 @@ +from typing import List, Any, Dict +import json +import sys +import traceback + +import jsonschema + +# fmt:off +# NOTE: Commented out properties are left intentionally, so it is easier to see +# what properties are optional. +schema = { + "$schema": "http://json-schema.org/schema#", + "type": "object", + "properties": { + "document_version": {"type": "integer"}, + "ip": {"type": "string"}, + "port": {"type": "integer"}, + "whois_description": {"type": "string"}, + "asn": {"type": "string"}, + "asn_country_code": {"type": "string"}, + "ptr": {"type": "string"}, + "abuse_mail": {"type": "string"}, + "domain": {"type": "string"}, + "timestamp": {"type": "string", "format": "date-time"}, + "display_name": {"type": "string"}, + "description": {"type": "string"}, + "custom_data": { + "type": "object", + "patternProperties": { + ".*": { + "type": "object", + "properties": { + "display_name": {"type": "string"}, + "data": {"type": ["string", "boolean", "integer"]}, + "description": {"type": "string"}, + }, + "required": [ + "display_name", + "data", + # "description" + ] + }, + }, + }, + "result": { + "type": "object", + "patternProperties": { + ".*": { + "type": "object", + "properties": { + "display_name": {"type": "string"}, + "vulnerable": {"type": "boolean"}, + "investigation_needed": {"type": "boolean"}, + "reliability": {"type": "integer"}, + "description": {"type": "string"}, + }, + "oneOf": [ + { + "required": [ + "display_name", + "vulnerable", + # "reliability", # TODO: reliability is required if vulnerable = true + # "description", + ] + }, + { + "required": [ + "display_name", + "investigation_needed", + # "reliability", # TODO: reliability is required if investigation_needed = true + # "description", + ] + }, + ] + }, + }, + }, + }, + "required": [ + "document_version", + "ip", + "port", + "whois_description", + "asn", + "asn_country_code", + "ptr", + "abuse_mail", + "domain", + "timestamp", + "display_name", + # "description", + # "custom_data", + "result", + ], +} +# fmt:on + + +def get_index_keys() -> List[str]: + keys: List[str] = [] + for key in schema["properties"]: + keys.append(key) + return keys + + +def as_index_list() -> List[Dict[str, Any]]: + index_list: List[Dict[str, Any]] = [] + for key in schema["properties"]: + name = f"{key}-json-index" + index = { + "index": { + "fields": [ + key, + ] + }, + "name": name, + "type": "json", + } + index_list.append(index) + + return index_list + + +def validate_collector_data(json_blob: Dict[str, Any]) -> str: + try: + jsonschema.validate(json_blob, schema, format_checker=jsonschema.FormatChecker()) + except jsonschema.exceptions.ValidationError as e: + return f"Validation failed with error: {e.message}" + return "" + + +if __name__ == "__main__": + with open(sys.argv[1]) as fd: + json_data = json.loads(fd.read()) + + print(validate_collector_data(json_data)) diff --git a/src/couch/__init__.py b/src/couch/__init__.py index a7537bc..64e0252 100644 --- a/src/couch/__init__.py +++ b/src/couch/__init__.py @@ -8,4 +8,4 @@ __email__ = "rinat.sabitov@gmail.com" __status__ = "Development" -from couch.client import Server # noqa: F401 +from .client import Server # noqa: F401 diff --git a/src/couch/client.py b/src/couch/client.py index 52477be..96dc78a 100644 --- a/src/couch/client.py +++ b/src/couch/client.py @@ -8,10 +8,24 @@ import copy import mimetypes import warnings -from couch import utils -from couch import feedreader -from couch import exceptions as exp -from couch.resource import Resource +from .utils import ( + force_bytes, + force_text, + encode_view_options, + extract_credentials, +) +from .feedreader import ( + SimpleFeedReader, + BaseFeedReader, +) + +from .exceptions import ( + Conflict, + NotFound, + FeedReaderExited, + UnexpectedError, +) +from .resource import Resource DEFAULT_BASE_URL = os.environ.get('COUCHDB_URL', 'http://localhost:5984/') @@ -25,16 +39,16 @@ def _id_to_path(_id: str) -> str: def _listen_feed(object, node, feed_reader, **kwargs): if not callable(feed_reader): - raise exp.UnexpectedError("feed_reader must be callable or class") + raise UnexpectedError("feed_reader must be callable or class") - if isinstance(feed_reader, feedreader.BaseFeedReader): + if isinstance(feed_reader, BaseFeedReader): reader = feed_reader(object) else: - reader = feedreader.SimpleFeedReader()(object, feed_reader) + reader = SimpleFeedReader()(object, feed_reader) # Possible options: "continuous", "longpoll" kwargs.setdefault("feed", "continuous") - data = utils.force_bytes(json.dumps(kwargs.pop('data', {}))) + data = force_bytes(json.dumps(kwargs.pop('data', {}))) (resp, result) = object.resource(node).post( params=kwargs, data=data, stream=True) @@ -44,8 +58,8 @@ def _listen_feed(object, node, feed_reader, **kwargs): if not line: reader.on_heartbeat() else: - reader.on_message(json.loads(utils.force_text(line))) - except exp.FeedReaderExited: + reader.on_message(json.loads(force_text(line))) + except FeedReaderExited: reader.on_close() @@ -100,7 +114,7 @@ class Server(object): def __init__(self, base_url=DEFAULT_BASE_URL, full_commit=True, authmethod="basic", verify=False): - self.base_url, credentials = utils.extract_credentials(base_url) + self.base_url, credentials = extract_credentials(base_url) self.resource = Resource(self.base_url, full_commit, credentials=credentials, authmethod=authmethod, @@ -112,7 +126,7 @@ class Server(object): def __contains__(self, name): try: self.resource.head(name) - except exp.NotFound: + except NotFound: return False else: return True @@ -158,7 +172,7 @@ class Server(object): """ (r, result) = self.resource.head(name) if r.status_code == 404: - raise exp.NotFound("Database '{0}' does not exists".format(name)) + raise NotFound("Database '{0}' does not exists".format(name)) db = Database(self.resource(name), name) return db @@ -206,7 +220,7 @@ class Server(object): data = {'source': source, 'target': target} data.update(kwargs) - data = utils.force_bytes(json.dumps(data)) + data = force_bytes(json.dumps(data)) (resp, result) = self.resource.post('_replicate', data=data) return result @@ -244,7 +258,7 @@ class Database(object): try: (resp, result) = self.resource.head(_id_to_path(doc_id)) return resp.status_code < 206 - except exp.NotFound: + except NotFound: return False def config(self): @@ -308,14 +322,14 @@ class Database(object): if "_deleted" not in doc: doc["_deleted"] = True - data = utils.force_bytes(json.dumps({"docs": _docs})) + data = force_bytes(json.dumps({"docs": _docs})) params = {"all_or_nothing": "true" if transaction else "false"} (resp, results) = self.resource.post( "_bulk_docs", data=data, params=params) for result, doc in zip(results, _docs): if "error" in result: - raise exp.Conflict("one or more docs are not saved") + raise Conflict("one or more docs are not saved") return results @@ -370,7 +384,7 @@ class Database(object): else: params = {} - data = utils.force_bytes(json.dumps(_doc)) + data = force_bytes(json.dumps(_doc)) print("gg1", flush=True) print(data, flush=True) @@ -390,7 +404,7 @@ class Database(object): print("vv2", flush=True) if resp.status_code == 409: - raise exp.Conflict(result['reason']) + raise Conflict(result['reason']) if "rev" in result and result["rev"] is not None: _doc["_rev"] = result["rev"] @@ -420,7 +434,7 @@ class Database(object): if "_id" not in doc: doc["_id"] = uuid.uuid4().hex - data = utils.force_bytes(json.dumps({"docs": _docs})) + data = orce_bytes(json.dumps({"docs": _docs})) params = {"all_or_nothing": "true" if transaction else "false"} (resp, results) = self.resource.post("_bulk_docs", data=data, @@ -456,9 +470,9 @@ class Database(object): if "keys" in params: data = {"keys": params.pop("keys")} - data = utils.force_bytes(json.dumps(data)) + data = force_bytes(json.dumps(data)) - params = utils.encode_view_options(params) + params = encode_view_options(params) if data: (resp, result) = self.resource.post( "_all_docs", params=params, data=data) @@ -538,7 +552,7 @@ class Database(object): resource = self.resource(doc_id) (resp, result) = resource.get(params=params) if resp.status_code == 404: - raise exp.NotFound("Document id `{0}` not found".format(doc_id)) + raise NotFound("Document id `{0}` not found".format(doc_id)) for rev in result['_revs_info']: if status and rev['status'] == status: @@ -566,10 +580,10 @@ class Database(object): (resp, result) = resource.delete( filename, params={'rev': _doc['_rev']}) if resp.status_code == 404: - raise exp.NotFound("filename {0} not found".format(filename)) + raise NotFound("filename {0} not found".format(filename)) if resp.status_code > 205: - raise exp.Conflict(result['reason']) + raise Conflict(result['reason']) _doc['_rev'] = result['rev'] try: @@ -645,7 +659,7 @@ class Database(object): if resp.status_code < 206: return self.get(doc["_id"]) - raise exp.Conflict(result['reason']) + raise Conflict(result['reason']) def one(self, name, flat=None, wrapper=None, **kwargs): """ @@ -665,16 +679,16 @@ class Database(object): params = {"limit": 1} params.update(kwargs) - path = utils._path_from_name(name, '_view') + path = _path_from_name(name, '_view') data = None if "keys" in params: data = {"keys": params.pop('keys')} if data: - data = utils.force_bytes(json.dumps(data)) + data = force_bytes(json.dumps(data)) - params = utils.encode_view_options(params) + params = encode_view_options(params) result = list(self._query(self.resource(*path), wrapper=wrapper, flat=flat, params=params, data=data)) @@ -716,16 +730,16 @@ class Database(object): :returns: generator object """ params = copy.copy(kwargs) - path = utils._path_from_name(name, '_view') + path = _path_from_name(name, '_view') data = None if "keys" in params: data = {"keys": params.pop('keys')} if data: - data = utils.force_bytes(json.dumps(data)) + data = force_bytes(json.dumps(data)) - params = utils.encode_view_options(params) + params = encode_view_options(params) result = self._query(self.resource(*path), wrapper=wrapper, flat=flat, params=params, data=data) @@ -768,7 +782,7 @@ class Database(object): """ path = '_find' - data = utils.force_bytes(json.dumps(selector)) + data = force_bytes(json.dumps(selector)) (resp, result) = self.resource.post(path, data=data, params=kwargs) @@ -780,7 +794,7 @@ class Database(object): def index(self, index_doc, **kwargs): path = '_index' - data = utils.force_bytes(json.dumps(index_doc)) + data = force_bytes(json.dumps(index_doc)) (resp, result) = self.resource.post(path, data=data, params=kwargs) diff --git a/src/couch/feedreader.py b/src/couch/feedreader.py index 98401ab..aac51d3 100644 --- a/src/couch/feedreader.py +++ b/src/couch/feedreader.py @@ -11,7 +11,7 @@ class BaseFeedReader: self.db = db return self - def on_message(self, message): + def on_message(self, message: str) -> None: """ Callback method that is called when change message is received from couchdb. @@ -22,14 +22,14 @@ class BaseFeedReader: raise NotImplementedError() - def on_close(self): + def on_close(self) -> None: """ Callback method that is received when connection is closed with a server. By default, does nothing. """ pass - def on_heartbeat(self): + def on_heartbeat(self) -> None: """ Callback method invoked when a hearbeat (empty line) is received from the _changes stream. Override this to purge the reader's internal @@ -48,5 +48,5 @@ class SimpleFeedReader(BaseFeedReader): self.callback = callback return super(SimpleFeedReader, self).__call__(db) - def on_message(self, message) -> None: + def on_message(self, message: str) -> None: self.callback(message, db=self.db) diff --git a/src/couch/resource.py b/src/couch/resource.py index 364bff4..f110c8d 100644 --- a/src/couch/resource.py +++ b/src/couch/resource.py @@ -1,17 +1,25 @@ # -*- coding: utf-8 -*- # Based on py-couchdb (https://github.com/histrio/py-couchdb) - +from __future__ import annotations from __future__ import unicode_literals -from typing import Union, Tuple +from typing import Union, Tuple, Dict, Any import json import requests - - -from couch import utils -from couch import exceptions +from .utils import ( + urljoin, + as_json, + force_bytes, +) +from .exceptions import ( + GenericError, + NotFound, + BadRequest, + Conflict, + AuthenticationFailed, +) class Resource: @@ -40,12 +48,11 @@ class Resource: if method == "session": data = {"name": credentials[0], "password": credentials[1]} - data = utils.force_bytes(json.dumps(data)) - post_url = utils.urljoin(self.base_url, "_session") - r = self.session.post(post_url, data=data) - if r.status_code != 200: - raise exceptions.AuthenticationFailed() + post_url = urljoin(self.base_url, "_session") + r = self.session.post(post_url, data=force_bytes(json.dumps(data))) + if r and r.status_code != 200: + raise AuthenticationFailed() elif method == "basic": self.session.auth = credentials @@ -53,8 +60,8 @@ class Resource: else: raise RuntimeError("Invalid authentication method") - def __call__(self, *path: str): - base_url = utils.urljoin(self.base_url, *path) + def __call__(self, *path: str) -> Resource: + base_url = urljoin(self.base_url, *path) return self.__class__(base_url, session=self.session) def _check_result(self, response, result) -> None: @@ -68,17 +75,18 @@ class Resource: # This is here because couchdb can return http 201 # but containing a list of conflict errors if error == 'conflict' or error == "file_exists": - raise exceptions.Conflict(reason or "Conflict") + raise Conflict(reason or "Conflict") if response.status_code > 205: if response.status_code == 404 or error == 'not_found': - raise exceptions.NotFound(reason or 'Not found') + raise NotFound(reason or 'Not found') elif error == 'bad_request': - raise exceptions.BadRequest(reason or "Bad request") - raise exceptions.GenericError(result) + raise BadRequest(reason or "Bad request") + raise GenericError(result) - def request(self, method, path: str, params=None, data=None, - headers=None, stream=False, **kwargs): + + def request(self, method, path: Union[str, None], params=None, data=None, + headers=None, stream=False, **kwargs) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: if headers is None: headers = {} @@ -88,7 +96,7 @@ class Resource: if path: if not isinstance(path, (list, tuple)): path = [path] - url = utils.urljoin(self.base_url, *path) + url = urljoin(self.base_url, *path) else: url = self.base_url @@ -102,7 +110,7 @@ class Resource: result = None self._check_result(response, result) else: - result = utils.as_json(response) + result = as_json(response) if result is None: return response, result @@ -115,17 +123,17 @@ class Resource: return response, result - def get(self, path: Union[str, None] = None, **kwargs): + def get(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: return self.request("GET", path, **kwargs) - def put(self, path: Union[str, None] = None, **kwargs): + def put(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: return self.request("PUT", path, **kwargs) - def post(self, path: Union[str, None] = None, **kwargs): + def post(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: return self.request("POST", path, **kwargs) - def delete(self, path: Union[str, None] = None, **kwargs): + def delete(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: return self.request("DELETE", path, **kwargs) - def head(self, path: Union[str, None] = None, **kwargs): + def head(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: return self.request("HEAD", path, **kwargs) diff --git a/src/couch/utils.py b/src/couch/utils.py index f0883a6..b3e5aa3 100644 --- a/src/couch/utils.py +++ b/src/couch/utils.py @@ -78,13 +78,17 @@ def urljoin(base: str, *path: str) -> str: return reduce(_join, path, base) # Probably bugs here -def as_json(response: requests.models.Response) -> Union[Dict[str, Any], None]: +def as_json(response: requests.models.Response) -> Union[Dict[str, Any], None, str]: if "application/json" in response.headers['content-type']: response_src = response.content.decode('utf-8') + print(response.content) if response.content != b'': ret: Dict[str, Any] = json.loads(response_src) return ret else: + print("fff") + print("fff") + print(type(response_src)) return response_src return None diff --git a/src/db.py b/src/db.py deleted file mode 100755 index 5173dda..0000000 --- a/src/db.py +++ /dev/null @@ -1,148 +0,0 @@ -# A database storing dictionaries, keyed on a timestamp. value = A -# dict which will be stored as a JSON object encoded in UTF-8. Note -# that dict keys of type integer or float will become strings while -# values will keep their type. - -# Note that there's a (slim) chance that you'd stomp on the previous -# value if you're too quick with generating the timestamps, ie -# invoking time.time() several times quickly enough. - -from typing import Dict, List, Tuple, Union, Any -import os -import sys -import time - -import couch -from schema import as_index_list, validate_collector_data - - -class DictDB(): - def __init__(self) -> None: - """ - Check if the database exists, otherwise we will create it together - with the indexes specified in index.py. - """ - - try: - self.database = os.environ['COUCHDB_NAME'] - self.hostname = os.environ['COUCHDB_HOSTNAME'] - self.username = os.environ['COUCHDB_USER'] - self.password = os.environ['COUCHDB_PASSWORD'] - except KeyError: - print('The environment variables COUCHDB_NAME, COUCHDB_HOSTNAME,' + - ' COUCHDB_USER and COUCHDB_PASSWORD must be set.') - sys.exit(-1) - - if 'COUCHDB_PORT' in os.environ: - couchdb_port = os.environ['COUCHDB_PORT'] - else: - couchdb_port = "5984" - - self.server = couch.client.Server( - f"http://{self.username}:{self.password}@{self.hostname}:{couchdb_port}/") - - try: - self.couchdb = self.server.database(self.database) - print("Database already exists") - except couch.exceptions.NotFound: - print("Creating database and indexes.") - self.couchdb = self.server.create(self.database) - - for i in as_index_list(): - self.couchdb.index(i) - - self._ts = time.time() - - def unique_key(self) -> int: - """ - Create a unique key based on the current time. We will use this as - the ID for any new documents we store in CouchDB. - """ - - ts = time.time() - while round(ts * 1000) == self._ts: - ts = time.time() - self._ts = round(ts * 1000) - - return self._ts - - # Why batch_write??? - def add(self, data: Union[List[Dict[str, Any]], Dict[str, Any]]) -> Union[str, Tuple[str, str]]: - """ - Store a document in CouchDB. - """ - - if isinstance(data, List): - for item in data: - error = validate_collector_data(item) - if error != "": - return error - item['_id'] = str(self.unique_key()) - ret: Tuple[str, str] = self.couchdb.save_bulk(data) - else: - error = validate_collector_data(data) - if error != "": - return error - data['_id'] = str(self.unique_key()) - ret = self.couchdb.save(data) - - return ret - - def get(self, key: int) -> Dict[str, Any]: - """ - Get a document based on its ID, return an empty dict if not found. - """ - - try: - doc: Dict[str, Any] = self.couchdb.get(key) - except couch.exceptions.NotFound: - doc = {} - - return doc - - def slice(self, key_from=None, key_to=None): - pass - - def search(self, limit: int = 25, skip: int = 0, **kwargs: Any) -> List[Dict[str, Any]]: - """ - Execute a Mango query, ideally we should have an index matching - the query otherwise things will be slow. - """ - - data = list() - selector = dict() - - try: - limit = int(limit) - skip = int(skip) - except ValueError: - limit = 25 - skip = 0 - - if kwargs: - selector = { - "limit": limit, - "skip": skip, - "selector": {} - } - - for key in kwargs: - if kwargs[key] and kwargs[key].isnumeric(): - kwargs[key] = int(kwargs[key]) - selector['selector'][key] = {'$eq': kwargs[key]} - - for doc in self.couchdb.find(selector, wrapper=None, limit=5): - data.append(doc) - - return data - - def delete(self, key: int) -> Union[int, None]: - """ - Delete a document based on its ID. - """ - try: - self.couchdb.delete(key) - except couch.exceptions.NotFound: - return None - - return key diff --git a/src/main.py b/src/main.py deleted file mode 100755 index 2730b83..0000000 --- a/src/main.py +++ /dev/null @@ -1,258 +0,0 @@ -from typing import Dict, Union, List, Any -import json -import os -import sys -import time - -import uvicorn -from fastapi import Depends, FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse -from fastapi_jwt_auth import AuthJWT -from fastapi_jwt_auth.exceptions import AuthJWTException -from pydantic import BaseModel - -from db import DictDB -from schema import get_index_keys, validate_collector_data - -app = FastAPI() - -app.add_middleware( - CORSMiddleware, - allow_origins=["http://localhost:8001"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - expose_headers=["X-Total-Count"], -) - -# TODO: X-Total-Count - - -@app.middleware("http") -async def mock_x_total_count_header(request: Request, call_next): - response = await call_next(request) - response.headers["X-Total-Count"] = "100" - return response - -for i in range(10): - try: - db = DictDB() - except Exception: - print( - f'Database not responding, will try again soon. Attempt {i + 1} of 10.') - else: - break - time.sleep(1) -else: - print('Database did not respond after 10 attempts, quitting.') - sys.exit(-1) - - -def get_pubkey() -> str: - try: - if 'JWT_PUBKEY_PATH' in os.environ: - keypath = os.environ['JWT_PUBKEY_PATH'] - else: - keypath = '/opt/certs/public.pem' - - with open(keypath, "r") as fd: - pubkey = fd.read() - except FileNotFoundError: - print(f"Could not find JWT certificate in {keypath}") - sys.exit(-1) - - return pubkey - - - -def get_data(key: Union[int, None] = None, - limit: int = 25, - skip: int = 0, - ip: Union[str, None] = None, - port: Union[int, None] = None, - asn: Union[str, None] = None, - domain: Union[str, None] = None) -> List[Dict[str, Any]]: - if key: - return [db.get(key)] - - selectors: Dict[str, Any] = {} - indexes = get_index_keys() - selectors['domain'] = domain - - if ip and 'ip' in indexes: - selectors['ip'] = ip - if port and 'port' in indexes: - selectors['port'] = port - if asn and 'asn' in indexes: - selectors['asn'] = asn - - data: List[Dict[str, Any]] = db.search(**selectors, limit=limit, skip=skip) - - return data - - -class JWTConfig(BaseModel): - authjwt_algorithm: str = "ES256" - authjwt_public_key: str = get_pubkey() - - -@AuthJWT.load_config -def jwt_config(): - return JWTConfig() - - -@app.exception_handler(AuthJWTException) -def authjwt_exception_handler(request: Request, exc: AuthJWTException) -> JSONResponse: - return JSONResponse(content={"status": "error", "message": - exc.message}, status_code=400) - - -@app.exception_handler(RuntimeError) -def app_exception_handler(request: Request, exc: RuntimeError) -> JSONResponse: - return JSONResponse(content={"status": "error", "message": - str(exc.with_traceback(None))}, - status_code=400) - - -@app.get('/sc/v0/get') -async def get(key: Union[int, None] = None, limit: int = 25, skip: int = 0, ip: Union[str, None] = None, port: Union[int, None] = None, asn: Union[str, None] = None, Authorize: AuthJWT = Depends()) -> JSONResponse: - - Authorize.jwt_required() - - data = [] - raw_jwt = Authorize.get_raw_jwt() - - if "read" not in raw_jwt: - return JSONResponse( - content={ - "status": "error", - "message": "Could not find read claim in JWT token", - }, - status_code=400, - ) - else: - domains = raw_jwt["read"] - - for domain in domains: - data.extend(get_data(key, limit, skip, ip, port, asn, domain)) - - return JSONResponse(content={"status": "success", "docs": data}) - - -@app.get('/sc/v0/get/{key}') -async def get_key(key: Union[int, None] = None, Authorize: AuthJWT = Depends()) -> JSONResponse: - - Authorize.jwt_required() - - raw_jwt = Authorize.get_raw_jwt() - - if "read" not in raw_jwt: - return JSONResponse( - content={ - "status": "error", - "message": "Could not find read claim in JWT token", - }, - status_code=400, - ) - else: - allowed_domains = raw_jwt["read"] - - data_list = get_data(key) - - # Handle if missing - data = data_list[0] - - if data and data["domain"] not in allowed_domains: - return JSONResponse( - content={ - "status": "error", - "message": "User not authorized to view this object", - }, - status_code=400, - ) - - return JSONResponse(content={"status": "success", "docs": data}) - - -# WHY IS AUTH OUTCOMMENTED??? -@app.post('/sc/v0/add') -async def add(data: Request, Authorize: AuthJWT = Depends()) -> JSONResponse: - # Authorize.jwt_required() - - try: - json_data = await data.json() - except json.decoder.JSONDecodeError: - return JSONResponse( - content={ - "status": "error", - "message": "Invalid JSON.", - }, - status_code=400, - ) - - key = db.add(json_data) - - if isinstance(key, str): - return JSONResponse( - content={ - "status": "error", - "message": key, - }, - status_code=400, - ) - - return JSONResponse(content={"status": "success", "docs": key}) - - -@app.delete('/sc/v0/delete/{key}') -async def delete(key: int, Authorize: AuthJWT = Depends()) -> JSONResponse: - - Authorize.jwt_required() - - raw_jwt = Authorize.get_raw_jwt() - - if "write" not in raw_jwt: - return JSONResponse( - content={ - "status": "error", - "message": "Could not find write claim in JWT token", - }, - status_code=400, - ) - else: - allowed_domains = raw_jwt["write"] - - data_list = get_data(key) - - # Handle if missing - data = data_list[0] - - if data and data["domain"] not in allowed_domains: - return JSONResponse( - content={ - "status": "error", - "message": "User not authorized to delete this object", - }, - status_code=400, - ) - - if db.delete(key) is None: - return JSONResponse(content={"status": "error", - "message": "Document not found"}, - status_code=400) - - return JSONResponse(content={"status": "success", "docs": data}) - - -def main(standalone: bool = False): - if not standalone: - return app - - uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug") - - -if __name__ == '__main__': - main(standalone=True) -else: - app = main() diff --git a/src/schema.py b/src/schema.py deleted file mode 100644 index 2b479d2..0000000 --- a/src/schema.py +++ /dev/null @@ -1,136 +0,0 @@ -from typing import List, Any, Dict -import json -import sys -import traceback - -import jsonschema - -# fmt:off -# NOTE: Commented out properties are left intentionally, so it is easier to see -# what properties are optional. -schema = { - "$schema": "http://json-schema.org/schema#", - "type": "object", - "properties": { - "document_version": {"type": "integer"}, - "ip": {"type": "string"}, - "port": {"type": "integer"}, - "whois_description": {"type": "string"}, - "asn": {"type": "string"}, - "asn_country_code": {"type": "string"}, - "ptr": {"type": "string"}, - "abuse_mail": {"type": "string"}, - "domain": {"type": "string"}, - "timestamp": {"type": "string", "format": "date-time"}, - "display_name": {"type": "string"}, - "description": {"type": "string"}, - "custom_data": { - "type": "object", - "patternProperties": { - ".*": { - "type": "object", - "properties": { - "display_name": {"type": "string"}, - "data": {"type": ["string", "boolean", "integer"]}, - "description": {"type": "string"}, - }, - "required": [ - "display_name", - "data", - # "description" - ] - }, - }, - }, - "result": { - "type": "object", - "patternProperties": { - ".*": { - "type": "object", - "properties": { - "display_name": {"type": "string"}, - "vulnerable": {"type": "boolean"}, - "investigation_needed": {"type": "boolean"}, - "reliability": {"type": "integer"}, - "description": {"type": "string"}, - }, - "oneOf": [ - { - "required": [ - "display_name", - "vulnerable", - # "reliability", # TODO: reliability is required if vulnerable = true - # "description", - ] - }, - { - "required": [ - "display_name", - "investigation_needed", - # "reliability", # TODO: reliability is required if investigation_needed = true - # "description", - ] - }, - ] - }, - }, - }, - }, - "required": [ - "document_version", - "ip", - "port", - "whois_description", - "asn", - "asn_country_code", - "ptr", - "abuse_mail", - "domain", - "timestamp", - "display_name", - # "description", - # "custom_data", - "result", - ], -} -# fmt:on - - -def get_index_keys() -> List[str]: - keys: List[str] = [] - for key in schema["properties"]: - keys.append(key) - return keys - - -def as_index_list() -> List[Dict[str, Any]]: - index_list: List[Dict[str, Any]] = [] - for key in schema["properties"]: - name = f"{key}-json-index" - index = { - "index": { - "fields": [ - key, - ] - }, - "name": name, - "type": "json" - } - index_list.append(index) - - return index_list - - -def validate_collector_data(json_blob: Dict[str, Any]) -> str: - try: - jsonschema.validate(json_blob, schema, format_checker=jsonschema.FormatChecker()) - except jsonschema.exceptions.ValidationError as e: - return f"Validation failed with error: {e.message}" - return "" - - -if __name__ == "__main__": - with open(sys.argv[1]) as fd: - json_data = json.loads(fd.read()) - - print(validate_collector_data(json_data)) diff --git a/src/test/__init__.py b/src/test/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/test/test_api.py b/src/test/test_api.py deleted file mode 100644 index 371fcf2..0000000 --- a/src/test/test_api.py +++ /dev/null @@ -1,232 +0,0 @@ -import os -import time -import pytest -import random -import ipaddress - -from main import app -from fastapi import FastAPI -from fastapi import testclient - -client = testclient.TestClient(app) -JWT_TOKEN = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY0MjE2ODkyMCwianRpIjoiNjM0NGFiNjEtMTIzZC00YWMyLTk3YjMtYmVlYTE2M2JiMWMwIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6InVzZXIxIiwibmJmIjoxNjQyMTY4OTIwLCJyZWFkIjpbInN1bmV0LnNlIl0sIndyaXRlIjpbInN1bmV0LnNlIl19._bX9EHI9h0Vjw75UvYvypqaH3AmsgaATFSUSOT-cYLZHrfMlxios3emr7cyKw-OV_BN5h_XNyrMBV1gIoqAk3A' -JWT_HEADER = {'Authorization': f'Bearer {JWT_TOKEN}'} - - -def test_001(): - print("*** Adding document.") - - doc_port = random.randint(1, 65536) - doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff))) - doc_asn = str(doc_ip) + '_' + str(doc_port) - - json_data = { - 'ip': doc_ip, - 'port': doc_port, - 'whois_description': 'unittest', - 'asn': doc_asn, - 'asn_country_code': 'SE', - 'ptr': 'unittest.example.com', - 'abuse_mail': 'unittest@example.com', - 'domain': 'sunet.se', - 'timestamp_in_utc': '2021-06-21T14:06UTC', - 'producer_unique_keys': { - 'subject_cn': 'unittest', - 'subject_o': 'unittest', - 'full_name': 'unittest', - 'end_of_general_support': False, - 'cve_2021_21972': 'unittest', - 'cve_2021_21974': 'unittest', - 'cve_2021_21985': 'unittest' - } - } - - response = client.post("/sc/v0/add", headers=JWT_HEADER, json=json_data) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - - response = client.get(f"/sc/v0/get?port={doc_port}", headers=JWT_HEADER) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - assert(len(response.json()['docs']) == 1) - assert(response.json()['docs'][0]['port'] == doc_port) - - response = client.get(f"/sc/v0/get?asn={doc_asn}", headers=JWT_HEADER) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - assert(len(response.json()['docs']) == 1) - assert(response.json()['docs'][0]['asn'] == doc_asn) - - response = client.get(f"/sc/v0/get?ip={doc_ip}", headers=JWT_HEADER) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - assert(len(response.json()['docs']) == 1) - assert(response.json()['docs'][0]['ip'] == doc_ip) - - -def test_002(): - nr_documents = 100 - starttime = time.time() - - for i in range(nr_documents): - doc_port = random.randint(1, 65536) - doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff))) - doc_asn = str(doc_ip) + '_' + str(doc_port) - - json_data = { - 'ip': doc_ip, - 'port': doc_port, - 'whois_description': 'unittest', - 'asn': doc_asn, - 'asn_country_code': 'SE', - 'ptr': 'unittest.example.com', - 'abuse_mail': 'unittest@example.com', - 'domain': 'sunet.se', - 'timestamp_in_utc': '2021-06-21T14:06UTC', - 'producer_unique_keys': { - 'subject_cn': 'unittest', - 'subject_o': 'unittest', - 'full_name': 'unittest', - 'end_of_general_support': False, - 'cve_2021_21972': 'unittest', - 'cve_2021_21974': 'unittest', - 'cve_2021_21985': 'unittest' - } - } - - response = client.post( - "/sc/v0/add", headers=JWT_HEADER, json=json_data) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - - response = client.get( - f"/sc/v0/get?port={doc_port}", headers=JWT_HEADER) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - assert(len(response.json()['docs']) == 1) - assert(response.json()['docs'][0]['port'] == doc_port) - - response = client.get(f"/sc/v0/get?asn={doc_asn}", headers=JWT_HEADER) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - assert(len(response.json()['docs']) == 1) - assert(response.json()['docs'][0]['asn'] == doc_asn) - - response = client.get(f"/sc/v0/get?ip={doc_ip}", headers=JWT_HEADER) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - assert(len(response.json()['docs']) == 1) - assert(response.json()['docs'][0]['ip'] == doc_ip) - - stop_time = str(time.time() - starttime) - print(f"*** Adding {nr_documents} documents took {stop_time} seconds.") - - -def test_003(): - response = client.get("/sc/v0/get", headers=JWT_HEADER) - assert(response.status_code == 200) - - for doc in response.json()['docs']: - doc_id = doc['_id'] - - response_doc = client.get(f"/sc/v0/get/{doc_id}", headers=JWT_HEADER) - assert(response_doc.status_code == 200) - assert(response_doc.json()['status'] == 'success') - assert(type(response_doc.json()['docs']) == type(dict())) - assert(response_doc.json()['docs']['domain'] == 'sunet.se') - - -def test_004(): - response = client.get("/sc/v0/get?limit=1000", headers=JWT_HEADER) - assert(response.status_code == 200) - - nr_documents = len(response.json()['docs']) - starttime = time.time() - - for doc in response.json()['docs']: - doc_id = doc['_id'] - response_doc = client.delete( - f"/sc/v0/delete/{doc_id}", headers=JWT_HEADER) - assert(response_doc.status_code == 200) - assert(response_doc.json()['status'] == 'success') - response_doc = client.get( - f"/sc/v0/get/{doc_id}", headers=JWT_HEADER) - assert(response_doc.status_code == 200) - assert(response_doc.json()['status'] == 'success') - assert(response_doc.json()['docs'] == {}) - - stop_time = str(time.time() - starttime) - print(f"*** Removing {nr_documents} documents took {stop_time} seconds.") - - print("*** Removing document with invalid ID.") - response = client.delete( - "/sc/v0/delete/nonexistent", headers=JWT_HEADER) - assert(response.status_code == 400) - assert(response.json()['status'] == 'error') - - -def test_005(): - print("*** Accessing endpoints without JWT token...") - - response = client.get("/sc/v0/get?limit=1000") - assert(response.status_code == 400) - assert(response.json()['status'] == 'error') - - response = client.get("/sc/v0/get/unittest") - assert(response.status_code == 400) - assert(response.json()['status'] == 'error') - - response = client.post("/sc/v0/add", json={"data": "nothing"}) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - - response = client.delete("/sc/v0/delete/unittest") - assert(response.status_code == 400) - assert(response.json()['status'] == 'error') - -def test_006(): - print("*** Add doc for unauthorized domain (this is allowed, currently)") - - doc_port = random.randint(1, 65536) - doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff))) - doc_asn = str(doc_ip) + '_' + str(doc_port) - - json_data = { - 'ip': doc_ip, - 'port': doc_port, - 'whois_description': 'unittest', - 'asn': doc_asn, - 'asn_country_code': 'SE', - 'ptr': 'unittest.example.com', - 'abuse_mail': 'unittest@example.com', - 'domain': 'sunet.se', - 'timestamp_in_utc': '2021-06-21T14:06UTC', - 'producer_unique_keys': { - 'subject_cn': 'unittest', - 'subject_o': 'unittest', - 'full_name': 'unittest', - 'end_of_general_support': False, - 'cve_2021_21972': 'unittest', - 'cve_2021_21974': 'unittest', - 'cve_2021_21985': 'unittest' - } - } - - response = client.post( - "/sc/v0/add", headers=JWT_HEADER, json=dict(json_data, domain="example.com") - ) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - - print("*** Get doc for unauthorized domain (not allowed)") - doc_id = response.json()['docs']['_id'] - response = client.get(f"/sc/v0/get/{doc_id}", headers=JWT_HEADER) - assert(response.status_code == 400) - assert(response.json()['status'] == 'error') - assert(response.json()['message'] == 'User not authorized to view this object') - - print("*** Delete doc for unauthorized domain (not allowed)") - response = client.delete(f"/sc/v0/delete/{doc_id}", headers=JWT_HEADER) - assert(response.status_code == 400) - assert(response.json()['status'] == 'error') - assert(response.json()['message'] == 'User not authorized to delete this object') -- cgit v1.1