1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
|
# -*- coding: utf-8 -*-
__author__ = 'lundberg'
import sys
import argparse
import logging
from socket import gethostbyname, gethostbyaddr, gaierror, herror
import dns.resolver
logger = logging.getLogger('dnscheck_nsd')
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
VERBOSE = False
def get_hostname(addr):
hostname = addr
try:
return gethostbyaddr(addr)[0]
except herror:
logger.error('No hostname could be found for %s.' % addr)
return hostname
def get_resolver(nameservers=None, lifetime=30):
resolver = dns.resolver.Resolver()
resolver.lifetime = lifetime
if nameservers:
resolver.nameservers = []
for nameserver in nameservers:
try:
resolver.nameservers.append(gethostbyname(nameserver))
except gaierror:
try:
resolver.nameservers.append(nameserver) # It is an IP address
except gaierror:
logger.error('Could not find nameserver: %s' % nameserver)
sys.exit(1)
logger.debug('Resolver instance with nameserver %s.' % resolver.nameservers)
return resolver
def compare_soa(zone, resolvers):
answers = []
for resolver in resolvers:
answer = None
try:
answer = resolver.query(zone, 'SOA')[0]
if VERBOSE:
if resolver.nameservers[0] == '127.0.0.1' or resolver.nameservers[0] == '::1':
logger.info('NS %s: %s' % (resolver.nameservers[0], answer))
else:
try:
logger.info('NS %s (%s): %s' % (get_hostname(resolver.nameservers[0]),
resolver.nameservers[0], answer))
except herror:
logger.info('NS %s: %s' % (resolver.nameservers[0], answer))
except dns.exception.Timeout:
logger.error('%s (%s) timed out. SOA request for %s failed.' % (get_hostname(resolver.nameservers[0]),
resolver.nameservers[0], zone))
except dns.resolver.NoAnswer:
logger.error('%s (%s) returned no answer for %s.' % (get_hostname(resolver.nameservers[0]),
resolver.nameservers[0], zone))
except dns.resolver.NXDOMAIN:
logger.error('NS %s (%s) responded domain not found (NXDOMAIN) for %s.' % (get_hostname(resolver.nameservers[0]),
resolver.nameservers[0], zone))
except dns.resolver.NoNameservers:
logger.error('No non-broken nameservers are available to answer the query for %s.' % zone)
if answer:
answers.append(answer)
if len(set(answers)) == 1:
return 'match'
return 'no match'
def print_soa(zone, resolvers):
for resolver in resolvers:
try:
answer = resolver.query(zone, 'SOA')[0]
if resolver.nameservers[0] == '127.0.0.1' or resolver.nameservers[0] == '::1':
print 'NS %s: %s' % (resolver.nameservers[0], answer)
else:
print 'NS %s: %s' % (get_hostname(resolver.nameservers[0]), answer)
except dns.exception.Timeout:
logger.error('%s (%s) timed out. SOA request for %s failed.' % (get_hostname(resolver.nameservers[0]),
resolver.nameservers[0], zone))
except dns.resolver.NoAnswer:
logger.error('%s (%s) returned no answer for %s.' % (get_hostname(resolver.nameservers[0]),
resolver.nameservers[0], zone))
except dns.resolver.NXDOMAIN:
logger.error('NS %s (%s) responded domain not found (NXDOMAIN) for %s.' % (get_hostname(resolver.nameservers[0]),
resolver.nameservers[0], zone))
except dns.resolver.NoNameservers:
logger.error('No non-broken nameservers are available to answer the query for %s.' % zone)
def check_auth(zone, resolver):
try:
nameserver = get_hostname(resolver.nameservers[0])
answer = resolver.query(zone, 'NS')
if VERBOSE:
logger.info('Checking if NS %s authoritative for %s...' % (nameserver, zone))
ns_match = '%s.' % nameserver # hostname.
for auth in answer:
if ns_match == auth.to_text():
if VERBOSE:
logger.info('NS %s is authoritative for %s...' % (nameserver, zone))
return True
except dns.exception.Timeout:
logger.error('%s timed out. NS request for %s (%s) failed.' % (get_hostname(resolver.nameservers[0]),
resolver.nameservers[0], zone))
return None
except dns.resolver.NoAnswer:
logger.error('%s (%s) returned no answer for %s.' % (get_hostname(resolver.nameservers[0]),
resolver.nameservers[0], zone))
return None
except dns.resolver.NXDOMAIN:
logger.error('NS %s (%s) responded domain not found (NXDOMAIN) for %s.' % (get_hostname(resolver.nameservers[0]),
resolver.nameservers[0], zone))
return None
except dns.resolver.NoNameservers:
logger.error('No non-broken nameservers are available to answer the query for %s.' % zone)
return None
except gaierror:
logger.error('Could not find nameserver: %s' % resolver.nameservers[0])
return None
if VERBOSE:
logger.info('NS %s is not authoritative for %s...' % (nameserver, zone))
return False
def parse_file(f):
result = []
in_zone, domain, ns_addresses = False, '', []
for line in f:
if not line.startswith('#'):
if line.strip() == '':
if in_zone and domain and ns_addresses:
result.append({
'domain': domain.strip('"'),
'ns_addresses': ns_addresses
})
logger.debug('Added %s to zones that should be checked.' % result[-1])
in_zone, domain, ns_addresses = False, '', []
if line.startswith('zone'):
if in_zone: # Zones should be separated by a blank line
if domain:
logger.error('Misconfigured zone: %s in %s.' % (domain, f.name))
if ns_addresses:
logger.error('Misconfigured zone with NS address: %s in %s.' % (ns_addresses, f.name))
in_zone, domain, ns_addresses = False, '', []
in_zone = True
if in_zone and line.find('name') != -1:
domain = line.split()[1]
if in_zone and line.find('request-xfr') != -1:
ns_addresses.append(line.split()[1])
return result
def main():
# User friendly usage output
parser = argparse.ArgumentParser()
parser.add_argument('--nameserver', '-ns', default='localhost',
help="IP address or hostname of reference NS, default localhosts resolver")
parser.add_argument('--timeout', '-t', type=float, default=5, help="timeout in seconds, default 5")
parser.add_argument('--nochecksoa', action='store_true', default=False)
parser.add_argument('--nocheckauth', action='store_true', default=False)
parser.add_argument('--verbose', '-v', action='store_true', default=False)
parser.add_argument('--debug', action='store_true', default=False)
parser.add_argument(
'--exclude',
'-x',
type=argparse.FileType('r'),
default=None,
help="list of zones to exclude from check"
)
parser.add_argument(
'file',
nargs='?',
type=argparse.FileType('r'),
default=sys.stdin,
help="NSD configuration file"
)
args = parser.parse_args()
if args.verbose:
global VERBOSE
VERBOSE = True
if args.debug:
logger.setLevel(logging.DEBUG)
if args.exclude:
exclude_list = [line.strip() for line in args.exclude]
print '-- Exclude List --'
print exclude_list
print '-- Exclude List --'
else:
exclude_list = []
ref_resolver = get_resolver(nameservers=[args.nameserver], lifetime=args.timeout)
try:
for item in parse_file(args.file):
if not item['domain'] in exclude_list:
if not args.nochecksoa:
resolvers = [ref_resolver]
for address in item['ns_addresses']:
resolvers.append(get_resolver(nameservers=[address], lifetime=args.timeout))
soa_result = compare_soa(item['domain'], resolvers)
if soa_result == 'match' and VERBOSE:
print 'SOA check complete for zone %s.\n' % item['domain']
elif not soa_result:
print 'SOA check for zone %s failed.\n' % item['domain']
elif soa_result == 'no match':
print 'SOA did not match for zone %s:' % item['domain']
print_soa(item['domain'], resolvers)
print ''
if not args.nocheckauth:
auth_result = check_auth(item['domain'], ref_resolver)
if auth_result and VERBOSE:
print 'Authority check complete for %s.\n' % item['domain']
elif auth_result is None:
print 'Authoritative check failed for %s.\n' % item['domain']
elif not auth_result:
print 'Reference NS is not authoritative for %s.\n' % item['domain']
else:
if VERBOSE:
logger.info('Zone %s found in exclude list, skipping...' % item['domain'])
except KeyboardInterrupt:
sys.exit(0)
return 0
if __name__ == '__main__':
main()
|