# -*- coding: utf-8 -*- """ Created on Mon May 9 14:20:31 2011 @author: lundberg Used in django-changepw (http://git.nordu.net/?p=django-changepw.git;a=summary). """ from subprocess import call, Popen, PIPE import pexpect import ldap from django.conf import settings SUFFIXES = ['', '/ppp', '/net', '/vpn'] def _normalize_whitespace(s): """ Removes leading and ending whitespace from a string. """ return ' '.join(s.split()) def check_kerberos_password(username, password): """ Tries to kinit with the username and password. Returns True and kdestroys the ticket if the kinit succeded and returns False otherwise. """ child = pexpect.spawn('kinit %s' % username) result = child.expect(['Password', 'not found']) if result is 0: child.sendline(password) result = child.expect([ 'kinit: Password incorrect', 'kinit: krb5_get_init_creds: salt type 3 not supported', # Missmatch of kerberos version between client and server 'unknown', pexpect.EOF]) if result == 3: call('kdestroy') return True return False def duplicated_kerberos_password(suffix, _username, password): """ Checks all suffixes except the one provided, if the password can unlock any pricipal True is returned else False. """ kerberos_uid = _username.split('@') kerberos_uid[1] = kerberos_uid[1].upper() suffixes = list(SUFFIXES) suffixes.remove(suffix) for suff in suffixes: username = '%s@' % suff if check_kerberos_password(username.join(kerberos_uid), password): return True return False def change_nordunet_sso_pw(user, new_password): """ Changes the Kerberos and LDAP password for the user. """ ret = _change_kerberos_pw('', user.username, new_password) if not ret: ret = set_nordunet_ldap_pw_sasl(user) return ret def set_nordunet_ldap_pw_sasl(user): """ Sets the users ldap password to a pointer to a Kerberos principal. """ username = user.username.split('@')[0] ldap_dn = 'uid=%s,ou=People,dc=nordu,dc=net' % username l = _connect_ldap(user=settings.LDAP_USER, password=settings.LDAP_PASSWORD) if l: try: mod_attrs = [(ldap.MOD_REPLACE, 'userPassword', str('{SASL}%s@NORDU.NET' % username))] l.modify_s(ldap_dn, mod_attrs) except ldap.LDAPError, e: l.unbind() return e.message l.unbind() else: return 'Invalid LDAP credentials in settings.' return 0 def change_nordunet_ppp_pw(user, new_password): """ Uses a third party script to change a Kerberos password. Returns the return value from the third party script. User needs to be employee at NORDUnet to run this. User has affiliation employee@nordu.net. """ if user.is_staff: return _change_kerberos_pw('/ppp', user.username, new_password) else: return 'You need to be a NORDUnet employee or member to use this.' def change_nordunet_net_pw(user, new_password): """ Uses a third party script to change a Kerberos password. Returns the return value from the third party script. User needs to be employee at NORDUnet to run this. If user has affiliation employee@nordu.net is_staff flag is True. """ if user.is_staff: return _change_kerberos_pw('/net', user.username, new_password) else: return 'You need to be a NORDUnet employee to use this.' def change_nordunet_vpn_pw(user, new_password): """ Uses a third party script to change a Kerberos password. Returns the return value from the third party script. User needs to be employee at NORDUnet to run this. If user has affiliation employee@nordu.net is_staff flag is True. """ if user.is_staff: return _change_kerberos_pw('/vpn', user.username, new_password) else: return 'You need to be a NORDUnet employee to use this.' def _change_kerberos_pw(suffix, username, new_password): kerberos_uid = username.split('@') kerberos_uid[1] = kerberos_uid[1].upper() if not duplicated_kerberos_password(suffix, username, new_password): kerberos_uid = '%s%s@%s' % (kerberos_uid[0], suffix, kerberos_uid[1]) p = Popen([settings.KERBEROS_SCRIPT], stdin=PIPE) p.communicate('%s %s' % (kerberos_uid, new_password)) return p.wait() return 'You can\'t set the same password as your %s password.' % _pretty_suffixes(without=suffix) def _pretty_suffixes(without=None): if without is None: suffixes = [s for s in SUFFIXES if s is not without] else: suffixes = list(SUFFIXES) if '' in suffixes: suffixes.remove('') suffixes.append('SSO') return ', '.join([s.upper().replace('/', '') for s in suffixes]) def _validate_ssh_key(s): """ Tries to validate a string against the public ssh key format as in RFC4253 and RFC4716. Checks that the string is in three parts separated by whitespace and that the first part is in public_key_formats and the second part is a base64 encoded string. Returns True if the string validates. """ import base64 public_key_formats = ['ssh-dss', 'ssh-rsa', 'pgp-sign-rsa', 'pgp-sign-dss', 'ssh-ed25519'] three_parts = s.split() if three_parts[0] in public_key_formats and len(three_parts) in [2,3]: try: base64.b64decode(three_parts[1]) except TypeError: return False else: return False return True def _connect_ldap(server=None, user=None, password=None): """ Connects to an ldap server and binds with supplied user and password. """ _server = server or settings.LDAP_URL l = ldap.initialize(_server) l.set_option(ldap.OPT_NETWORK_TIMEOUT, 10) if not _server.startswith("ldaps"): l.start_tls_s() try: if user is None: l.simple_bind_s() else: l.bind_s(user, password) except ldap.INVALID_CREDENTIALS: return False return l def set_public_ssh_key(user, ssh_keys): """ Sets the provided string(s) as the sshPublicKey attribute for the user. User need to have affiliation employee@nordu.net to use this function. """ if user.is_staff: valid_keys = [] for ssh_key in ssh_keys.split('\n'): ssh_key = _normalize_whitespace(ssh_key) if ssh_key: if _validate_ssh_key(ssh_key): valid_keys.append(ssh_key) else: return '%s is not a valid SSH key.' % ssh_key if valid_keys: ldap_dn = user.username.split('@') ldap_dn = 'uid=%s,ou=People,dc=nordu,dc=net' % ldap_dn[0] l = _connect_ldap(user=settings.LDAP_USER, password=settings.LDAP_PASSWORD) if l: try: # Ensure that objectClass ldapPublicKey is added to the user mod_attrs = [(ldap.MOD_ADD, 'objectClass', 'ldapPublicKey')] l.modify_s(ldap_dn, mod_attrs) except ldap.TYPE_OR_VALUE_EXISTS: pass try: # Add the new ssh keys for key in valid_keys: mod_attrs = [(ldap.MOD_ADD, 'sshPublicKey', str(key))] l.modify_s(ldap_dn, mod_attrs) except ldap.LDAPError, e: l.unbind() return e.message l.unbind() else: return 'Invalid LDAP credentials in settings.' else: return 'You need to be a NORDUnet employee to use this.' return 0 def get_public_ssh_keys(user): l = _connect_ldap() if l: uid = user.username.split('@')[0] dn = "uid=%s,ou=People,dc=nordu,dc=net" % uid try: res = l.search_s(dn, ldap.SCOPE_SUBTREE, "(objectClass=person)")[0][1] return res.get('sshPublicKey') except (ldap.LDAPError, TypeError): pass return None def del_public_ssh_key(user, ssh_key): """ Sets the provided string(s) as the sshPublicKey attribute for the user. User need to have affiliation employee@nordu.net to use this function. """ if user.is_staff: ldap_dn = user.username.split('@') ldap_dn = 'uid=%s,ou=People,dc=nordu,dc=net' % ldap_dn[0] l = _connect_ldap(user=settings.LDAP_USER, password=settings.LDAP_PASSWORD) if l: try: # Remove all previous ssh keys try: mod_attrs = [(ldap.MOD_DELETE, 'sshPublicKey', ssh_key)] l.modify_s(ldap_dn, mod_attrs) except ldap.NO_SUCH_ATTRIBUTE: pass except ldap.LDAPError, e: l.unbind() return e.message l.unbind() else: return 'Invalid LDAP credentials in settings.' else: return 'You need to be a NORDUnet employee to use this.' return 0