From 6b62ebbf1de5b9e55b04e9cfafd0620f1374c2d4 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Tue, 31 Mar 2015 14:27:23 +0200 Subject: Cleanup tests and use urllib2.build_opener Remove unused files Generate test config files directly in release directory Move test database files to "tests" directory Generate log key when preparing tests Report error when STH not found in v1.erl Make merge, fetchallcerts, submitcert, verifysct, and testcase1 take log key as argument --- tools/certtools.py | 50 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 18 deletions(-) (limited to 'tools/certtools.py') diff --git a/tools/certtools.py b/tools/certtools.py index 2c97dfb..da5021a 100644 --- a/tools/certtools.py +++ b/tools/certtools.py @@ -88,8 +88,15 @@ def get_root_cert(issuer): return root_cert +def urlopen(url, data=None): + try: + opener = urllib2.build_opener(urllib2.HTTPSHandler(context=None)) + except TypeError: + opener = urllib2.build_opener(urllib2.HTTPSHandler()) + return opener.open(url, data) + def get_sth(baseurl): - result = urllib2.urlopen(baseurl + "ct/v1/get-sth").read() + result = urlopen(baseurl + "ct/v1/get-sth").read() return json.loads(result) def get_proof_by_hash(baseurl, hash, tree_size): @@ -97,7 +104,7 @@ def get_proof_by_hash(baseurl, hash, tree_size): params = urllib.urlencode({"hash":base64.b64encode(hash), "tree_size":tree_size}) result = \ - urllib2.urlopen(baseurl + "ct/v1/get-proof-by-hash?" + params).read() + urlopen(baseurl + "ct/v1/get-proof-by-hash?" + params).read() return json.loads(result) except urllib2.HTTPError, e: print "ERROR:", e.read() @@ -108,7 +115,7 @@ def get_consistency_proof(baseurl, tree_size1, tree_size2): params = urllib.urlencode({"first":tree_size1, "second":tree_size2}) result = \ - urllib2.urlopen(baseurl + "ct/v1/get-sth-consistency?" + params).read() + urlopen(baseurl + "ct/v1/get-sth-consistency?" + params).read() return json.loads(result)["consistency"] except urllib2.HTTPError, e: print "ERROR:", e.read() @@ -131,7 +138,7 @@ def unpack_tls_array(packed_data, length_len): def add_chain(baseurl, submission): try: - result = urllib2.urlopen(baseurl + "ct/v1/add-chain", json.dumps(submission)).read() + result = urlopen(baseurl + "ct/v1/add-chain", json.dumps(submission)).read() return json.loads(result) except urllib2.HTTPError, e: print "ERROR", e.code,":", e.read() @@ -148,7 +155,7 @@ def add_chain(baseurl, submission): def add_prechain(baseurl, submission): try: - result = urllib2.urlopen(baseurl + "ct/v1/add-pre-chain", + result = urlopen(baseurl + "ct/v1/add-pre-chain", json.dumps(submission)).read() return json.loads(result) except urllib2.HTTPError, e: @@ -167,7 +174,7 @@ def add_prechain(baseurl, submission): def get_entries(baseurl, start, end): try: params = urllib.urlencode({"start":start, "end":end}) - result = urllib2.urlopen(baseurl + "ct/v1/get-entries?" + params).read() + result = urlopen(baseurl + "ct/v1/get-entries?" + params).read() return json.loads(result) except urllib2.HTTPError, e: print "ERROR:", e.read() @@ -198,8 +205,9 @@ def encode_signature(hash_alg, signature_alg, unpacked_signature): signature += tls_array(unpacked_signature, 2) return signature -def check_signature(baseurl, signature, data): - publickey = base64.decodestring(publickeys[baseurl]) +def check_signature(baseurl, signature, data, publickey=None): + if publickey == None: + publickey = base64.decodestring(publickeys[baseurl]) (hash_alg, signature_alg, unpacked_signature) = decode_signature(signature) assert hash_alg == 4, \ "hash_alg is %d, expected 4" % (hash_alg,) # sha256 @@ -230,20 +238,25 @@ def check_auth_header(authheader, expected_key, publickeydir, data, path): return True def http_request(url, data=None, key=None, verifynode=None, publickeydir="."): - req = urllib2.Request(url, data) + try: + opener = urllib2.build_opener(urllib2.HTTPSHandler(context=None)) + except TypeError: + opener = urllib2.build_opener(urllib2.HTTPSHandler()) + (keyname, keyfile) = key privatekey = get_eckey_from_file(keyfile) sk = ecdsa.SigningKey.from_der(privatekey) parsed_url = urlparse.urlparse(url) if data == None: - data = parsed_url.query + data_to_sign = parsed_url.query method = "GET" else: + data_to_sign = data method = "POST" - signature = sk.sign("%s\0%s\0%s" % (method, parsed_url.path, data), hashfunc=hashlib.sha256, + signature = sk.sign("%s\0%s\0%s" % (method, parsed_url.path, data_to_sign), hashfunc=hashlib.sha256, sigencode=ecdsa.util.sigencode_der) - req.add_header('X-Catlfish-Auth', base64.b64encode(signature) + ";key=" + keyname) - result = urllib2.urlopen(req) + opener.addheaders = [('X-Catlfish-Auth', base64.b64encode(signature) + ";key=" + keyname)] + result = opener.open(url, data) authheader = result.info().get('X-Catlfish-Auth') data = result.read() check_auth_header(authheader, verifynode, publickeydir, data, parsed_url.path) @@ -263,7 +276,7 @@ def create_signature(baseurl, data, key=None): unpacked_signature = get_signature(baseurl, data, key) return encode_signature(4, 3, unpacked_signature) -def check_sth_signature(baseurl, sth): +def check_sth_signature(baseurl, sth, publickey=None): signature = base64.decodestring(sth["tree_head_signature"]) version = struct.pack(">b", 0) @@ -273,7 +286,7 @@ def check_sth_signature(baseurl, sth): hash = base64.decodestring(sth["sha256_root_hash"]) tree_head = version + signature_type + timestamp + tree_size + hash - check_signature(baseurl, signature, tree_head) + check_signature(baseurl, signature, tree_head, publickey=publickey) def create_sth_signature(tree_size, timestamp, root_hash, baseurl, key=None): version = struct.pack(">b", 0) @@ -284,8 +297,9 @@ def create_sth_signature(tree_size, timestamp, root_hash, baseurl, key=None): return create_signature(baseurl, tree_head, key=key) -def check_sct_signature(baseurl, signed_entry, sct, precert=False): - publickey = base64.decodestring(publickeys[baseurl]) +def check_sct_signature(baseurl, signed_entry, sct, precert=False, publickey=None): + if publickey == None: + publickey = base64.decodestring(publickeys[baseurl]) calculated_logid = hashlib.sha256(publickey).digest() received_logid = base64.decodestring(sct["id"]) assert calculated_logid == received_logid, \ @@ -306,7 +320,7 @@ def check_sct_signature(baseurl, signed_entry, sct, precert=False): entry_type + signed_entry + \ tls_array(base64.decodestring(sct["extensions"]), 2) - check_signature(baseurl, signature, signed_struct) + check_signature(baseurl, signature, signed_struct, publickey=publickey) def pack_mtl(timestamp, leafcert): entry_type = struct.pack(">H", 0) -- cgit v1.1