summaryrefslogtreecommitdiff
path: root/src/test/test_api.py
blob: 07863637cfa12ffe765796d89a8db0991684124f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import os
import time
import pytest
import random
import ipaddress

from main import app
from fastapi import FastAPI
from fastapi import testclient

client = testclient.TestClient(app)
JWT_TOKEN = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJ0eXBlIjoiYWNjZXNzIiwi' + \
    'ZG9tYWlucyI6WyJzdW5ldC5zZSJdfQ._emWyVw-6qer5u65SitS8bZJas7l8bw4almnI1' + \
    'TB7DBnzNsch8ctU4btlgBpfJ2jbrvXZTIl8jXIcykO4crUrQ'
JWT_HEADER = {'Authorization': f'Bearer {JWT_TOKEN}'}


def test_001():
    doc_port = random.randint(1, 65536)
    doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff)))
    doc_asn = str(doc_ip) + '_' + str(doc_port)

    json_data = {
        'ip': doc_ip,
        'port': doc_port,
        'whois_description': 'unittest',
        'asn': doc_asn,
        'asn_country_code': 'SE',
        'ptr': 'unittest.example.com',
        'abuse_mail': 'unittest@example.com',
        'domain': 'sunet.se',
        'timestamp_in_utc': '2021-06-21T14:06UTC',
        'producer_unique_keys': {
            'subject_cn': 'unittest',
            'subject_o': 'unittest',
            'full_name': 'unittest',
            'end_of_general_support': False,
            'cve_2021_21972': 'unittest',
            'cve_2021_21974': 'unittest',
            'cve_2021_21985': 'unittest'
        }
    }

    response = client.post("/sc/v0/add", headers=JWT_HEADER, json=json_data)
    assert(response.status_code == 200)
    assert(response.json()['status'] == 'success')

    response = client.get(f"/sc/v0/get?port={doc_port}", headers=JWT_HEADER)
    assert(response.status_code == 200)
    assert(response.json()['status'] == 'success')
    assert(len(response.json()['docs']) == 1)
    assert(response.json()['docs'][0]['port'] == doc_port)

    response = client.get(f"/sc/v0/get?asn={doc_asn}", headers=JWT_HEADER)
    assert(response.status_code == 200)
    assert(response.json()['status'] == 'success')
    assert(len(response.json()['docs']) == 1)
    assert(response.json()['docs'][0]['asn'] == doc_asn)

    response = client.get(f"/sc/v0/get?ip={doc_ip}", headers=JWT_HEADER)
    assert(response.status_code == 200)
    assert(response.json()['status'] == 'success')
    assert(len(response.json()['docs']) == 1)
    assert(response.json()['docs'][0]['ip'] == doc_ip)


def test_002():
    response = client.get("/sc/v0/get", headers=JWT_HEADER)
    assert(response.status_code == 200)

    for doc in response.json()['docs']:
        doc_id = doc['_id']
        response_doc = client.get(f"/sc/v0/get/{doc_id}", headers=JWT_HEADER)
        assert(response_doc.status_code == 200)
        assert(response_doc.json()['status'] == 'success')
        assert(type(response_doc.json()['docs']) == type(dict()))
        assert(response_doc.json()['docs']['domain'] == 'sunet.se')