1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
import os
import time
import pytest
import random
import ipaddress
from main import app
from fastapi import FastAPI
from fastapi import testclient
client = testclient.TestClient(app)
JWT_TOKEN = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJ0eXBlIjoiYWNjZXNzIiwi' + \
'ZG9tYWlucyI6WyJzdW5ldC5zZSJdfQ._emWyVw-6qer5u65SitS8bZJas7l8bw4almnI1' + \
'TB7DBnzNsch8ctU4btlgBpfJ2jbrvXZTIl8jXIcykO4crUrQ'
JWT_HEADER = {'Authorization': f'Bearer {JWT_TOKEN}'}
def test_001():
doc_port = random.randint(1, 65536)
doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff)))
doc_asn = str(doc_ip) + '_' + str(doc_port)
json_data = {
'ip': doc_ip,
'port': doc_port,
'whois_description': 'unittest',
'asn': doc_asn,
'asn_country_code': 'SE',
'ptr': 'unittest.example.com',
'abuse_mail': 'unittest@example.com',
'domain': 'sunet.se',
'timestamp_in_utc': '2021-06-21T14:06UTC',
'producer_unique_keys': {
'subject_cn': 'unittest',
'subject_o': 'unittest',
'full_name': 'unittest',
'end_of_general_support': False,
'cve_2021_21972': 'unittest',
'cve_2021_21974': 'unittest',
'cve_2021_21985': 'unittest'
}
}
response = client.post("/sc/v0/add", headers=JWT_HEADER, json=json_data)
assert(response.status_code == 200)
assert(response.json()['status'] == 'success')
response = client.get(f"/sc/v0/get?port={doc_port}", headers=JWT_HEADER)
assert(response.status_code == 200)
assert(response.json()['status'] == 'success')
assert(len(response.json()['docs']) == 1)
assert(response.json()['docs'][0]['port'] == doc_port)
response = client.get(f"/sc/v0/get?asn={doc_asn}", headers=JWT_HEADER)
assert(response.status_code == 200)
assert(response.json()['status'] == 'success')
assert(len(response.json()['docs']) == 1)
assert(response.json()['docs'][0]['asn'] == doc_asn)
response = client.get(f"/sc/v0/get?ip={doc_ip}", headers=JWT_HEADER)
assert(response.status_code == 200)
assert(response.json()['status'] == 'success')
assert(len(response.json()['docs']) == 1)
assert(response.json()['docs'][0]['ip'] == doc_ip)
def test_002():
response = client.get("/sc/v0/get", headers=JWT_HEADER)
assert(response.status_code == 200)
for doc in response.json()['docs']:
doc_id = doc['_id']
response_doc = client.get(f"/sc/v0/get/{doc_id}", headers=JWT_HEADER)
assert(response_doc.status_code == 200)
assert(response_doc.json()['status'] == 'success')
assert(type(response_doc.json()['docs']) == type(dict()))
assert(response_doc.json()['docs']['domain'] == 'sunet.se')
|