1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
import uvicorn
from fastapi import FastAPI, Depends, Request
from fastapi.responses import JSONResponse
from fastapi_jwt_auth import AuthJWT
from fastapi_jwt_auth.exceptions import AuthJWTException
from pydantic import BaseModel
from index import CouchIindex
from db import DictDB
app = FastAPI()
db = DictDB()
public_key = """-----BEGIN PUBLIC KEY-----
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEPW8bkkVIq4BX8eWwlUOUYbJhiGDv
K/6xY5T0BsvV6pbMoIUfgeThVOq5I3CmXxLt+qyPska6ol9fTN7woZLsCg==
-----END PUBLIC KEY-----"""
def get_data(key=None, limit=25, skip=0, ip=None,
port=None, asn=None):
selectors = dict()
indexes = CouchIindex().dict()
selectors['domain'] = 'sunet.se'
if ip and 'ip' in indexes:
selectors['ip'] = ip
if port and 'port' in indexes:
selectors['port'] = port
if asn and 'asn' in indexes:
selectors['asn'] = asn
data = db.search(**selectors, limit=limit, skip=skip)
return JSONResponse(content={"status": "success", "data": data})
class JWTConfig(BaseModel):
authjwt_algorithm: str = "ES256"
authjwt_public_key: str = public_key
@AuthJWT.load_config
def jwt_config():
return JWTConfig()
@app.exception_handler(AuthJWTException)
def authjwt_exception_handler(request: Request, exc: AuthJWTException):
return JSONResponse(content={"status": "error", "message":
exc.message}, status_code=400)
@app.exception_handler(RuntimeError)
def app_exception_handler(request: Request, exc: RuntimeError):
return JSONResponse(content={"status": "error", "message":
str(exc.with_traceback(None))},
status_code=400)
@app.get('/sc/v0/get')
async def get(key=None, limit=25, skip=0, ip=None, port=None,
asn=None, Authorize: AuthJWT = Depends()):
Authorize.jwt_required()
return get_data(key, limit, skip, ip, port, asn)
@app.get('/sc/v0/get/{key}')
async def get_key(key=None, Authorize: AuthJWT = Depends()):
Authorize.jwt_required()
return get_data(key)
@app.post('/sc/v0/add')
async def add(data: Request, Authorize: AuthJWT = Depends()):
Authorize.jwt_required()
orgs = ['sunet.se']
if not orgs:
return JSONResponse(content={"status": "error", "message":
"Could not find an organization"}, status_code=400)
json_data = await data.json()
key = db.add(json_data)
return JSONResponse(content={"status": "success", "docs": key})
def main(standalone=False):
if not standalone:
return app
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug")
if __name__ == '__main__':
main(standalone=True)
else:
app = main()
|