# -*- coding: utf-8 -*- __author__ = 'lundberg' import sys import argparse import logging from socket import gethostbyname, gethostbyaddr, gaierror, herror import dns.resolver logger = logging.getLogger('dnscheck_nsd') logger.setLevel(logging.INFO) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) logger.addHandler(ch) VERBOSE = False def get_hostname(addr): hostname = addr try: return gethostbyaddr(addr)[0] except herror: logger.error('No hostname could be found for %s.' % addr) return hostname def get_resolver(nameservers=None, lifetime=30): resolver = dns.resolver.Resolver() resolver.lifetime = lifetime if nameservers: resolver.nameservers = [] for nameserver in nameservers: try: resolver.nameservers.append(gethostbyname(nameserver)) except gaierror: try: resolver.nameservers.append(nameserver) # It is an IP address except gaierror: logger.error('Could not find nameserver: %s' % nameserver) sys.exit(1) logger.debug('Resolver instance with nameserver %s.' % resolver.nameservers) return resolver def compare_soa(zone, resolvers): answers = [] for resolver in resolvers: answer = None try: answer = resolver.query(zone, 'SOA')[0] if VERBOSE: if resolver.nameservers[0] == '127.0.0.1' or resolver.nameservers[0] == '::1': logger.info('NS %s: %s' % (resolver.nameservers[0], answer)) else: try: logger.info('NS %s (%s): %s' % (get_hostname(resolver.nameservers[0]), resolver.nameservers[0], answer)) except herror: logger.info('NS %s: %s' % (resolver.nameservers[0], answer)) except dns.exception.Timeout: logger.error('%s (%s) timed out. SOA request for %s failed.' % (get_hostname(resolver.nameservers[0]), resolver.nameservers[0], zone)) except dns.resolver.NoAnswer: logger.error('%s (%s) returned no answer for %s.' % (get_hostname(resolver.nameservers[0]), resolver.nameservers[0], zone)) except dns.resolver.NXDOMAIN: logger.error('NS %s (%s) responded domain not found (NXDOMAIN) for %s.' % (get_hostname(resolver.nameservers[0]), resolver.nameservers[0], zone)) except dns.resolver.NoNameservers: logger.error('No non-broken nameservers are available to answer the query for %s.' % zone) if answer: answers.append(answer) if len(set(answers)) == 1: return 'match' return 'no match' def print_soa(zone, resolvers): for resolver in resolvers: try: answer = resolver.query(zone, 'SOA')[0] if resolver.nameservers[0] == '127.0.0.1' or resolver.nameservers[0] == '::1': print 'NS %s: %s' % (resolver.nameservers[0], answer) else: print 'NS %s: %s' % (get_hostname(resolver.nameservers[0]), answer) except dns.exception.Timeout: logger.error('%s (%s) timed out. SOA request for %s failed.' % (get_hostname(resolver.nameservers[0]), resolver.nameservers[0], zone)) except dns.resolver.NoAnswer: logger.error('%s (%s) returned no answer for %s.' % (get_hostname(resolver.nameservers[0]), resolver.nameservers[0], zone)) except dns.resolver.NXDOMAIN: logger.error('NS %s (%s) responded domain not found (NXDOMAIN) for %s.' % (get_hostname(resolver.nameservers[0]), resolver.nameservers[0], zone)) except dns.resolver.NoNameservers: logger.error('No non-broken nameservers are available to answer the query for %s.' % zone) def check_auth(zone, resolver): try: nameserver = get_hostname(resolver.nameservers[0]) answer = resolver.query(zone, 'NS') if VERBOSE: logger.info('Checking if NS %s authoritative for %s...' % (nameserver, zone)) ns_match = '%s.' % nameserver # hostname. for auth in answer: if ns_match == auth.to_text(): if VERBOSE: logger.info('NS %s is authoritative for %s...' % (nameserver, zone)) return True except dns.exception.Timeout: logger.error('%s timed out. NS request for %s (%s) failed.' % (get_hostname(resolver.nameservers[0]), resolver.nameservers[0], zone)) return None except dns.resolver.NoAnswer: logger.error('%s (%s) returned no answer for %s.' % (get_hostname(resolver.nameservers[0]), resolver.nameservers[0], zone)) return None except dns.resolver.NXDOMAIN: logger.error('NS %s (%s) responded domain not found (NXDOMAIN) for %s.' % (get_hostname(resolver.nameservers[0]), resolver.nameservers[0], zone)) return None except dns.resolver.NoNameservers: logger.error('No non-broken nameservers are available to answer the query for %s.' % zone) return None except gaierror: logger.error('Could not find nameserver: %s' % resolver.nameservers[0]) return None if VERBOSE: logger.info('NS %s is not authoritative for %s...' % (nameserver, zone)) return False def parse_file(f): result = [] in_zone, domain, ns_addresses = False, '', [] for line in f: if not line.startswith('#'): if line.strip() == '': if in_zone and domain and ns_addresses: result.append({ 'domain': domain.strip('"'), 'ns_addresses': ns_addresses }) logger.debug('Added %s to zones that should be checked.' % result[-1]) in_zone, domain, ns_addresses = False, '', [] if line.startswith('zone'): if in_zone: # Zones should be separated by a blank line if domain: logger.error('Misconfigured zone: %s in %s.' % (domain, f.name)) if ns_addresses: logger.error('Misconfigured zone with NS address: %s in %s.' % (ns_addresses, f.name)) in_zone, domain, ns_addresses = False, '', [] in_zone = True if in_zone and line.find('name') != -1: domain = line.split()[1] if in_zone and line.find('request-xfr') != -1: ns_addresses.append(line.split()[1]) return result def main(): # User friendly usage output parser = argparse.ArgumentParser() parser.add_argument('--nameserver', '-ns', default='localhost', help="IP address or hostname of reference NS, default localhosts resolver") parser.add_argument('--timeout', '-t', type=float, default=5, help="timeout in seconds, default 5") parser.add_argument('--nochecksoa', action='store_true', default=False) parser.add_argument('--nocheckauth', action='store_true', default=False) parser.add_argument('--verbose', '-v', action='store_true', default=False) parser.add_argument('--debug', action='store_true', default=False) parser.add_argument( '--exclude', '-x', type=argparse.FileType('r'), default=None, help="list of zones to exclude from check" ) parser.add_argument( 'file', nargs='?', type=argparse.FileType('r'), default=sys.stdin, help="NSD configuration file" ) args = parser.parse_args() if args.verbose: global VERBOSE VERBOSE = True if args.debug: logger.setLevel(logging.DEBUG) if args.exclude: exclude_list = [line.strip() for line in args.exclude] print '-- Exclude List --' print exclude_list print '-- Exclude List --' else: exclude_list = [] ref_resolver = get_resolver(nameservers=[args.nameserver], lifetime=args.timeout) try: for item in parse_file(args.file): if not item['domain'] in exclude_list: if not args.nochecksoa: resolvers = [ref_resolver] for address in item['ns_addresses']: resolvers.append(get_resolver(nameservers=[address], lifetime=args.timeout)) soa_result = compare_soa(item['domain'], resolvers) if soa_result == 'match' and VERBOSE: print 'SOA check complete for zone %s.\n' % item['domain'] elif not soa_result: print 'SOA check for zone %s failed.\n' % item['domain'] elif soa_result == 'no match': print 'SOA did not match for zone %s:' % item['domain'] print_soa(item['domain'], resolvers) print '' if not args.nocheckauth: auth_result = check_auth(item['domain'], ref_resolver) if auth_result and VERBOSE: print 'Authority check complete for %s.\n' % item['domain'] elif auth_result is None: print 'Authoritative check failed for %s.\n' % item['domain'] elif not auth_result: print 'Reference NS is not authoritative for %s.\n' % item['domain'] else: if VERBOSE: logger.info('Zone %s found in exclude list, skipping...' % item['domain']) except KeyboardInterrupt: sys.exit(0) return 0 if __name__ == '__main__': main()