diff options
Diffstat (limited to 'tools/josef_experimental_auditor.py')
-rwxr-xr-x | tools/josef_experimental_auditor.py | 64 |
1 files changed, 8 insertions, 56 deletions
diff --git a/tools/josef_experimental_auditor.py b/tools/josef_experimental_auditor.py index 57ef9cb..9268d03 100755 --- a/tools/josef_experimental_auditor.py +++ b/tools/josef_experimental_auditor.py @@ -16,36 +16,24 @@ NAGIOS_UNKNOWN = 3 DEFAULT_CUR_FILE = 'all-sth.json' base_urls = [ - "https://plausible.ct.nordu.net/", - "https://ct1.digicert-ct.com/log/", - "https://ct.izenpe.com/", - "https://log.certly.io/", - "https://ct.googleapis.com/aviator/", - "https://ct.googleapis.com/pilot/", - "https://ct.googleapis.com/rocketeer/", + # "https://plausible.ct.nordu.net/", + # "https://ct1.digicert-ct.com/log/", + # "https://ct.izenpe.com/", + # "https://log.certly.io/", + # "https://ct.googleapis.com/aviator/", + # "https://ct.googleapis.com/pilot/", + # "https://ct.googleapis.com/rocketeer/", "https://ct.ws.symantec.com/", # "https://ctlog.api.venafi.com/", ] - -# logkeys = {} -# logkeys["https://plausible.ct.nordu.net/"] = get_public_key_from_file("../../plausible-logkey.pem") -# logkeys["https://ct.googleapis.com/rocketeer/"] = get_public_key_from_file("../../rocketeer-logkey.pem") -# logkeys["https://ct.googleapis.com/aviator/"] = get_public_key_from_file("../../aviator-logkey.pem") -# logkeys["https://ct.googleapis.com/pilot/"] = get_public_key_from_file("../../pilot-logkey.pem") -# logkeys["https://log.certly.io/"] = get_public_key_from_file("../../certly-logkey.pem") -# logkeys["https://ct.izenpe.com/"] = get_public_key_from_file("../../izenpe-logkey.pem") -# logkeys["https://ct.ws.symantec.com/"] = get_public_key_from_file("../../symantec-logkey.pem") -# logkeys["https://ctlog.api.venafi.com/"] = get_public_key_from_file("../../venafi-logkey.pem") -# logkeys["https://ct1.digicert-ct.com/log/"] = get_public_key_from_file("../../digicert-logkey.pem") - parser = argparse.ArgumentParser(description="") parser.add_argument('--audit', action='store_true', help="run lightweight auditor verifying consistency in STH") +parser.add_argument('--monitor', action='store_true', help="run full monitoring for all logs") parser.add_argument('--audit2', action='store_true', help="run medium-weight auditor verifying consistency in STH and inclusion proofs of new entries") parser.add_argument('--audit3', action='store_true', help="continously run medium-weight auditor verifying consistency in STH and inclusion proofs of new entries") parser.add_argument('--audit4', action='store_true', help="run one check on one server") parser.add_argument('--build-sth', action='store_true', help="get all entries and construct STH") parser.add_argument('--verify-index', default=None, help="Verify a specific index in all logs" ) -# parser.add_argument('--verify-hash', action='store_true', help="Verify an entry hash in all logs" ) parser.add_argument('--host', default=None, help="Base URL for CT log") parser.add_argument('--roots', action='store_true', help="Check accepted root certificates for all logs" ) parser.add_argument('--cur-sth', @@ -120,24 +108,6 @@ def fetch_all_sth(): errors.append(error_str) continue - # Add timing info - # try: - # if base_url not in timings: - # timings[base_url] = {"last":sths[base_url]["timestamp"], "longest":0} - # else: - # then = datetime.datetime.fromtimestamp(int(timings[base_url]["last"])/1000) - # now = datetime.datetime.fromtimestamp(int(sths[base_url]["timestamp"])/1000) - # tdelta = now - then - - # timings[base_url]["last"] = sths[base_url]["timestamp"] - - # if tdelta.total_seconds() > timings[base_url]["longest"]: - # timings[base_url]["longest"] = tdelta.total_seconds() - - # except Exception, err: - # print Exception, err - # print time.strftime('%H:%M:%S') + "ERROR: Failed to set TIME info for STH" - return sths def verify_progress(old, new): @@ -148,20 +118,12 @@ def verify_progress(old, new): if new[url]["tree_size"] == old[url]["tree_size"]: if old[url]["sha256_root_hash"] != new[url]["sha256_root_hash"]: errors.append(time.strftime('%H:%M:%S') + " CRITICAL: root hash is different for same tree size in " + url) - # print "tree size:", newsth["tree_size"], - # print "old hash:", b64_to_b16(oldsth["sha256_root_hash"]) - # print "new hash:", b64_to_b16(newsth["sha256_root_hash"]) - # sys.exit(NAGIOS_CRIT) - # TODO elif new[url]["tree_size"] < old[url]["tree_size"]: - # if not args.allow_lag: errors.append(time.strftime('%H:%M:%S') + " CRITICAL: new tree smaller than previous tree (%d < %d)" % \ (new[url]["tree_size"], old[url]["tree_size"])) - # sys.exit(NAGIOS_CRIT) if new[url]: age = time.time() - new[url]["timestamp"]/1000 sth_time = datetime.datetime.fromtimestamp(new[url]['timestamp'] / 1000, UTC()).strftime("%Y-%m-%d %H:%M:%S") - # roothash = b64_to_b16(sth['sha256_root_hash']) roothash = new[url]['sha256_root_hash'] if age > 24 * 3600: errors.append(time.strftime('%H:%M:%S') + " CRITICAL: %s is older than 24h: %s UTC" % (url, sth_time)) @@ -273,7 +235,6 @@ def verify_inclusion_by_hash(base_url, leaf_hash): root = base64.b64encode(verify_inclusion_proof(decoded_inclusion_proof, proof["leaf_index"], tmp_sth["tree_size"], leaf_hash)) if tmp_sth["sha256_root_hash"] == root: - # print "Verifying inclusion for entry " + str(proof["leaf_index"]) + " in " + base_url + "...OK." return True else: print time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for entry " + str(proof["leaf_index"]) + " in " + base_url @@ -316,7 +277,6 @@ def get_proof_by_index(baseurl, index, tree_size): sys.exit(0) def get_all_roots(base_url): - # print "Fetching roots from " + base_url result = urlopen(base_url + "ct/v1/get-roots").read() certs = json.loads(result)["certificates"] print time.strftime('%H:%M:%S') + " Received " + str(len(certs)) + " certs from " + base_url @@ -374,13 +334,6 @@ def main(args): for url in base_urls: verify_inclusion_by_index(url, int(args.verify_index)) - # if args.verify_hash: - # idx = 1337 - # url = base_urls[0] - # entries = get_entries(url, idx, idx)["entries"] - # h = get_leaf_hash(base64.b64decode(entries[0]["leaf_input"])) - # verify_inclusion_by_hash(url, h) - if args.roots: print time.strftime('%H:%M:%S') + " Getting accepted Root Certs from all logs..." for url in base_urls: @@ -432,7 +385,6 @@ def main(args): try: tmp_sth = get_sth(base_url) except: - # sths[base_url] = None error_str = time.strftime('%H:%M:%S') + " ERROR: Failed to retrieve STH from " + base_url print error_str errors.append(error_str) |