import os import time import pytest import random import ipaddress from main import app from fastapi import FastAPI from fastapi import testclient client = testclient.TestClient(app) JWT_TOKEN = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY0MjE2ODkyMCwianRpIjoiNjM0NGFiNjEtMTIzZC00YWMyLTk3YjMtYmVlYTE2M2JiMWMwIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6InVzZXIxIiwibmJmIjoxNjQyMTY4OTIwLCJyZWFkIjpbInN1bmV0LnNlIl0sIndyaXRlIjpbInN1bmV0LnNlIl19._bX9EHI9h0Vjw75UvYvypqaH3AmsgaATFSUSOT-cYLZHrfMlxios3emr7cyKw-OV_BN5h_XNyrMBV1gIoqAk3A' JWT_HEADER = {'Authorization': f'Bearer {JWT_TOKEN}'} def test_001(): print("*** Adding document.") doc_port = random.randint(1, 65536) doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff))) doc_asn = str(doc_ip) + '_' + str(doc_port) json_data = { 'ip': doc_ip, 'port': doc_port, 'whois_description': 'unittest', 'asn': doc_asn, 'asn_country_code': 'SE', 'ptr': 'unittest.example.com', 'abuse_mail': 'unittest@example.com', 'domain': 'sunet.se', 'timestamp_in_utc': '2021-06-21T14:06UTC', 'producer_unique_keys': { 'subject_cn': 'unittest', 'subject_o': 'unittest', 'full_name': 'unittest', 'end_of_general_support': False, 'cve_2021_21972': 'unittest', 'cve_2021_21974': 'unittest', 'cve_2021_21985': 'unittest' } } response = client.post("/sc/v0/add", headers=JWT_HEADER, json=json_data) assert(response.status_code == 200) assert(response.json()['status'] == 'success') response = client.get(f"/sc/v0/get?port={doc_port}", headers=JWT_HEADER) assert(response.status_code == 200) assert(response.json()['status'] == 'success') assert(len(response.json()['docs']) == 1) assert(response.json()['docs'][0]['port'] == doc_port) response = client.get(f"/sc/v0/get?asn={doc_asn}", headers=JWT_HEADER) assert(response.status_code == 200) assert(response.json()['status'] == 'success') assert(len(response.json()['docs']) == 1) assert(response.json()['docs'][0]['asn'] == doc_asn) response = client.get(f"/sc/v0/get?ip={doc_ip}", headers=JWT_HEADER) assert(response.status_code == 200) assert(response.json()['status'] == 'success') assert(len(response.json()['docs']) == 1) assert(response.json()['docs'][0]['ip'] == doc_ip) def test_002(): nr_documents = 100 starttime = time.time() for i in range(nr_documents): doc_port = random.randint(1, 65536) doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff))) doc_asn = str(doc_ip) + '_' + str(doc_port) json_data = { 'ip': doc_ip, 'port': doc_port, 'whois_description': 'unittest', 'asn': doc_asn, 'asn_country_code': 'SE', 'ptr': 'unittest.example.com', 'abuse_mail': 'unittest@example.com', 'domain': 'sunet.se', 'timestamp_in_utc': '2021-06-21T14:06UTC', 'producer_unique_keys': { 'subject_cn': 'unittest', 'subject_o': 'unittest', 'full_name': 'unittest', 'end_of_general_support': False, 'cve_2021_21972': 'unittest', 'cve_2021_21974': 'unittest', 'cve_2021_21985': 'unittest' } } response = client.post( "/sc/v0/add", headers=JWT_HEADER, json=json_data) assert(response.status_code == 200) assert(response.json()['status'] == 'success') response = client.get( f"/sc/v0/get?port={doc_port}", headers=JWT_HEADER) assert(response.status_code == 200) assert(response.json()['status'] == 'success') assert(len(response.json()['docs']) == 1) assert(response.json()['docs'][0]['port'] == doc_port) response = client.get(f"/sc/v0/get?asn={doc_asn}", headers=JWT_HEADER) assert(response.status_code == 200) assert(response.json()['status'] == 'success') assert(len(response.json()['docs']) == 1) assert(response.json()['docs'][0]['asn'] == doc_asn) response = client.get(f"/sc/v0/get?ip={doc_ip}", headers=JWT_HEADER) assert(response.status_code == 200) assert(response.json()['status'] == 'success') assert(len(response.json()['docs']) == 1) assert(response.json()['docs'][0]['ip'] == doc_ip) stop_time = str(time.time() - starttime) print(f"*** Adding {nr_documents} documents took {stop_time} seconds.") def test_003(): response = client.get("/sc/v0/get", headers=JWT_HEADER) assert(response.status_code == 200) for doc in response.json()['docs']: doc_id = doc['_id'] response_doc = client.get(f"/sc/v0/get/{doc_id}", headers=JWT_HEADER) assert(response_doc.status_code == 200) assert(response_doc.json()['status'] == 'success') assert(type(response_doc.json()['docs']) == type(dict())) assert(response_doc.json()['docs']['domain'] == 'sunet.se') def test_004(): response = client.get("/sc/v0/get?limit=1000", headers=JWT_HEADER) assert(response.status_code == 200) nr_documents = len(response.json()['docs']) starttime = time.time() for doc in response.json()['docs']: doc_id = doc['_id'] response_doc = client.delete( f"/sc/v0/delete/{doc_id}", headers=JWT_HEADER) assert(response_doc.status_code == 200) assert(response_doc.json()['status'] == 'success') response_doc = client.get( f"/sc/v0/get/{doc_id}", headers=JWT_HEADER) assert(response_doc.status_code == 200) assert(response_doc.json()['status'] == 'success') assert(response_doc.json()['docs'] == {}) stop_time = str(time.time() - starttime) print(f"*** Removing {nr_documents} documents took {stop_time} seconds.") print("*** Removing document with invalid ID.") response = client.delete( "/sc/v0/delete/nonexistent", headers=JWT_HEADER) assert(response.status_code == 400) assert(response.json()['status'] == 'error') def test_005(): print("*** Accessing endpoints without JWT token...") response = client.get("/sc/v0/get?limit=1000") assert(response.status_code == 400) assert(response.json()['status'] == 'error') response = client.get("/sc/v0/get/unittest") assert(response.status_code == 400) assert(response.json()['status'] == 'error') response = client.post("/sc/v0/add", json={"data": "nothing"}) assert(response.status_code == 200) assert(response.json()['status'] == 'success') response = client.delete("/sc/v0/delete/unittest") assert(response.status_code == 400) assert(response.json()['status'] == 'error')