#!/usr/bin/python # -*- coding: utf-8 -*- import time import datetime import base64 import argparse import errno from certtools import * NAGIOS_OK = 0 NAGIOS_WARN = 1 NAGIOS_CRIT = 2 NAGIOS_UNKNOWN = 3 parser = argparse.ArgumentParser(description="") parser.add_argument('--audit', action='store_true', help="run lightweight auditor verifying consistency in STH") parser.add_argument('--build-sth', action='store_true', help="get all entries and construct STH") parser.add_argument('--baseurl', required=True, help="Base URL for CT log") parser.add_argument('--sthfile', required=True, metavar='file', help="File containing current STH") parser.add_argument('--keyfile', metavar='file', required=True, help="File containing current STH") class UTC(datetime.tzinfo): def utcoffset(self, dt): return datetime.timedelta(hours=0) def dst(self, dt): return datetime.timedelta(0) def reduce_layer(layer): new_layer = [] while len(layer) > 1: e1 = layer.pop(0) e2 = layer.pop(0) new_layer.append(internal_hash((e1,e2))) return new_layer def reduce_tree(entries, layers): if len(entries) == 0 and layers is []: return [[hashlib.sha256().digest()]] layer_idx = 0 layers[layer_idx] += entries while len(layers[layer_idx]) > 1: if len(layers) == layer_idx + 1: layers.append([]) layers[layer_idx + 1] += reduce_layer(layers[layer_idx]) layer_idx += 1 return layers def reduce_subtree_to_root(layers): while len(layers) > 1: if len(layers[1]) == 0: layers[1] = layers[0] else: layers[1] += next_merkle_layer(layers[0]) del layers[0] if len(layers[0]) > 1: return next_merkle_layer(layers[0]) return layers[0] def get_and_verify_sth(url, key): try: sth = get_sth(url) except: print time.strftime('%H:%M:%S') + " ERROR: Failed to retrieve STH from " + url sys.exit(NAGIOS_CRIT) # Check signature on the STH try: check_sth_signature(url, sth, key) # write_file("plausible-sth.json", tmp_sth) except: error_str = time.strftime('%H:%M:%S') + " ERROR: Could not verify signature from " + url print error_str sys.exit(NAGIOS_CRIT) return sth def fetch_all_sth(): sths = {} for base_url in base_urls: # Fetch STH try: sths[base_url] = get_sth(base_url) except: sths[base_url] = None error_str = time.strftime('%H:%M:%S') + " ERROR: Failed to retrieve STH from " + base_url print error_str errors.append(error_str) continue # Check signature on the STH try: check_sth_signature(base_url, sths[base_url], logkeys[base_url]) except: error_str = time.strftime('%H:%M:%S') + " ERROR: Could not verify signature from " + base_url print error_str errors.append(error_str) continue # Add timing info # try: # if base_url not in timings: # timings[base_url] = {"last":sths[base_url]["timestamp"], "longest":0} # else: # then = datetime.datetime.fromtimestamp(int(timings[base_url]["last"])/1000) # now = datetime.datetime.fromtimestamp(int(sths[base_url]["timestamp"])/1000) # tdelta = now - then # timings[base_url]["last"] = sths[base_url]["timestamp"] # if tdelta.total_seconds() > timings[base_url]["longest"]: # timings[base_url]["longest"] = tdelta.total_seconds() # except Exception, err: # print Exception, err # print time.strftime('%H:%M:%S') + "ERROR: Failed to set TIME info for STH" return sths def verify_progress(url, old, new): if old and new: if new["tree_size"] == old["tree_size"]: if old["sha256_root_hash"] != new["sha256_root_hash"]: print time.strftime('%H:%M:%S') + " CRITICAL: root hash is different for same tree size in " + url sys.exit(NAGIOS_CRIT) elif new["tree_size"] < old["tree_size"]: print time.strftime('%H:%M:%S') + " CRITICAL: new tree smaller than previous tree (%d < %d)" % \ (new["tree_size"], old["tree_size"]) sys.exit(NAGIOS_WARN) if new: age = time.time() - new["timestamp"]/1000 sth_time = datetime.datetime.fromtimestamp(new['timestamp'] / 1000, UTC()).strftime("%Y-%m-%d %H:%M:%S") roothash = new['sha256_root_hash'] if age > 24 * 3600: print time.strftime('%H:%M:%S') + " CRITICAL: %s is older than 24h: %s UTC" % (url, sth_time) sys.exit(NAGIOS_CRIT) elif age > 12 * 3600: print time.strftime('%H:%M:%S') + " WARNING: %s is older than 12h: %s UTC" % (url, sth_time) sys.exit(NAGIOS_WARN) elif age > 6 * 3600: print time.strftime('%H:%M:%S') + " WARNING: %s is older than 6h: %s UTC" % (url, sth_time) sys.exit(NAGIOS_WARN) def verify_consistency(url, old, new): if old and new: try: if old["tree_size"]!= new["tree_size"]: consistency_proof = get_consistency_proof(url, old["tree_size"], new["tree_size"]) decoded_consistency_proof = [] for item in consistency_proof: decoded_consistency_proof.append(base64.b64decode(item)) res = verify_consistency_proof(decoded_consistency_proof, old["tree_size"], new["tree_size"], old["sha256_root_hash"]) if old["sha256_root_hash"] != str(base64.b64encode(res[0])): print time.strftime('%H:%M:%S') + " Verification of old hash failed! " + old["sha256_root_hash"] + str(base64.b64encode(res[0])) sys.exit(NAGIOS_CRIT) # errors.append(time.strftime('%H:%M:%S') + " ERROR: Failed to verify consistency for " + url + ", tree size " + old[url]["tree_size"]) elif new[url]["sha256_root_hash"] != str(base64.b64encode(res[1])): print time.strftime('%H:%M:%S') + " Verification of new hash failed! " + new["sha256_root_hash"] + str(base64.b64encode(res[1])) sys.exit(NAGIOS_CRIT) # errors.append(time.strftime('%H:%M:%S') + " ERROR: Failed to verify consistency for " + url + ", tree size " + new[url]["tree_size"]) else: print time.strftime("%H:%M:%S") + " New STH from " + url + ", timestamp: " + \ str(new["timestamp"]) + ", size: " + str(new["tree_size"]) + "...OK." except: print time.strftime('%H:%M:%S') + " ERROR: Could not verify consistency for " + url sys.exit(NAGIOS_CRIT) def verify_inclusion_all(url, old, new): if old and new: try: if old["tree_size"]!= new["tree_size"]: entries = get_entries(url, old["tree_size"], new["tree_size"] -1)["entries"] success = True for i in entries: h = get_leaf_hash(base64.b64decode(i["leaf_input"])) if not verify_inclusion_by_hash(url, h): success = False if not success: # print time.strftime("%H:%M:%S") + " Verifying inclusion for " + str(len(entries)) + " new entries in " + url + " ...OK" # else: print time.strftime('%H:%M:%S') + " ERROR: Failed to prove inclusion of all new entries in " + url sys.exit(NAGIOS_CRIT) # errors.append(time.strftime('%H:%M:%S') + " ERROR: Failed to prove inclusion of all new entries in " + url) except: print time.strftime('%H:%M:%S') + " ERROR: Failed to prove inclusion of all new entries in " + url # errors.append(time.strftime('%H:%M:%S') + " ERROR: Failed to prove inclusion of all new entries in " + url) sys.exit(NAGIOS_CRIT) def fetch_and_build_tree(old_sth, base_url): sth = old_sth[base_url] subtree = [[]] idx = 0 res_strings = [""] print time.strftime('%H:%M:%S') + " Getting all entries from " + base_url while idx < sth["tree_size"]: pre_size = idx entries = get_entries(base_url, idx, sth["tree_size"]-1)["entries"] new_leafs = [] for item in entries: new_leafs.append(get_leaf_hash(base64.b64decode(item["leaf_input"]))) idx += len(new_leafs) print time.strftime('%H:%M:%S') + " Got entries " + str(pre_size) + " to " + str(idx) + " from " + base_url subtree = reduce_tree(new_leafs, subtree) root = base64.b64encode(reduce_subtree_to_root(subtree)[0]) if root == sth["sha256_root_hash"]: print time.strftime('%H:%M:%S') + " Verifying root hashes for " + base_url + "...OK." res_strings.append("STH for " + base_url + " built successfully.") else: print time.strftime('%H:%M:%S') + " ERROR: Failed to verify root hashes! STH root: " + sth["sha256_root_hash"] + ", Tree root: " + root res_strings.append(time.strftime('%H:%M:%S') + " " + base_url + " Failed! STH root: " + sth["sha256_root_hash"] + " Calculated root: " + root) errors.append(time.strftime('%H:%M:%S') + " ERROR: Failed to verify root hash for " + base_url + ", tre size " + sth["tree_size"]) for item in res_strings: print item + "\n" def verify_inclusion_by_hash(base_url, leaf_hash): try: tmp_sth = get_sth(base_url) proof = get_proof_by_hash(base_url, leaf_hash, tmp_sth["tree_size"]) decoded_inclusion_proof = [] for item in proof["audit_path"]: decoded_inclusion_proof.append(base64.b64decode(item)) root = base64.b64encode(verify_inclusion_proof(decoded_inclusion_proof, proof["leaf_index"], tmp_sth["tree_size"], leaf_hash)) if tmp_sth["sha256_root_hash"] == root: # print "Verifying inclusion for entry " + str(proof["leaf_index"]) + " in " + base_url + "...OK." return True else: print time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for entry " + str(proof["leaf_index"]) + " in " + base_url errors.append(time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for entry " + str(proof["leaf_index"]) + " in " + base_url) return False except: print time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for hashed entry in " + base_url errors.append(time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for hashed entry in " + base_url) return False def verify_inclusion_by_index(base_url, index): try: tmp_sth = get_sth(base_url) proof = get_proof_by_index(base_url, index, tmp_sth["tree_size"]) decoded_inclusion_proof = [] for item in proof["audit_path"]: decoded_inclusion_proof.append(base64.b64decode(item)) root = base64.b64encode(verify_inclusion_proof(decoded_inclusion_proof, index, tmp_sth["tree_size"], get_leaf_hash(base64.b64decode(proof["leaf_input"])))) if tmp_sth["sha256_root_hash"] == root: print time.strftime('%H:%M:%S') + " Verifying inclusion for entry " + str(index) + " in " + base_url + "...OK." else: print time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for entry " + str(index) + " in " + base_url errors.append(time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for entry " + str(index) + " in " + base_url) except: print time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for entry " + str(index) + " in " + base_url errors.append(time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for entry " + str(index) + " in " + base_url) def get_proof_by_index(baseurl, index, tree_size): try: params = urllib.urlencode({"leaf_index":index, "tree_size":tree_size}) result = \ urlopen(baseurl + "ct/v1/get-entry-and-proof?" + params).read() return json.loads(result) except urllib2.HTTPError, e: print "ERROR:", e.read() sys.exit(0) def get_all_roots(base_url): # print "Fetching roots from " + base_url result = urlopen(base_url + "ct/v1/get-roots").read() certs = json.loads(result)["certificates"] print time.strftime('%H:%M:%S') + " Received " + str(len(certs)) + " certs from " + base_url for accepted_cert in certs: subject = get_cert_info(base64.decodestring(accepted_cert))["subject"] issuer = get_cert_info(base64.decodestring(accepted_cert))["issuer"] if subject == issuer: root_cert = base64.decodestring(accepted_cert) print get_cert_info(root_cert)["subject"] def read_sth(fn): try: f = open(fn) except IOError, e: if e.errno == errno.ENOENT: return None raise e return json.loads(f.read()) def write_file(fn, sth): tempname = fn + ".new" open(tempname, 'w').write(json.dumps(sth)) mv_file(tempname, fn) def main(args): try: log_key = get_public_key_from_file(args.keyfile) except: print time.strftime('%H:%M:%S') + " ERROR: Failed to load keyfile " + args.keyfile sys.exit(NAGIOS_WARN) old_sth = read_sth(args.sthfile) new_sth = get_and_verify_sth(args.baseurl, log_key) write_file(args.sthfile, new_sth) verify_progress(args.baseurl, old_sth, new_sth) verify_consistency(args.baseurl, old_sth, new_sth) verify_inclusion_all(args.baseurl, old_sth, new_sth) print "Everything OK from " + args.baseurl sys.exit(NAGIOS_OK) if __name__ == '__main__': main(parser.parse_args())