summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tools/certtools.py2
-rwxr-xr-xtools/comparecert.py35
-rwxr-xr-xtools/fetchallcerts.py10
-rwxr-xr-xtools/submitcert.py31
-rwxr-xr-xtools/testcase1.py55
5 files changed, 41 insertions, 92 deletions
diff --git a/tools/certtools.py b/tools/certtools.py
index 6cb4f55..2165781 100644
--- a/tools/certtools.py
+++ b/tools/certtools.py
@@ -179,7 +179,7 @@ def unpack_tls_array(packed_data, length_len):
def add_chain(baseurl, submission):
try:
- result = urlopen(baseurl + "ct/v1/add-chain", json.dumps(submission)).read()
+ result = urlopen(baseurl + "ct/v1/add-blob", json.dumps(submission)).read()
return json.loads(result)
except urllib2.HTTPError, e:
print "ERROR", e.code,":", e.read()
diff --git a/tools/comparecert.py b/tools/comparecert.py
index 81893f7..e6864b6 100755
--- a/tools/comparecert.py
+++ b/tools/comparecert.py
@@ -20,31 +20,18 @@ import signal
import select
import zipfile
-def readfile(filename):
- contents = open(filename).read()
- certchain = get_certs_from_string(contents)
- precerts = get_precerts_from_string(contents)
- return (certchain, precerts)
+def readfile(filename, filetype):
+ if filetype == 'raw':
+ return open(filename, 'r').read()
+ else:
+ return get_pemlike(filename, filetype)
def testcerts(template, test):
- (certchain1, precerts1) = template
- (certchain2, precerts2) = test
+ blob1 = template
+ blob2 = test
- if precerts1 != precerts2:
- return (False, "precerts are different")
-
- if certchain1 == certchain2:
- return (True, "")
-
- if len(certchain2) == len(certchain1) + 1:
- if certchain2[:-1] != certchain1:
- return (False, "certchains are different")
- last_issuer = get_cert_info(certchain1[-1])["issuer"]
- root_subject = get_cert_info(certchain2[-1])["subject"]
- if last_issuer == root_subject:
- return (True, "fetched chain has an appended root cert")
- else:
- return (False, "fetched chain has an extra entry")
+ if blob1 == blob2:
+ return (True, "equal")
return (False, "certchains are different")
@@ -53,9 +40,9 @@ parser.add_argument('templates', help="Test templates, separated with colon")
parser.add_argument('test', help="Files to test, separated with colon")
args = parser.parse_args()
-templates = [readfile(filename) for filename in args.templates.split(":")]
+templates = [readfile(filename, 'raw') for filename in args.templates.split(":")]
-tests = [readfile(filename) for filename in args.test.split(":")]
+tests = [readfile(filename, 'BLOB')[0] for filename in args.test.split(":")]
for test in tests:
diff --git a/tools/fetchallcerts.py b/tools/fetchallcerts.py
index 66fde74..169764f 100755
--- a/tools/fetchallcerts.py
+++ b/tools/fetchallcerts.py
@@ -129,16 +129,10 @@ else:
leaf_input = base64.decodestring(entry["leaf_input"])
leaf_hash = get_leaf_hash(leaf_input)
s += "Leafhash: %s\n" % base64.b16encode(leaf_hash)
- if issuer_key_hash:
- s += "-----BEGIN PRECERTIFICATE-----\n"
- s += base64.encodestring(chain[0]).rstrip() + "\n"
- s += "-----END PRECERTIFICATE-----\n"
- s += "\n"
- chain = chain[1:]
for cert in chain:
- s += "-----BEGIN CERTIFICATE-----\n"
+ s += "-----BEGIN BLOB-----\n"
s += base64.encodestring(cert).rstrip() + "\n"
- s += "-----END CERTIFICATE-----\n"
+ s += "-----END BLOB-----\n"
s += "\n"
zf.writestr("%08d" % i, s)
except AssertionError, e:
diff --git a/tools/submitcert.py b/tools/submitcert.py
index 3b14912..91d2111 100755
--- a/tools/submitcert.py
+++ b/tools/submitcert.py
@@ -49,31 +49,14 @@ else:
sth = get_sth(baseurl)
-def submitcert((certfile, cert)):
+def submitcert((certfile, blob)):
timing = timing_point()
- certchain = get_certs_from_string(cert)
- precerts = get_precerts_from_string(cert)
- assert len(precerts) == 0 or len(precerts) == 1
- precert = precerts[0] if precerts else None
timing_point(timing, "readcerts")
try:
- if precert:
- if ext_key_usage_precert_signing_cert in get_ext_key_usage(certchain[0]):
- issuer_key_hash = get_cert_key_hash(certchain[1])
- issuer = certchain[1]
- else:
- issuer_key_hash = get_cert_key_hash(certchain[0])
- issuer = None
- cleanedcert = cleanprecert(precert, issuer=issuer)
- signed_entry = pack_precert(cleanedcert, issuer_key_hash)
- leafcert = cleanedcert
- result = add_prechain(baseurl, {"chain":map(base64.b64encode, [precert] + certchain)})
- else:
- signed_entry = pack_cert(certchain[0])
- leafcert = certchain[0]
- issuer_key_hash = None
- result = add_chain(baseurl, {"chain":map(base64.b64encode, certchain)})
+ signed_entry = pack_cert(blob)
+ issuer_key_hash = None
+ result = add_chain(baseurl, {"blob":base64.b64encode(blob)})
except SystemExit:
print "EXIT:", certfile
select.select([], [], [], 1.0)
@@ -87,7 +70,7 @@ def submitcert((certfile, cert)):
try:
if args.check_sct:
- check_sct_signature(baseurl, signed_entry, result, precert=precert, publickey=logpublickey)
+ check_sct_signature(baseurl, signed_entry, result, publickey=logpublickey)
timing_point(timing, "checksig")
except AssertionError, e:
print "ERROR:", certfile, e
@@ -101,7 +84,7 @@ def submitcert((certfile, cert)):
if lookup_in_log:
- merkle_tree_leaf = pack_mtl(result["timestamp"], leafcert)
+ merkle_tree_leaf = pack_mtl(result["timestamp"], blob)
leaf_hash = get_leaf_hash(merkle_tree_leaf)
@@ -139,7 +122,7 @@ def submitcert((certfile, cert)):
print "and submitted chain has length", len(submittedcertchain)
timing_point(timing, "lookup")
- return ((leafcert, issuer_key_hash, result), timing["deltatimes"])
+ return ((blob, issuer_key_hash, result), timing["deltatimes"])
def get_ncerts(certfiles):
n = 0
diff --git a/tools/testcase1.py b/tools/testcase1.py
index 697cc99..1a294d9 100755
--- a/tools/testcase1.py
+++ b/tools/testcase1.py
@@ -22,11 +22,14 @@ certfiles = ["../tools/testcerts/cert1.txt", "../tools/testcerts/cert2.txt",
"../tools/testcerts/cert3.txt", "../tools/testcerts/cert4.txt",
"../tools/testcerts/cert5.txt"]
-cc1 = get_certs_from_file(certfiles[0])
-cc2 = get_certs_from_file(certfiles[1])
-cc3 = get_certs_from_file(certfiles[2])
-cc4 = get_certs_from_file(certfiles[3])
-cc5 = get_certs_from_file(certfiles[4])
+def get_blob_from_file(filename):
+ return [open(filename, 'r').read()]
+
+cc1 = get_blob_from_file(certfiles[0])
+cc2 = get_blob_from_file(certfiles[1])
+cc3 = get_blob_from_file(certfiles[2])
+cc4 = get_blob_from_file(certfiles[3])
+cc5 = get_blob_from_file(certfiles[4])
create_ssl_context(cafile=cacertfile)
@@ -54,7 +57,8 @@ def assert_equal(actual, expected, name, quiet=False, nodata=False, fatal=False)
if nodata:
print_error("%s differs", name)
else:
- print_error("%s expected %s got %s", name, expected, actual)
+ print_error("%s expected %s got %s", name, repr(expected),
+ repr(actual))
if fatal:
sys.exit(1)
elif not quiet:
@@ -74,12 +78,13 @@ def print_and_check_tree_size(expected, baseurl):
def do_add_chain(chain, baseurl):
global failures
+ blob = ''.join(chain)
try:
- result = add_chain(baseurl, {"chain":map(base64.b64encode, chain)})
+ result = add_chain(baseurl, {"blob":base64.b64encode(blob)})
except ValueError, e:
print_error("%s", e)
try:
- signed_entry = pack_cert(chain[0])
+ signed_entry = pack_cert(blob)
check_sct_signature(baseurl, signed_entry, result, publickey=logpublickey)
print_success("signature check succeeded")
except AssertionError, e:
@@ -90,8 +95,8 @@ def do_add_chain(chain, baseurl):
return result
def get_and_validate_proof(timestamp, chain, leaf_index, nentries, baseurl):
- cert = chain[0]
- merkle_tree_leaf = pack_mtl(timestamp, cert)
+ blob = ''.join(chain)
+ merkle_tree_leaf = pack_mtl(timestamp, blob)
leaf_hash = get_leaf_hash(merkle_tree_leaf)
sth = get_sth(baseurl)
proof = get_proof_by_hash(baseurl, leaf_hash, sth["tree_size"])
@@ -104,7 +109,7 @@ def get_and_validate_proof(timestamp, chain, leaf_index, nentries, baseurl):
root_hash = base64.b64decode(sth["sha256_root_hash"])
assert_equal(root_hash, calc_root_hash, "verified root hash", nodata=True, quiet=True)
- get_and_check_entry(timestamp, chain, leaf_index, baseurl)
+ get_and_check_entry(timestamp, blob, leaf_index, baseurl)
def get_and_validate_consistency_proof(sth1, sth2, size1, size2, baseurl):
consistency_proof = [base64.decodestring(entry) for entry in get_consistency_proof(baseurl, size1, size2)]
@@ -116,35 +121,15 @@ def get_and_validate_consistency_proof(sth1, sth2, size1, size2, baseurl):
def get_and_check_entry(timestamp, chain, leaf_index, baseurl):
+ blob = ''.join(chain)
entries = get_entries(baseurl, leaf_index, leaf_index)
assert_equal(len(entries), 1, "get_entries", quiet=True)
fetched_entry = entries["entries"][0]
- merkle_tree_leaf = pack_mtl(timestamp, chain[0])
+ merkle_tree_leaf = pack_mtl(timestamp, blob)
leaf_input = base64.decodestring(fetched_entry["leaf_input"])
- assert_equal(leaf_input, merkle_tree_leaf, "entry", nodata=True, quiet=True)
extra_data = base64.decodestring(fetched_entry["extra_data"])
- certchain = decode_certificate_chain(extra_data)
-
- submittedcertchain = chain[1:]
-
- for (submittedcert, fetchedcert, i) in zip(submittedcertchain,
- certchain, itertools.count(1)):
- assert_equal(fetchedcert, submittedcert, "cert %d in chain" % (i,), quiet=True)
-
- if len(certchain) == len(submittedcertchain) + 1:
- last_issuer = get_cert_info(submittedcertchain[-1])["issuer"]
- root_subject = get_cert_info(certchain[-1])["subject"]
- if last_issuer == root_subject:
- print_success("fetched chain has an appended root cert")
- else:
- print_error("fetched chain has an extra entry")
- elif len(certchain) == len(submittedcertchain):
- print_success("cert chains are the same length")
- else:
- print_error("cert chain length %d expected %d or %d",
- len(certchain),
- len(submittedcertchain),
- len(submittedcertchain))
+ assert_equal(leaf_input, merkle_tree_leaf, "entry", nodata=True, quiet=True)
+ assert_equal(extra_data, '\x00\x00\x00', "extra_data", quiet=True)
def merge():
return subprocess.call(["../tools/merge", "--config", "../test/catlfish-test.cfg",