# -*- coding: utf-8 -*- # Based on py-couchdb (https://github.com/histrio/py-couchdb) from __future__ import annotations from __future__ import unicode_literals from typing import Union, Tuple, Dict, Any import json import requests from .utils import ( urljoin, as_json, force_bytes, ) from .exceptions import ( GenericError, NotFound, BadRequest, Conflict, AuthenticationFailed, ) class Resource: def __init__(self, base_url: str, full_commit: bool = True, session: Union[requests.sessions.Session, None] = None, credentials: Union[Tuple[str, str], None] = None, authmethod: str = "session", verify: bool = False) -> None: self.base_url = base_url # self.verify = verify if not session: self.session = requests.session() self.session.headers.update({"accept": "application/json", "content-type": "application/json"}) self._authenticate(credentials, authmethod) if not full_commit: self.session.headers.update({'X-Couch-Full-Commit': 'false'}) else: self.session = session self.session.verify = verify def _authenticate(self, credentials: Union[Tuple[str, str], None], method: str) -> None: if not credentials: return if method == "session": data = {"name": credentials[0], "password": credentials[1]} post_url = urljoin(self.base_url, "_session") r = self.session.post(post_url, data=force_bytes(json.dumps(data))) if r and r.status_code != 200: raise AuthenticationFailed() elif method == "basic": self.session.auth = credentials else: raise RuntimeError("Invalid authentication method") def __call__(self, *path: str) -> Resource: base_url = urljoin(self.base_url, *path) return self.__class__(base_url, session=self.session) def _check_result(self, response, result) -> None: try: error = result.get('error', None) reason = result.get('reason', None) except AttributeError: error = None reason = '' # This is here because couchdb can return http 201 # but containing a list of conflict errors if error == 'conflict' or error == "file_exists": raise Conflict(reason or "Conflict") if response.status_code > 205: if response.status_code == 404 or error == 'not_found': raise NotFound(reason or 'Not found') elif error == 'bad_request': raise BadRequest(reason or "Bad request") raise GenericError(result) def request(self, method, path: Union[str, None], params=None, data=None, headers=None, stream=False, **kwargs) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: if headers is None: headers = {} headers.setdefault('Accept', 'application/json') if path: if not isinstance(path, (list, tuple)): path = [path] url = urljoin(self.base_url, *path) else: url = self.base_url response = self.session.request(method, url, stream=stream, data=data, params=params, headers=headers, **kwargs) # Ignore result validation if # request is with stream mode if stream and response.status_code < 400: result = None self._check_result(response, result) else: result = as_json(response) if result is None: return response, result if isinstance(result, list): for res in result: self._check_result(response, res) else: self._check_result(response, result) return response, result def get(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: return self.request("GET", path, **kwargs) def put(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: return self.request("PUT", path, **kwargs) def post(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: return self.request("POST", path, **kwargs) def delete(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: return self.request("DELETE", path, **kwargs) def head(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: return self.request("HEAD", path, **kwargs)