From 15d65c756fe89aca6cbcc754dc648853ca334095 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Wed, 9 Mar 2016 06:58:08 +0100 Subject: Use python requests package instead of urllib2 --- tools/certtools.py | 194 ++++++++++++++++++++++++----------------------------- 1 file changed, 88 insertions(+), 106 deletions(-) (limited to 'tools/certtools.py') diff --git a/tools/certtools.py b/tools/certtools.py index 6cb4f55..919460e 100644 --- a/tools/certtools.py +++ b/tools/certtools.py @@ -4,7 +4,6 @@ import subprocess import json import base64 -import urllib import urllib2 import ssl import urlparse @@ -16,6 +15,9 @@ import datetime import cStringIO import zipfile import shutil +import requests +import warnings + from certkeys import publickeys def get_cert_info(s): @@ -90,76 +92,48 @@ def get_root_cert(issuer): return root_cert class sslparameters: - sslcontext = None + cafile = None def create_ssl_context(cafile=None): try: - sslparameters.sslcontext = ssl.create_default_context(cafile=cafile) + sslparameters.cafile = cafile except AttributeError: - sslparameters.sslcontext = None + sslparameters.cafile = None -def get_opener(): - try: - opener = urllib2.build_opener(urllib2.HTTPSHandler(context=sslparameters.sslcontext)) - except TypeError: - opener = urllib2.build_opener(urllib2.HTTPSHandler()) - return opener - -def urlopen(url, data=None): - return get_opener().open(url, data) - -def pyopenssl_https_get(url): - """ - HTTPS GET-function to use when running old Python < 2.7 - """ - from OpenSSL import SSL - import socket - - # TLSv1 is the best we can get on Python 2.6 - context = SSL.Context(SSL.TLSv1_METHOD) - sock = SSL.Connection(context, socket.socket(socket.AF_INET, socket.SOCK_STREAM)) - - url_without_scheme = url.split('https://')[-1] - host = url_without_scheme.split('/')[0] - path = url_without_scheme.split('/', 1)[1] - http_get_request = ("GET /{path} HTTP/1.1\r\n" - "Host: {host}\r\n" - "\r\n" - ).format(path=path, host=host) - - sock.connect((host, 443)) - sock.write(http_get_request) - response = sock.recv(1024) - response_lines = response.rsplit('\n') - - # We are only interested in the actual response, - # without headers, contained in the last line. - return response_lines[len(response_lines) - 1] +def urlget(url, params=None): + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=requests.packages.urllib3.exceptions.SubjectAltNameWarning) + return requests.get(url, verify=sslparameters.cafile, params=params) + +def urlpost(url, data): + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=requests.packages.urllib3.exceptions.SubjectAltNameWarning) + return requests.post(url, data=data, verify=sslparameters.cafile) def get_sth(baseurl): - result = urlopen(baseurl + "ct/v1/get-sth").read() - return json.loads(result) + result = urlget(baseurl + "ct/v1/get-sth") + result.raise_for_status() + return result.json() def get_proof_by_hash(baseurl, hash, tree_size): - try: - params = urllib.urlencode({"hash":base64.b64encode(hash), - "tree_size":tree_size}) - result = \ - urlopen(baseurl + "ct/v1/get-proof-by-hash?" + params).read() - return json.loads(result) - except urllib2.HTTPError, e: - print "ERROR:", e.read() + params = {"hash":base64.b64encode(hash), + "tree_size":tree_size} + result = \ + urlget(baseurl + "ct/v1/get-proof-by-hash", params=params) + if result.status_code == requests.codes.ok: + return result.json() + else: + print "ERROR:", result.status_code, result.text sys.exit(1) def get_consistency_proof(baseurl, tree_size1, tree_size2): - try: - params = urllib.urlencode({"first":tree_size1, - "second":tree_size2}) - result = \ - urlopen(baseurl + "ct/v1/get-sth-consistency?" + params).read() - return json.loads(result)["consistency"] - except urllib2.HTTPError, e: - print "ERROR:", e.read() + params = {"first":tree_size1, + "second":tree_size2} + result = urlget(baseurl + "ct/v1/get-sth-consistency", params=params) + if result.status_code == requests.codes.ok: + return result.json()["consistency"] + else: + print "ERROR:", result.status_code, result.text sys.exit(1) def tls_array(data, length_len): @@ -179,13 +153,14 @@ def unpack_tls_array(packed_data, length_len): def add_chain(baseurl, submission): try: - result = urlopen(baseurl + "ct/v1/add-chain", json.dumps(submission)).read() - return json.loads(result) - except urllib2.HTTPError, e: - print "ERROR", e.code,":", e.read() - if e.code == 400: - return None - sys.exit(1) + result = urlpost(baseurl + "ct/v1/add-chain", json.dumps(submission)) + if result.status_code == requests.codes.ok: + return result.json() + else: + print "ERROR:", result.status_code, result.text + if result.status_code == 400: + return None + sys.exit(1) except ValueError, e: print "==== FAILED REQUEST ====" print submission @@ -196,14 +171,16 @@ def add_chain(baseurl, submission): def add_prechain(baseurl, submission): try: - result = urlopen(baseurl + "ct/v1/add-pre-chain", - json.dumps(submission)).read() - return json.loads(result) - except urllib2.HTTPError, e: - print "ERROR", e.code,":", e.read() - if e.code == 400: - return None - sys.exit(1) + result = urlpost(baseurl + "ct/v1/add-pre-chain", + json.dumps(submission)) + + if result.status_code == requests.codes.ok: + return result.json() + else: + print "ERROR:", result.status_code, result.text + if result.status_code == 400: + return None + sys.exit(1) except ValueError, e: print "==== FAILED REQUEST ====" print submission @@ -213,12 +190,12 @@ def add_prechain(baseurl, submission): raise e def get_entries(baseurl, start, end): - params = urllib.urlencode({"start":start, "end":end}) - try: - result = urlopen(baseurl + "ct/v1/get-entries?" + params).read() - return json.loads(result) - except urllib2.HTTPError, e: - print "ERROR:", e.read() + params = {"start":start, "end":end} + result = urlget(baseurl + "ct/v1/get-entries", params=params) + if result.status_code == requests.codes.ok: + return result.json() + else: + print "ERROR:", result.status_code, result.text sys.exit(1) def extract_precertificate(precert_chain_entry): @@ -283,27 +260,35 @@ def check_auth_header(authheader, expected_key, publickeydir, data, path): sigdecode=ecdsa.util.sigdecode_der) return True -def http_request(url, data=None, key=None, verifynode=None, publickeydir="."): - opener = get_opener() - - (keyname, keyfile) = key - privatekey = get_eckey_from_file(keyfile) - sk = ecdsa.SigningKey.from_der(privatekey) - parsed_url = urlparse.urlparse(url) - if data == None: - data_to_sign = parsed_url.query - method = "GET" - else: - data_to_sign = data - method = "POST" - signature = sk.sign("%s\0%s\0%s" % (method, parsed_url.path, data_to_sign), hashfunc=hashlib.sha256, - sigencode=ecdsa.util.sigencode_der) - opener.addheaders = [('X-Catlfish-Auth', base64.b64encode(signature) + ";key=" + keyname)] - result = opener.open(url, data) - authheader = result.info().get('X-Catlfish-Auth') - data = result.read() - check_auth_header(authheader, verifynode, publickeydir, data, parsed_url.path) - return data +def http_request(url, data=None, key=None, verifynode=None, publickeydir=".", params=None): + with requests.sessions.Session() as session: + (keyname, keyfile) = key + privatekey = get_eckey_from_file(keyfile) + sk = ecdsa.SigningKey.from_der(privatekey) + if data == None: + method = "GET" + else: + method = "POST" + assert(params == None) + req = requests.Request(method, url, params=params, data=data) + prepared_req = session.prepare_request(req) + parsed_url = urlparse.urlparse(prepared_req.url) + if data == None: + data_to_sign = parsed_url.query + else: + data_to_sign = data + url_to_sign = parsed_url.path + signature = sk.sign("%s\0%s\0%s" % (method, url_to_sign, data_to_sign), hashfunc=hashlib.sha256, + sigencode=ecdsa.util.sigencode_der) + prepared_req.headers['X-Catlfish-Auth'] = base64.b64encode(signature) + ";key=" + keyname + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=requests.packages.urllib3.exceptions.SubjectAltNameWarning) + result = session.send(prepared_req, verify=sslparameters.cafile) + result.raise_for_status() + authheader = result.headers.get('X-Catlfish-Auth') + data = result.text + check_auth_header(authheader, verifynode, publickeydir, data, url_to_sign) + return data def get_signature(baseurl, data, key=None): try: @@ -311,11 +296,8 @@ def get_signature(baseurl, data, key=None): result = http_request(baseurl + "plop/v1/signing/sth", params, key=key) parsed_result = json.loads(result) return base64.b64decode(parsed_result.get(u"result")) - except urllib2.URLError, e: - print >>sys.stderr, "ERROR: get_signature", e.reason - sys.exit(1) - except urllib2.HTTPError, e: - print "ERROR: get_signature", e.read() + except requests.exceptions.HTTPError, e: + print "ERROR: get_signature", e.response raise e def create_signature(baseurl, data, key=None): -- cgit v1.1