#!/usr/local/bin/python import os,sys import stat # for chmod constants import re import time import string import copy import pickle import logging import subprocess import traceback from threading import * from M2Crypto.SSL import SSLError from util import * from access_project import access_project from fedid import fedid, generate_fedid from authorizer import authorizer from service_error import service_error from remote_service import xmlrpc_handler, soap_handler, service_caller import httplib import tempfile from urlparse import urlparse from access import access_base import topdl import list_log import proxy_protogeni_segment # Make log messages disappear if noone configures a fedd logger class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.access") fl.addHandler(nullHandler()) class access(access_base): """ The implementation of access control based on mapping users to projects. Users can be mapped to existing projects or have projects created dynamically. This implements both direct requests and proxies. """ def __init__(self, config=None, auth=None): """ Initializer. Pulls parameters out of the ConfigParser's access section. """ access_base.__init__(self, config, auth) self.domain = config.get("access", "domain") self.userconfdir = config.get("access","userconfdir") self.userconfcmd = config.get("access","userconfcmd") self.userconfurl = config.get("access","userconfurl") self.federation_software = config.get("access", "federation_software") self.portal_software = config.get("access", "portal_software") self.ssh_port = config.get("access","ssh_port") or "22" self.sshd = config.get("access","sshd") self.sshd_config = config.get("access", "sshd_config") self.access_type = config.get("access", "type") self.staging_dir = config.get("access", "staging_dir") or "/tmp" self.staging_host = config.get("access", "staging_host") \ or "ops.emulab.net" self.local_seer_software = config.get("access", "local_seer_software") self.local_seer_image = config.get("access", "local_seer_image") self.local_seer_start = config.get("access", "local_seer_start") self.dragon_endpoint = config.get("access", "dragon") self.dragon_vlans = config.get("access", "dragon_vlans") self.deter_internal = config.get("access", "deter_internal") self.tunnel_config = config.getboolean("access", "tunnel_config") self.portal_command = config.get("access", "portal_command") self.portal_image = config.get("access", "portal_image") self.portal_type = config.get("access", "portal_type") or "pc" self.portal_startcommand = config.get("access", "portal_startcommand") self.node_startcommand = config.get("access", "node_startcommand") self.federation_software = self.software_list(self.federation_software) self.portal_software = self.software_list(self.portal_software) self.local_seer_software = self.software_list(self.local_seer_software) self.renewal_interval = config.get("access", "renewal") or (3 * 60 ) self.renewal_interval = int(self.renewal_interval) * 60 self.ch_url = config.get("access", "ch_url") self.sa_url = config.get("access", "sa_url") self.cm_url = config.get("access", "cm_url") self.restricted = [ ] # read_state in the base_class self.state_lock.acquire() for a in ('allocation', 'projects', 'keys', 'types'): if a not in self.state: self.state[a] = { } self.allocation = self.state['allocation'] self.projects = self.state['projects'] self.keys = self.state['keys'] self.types = self.state['types'] # Add the ownership attributes to the authorizer. Note that the # indices of the allocation dict are strings, but the attributes are # fedids, so there is a conversion. for k in self.state.get('allocation', {}).keys(): for o in self.state['allocation'][k].get('owners', []): self.auth.set_attribute(o, fedid(hexstr=k)) self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k)) self.state_lock.release() self.log = logging.getLogger("fedd.access") set_log_level(config, "access", self.log) self.access = { } if config.has_option("access", "accessdb"): self.read_access(config.get("access", "accessdb"), access_obj=self.make_access_info) self.start_segment = proxy_protogeni_segment.start_segment self.stop_segment = proxy_protogeni_segment.stop_segment self.renew_segment = proxy_protogeni_segment.renew_segment self.lookup_access = self.lookup_access_base self.call_SetValue = service_caller('SetValue') self.call_GetValue = service_caller('GetValue') self.exports = { 'local_seer_control': self.export_local_seer, 'seer_master': self.export_seer_master, 'hide_hosts': self.export_hide_hosts, } if not self.local_seer_image or not self.local_seer_software or \ not self.local_seer_start: if 'local_seer_control' in self.exports: del self.exports['local_seer_control'] if not self.local_seer_image or not self.local_seer_software or \ not self.seer_master_start: if 'seer_master' in self.exports: del self.exports['seer_master'] self.RenewSlices() self.soap_services = {\ 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 'StartSegment': soap_handler("StartSegment", self.StartSegment), 'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment), } self.xmlrpc_services = {\ 'RequestAccess': xmlrpc_handler('RequestAccess', self.RequestAccess), 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', self.ReleaseAccess), 'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment), 'TerminateSegment': xmlrpc_handler('TerminateSegment', self.TerminateSegment), } @staticmethod def make_access_info(s): """ Split a string of the form (id, id, id, id) ito its constituent tuples and return them as a tuple. Use to import access info from the access_db. """ ss = s.strip() if ss.startswith('(') and ss.endswith(')'): l = [ s.strip() for s in ss[1:-1].split(",")] if len(l) == 4: return tuple(l) else: raise self.parse_error( "Exactly 4 elements in access info required") else: raise self.parse_error("Expecting parenthezied values") def get_handler(self, path, fid): self.log.info("Get handler %s %s" % (path, fid)) if self.auth.check_attribute(fid, path) and self.userconfdir: return ("%s/%s" % (self.userconfdir, path), "application/binary") else: return (None, None) def build_access_response(self, alloc_id, services): """ Create the SOAP response. Build the dictionary description of the response and use fedd_utils.pack_soap to create the soap message. ap is the allocate project message returned from a remote project allocation (even if that allocation was done locally). """ # Because alloc_id is already a fedd_services_types.IDType_Holder, # there's no need to repack it msg = { 'allocID': alloc_id, 'fedAttr': [ { 'attribute': 'domain', 'value': self.domain } , ] } if self.dragon_endpoint: msg['fedAttr'].append({'attribute': 'dragon', 'value': self.dragon_endpoint}) if self.deter_internal: msg['fedAttr'].append({'attribute': 'deter_internal', 'value': self.deter_internal}) #XXX: ?? if self.dragon_vlans: msg['fedAttr'].append({'attribute': 'vlans', 'value': self.dragon_vlans}) if services: msg['service'] = services return msg def RequestAccess(self, req, fid): """ Handle the access request. """ # The dance to get into the request body if req.has_key('RequestAccessRequestBody'): req = req['RequestAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") if req.has_key('destinationTestbed'): dt = unpack_id(req['destinationTestbed']) # Request for this fedd found, match = self.lookup_access(req, fid) services, svc_state = self.export_services(req.get('service',[]), None, None) # keep track of what's been added allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) aid = unicode(allocID) self.state_lock.acquire() self.allocation[aid] = { } # The protoGENI certificate self.allocation[aid]['credentials'] = found # The list of owner FIDs self.allocation[aid]['owners'] = [ fid ] self.write_state() self.state_lock.release() self.auth.set_attribute(fid, allocID) self.auth.set_attribute(allocID, allocID) try: f = open("%s/%s.pem" % (self.certdir, aid), "w") print >>f, alloc_cert f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Can't open %s/%s : %s" % (self.certdir, aid, e)) return self.build_access_response({ 'fedid': allocID }, None) def ReleaseAccess(self, req, fid): # The dance to get into the request body if req.has_key('ReleaseAccessRequestBody'): req = req['ReleaseAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") # Local request try: if req['allocID'].has_key('localname'): auth_attr = aid = req['allocID']['localname'] elif req['allocID'].has_key('fedid'): aid = unicode(req['allocID']['fedid']) auth_attr = req['allocID']['fedid'] else: raise service_error(service_error.req, "Only localnames and fedids are understood") except KeyError: raise service_error(service_error.req, "Badly formed request") self.log.debug("[access] deallocation requested for %s", aid) if not self.auth.check_attribute(fid, auth_attr): self.log.debug("[access] deallocation denied for %s", aid) raise service_error(service_error.access, "Access Denied") self.state_lock.acquire() if self.allocation.has_key(aid): self.log.debug("Found allocation for %s" %aid) del self.allocation[aid] self.write_state() self.state_lock.release() # And remove the access cert cf = "%s/%s.pem" % (self.certdir, aid) self.log.debug("Removing %s" % cf) os.remove(cf) return { 'allocID': req['allocID'] } else: self.state_lock.release() raise service_error(service_error.req, "No such allocation") def generate_rspec(self, topo, softdir, connInfo): t = topo.clone() starts = { } # Weed out the things we aren't going to instantiate: Segments, portal # substrates, and portal interfaces. (The copy in the for loop allows # us to delete from e.elements in side the for loop). While we're # touching all the elements, we also adjust paths from the original # testbed to local testbed paths and put the federation commands and # startcommands into a dict so we can start them manually later. # ProtoGENI requires setup before the federation commands run, so we # run them by hand after we've seeded configurations. for e in [e for e in t.elements]: if isinstance(e, topdl.Segment): t.elements.remove(e) # Fix software paths for s in getattr(e, 'software', []): s.location = re.sub("^.*/", softdir, s.location) if isinstance(e, topdl.Computer): if e.get_attribute('portal') and self.portal_startcommand: # Portals never have a user-specified start command starts[e.name] = self.portal_startcommand elif self.node_startcommand: if e.get_attribute('startup'): starts[e.name] = "%s \\$USER '%s'" % \ (self.node_startcommand, e.get_attribute('startup')) e.remove_attribute('startup') else: starts[e.name] = self.node_startcommand # Remove portal interfaces e.interface = [i for i in e.interface \ if not i.get_attribute('portal')] t.substrates = [ s.clone() for s in t.substrates ] t.incorporate_elements() # Customize the ns2 output for local portal commands and images filters = [] # NB: these are extra commands issued for the node, not the startcmds if self.portal_command: filters.append(topdl.generate_portal_command_filter( self.portal_command)) # Convert to rspec and return it exp_rspec = topdl.topology_to_rspec(t, filters) return exp_rspec def retrieve_software(self, topo, certfile, softdir): """ Collect the software that nodes in the topology need loaded and stage it locally. This implies retrieving it from the experiment_controller and placing it into softdir. Certfile is used to prove that this node has access to that data (it's the allocation/segment fedid). Finally local portal and federation software is also copied to the same staging directory for simplicity - all software needed for experiment creation is in softdir. """ sw = set() for e in topo.elements: for s in getattr(e, 'software', []): sw.add(s.location) os.mkdir(softdir) for s in sw: self.log.debug("Retrieving %s" % s) try: get_url(s, certfile, softdir) except: t, v, st = sys.exc_info() raise service_error(service_error.internal, "Error retrieving %s: %s" % (s, v)) # Copy local portal node software to the tempdir for s in (self.portal_software, self.federation_software): for l, f in s: base = os.path.basename(f) copy_file(f, "%s/%s" % (softdir, base)) # Ick. Put this python rpm in a place that it will get moved into # the staging area. It's a hack to install a modern (in a Roman # sense of modern) python on ProtoGENI python_rpm ="python2.4-2.4-1pydotorg.i586.rpm" if os.access("./%s" % python_rpm, os.R_OK): copy_file("./%s" % python_rpm, "%s/%s" % (softdir, python_rpm)) def initialize_experiment_info(self, attrs, aid, certfile, tmpdir): """ Gather common configuration files, retrieve or create an experiment name and project name, and return the ssh_key filenames. Create an allocation log bound to the state log variable as well. """ configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey')) ename = None pubkey_base = None secretkey_base = None alloc_log = None for a in attrs: if a['attribute'] in configs: try: self.log.debug("Retrieving %s" % a['value']) get_url(a['value'], certfile, tmpdir) except: t, v, st = sys.exc_info() raise service_error(service_error.internal, "Error retrieving %s: %s" % (a.get('value', ""), v)) if a['attribute'] == 'ssh_pubkey': pubkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'ssh_secretkey': secretkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'experiment_name': ename = a['value'] if not ename: ename = "" for i in range(0,5): ename += random.choice(string.ascii_letters) self.log.warn("No experiment name: picked one randomly: %s" \ % ename) self.state_lock.acquire() if self.allocation.has_key(aid): cf, user, ssh_key, cpw = self.allocation[aid]['credentials'] self.allocation[aid]['experiment'] = ename self.allocation[aid]['log'] = [ ] # Create a logger that logs to the experiment's state object as # well as to the main log file. alloc_log = logging.getLogger('fedd.access.%s' % ename) h = logging.StreamHandler( list_log.list_log(self.allocation[aid]['log'])) # XXX: there should be a global one of these rather than # repeating the code. h.setFormatter(logging.Formatter( "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) alloc_log.addHandler(h) self.write_state() else: self.log.error("No allocation for %s!?" % aid) self.state_lock.release() return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, cpw, alloc_log) def finalize_experiment(self, topo, starter, aid, alloc_id): # Copy the assigned names into the return topology rvtopo = topo.clone() embedding = [ ] for n in starter.node: embedding.append({ 'toponame': n, 'physname': ["%s%s" % (starter.node[n], self.domain)], }) # Grab the log (this is some anal locking, but better safe than # sorry) self.state_lock.acquire() logv = "".join(self.allocation[aid]['log']) # It's possible that the StartSegment call gets retried (!). # if the 'started' key is in the allocation, we'll return it rather # than redo the setup. self.allocation[aid]['started'] = { 'allocID': alloc_id, 'allocationLog': logv, 'segmentdescription': { 'topdldescription': rvtopo.to_dict() }, 'embedding': embedding, } retval = copy.deepcopy(self.allocation[aid]['started']) self.write_state() self.state_lock.release() return retval def StartSegment(self, req, fid): err = None # Any service_error generated after tmpdir is created rv = None # Return value from segment creation try: req = req['StartSegmentRequestBody'] topref = req['segmentdescription']['topdldescription'] except KeyError: raise service_error(service_error.req, "Badly formed request") connInfo = req.get('connection', []) services = req.get('service', []) auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) if not self.auth.check_attribute(fid, auth_attr): raise service_error(service_error.access, "Access denied") else: # See if this is a replay of an earlier succeeded StartSegment - # sometimes SSL kills 'em. If so, replay the response rather than # redoing the allocation. self.state_lock.acquire() retval = self.allocation[aid].get('started', None) self.state_lock.release() if retval: self.log.warning("Duplicate StartSegment for %s: " % aid + \ "replaying response") return retval if topref: topo = topdl.Topology(**topref) else: raise service_error(service_error.req, "Request missing segmentdescription'") certfile = "%s/%s.pem" % (self.certdir, auth_attr) try: tmpdir = tempfile.mkdtemp(prefix="access-") softdir = "%s/software" % tmpdir except EnvironmentError: raise service_error(service_error.internal, "Cannot create tmp dir") # Try block alllows us to clean up temporary files. try: self.retrieve_software(topo, certfile, softdir) self.configure_userconf(services, tmpdir) ename, pubkey_base, secretkey_base, cf, user, ssh_key, \ cpw, alloc_log = self.initialize_experiment_info(attrs, aid, certfile, tmpdir) # XXX: we really need to put the import and connection info # generation off longer. self.import_store_info(certfile, connInfo) rspec = self.generate_rspec(topo, "%s/%s/" \ % (self.staging_dir, ename), connInfo) starter = self.start_segment(keyfile=ssh_key, debug=self.create_debug, log=alloc_log, ch_url = self.ch_url, sa_url=self.sa_url, cm_url=self.cm_url) rv = starter(self, aid, user, rspec, pubkey_base, secretkey_base, ename, "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw, certfile, topo, connInfo, services) except EnvironmentError: err = service_error(service_error.internal, "%s" % e) except service_error, e: err = e except: t, v, st = sys.exc_info() err = service_error(service_error.internal, "%s: %s" % \ (v, traceback.extract_tb(st))) # Walk up tmpdir, deleting as we go if self.cleanup: self.remove_dirs(tmpdir) else: self.log.debug("[StartSegment]: not removing %s" % tmpdir) if rv: return self.finalize_experiment(topo, starter, aid, req['allocID']) elif err: raise service_error(service_error.federant, "Swapin failed: %s" % err) else: raise service_error(service_error.federant, "Swapin failed") def TerminateSegment(self, req, fid): try: req = req['TerminateSegmentRequestBody'] except KeyError: raise service_error(service_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) if not self.auth.check_attribute(fid, auth_attr): raise service_error(service_error.access, "Access denied") self.state_lock.acquire() if self.allocation.has_key(aid): cf, user, ssh_key, cpw = self.allocation[aid]['credentials'] slice_cred = self.allocation[aid].get('slice_credential', None) ename = self.allocation[aid].get('experiment', None) else: cf, user, ssh_key, cpw = (None, None, None, None) slice_cred = None ename = None self.state_lock.release() if ename: staging = "%s/%s" % ( self.staging_dir, ename) else: self.log.warn("Can't find experiment name for %s" % aid) staging = None stopper = self.stop_segment(keyfile=ssh_key, debug=self.create_debug, ch_url = self.ch_url, sa_url=self.sa_url, cm_url=self.cm_url) stopper(self, user, staging, slice_cred, cf, cpw) return { 'allocID': req['allocID'] } def RenewSlices(self): self.log.info("Scanning for slices to renew") self.state_lock.acquire() aids = self.allocation.keys() self.state_lock.release() for aid in aids: self.state_lock.acquire() if self.allocation.has_key(aid): name = self.allocation[aid].get('slice_name', None) scred = self.allocation[aid].get('slice_credential', None) cf, user, ssh_key, cpw = self.allocation[aid]['credentials'] else: name = None scred = None self.state_lock.release() if not os.access(cf, os.R_OK): self.log.error( "[RenewSlices] cred.file %s unreadable, ignoring" % cf) continue # There's a ProtoGENI slice associated with the segment; renew it. if name and scred: renewer = self.renew_segment(log=self.log, debug=self.create_debug, keyfile=ssh_key, cm_url = self.cm_url, sa_url = self.sa_url, ch_url = self.ch_url) new_scred = renewer(name, scred, self.renewal_interval, cf, cpw) if new_scred: self.log.info("Slice %s renewed until %s GMT" % \ (name, time.asctime(time.gmtime(\ time.time()+self.renewal_interval)))) self.state_lock.acquire() if self.allocation.has_key(aid): self.allocation[aid]['slice_credential'] = new_scred self.state_lock.release() else: self.log.info("Failed to renew slice %s " % name) # Let's do this all again soon. (4 tries before the slices time out) t = Timer(self.renewal_interval/4, self.RenewSlices) t.start()