#!/usr/local/bin/python import os,sys import re import random import string import subprocess import threading import pickle import tempfile from util import * from fedid import fedid from remote_service import xmlrpc_handler, soap_handler, service_caller from service_error import service_error import logging # Configure loggers to dump to /dev/null which avoids errors if calling classes # don't configure them. class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.allocate.local") fl.addHandler(nullHandler()) fl = logging.getLogger("fedd.allocate.remote") fl.addHandler(nullHandler()) class allocate_project_local: """ Allocate projects on this machine in response to an access request. """ dynamic_projects = 4 dynamic_keys= 2 confirm_keys = 1 none = 0 levels = { 'dynamic_projects': dynamic_projects, 'dynamic_keys': dynamic_keys, 'confirm_keys': confirm_keys, 'none': none, } def __init__(self, config, auth=None): """ Initializer. Parses a configuration if one is given. """ self.debug = config.getboolean("allocate", "debug", False) self.wap = config.get('allocate', 'wap', '/usr/testbed/sbin/wap') self.newproj = config.get('allocate', 'newproj', '/usr/testbed/sbin/newproj') self.mkproj = config.get('allocate', 'mkproj', '/usr/testbed/sbin/mkproj') self.rmproj = config.get('allocate', 'rmproj', '/usr/testbed/sbin/rmproj') self.rmuser = config.get('allocate', 'rmuser', '/usr/testbed/sbin/rmuser') self.newuser = config.get('allocate', 'newuser', '/usr/testbed/sbin/newuser') self.addpubkey = config.get('allocate', 'addpubkey', '/usr/testbed/sbin/addpubkey') self.grantnodetype = config.get('allocate', 'grantnodetype', '/usr/testbed/sbin/grantnodetype') self.confirmkey = config.get('allocate', 'confirmkey', '/usr/testbed/sbin/taddpubkey') self.user_to_project=config.get("allocate", 'user_to_project', '/usr/local/bin/user_to_project.py') self.allocation_level = config.get("allocate", "allocation_level", "none") self.log = logging.getLogger("fedd.allocate.local") set_log_level(config, "allocate", self.log) if auth: self.auth = auth else: auth = authorizer() log.warn("[allocate] No authorizer passed in, using local one") try: self.allocation_level = \ self.levels[self.allocation_level.strip().lower()] except KeyError: self.log.error("Bad allocation_level %s. Defaulting to none" % \ self.allocation_error) self.allocation_level = self.none self.state = { 'keys': set(), 'types': set(), 'projects': set(), 'users': set(), } self.state_filename = config.get('allocate', 'allocation_state') self.state_lock = threading.Lock() self.read_state() access_db = config.get("allocate", "accessdb") if access_db: try: read_simple_accessdb(access_db, self.auth, 'allocate') except EnvironmentError, e: raise service_error(service_error.internal, "Error reading accessDB %s: %s" % (access_db, e)) except ValueError: raise service_error(service_error.internal, "%s" % e) # Internal services are SOAP only self.soap_services = {\ "AllocateProject": soap_handler("AllocateProject", self.dynamic_project), "StaticProject": soap_handler("StaticProject", self.static_project), "ReleaseProject": soap_handler("ReleaseProject", self.release_project), } self.xmlrpc_services = { } def read_state(self): """ Read a new copy of access state. Old state is overwritten. State format is a simple pickling of the state dictionary. """ if self.state_filename: try: f = open(self.state_filename, "r") self.state = pickle.load(f) self.log.debug("[allocation]: Read state from %s" % \ self.state_filename) except EnvironmentError, e: self.log.warning(("[allocation]: No saved state: " +\ "Can't open %s: %s") % (self.state_filename, e)) except EOFError, e: self.log.warning(("[allocation]: " +\ "Empty or damaged state file: %s:") % \ self.state_filename) except pickle.UnpicklingError, e: self.log.warning(("[allocation]: No saved state: " + \ "Unpickling failed: %s") % e) # These should all be in the picked representation, but make sure if not self.state.has_key('keys'): self.state['keys'] = set() if not self.state.has_key('types'): self.state['types'] = set() if not self.state.has_key('projects'): self.state['projects'] = set() if not self.state.has_key('users'): self.state['users'] = set() def write_state(self): if self.state_filename: try: f = open(self.state_filename, 'w') pickle.dump(self.state, f) except EnvironmentError, e: self.log.error("Can't write file %s: %s" % \ (self.state_filename, e)) except pickle.PicklingError, e: self.log.error("Pickling problem: %s" % e) except TypeError, e: self.log.error("Pickling problem (TypeError): %s" % e) def random_string(self, s, n=3): """Append n random ASCII characters to s and return the string""" rv = s for i in range(0,n): rv += random.choice(string.ascii_letters) return rv def write_attr_xml(self, file, root, lines): """ Write an emulab config file for a dynamic project. Format is lines[1] """ # Convert a pair to an attribute line out_attr = lambda a,v : \ '%s' % (a, v) f = os.fdopen(file, "w") f.write("<%s>\n" % root) f.write("\n".join([out_attr(*l) for l in lines])) f.write("\n" % root) f.close() def run_cmd(self, cmd, log_prefix='allocate'): """ Run the command passed in. Cmd is a list containing the words of the command. Return the exit value from the subprocess - that is 0 on success. On an error running the command - python or OS error, raise a service exception. """ try: dnull = open("/dev/null", "w") except EnvironmentError: self.log.warn("[run_cmd]: failed to open /dev/null for redirect") dnull = None self.log.debug("[%s]: %s" % (log_prefix, ' '.join(cmd))) if not self.debug: try: return subprocess.call(cmd, stdout=dnull, stderr=dnull) except OSError, e: raise service_error(service_error.internal, "Static project subprocess creation error "+ \ "[%s] (%s)" % (cmd[0], e.strerror)) else: return 0 def confirm_key(self, user, key): """ Call run_cmd to comfirm the key. Return a boolean rather than the subprocess code. """ return self.run_cmd((self.wap, self.confirmkey, '-C', '-u', user, '-k', key)) ==0 def add_key(self, user, key): """ Call run_cmd to add the key. Return a boolean rather than the subprocess code. """ return self.run_cmd((self.wap, self.addpubkey, '-u', user, '-k', key)) == 0 def remove_key(self, user, key): """ Call run_cmd to remove the key. Return a boolean rather than the subprocess code. """ return self.run_cmd((self.wap, self.addpubkey, '-R', '-u', user, '-k', key)) == 0 def confirm_access(self, project, type): """ Call run_cmd to comfirm the key. Return a boolean rather than the subprocess code. """ return self.run_cmd((self.wap, self.grantnodetype, '-C', '-p', project, type)) ==0 def add_access(self, project, type): """ Call run_cmd to add the key. Return a boolean rather than the subprocess code. """ return self.run_cmd((self.wap, self.grantnodetype, '-p', project, type)) == 0 def remove_access(self, project, type): """ Call run_cmd to remove the key. Return a boolean rather than the subprocess code. """ return self.run_cmd((self.wap, self.grantnodetype, '-R', '-p', project, type)) == 0 def add_project(self, project, projfile): """ Create a project using run_cmd. This is two steps, and assumes that the relevant XML files are in place and correct. Make the return value boolean. Note that if a new user is specified in the XML, that user is created on success. """ if self.run_cmd((self.wap, self.newproj, projfile)) == 0: return self.run_cmd((self.wap, self.mkproj, project)) ==0 else: return False def remove_project(self, project): """ Call run_cmd to remove the project. Make the return value boolean. """ return self.run_cmd(self.wap, self.rmproj, project) == 0 def add_user(self, name, param_file, project): """ Create a user and link them to the given project. Similar to add_project, this requires a two step approach. Returns True on success False on failure. """ if self.run_cmd((self.wap, self.newuser, param_file)) == 0: return self.run_cmd((self.wap, self.user_to_project, user, project)) == 0 else: return False def remove_user(self, user): """ Call run_cmd to remove the user. Make the return value boolean. """ return self.run_cmd(self.wap, self.rmuser, user) == 0 def dynamic_project(self, req, fedid=None): """ Create a dynamic project with ssh access Req includes the project and resources as a dictionary """ # Internal calls do not have a fedid parameter (i.e., local calls on # behalf of already vetted fedids) if fedid and not self.auth.check_attribute(fedid, "allocate"): self.log.debug("[allocate] Access denied (%s)" % fedid) raise service_error(service_error.access, "Access Denied") if self.allocation_level < self.dynamic_projects: raise service_error(service_error.access, "[dynamic_project] dynamic project allocation not " + \ "permitted: check allocation level") # tempfiles for the parameter files cuf, create_userfile = tempfile.mkstemp(prefix="usr", suffix=".xml", dir="/tmp") suf, service_userfile = tempfile.mkstemp(prefix="usr", suffix=".xml", dir="/tmp") pf, projfile = tempfile.mkstemp(prefix="proj", suffix=".xml", dir="/tmp") if req.has_key('AllocateProjectRequestBody'): proj = req['AllocateProjectRequestBody'].get('project', None) if not proj: raise service_error(service_error.req, "Badly formed allocation request") resources = req['AllocateProjectRequestBody'].get('resources', { }) else: raise service_error(service_error.req, "Badly formed allocation request") # Take the first user and ssh key name = proj.get('name', None) or self.random_string("proj",4) user = proj.get('user', []) uname = { } ssh = { } for u in user: role = u.get('role', None) if not role: continue if u.has_key('userID'): uid = u['userID'] uname[role] = uid.get('localname', None) or \ uid.get('kerberosUsername', None) or \ uid.get('uri', None) if uname[role] == None: raise service_error(service_error.req, "No ID for user") else: uname[role] = self.random_string("user", 3) access = u.get('access', None) if access: # XXX collect and call addpubkey later, for now use first one. for a in access: ssh[role] = a.get('sshPubkey', None) if ssh: break else: raise service_error(service_error.req, "No SSH key for user %s" % uname[role]) else: raise service_error(service_error.req, "No access mechanisms for for user %s" % uname[role]) if not (uname.has_key('experimentCreation') and \ uname.has_key('serviceAccess')): raise service_error(service_error.req, "Must specify both user roles") create_user_fields = [ ("name", "Federation User %s" % uname['experimentCreation']), ("email", "%s-fed@isi.deterlab.net" % \ uname['experimentCreation']), ("password", self.random_string("", 8)), ("login", uname['experimentCreation']), ("address", "4676 Admiralty"), ("city", "Marina del Rey"), ("state", "CA"), ("zip", "90292"), ("country", "USA"), ("phone", "310-448-9190"), ("title", "None"), ("affiliation", "USC/ISI"), ("pubkey", ssh['experimentCreation']) ] service_user_fields = [ ("name", "Federation User %s" % uname['serviceAccess']), ("email", "%s-fed@isi.deterlab.net" % uname['serviceAccess']), ("password", self.random_string("", 8)), ("login", uname['serviceAccess']), ("address", "4676 Admiralty"), ("city", "Marina del Rey"), ("state", "CA"), ("zip", "90292"), ("country", "USA"), ("phone", "310-448-9190"), ("title", "None"), ("affiliation", "USC/ISI"), ("pubkey", ssh['serviceAccess']) ] proj_fields = [ ("name", name), ("short description", "dynamic federated project"), ("URL", "http://www.isi.edu/~faber"), ("funders", "USC/USU"), ("long description", "Federation access control"), ("public", "1"), ("num_pcs", "100"), ("linkedtous", "1"), ("newuser_xml", create_userfile) ] added_projects = [ ] added_users = [ ] added_types = [ ] self.state_lock.acquire() try: # Write out the files self.write_attr_xml(cuf, "user", create_user_fields) self.write_attr_xml(suf, "user", service_user_fields) self.write_attr_xml(pf, "project", proj_fields) try: if self.add_project(name, projfile): # add_project adds a user as well in this case added_projects.append(name) added_users.append(uname['createExperiment']) self.state['projects'].add(name) self.state['users'].add(uname['createExperiment']) if self.add_user(uname['serviceAccess'], service_userfile, name): added_users.append(uname['serviceAccess']) self.state['users'].add(uname['serviceAccess']) else: raise service_error("Unable to create user %s" % \ uname['serviceAccess']) else: raise service_error("Unable to create project/user %s/%s" % \ (name, uname['experimentCreation'])) nodes = resources.get('node', []) # Grant access to restricted resources. This is simpler than # the corresponding loop from static_project because this is a # clean slate. for nt in [ h for n in nodes\ if n.has_key('hardware')\ for h in n['hardware'] ] : if self.add_access(name, nt): self.state['types'].add((name, nt)) added_types.append((name, nt)) else: raise service_error(service_error.internal, "Failed to add access for %s to %s"\ % (name, nt)) except service_error, e: # Something failed. Back out the partial allocation as # completely as possible and re-raise the error. for p, t in added_types: self.state['types'].discard((p, t)) try: self.remove_access(p, t) except service_error: pass for u in added_users: self.state['users'].discard(u) try: self.remove_user(u) except service_error: pass for p in added_projects: self.state['projects'].discard(p) try: self.remove_project(p) except service_error: pass self.state_lock.release() raise e finally: # Clean up tempfiles os.unlink(create_userfile) os.unlink(service_userfile) os.unlink(projfile) rv = {\ 'project': {\ 'name': { 'localname': name }, 'user' : [\ {\ 'userID': { 'localname' : uname['experimentCreation'] }, 'access': [ {'sshPubkey': ssh['experimentCreation'] } ], 'role': 'experimentCreation', }, \ {\ 'userID': { 'localname' : uname['serviceAccess'] }, 'access': [ { 'sshPubkey' : ssh['serviceAccess'] } ], 'role': 'serviceAccess', } \ ]\ }\ } return rv def static_project(self, req, fedid=None): """ Be certain that the local project in the request has access to the proper resources and users have correct keys. Add them if necessary. """ # Internal calls do not have a fedid parameter (i.e., local calls on # behalf of already vetted fedids) if fedid and not self.auth.check_attribute(fedid, "allocate"): self.log.debug("[allocate] Access denied (%s)" % fedid) raise service_error(service_error.access, "Access Denied") # While we should be more careful about this, for the short term, add # the keys to the specified users. try: users = req['StaticProjectRequestBody']['project']['user'] pname = req['StaticProjectRequestBody']['project']\ ['name']['localname'] resources = req['StaticProjectRequestBody'].get('resources', { }) except KeyError: raise service_error(service_error.req, "Badly formed request") added_keys = [ ] added_types = [ ] # Keep track of changes made to the system self.state_lock.acquire() try: for u in users: try: name = u['userID']['localname'] except KeyError: raise service_error(service_error.req, "Badly formed user") for sk in [ k['sshPubkey'] for k in u.get('access', []) \ if k.has_key('sshPubkey')]: if self.allocation_level >=self.confirm_keys: key_ok = self.confirm_key(name, sk) if not key_ok: if self.allocation_level >= self.dynamic_keys: if self.add_key(name, sk): self.state['keys'].add((name, sk)) added_keys.append((name, sk)) else: raise service_error(service_error.internal, "Failed to add key for %s" % name) else: raise service_error(service_error.internal, "Failed to confirm key for %s" % name) else: self.log.warning("[static_project] no checking of " + \ "static keys") # Grant access to any resources in the request. The # list comprehension pulls out the hardware types in the node # entries in the resources list. The access module knows to # only send resources that are restricted and needed by the # project. nodes = resources.get('node', []) for nt in [ h for n in nodes\ if n.has_key('hardware')\ for h in n['hardware'] ] : if self.allocation_level >= self.confirm_keys: access_ok = self.confirm_access(pname, nt) if not access_ok: if self.allocation_level >= self.dynamic_keys: if self.add_access(pname, nt): self.state['types'].add((pname, nt)) added_types.append((pname, nt)) else: raise service_error(service_error.internal, "Failed to add access for %s to %s"\ % (pname, nt)) else: raise service_error(service_error.internal, "Failed to confirm access for %s to %s"\ % (pname, nt)) else: self.log.warning("[static_project] no checking of " + \ "node access") except service_error, e: # Do our best to clean up partial allocation and reraise the # error. Do our best to make sure that both allocation state and # testbed state is restored. for u, k in added_keys: self.state['keys'].discard((u, k)) try: self.remove_key(u, k) except service_error: pass for p, t in added_types: self.state['types'].discard((p, t)) try: self.remove_access(p, t) except service_error: pass self.state_lock.release() raise e # All is well, save state and release the lock self.write_state() self.state_lock.release() # return { 'project': req['StaticProjectRequestBody']['project']} return req['StaticProjectRequestBody'] def release_project(self, req, fedid=None): """ Remove user keys from users and delete dynamic projects. Only keys this service created are deleted and there are similar protections for projects. """ # Internal calls do not have a fedid parameter (i.e., local calls on # behalf of already vetted fedids) if fedid and not self.auth.check_attribute(fedid, "allocate"): self.log.debug("[allocate] Access denied (%s)" % fedid) raise service_error(service_error.access, "Access Denied") pname = None users = [] nodes = [ ] try: if req['ReleaseProjectRequestBody']['project'].has_key('name'): pname = req['ReleaseProjectRequestBody']['project']\ ['name']['localname'] if req['ReleaseProjectRequestBody']['project'].has_key('user'): users = req['ReleaseProjectRequestBody']['project']['user'] if req['ReleaseProjectRequestBody'].has_key('resources'): nodes = req['ReleaseProjectRequestBody']\ ['resources'].get('node', []) except KeyError: raise service_error(service_error.req, "Badly formed request") if nodes and not pname: raise service_error(service_error.req, "Badly formed request (nodes without project)") self.state_lock.acquire() try: for nt in [ h for n in nodes if n.has_key('hardware')\ for h in n['hardware'] ] : if (pname, nt ) in self.state['types']: self.remove_access(pname, nt) self.state['types'].discard((pname, nt)) for u in users: try: name = u['userID']['localname'] except KeyError: raise service_error(service_error.req, "Badly formed user") if name in self.state['users']: # If we created this user, discard the user, keys and all self.remove_user(name) self.state['users'].discard(name) else: # If not, just strip any keys we added for sk in [ k['sshPubkey'] for k in u.get('access', []) \ if k.has_key('sshPubkey')]: if (name, sk) in self.state['keys']: self.remove_key(name, sk) self.state['keys'].discard((name, sk)) if pname in self.state['projects']: self.remove_project(pname) self.state['projects'].discard(pname) except service_error, e: self.write_state() self.state_lock.release() raise e self.write_state() self.state_lock.release() return { 'project': req['ReleaseProjectRequestBody']['project']} class allocate_project_remote: """ Allocate projects on a remote machine using the internal SOAP interface """ class proxy(service_caller): """ This class is a proxy functor (callable) that has the same signature as a function called by soap_handler or xmlrpc_handler, but that used the service_caller class to call the function remotely. """ def __init__(self, url, cert_file, cert_pwd, trusted_certs, auth, method): service_caller.__init__(self, method) self.url = url self.cert_file = cert_file self.cert_pwd = cert_pwd self.trusted_certs = trusted_certs self.request_body__name = "%sRequestBody" % method self.resp_name = "%sResponseBody" % method self.auth = auth # Calling the proxy object directly invokes the proxy_call method, # not the service_call method. self.__call__ = self.proxy_call # Define the proxy, NB, the parameters to make_proxy are visible to the # definition of proxy. def proxy_call(self, req, fid=None): """ Send req on to a remote project instantiator. Req is just the message to be sent. This function re-wraps it. It also rethrows any faults. """ if req.has_key(self.request_body_name): req = req[self.request_body_name] else: raise service_error(service_error.req, "Bad formated request"); try: r = self.call_service(self.url, req, self.cert_file, self.cert_pwd, self.trusted_certs) except service_error, e: if e.code == service_error.connect: raise service_error(service_error.internal, "Cannot connect to internal service: (%d) %s" % \ (e.code, e.desc)) else: raise if r.has_key(self.resp_name): return r[self.resp_name] else: raise service_error(service_error.protocol, "Bad proxy response") # back to defining the allocate_project_remote class def __init__(self, config, auth=None): """ Initializer. Parses a configuration if one is given. """ self.debug = config.get("allocate", "debug", False) self.url = config.get("allocate", "uri", "") # Keep cert file and password coming from the same source self.cert_file = config.get("allocate", "cert_file", None) if self.cert_file: self.cert_pwd = config.get("allocate", "cert_pwd", None) else: self.cert_file = config.get("globals", "cert_file", None) self.cert_pwd = config.get("globals", "cert_pwd", None) self.trusted_certs = config.get("allocate", "trusted_certs", None) or \ config.get("globals", "trusted_certs") self.soap_services = { } self.xmlrpc_services = { } self.log = logging.getLogger("fedd.allocate.remote") set_log_level(config, "allocate", self.log) if auth: self.auth = auth else: auth = authorizer() log.warn("[allocate] No authorizer passed in, using local one") # The specializations of the proxy functions self.dynamic_project = self.proxy(self.url, self.cert_file, self.cert_pwd, self.trusted_certs, self.auth, "AllocateProject") self.static_project = self.proxy(self.url, self.cert_file, self.cert_pwd, self.trusted_certs, self.auth, "StaticProject") self.release_project = self.proxy(self.url, self.cert_file, self.cert_pwd, self.trusted_certs, self.auth, "ReleaseProject")