summaryrefslogtreecommitdiff
path: root/src/couch/resource.py
blob: f110c8d25cf260543ea5058009ba8e0d0662e038 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# -*- coding: utf-8 -*-
# Based on py-couchdb (https://github.com/histrio/py-couchdb)

from __future__ import annotations
from __future__ import unicode_literals
from typing import Union, Tuple, Dict, Any

import json
import requests

from .utils import (
    urljoin,
    as_json,
    force_bytes,
)
from .exceptions import (
    GenericError,
    NotFound,
    BadRequest,
    Conflict,
    AuthenticationFailed,
)


class Resource:
    def __init__(self, base_url: str, full_commit: bool = True, session: Union[requests.sessions.Session, None] = None,
                 credentials: Union[Tuple[str, str], None] = None, authmethod: str = "session", verify: bool = False) -> None:

        self.base_url = base_url
#        self.verify = verify

        if not session:
            self.session = requests.session()

            self.session.headers.update({"accept": "application/json",
                                         "content-type": "application/json"})
            self._authenticate(credentials, authmethod)

            if not full_commit:
                self.session.headers.update({'X-Couch-Full-Commit': 'false'})
        else:
            self.session = session
        self.session.verify = verify

    def _authenticate(self, credentials: Union[Tuple[str, str], None], method: str) ->  None:
        if not credentials:
            return

        if method == "session":
            data = {"name": credentials[0], "password": credentials[1]}

            post_url = urljoin(self.base_url, "_session")
            r = self.session.post(post_url, data=force_bytes(json.dumps(data)))
            if r and r.status_code != 200:
                raise AuthenticationFailed()

        elif method == "basic":
            self.session.auth = credentials

        else:
            raise RuntimeError("Invalid authentication method")

    def __call__(self, *path: str) -> Resource:
        base_url = urljoin(self.base_url, *path)
        return self.__class__(base_url, session=self.session)

    def _check_result(self, response, result) -> None:
        try:
            error = result.get('error', None)
            reason = result.get('reason', None)
        except AttributeError:
            error = None
            reason = ''

        # This is here because couchdb can return http 201
        # but containing a list of conflict errors
        if error == 'conflict' or error == "file_exists":
            raise Conflict(reason or "Conflict")

        if response.status_code > 205:
            if response.status_code == 404 or error == 'not_found':
                raise NotFound(reason or 'Not found')
            elif error == 'bad_request':
                raise BadRequest(reason or "Bad request")
            raise GenericError(result)

        
    def request(self, method, path: Union[str, None], params=None, data=None,
                headers=None, stream=False, **kwargs) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]:

        if headers is None:
            headers = {}

        headers.setdefault('Accept', 'application/json')

        if path:
            if not isinstance(path, (list, tuple)):
                path = [path]
            url = urljoin(self.base_url, *path)
        else:
            url = self.base_url

        response = self.session.request(method, url, stream=stream,
                                        data=data, params=params,
                                        headers=headers, **kwargs)
        # Ignore result validation if
        # request is with stream mode

        if stream and response.status_code < 400:
            result = None
            self._check_result(response, result)
        else:
            result = as_json(response)

        if result is None:
            return response, result

        if isinstance(result, list):
            for res in result:
                self._check_result(response, res)
        else:
            self._check_result(response, result)

        return response, result

    def get(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]:
        return self.request("GET", path, **kwargs)

    def put(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]:
        return self.request("PUT", path, **kwargs)

    def post(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]:
        return self.request("POST", path, **kwargs)

    def delete(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]:
        return self.request("DELETE", path, **kwargs)

    def head(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]:
        return self.request("HEAD", path, **kwargs)