#!/usr/local/bin/python import os,sys import stat # for chmod constants import re import random import string import copy import pickle import logging import subprocess import traceback from threading import * from M2Crypto.SSL import SSLError from access import access_base from legacy_access import legacy_access from util import * from allocate_project import allocate_project_local, allocate_project_remote from fedid import fedid, generate_fedid from authorizer import authorizer, abac_authorizer from service_error import service_error from remote_service import xmlrpc_handler, soap_handler, service_caller import httplib import tempfile from urlparse import urlparse import topdl import list_log import proxy_emulab_segment import local_emulab_segment # Make log messages disappear if noone configures a fedd logger class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.access") fl.addHandler(nullHandler()) class access(access_base, legacy_access): """ The implementation of access control based on mapping users to projects. Users can be mapped to existing projects or have projects created dynamically. This implements both direct requests and proxies. """ def __init__(self, config=None, auth=None): """ Initializer. Pulls parameters out of the ConfigParser's access section. """ access_base.__init__(self, config, auth) self.allow_proxy = config.getboolean("access", "allow_proxy") self.domain = config.get("access", "domain") self.userconfdir = config.get("access","userconfdir") self.userconfcmd = config.get("access","userconfcmd") self.userconfurl = config.get("access","userconfurl") self.federation_software = config.get("access", "federation_software") self.portal_software = config.get("access", "portal_software") self.local_seer_software = config.get("access", "local_seer_software") self.local_seer_image = config.get("access", "local_seer_image") self.local_seer_start = config.get("access", "local_seer_start") self.seer_master_start = config.get("access", "seer_master_start") self.ssh_privkey_file = config.get("access","ssh_privkey_file") self.ssh_pubkey_file = config.get("access","ssh_pubkey_file") self.ssh_port = config.get("access","ssh_port") or "22" self.boss = config.get("access", "boss") self.ops = config.get("access", "ops") self.xmlrpc_cert = config.get("access", "xmlrpc_cert") self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw") self.dragon_endpoint = config.get("access", "dragon") self.dragon_vlans = config.get("access", "dragon_vlans") self.deter_internal = config.get("access", "deter_internal") self.tunnel_config = config.getboolean("access", "tunnel_config") self.portal_command = config.get("access", "portal_command") self.portal_image = config.get("access", "portal_image") self.portal_type = config.get("access", "portal_type") or "pc" self.portal_startcommand = config.get("access", "portal_startcommand") self.node_startcommand = config.get("access", "node_startcommand") self.federation_software = self.software_list(self.federation_software) self.portal_software = self.software_list(self.portal_software) self.local_seer_software = self.software_list(self.local_seer_software) self.access_type = self.access_type.lower() if self.access_type == 'remote_emulab': self.start_segment = proxy_emulab_segment.start_segment self.stop_segment = proxy_emulab_segment.stop_segment elif self.access_type == 'local_emulab': self.start_segment = local_emulab_segment.start_segment self.stop_segment = local_emulab_segment.stop_segment else: self.start_segment = None self.stop_segment = None self.restricted = [ ] # XXX: this should go? #if config.has_option("access", "accessdb"): # self.read_access(config.get("access", "accessdb")) tb = config.get('access', 'testbed') if tb: self.testbed = [ t.strip() for t in tb.split(',') ] else: self.testbed = [ ] # authorization information self.auth_type = config.get('access', 'auth_type') \ or 'legacy' self.auth_dir = config.get('access', 'auth_dir') accessdb = config.get("access", "accessdb") # initialize the authorization system if self.auth_type == 'legacy': self.access = { } if accessdb: self.legacy_read_access(accessdb, self.legacy_access_tuple) elif self.auth_type == 'abac': self.auth = abac_authorizer(load=self.auth_dir) self.access = [ ] if accessdb: self.read_access(accessdb, self.access_tuple) else: raise service_error(service_error.internal, "Unknown auth_type: %s" % self.auth_type) # read_state in the base_class self.state_lock.acquire() for a in ('allocation', 'projects', 'keys', 'types'): if a not in self.state: self.state[a] = { } self.allocation = self.state['allocation'] self.projects = self.state['projects'] self.keys = self.state['keys'] self.types = self.state['types'] if self.auth_type == "legacy": # Add the ownership attributes to the authorizer. Note that the # indices of the allocation dict are strings, but the attributes are # fedids, so there is a conversion. for k in self.allocation.keys(): for o in self.allocation[k].get('owners', []): self.auth.set_attribute(o, fedid(hexstr=k)) if self.allocation[k].has_key('userconfig'): sfid = self.allocation[k]['userconfig'] fid = fedid(hexstr=sfid) self.auth.set_attribute(fid, "/%s" % sfid) self.state_lock.release() self.exports = { 'SMB': self.export_SMB, 'seer': self.export_seer, 'tmcd': self.export_tmcd, 'userconfig': self.export_userconfig, 'project_export': self.export_project_export, 'local_seer_control': self.export_local_seer, 'seer_master': self.export_seer_master, 'hide_hosts': self.export_hide_hosts, } if not self.local_seer_image or not self.local_seer_software or \ not self.local_seer_start: if 'local_seer_control' in self.exports: del self.exports['local_seer_control'] if not self.local_seer_image or not self.local_seer_software or \ not self.seer_master_start: if 'seer_master' in self.exports: del self.exports['seer_master'] self.soap_services = {\ 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 'StartSegment': soap_handler("StartSegment", self.StartSegment), 'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment), } self.xmlrpc_services = {\ 'RequestAccess': xmlrpc_handler('RequestAccess', self.RequestAccess), 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', self.ReleaseAccess), 'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment), 'TerminateSegment': xmlrpc_handler('TerminateSegment', self.TerminateSegment), } self.call_SetValue = service_caller('SetValue') self.call_GetValue = service_caller('GetValue', log=self.log) if not config.has_option("allocate", "uri"): self.allocate_project = \ allocate_project_local(config, auth) else: self.allocate_project = \ allocate_project_remote(config, auth) # If the project allocator exports services, put them in this object's # maps so that classes that instantiate this can call the services. self.soap_services.update(self.allocate_project.soap_services) self.xmlrpc_services.update(self.allocate_project.xmlrpc_services) @staticmethod def legacy_access_tuple(str): """ Convert a string of the form (id[:resources:resouces], id, id) into a tuple of the form (project, user, user) where users may be names or fedids. The resources strings are obsolete and ignored. """ def parse_name(n): if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):]) else: return n str = str.strip() if str.startswith('(') and str.endswith(')'): str = str[1:-1] names = [ s.strip() for s in str.split(",")] if len(names) > 3: raise self.parse_error("More than three fields in name") first = names[0].split(":") if first == 'fedid:': del first[0] first[0] = fedid(hexstr=first[0]) names[0] = first[0] for i in range(1,2): names[i] = parse_name(names[i]) return tuple(names) else: raise self.parse_error('Bad mapping (unbalanced parens)') @staticmethod def access_tuple(str): """ Convert a string of the form (id, id) into an access_project. This is called by read_access to convert to local attributes. It returns a tuple of the form (project, user, user) where the two users are always the same. """ str = str.strip() if str.startswith('(') and str.endswith(')') and str.count(',') == 1: # The slice takes the parens off the string. proj, user = str[1:-1].split(',') return (proj.strip(), user.strip(), user.strip()) else: raise self.parse_error( 'Bad mapping (unbalanced parens or more than 1 comma)') # RequestAccess support routines def legacy_lookup_access(self, req, fid): """ Look up the local access control information mapped to this fedid and credentials. In this case it is a (project, create_user, access_user) triple, and a triple of three booleans indicating which, if any will need to be dynamically created. Finally a list of owners for that allocation is returned. lookup_access_base pulls the first triple out, and it is parsed by this routine into the boolean map. Owners is always the controlling fedid. """ # Return values rp = None ru = None # This maps a valid user to the Emulab projects and users to use found, match = self.legacy_lookup_access_base(req, fid) tb, project, user = match if found == None: raise service_error(service_error.access, "Access denied - cannot map access") # resolve and in found dyn_proj = False dyn_create_user = False dyn_service_user = False if found[0] == "": if project != None: rp = project else : raise service_error(\ service_error.server_config, "Project matched when no project given") elif found[0] == "": rp = None dyn_proj = True else: rp = found[0] if found[1] == "": if user_match == "": if user != None: rcu = user[0] else: raise service_error(\ service_error.server_config, "Matched on anonymous request") else: rcu = user_match elif found[1] == "": rcu = None dyn_create_user = True else: rcu = found[1] if found[2] == "": if user_match == "": if user != None: rsu = user[0] else: raise service_error(\ service_error.server_config, "Matched on anonymous request") else: rsu = user_match elif found[2] == "": rsu = None dyn_service_user = True else: rsu = found[2] return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\ [ fid ] def do_project_allocation(self, dyn, project, user): """ Call the project allocation routines and return the info. """ if dyn: # Compose the dynamic project request # (only dynamic, dynamic currently allowed) preq = { 'AllocateProjectRequestBody': \ { 'project' : {\ 'user': [ \ { \ 'access': [ { 'sshPubkey': self.ssh_pubkey_file } ], 'role': "serviceAccess",\ }, \ { \ 'access': [ { 'sshPubkey': self.ssh_pubkey_file } ], 'role': "experimentCreation",\ }, \ ], \ }\ }\ } return self.allocate_project.dynamic_project(preq) else: preq = {'StaticProjectRequestBody' : \ { 'project': \ { 'name' : { 'localname' : project },\ 'user' : [ \ {\ 'userID': { 'localname' : user }, \ 'access': [ { 'sshPubkey': self.ssh_pubkey_file } ], 'role': 'experimentCreation'\ },\ {\ 'userID': { 'localname' : user}, \ 'access': [ { 'sshPubkey': self.ssh_pubkey_file } ], 'role': 'serviceAccess'\ },\ ]}\ }\ } return self.allocate_project.static_project(preq) def save_project_state(self, aid, ap, dyn, owners): """ Parse out and save the information relevant to the project created for this experiment. That info is largely in ap and owners. dyn indicates that the project was created dynamically. Return the user and project names. """ self.state_lock.acquire() self.allocation[aid] = { } try: pname = ap['project']['name']['localname'] except KeyError: pname = None if dyn: if not pname: self.state_lock.release() raise service_error(service_error.internal, "Misformed allocation response?") if pname in self.projects: self.projects[pname] += 1 else: self.projects[pname] = 1 self.allocation[aid]['project'] = pname else: # sproject is a static project associated with this allocation. self.allocation[aid]['sproject'] = pname self.allocation[aid]['keys'] = [ ] try: for u in ap['project']['user']: uname = u['userID']['localname'] if u['role'] == 'experimentCreation': self.allocation[aid]['user'] = uname for k in [ k['sshPubkey'] for k in u['access'] \ if k.has_key('sshPubkey') ]: kv = "%s:%s" % (uname, k) if self.keys.has_key(kv): self.keys[kv] += 1 else: self.keys[kv] = 1 self.allocation[aid]['keys'].append((uname, k)) except KeyError: self.state_lock.release() raise service_error(service_error.internal, "Misformed allocation response?") self.allocation[aid]['owners'] = owners self.write_state() self.state_lock.release() return (pname, uname) # End of RequestAccess support routines def RequestAccess(self, req, fid): """ Handle the access request. Proxy if not for us. Parse out the fields and make the allocations or rejections if for us, otherwise, assuming we're willing to proxy, proxy the request out. """ def gateway_hardware(h): if h == 'GWTYPE': return self.portal_type or 'GWTYPE' else: return h def get_export_project(svcs): """ if the service requests includes one to export a project, return that project. """ rv = None for s in svcs: if s.get('name', '') == 'project_export' and \ s.get('visibility', '') == 'export': if not rv: for a in s.get('fedAttr', []): if a.get('attribute', '') == 'project' \ and 'value' in a: rv = a['value'] else: raise service_error(service_error, access, 'Requesting multiple project exports is ' + \ 'not supported'); return rv # The dance to get into the request body if req.has_key('RequestAccessRequestBody'): req = req['RequestAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") # if this includes a project export request, construct a filter such # that only the ABAC attributes mapped to that project are checked for # access. if 'service' in req: ep = get_export_project(req['service']) if ep: pf = lambda(a): a.value[0] == ep else: pf = None else: ep = None pf = None if self.auth.import_credentials( data_list=req.get('abac_credential', [])): self.auth.save() if self.auth_type == "legacy": found, dyn, owners = self.legacy_lookup_access(req, fid) elif self.auth_type == 'abac': found, dyn, owners = self.lookup_access(req, fid, filter=pf) else: raise service_error(service_error.internal, 'Unknown auth_type: %s' % self.auth_type) ap = None # This only happens in legacy lookups, but if this user has access to # the testbed but not the project to be exported, raise the error. if ep and ep != found[0]: raise service_error(service_error.access, "Cannot export %s" % ep) if self.ssh_pubkey_file: ap = self.do_project_allocation(dyn[1], found[0], found[1]) else: raise service_error(service_error.internal, "SSH access parameters required") # keep track of what's been added allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) aid = unicode(allocID) pname, uname = self.save_project_state(aid, ap, dyn[1], owners) services, svc_state = self.export_services(req.get('service',[]), pname, uname) self.state_lock.acquire() # Store services state in global state for k, v in svc_state.items(): self.allocation[aid][k] = v self.write_state() self.state_lock.release() # Give the owners the right to change this allocation for o in owners: self.auth.set_attribute(o, allocID) self.auth.save() try: f = open("%s/%s.pem" % (self.certdir, aid), "w") print >>f, alloc_cert f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Can't open %s/%s : %s" % (self.certdir, aid, e)) resp = self.build_access_response({ 'fedid': allocID } , ap, services) return resp def do_release_project(self, del_project, del_users, del_types): """ If a project and users has to be deleted, make the call. """ msg = { 'project': { }} if del_project: msg['project']['name']= {'localname': del_project} users = [ ] for u in del_users.keys(): users.append({ 'userID': { 'localname': u },\ 'access' : \ [ {'sshPubkey' : s } for s in del_users[u]]\ }) if users: msg['project']['user'] = users if len(del_types) > 0: msg['resources'] = { 'node': \ [ {'hardware': [ h ] } for h in del_types ]\ } if self.allocate_project.release_project: msg = { 'ReleaseProjectRequestBody' : msg} self.allocate_project.release_project(msg) def ReleaseAccess(self, req, fid): # The dance to get into the request body if req.has_key('ReleaseAccessRequestBody'): req = req['ReleaseAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") try: if req['allocID'].has_key('localname'): auth_attr = aid = req['allocID']['localname'] elif req['allocID'].has_key('fedid'): aid = unicode(req['allocID']['fedid']) auth_attr = req['allocID']['fedid'] else: raise service_error(service_error.req, "Only localnames and fedids are understood") except KeyError: raise service_error(service_error.req, "Badly formed request") self.log.debug("[access] deallocation requested for %s by %s" % \ (aid, fid)) if not self.auth.check_attribute(fid, auth_attr): self.log.debug("[access] deallocation denied for %s", aid) raise service_error(service_error.access, "Access Denied") # If we know this allocation, reduce the reference counts and # remove the local allocations. Otherwise report an error. If # there is an allocation to delete, del_users will be a dictonary # of sets where the key is the user that owns the keys in the set. # We use a set to avoid duplicates. del_project is just the name # of any dynamic project to delete. We're somewhat lazy about # deleting authorization attributes. Having access to something # that doesn't exist isn't harmful. del_users = { } del_project = None del_types = set() self.state_lock.acquire() if aid in self.allocation: self.log.debug("Found allocation for %s" %aid) for k in self.allocation[aid]['keys']: kk = "%s:%s" % k self.keys[kk] -= 1 if self.keys[kk] == 0: if not del_users.has_key(k[0]): del_users[k[0]] = set() del_users[k[0]].add(k[1]) del self.keys[kk] if 'project' in self.allocation[aid]: pname = self.allocation[aid]['project'] self.projects[pname] -= 1 if self.projects[pname] == 0: del_project = pname del self.projects[pname] if 'types' in self.allocation[aid]: for t in self.allocation[aid]['types']: self.types[t] -= 1 if self.types[t] == 0: if not del_project: del_project = t[0] del_types.add(t[1]) del self.types[t] del self.allocation[aid] self.write_state() self.state_lock.release() # If we actually have resources to deallocate, prepare the call. if del_project or del_users: self.do_release_project(del_project, del_users, del_types) # And remove the access cert cf = "%s/%s.pem" % (self.certdir, aid) self.log.debug("Removing %s" % cf) os.remove(cf) return { 'allocID': req['allocID'] } else: self.state_lock.release() raise service_error(service_error.req, "No such allocation") # These are subroutines for StartSegment def generate_ns2(self, topo, expfn, softdir, connInfo): """ Convert topo into an ns2 file, decorated with appropriate commands for the particular testbed setup. Convert all requests for software, etc to point at the staged copies on this testbed and add the federation startcommands. """ class dragon_commands: """ Functor to spit out approrpiate dragon commands for nodes listed in the connectivity description. The constructor makes a dict mapping dragon nodes to their parameters and the __call__ checks each element in turn for membership. """ def __init__(self, map): self.node_info = map def __call__(self, e): s = "" if isinstance(e, topdl.Computer): if self.node_info.has_key(e.name): info = self.node_info[e.name] for ifname, vlan, type in info: for i in e.interface: if i.name == ifname: addr = i.get_attribute('ip4_address') subs = i.substrate[0] break else: raise service_error(service_error.internal, "No interface %s on element %s" % \ (ifname, e.name)) # XXX: do netmask right if type =='link': s = ("tb-allow-external ${%s} " + \ "dragonportal ip %s vlan %s " + \ "netmask 255.255.255.0\n") % \ (topdl.to_tcl_name(e.name), addr, vlan) elif type =='lan': s = ("tb-allow-external ${%s} " + \ "dragonportal " + \ "ip %s vlan %s usurp %s\n") % \ (topdl.to_tcl_name(e.name), addr, vlan, subs) else: raise service_error(service_error_internal, "Unknown DRAGON type %s" % type) return s class not_dragon: """ Return true if a node is in the given map of dragon nodes. """ def __init__(self, map): self.nodes = set(map.keys()) def __call__(self, e): return e.name not in self.nodes # Main line of generate_ns2 t = topo.clone() # Create the map of nodes that need direct connections (dragon # connections) from the connInfo dragon_map = { } for i in [ i for i in connInfo if i['type'] == 'transit']: for a in i.get('fedAttr', []): if a['attribute'] == 'vlan_id': vlan = a['value'] break else: raise service_error(service_error.internal, "No vlan tag") members = i.get('member', []) if len(members) > 1: type = 'lan' else: type = 'link' try: for m in members: if m['element'] in dragon_map: dragon_map[m['element']].append(( m['interface'], vlan, type)) else: dragon_map[m['element']] = [( m['interface'], vlan, type),] except KeyError: raise service_error(service_error.req, "Missing connectivity info") # Weed out the things we aren't going to instantiate: Segments, portal # substrates, and portal interfaces. (The copy in the for loop allows # us to delete from e.elements in side the for loop). While we're # touching all the elements, we also adjust paths from the original # testbed to local testbed paths and put the federation commands into # the start commands for e in [e for e in t.elements]: if isinstance(e, topdl.Segment): t.elements.remove(e) if isinstance(e, topdl.Computer): self.add_kit(e, self.federation_software) if e.get_attribute('portal') and self.portal_startcommand: # Add local portal support software self.add_kit(e, self.portal_software) # Portals never have a user-specified start command e.set_attribute('startup', self.portal_startcommand) elif self.node_startcommand: if e.get_attribute('startup'): e.set_attribute('startup', "%s \\$USER '%s'" % \ (self.node_startcommand, e.get_attribute('startup'))) else: e.set_attribute('startup', self.node_startcommand) dinf = [i[0] for i in dragon_map.get(e.name, []) ] # Remove portal interfaces that do not connect to DRAGON e.interface = [i for i in e.interface \ if not i.get_attribute('portal') or i.name in dinf ] # Fix software paths for s in getattr(e, 'software', []): s.location = re.sub("^.*/", softdir, s.location) t.substrates = [ s.clone() for s in t.substrates ] t.incorporate_elements() # Customize the ns2 output for local portal commands and images filters = [] if self.dragon_endpoint: add_filter = not_dragon(dragon_map) filters.append(dragon_commands(dragon_map)) else: add_filter = None if self.portal_command: filters.append(topdl.generate_portal_command_filter( self.portal_command, add_filter=add_filter)) if self.portal_image: filters.append(topdl.generate_portal_image_filter( self.portal_image)) if self.portal_type: filters.append(topdl.generate_portal_hardware_filter( self.portal_type)) # Convert to ns and write it out expfile = topdl.topology_to_ns2(t, filters) try: f = open(expfn, "w") print >>f, expfile f.close() except EnvironmentError: raise service_error(service_error.internal, "Cannot write experiment file %s: %s" % (expfn,e)) def export_store_info(self, cf, proj, ename, connInfo): """ For the export requests in the connection info, install the peer names at the experiment controller via SetValue calls. """ for c in connInfo: for p in [ p for p in c.get('parameter', []) \ if p.get('type', '') == 'output']: if p.get('name', '') == 'peer': k = p.get('key', None) surl = p.get('store', None) if surl and k and k.index('/') != -1: value = "%s.%s.%s%s" % \ (k[k.index('/')+1:], ename, proj, self.domain) req = { 'name': k, 'value': value } self.log.debug("Setting %s to %s on %s" % \ (k, value, surl)) self.call_SetValue(surl, req, cf) else: self.log.error("Bad export request: %s" % p) elif p.get('name', '') == 'ssh_port': k = p.get('key', None) surl = p.get('store', None) if surl and k: req = { 'name': k, 'value': self.ssh_port } self.log.debug("Setting %s to %s on %s" % \ (k, self.ssh_port, surl)) self.call_SetValue(surl, req, cf) else: self.log.error("Bad export request: %s" % p) else: self.log.error("Unknown export parameter: %s" % \ p.get('name')) continue def add_seer_node(self, topo, name, startup): """ Add a seer node to the given topology, with the startup command passed in. Used by configure seer_services. """ c_node = topdl.Computer( name=name, os= topdl.OperatingSystem( attribute=[ { 'attribute': 'osid', 'value': self.local_seer_image }, ]), attribute=[ { 'attribute': 'startup', 'value': startup }, ] ) self.add_kit(c_node, self.local_seer_software) topo.elements.append(c_node) def configure_seer_services(self, services, topo, softdir): """ Make changes to the topology required for the seer requests being made. Specifically, add any control or master nodes required and set up the start commands on the nodes to interconnect them. """ local_seer = False # True if we need to add a control node collect_seer = False # True if there is a seer-master node seer_master= False # True if we need to add the seer-master for s in services: s_name = s.get('name', '') s_vis = s.get('visibility','') if s_name == 'local_seer_control' and s_vis == 'export': local_seer = True elif s_name == 'seer_master': if s_vis == 'import': collect_seer = True elif s_vis == 'export': seer_master = True # We've got the whole picture now, so add nodes if needed and configure # them to interconnect properly. if local_seer or seer_master: # Copy local seer control node software to the tempdir for l, f in self.local_seer_software: base = os.path.basename(f) copy_file(f, "%s/%s" % (softdir, base)) # If we're collecting seers somewhere the controllers need to talk to # the master. In testbeds that export the service, that will be a # local node that we'll add below. Elsewhere it will be the control # portal that will port forward to the exporting master. if local_seer: if collect_seer: startup = "%s -C %s" % (self.local_seer_start, "seer-master") else: startup = self.local_seer_start self.add_seer_node(topo, 'control', startup) # If this is the seer master, add that node, too. if seer_master: self.add_seer_node(topo, 'seer-master', "%s -R -n -R seer-master -R -A -R sink" % \ self.seer_master_start) def retrieve_software(self, topo, certfile, softdir): """ Collect the software that nodes in the topology need loaded and stage it locally. This implies retrieving it from the experiment_controller and placing it into softdir. Certfile is used to prove that this node has access to that data (it's the allocation/segment fedid). Finally local portal and federation software is also copied to the same staging directory for simplicity - all software needed for experiment creation is in softdir. """ sw = set() for e in topo.elements: for s in getattr(e, 'software', []): sw.add(s.location) for s in sw: self.log.debug("Retrieving %s" % s) try: get_url(s, certfile, softdir) except: t, v, st = sys.exc_info() raise service_error(service_error.internal, "Error retrieving %s: %s" % (s, v)) # Copy local federation and portal node software to the tempdir for s in (self.federation_software, self.portal_software): for l, f in s: base = os.path.basename(f) copy_file(f, "%s/%s" % (softdir, base)) def initialize_experiment_info(self, attrs, aid, certfile, tmpdir): """ Gather common configuration files, retrieve or create an experiment name and project name, and return the ssh_key filenames. Create an allocation log bound to the state log variable as well. """ configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey')) ename = None pubkey_base = None secretkey_base = None proj = None user = None alloc_log = None vchars_re = '[^' + string.ascii_letters + string.digits + '-]' for a in attrs: if a['attribute'] in configs: try: self.log.debug("Retrieving %s from %s" % \ (a['attribute'], a['value'])) get_url(a['value'], certfile, tmpdir) except: t, v, st = sys.exc_info() raise service_error(service_error.internal, "Error retrieving %s: %s" % (a.get('value', ""), v)) if a['attribute'] == 'ssh_pubkey': pubkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'ssh_secretkey': secretkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'experiment_name': ename = a['value'] if ename: # Clean up the experiment name so that emulab will accept it. ename = re.sub(vchars_re, '-', ename) else: ename = "" for i in range(0,5): ename += random.choice(string.ascii_letters) self.log.warn("No experiment name: picked one randomly: %s" \ % ename) if not pubkey_base: raise service_error(service_error.req, "No public key attribute") if not secretkey_base: raise service_error(service_error.req, "No secret key attribute") self.state_lock.acquire() if aid in self.allocation: proj = self.allocation[aid].get('project', None) if not proj: proj = self.allocation[aid].get('sproject', None) user = self.allocation[aid].get('user', None) self.allocation[aid]['experiment'] = ename self.allocation[aid]['log'] = [ ] # Create a logger that logs to the experiment's state object as # well as to the main log file. alloc_log = logging.getLogger('fedd.access.%s' % ename) h = logging.StreamHandler( list_log.list_log(self.allocation[aid]['log'])) # XXX: there should be a global one of these rather than # repeating the code. h.setFormatter(logging.Formatter( "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) alloc_log.addHandler(h) self.write_state() self.state_lock.release() if not proj: raise service_error(service_error.internal, "Can't find project for %s" %aid) if not user: raise service_error(service_error.internal, "Can't find creation user for %s" %aid) return (ename, proj, user, pubkey_base, secretkey_base, alloc_log) def finalize_experiment(self, starter, topo, aid, alloc_id): """ Store key bits of experiment state in the global repository, including the response that may need to be replayed, and return the response. """ # Copy the assigned names into the return topology embedding = [ ] for n in starter.node: embedding.append({ 'toponame': n, 'physname': ["%s%s" % (starter.node[n], self.domain)], }) # Grab the log (this is some anal locking, but better safe than # sorry) self.state_lock.acquire() logv = "".join(self.allocation[aid]['log']) # It's possible that the StartSegment call gets retried (!). # if the 'started' key is in the allocation, we'll return it rather # than redo the setup. self.allocation[aid]['started'] = { 'allocID': alloc_id, 'allocationLog': logv, 'segmentdescription': { 'topdldescription': topo.clone().to_dict() }, 'embedding': embedding } retval = copy.copy(self.allocation[aid]['started']) self.write_state() self.state_lock.release() return retval # End of StartSegment support routines def StartSegment(self, req, fid): err = None # Any service_error generated after tmpdir is created rv = None # Return value from segment creation try: req = req['StartSegmentRequestBody'] auth_attr = req['allocID']['fedid'] topref = req['segmentdescription']['topdldescription'] except KeyError: raise service_error(server_error.req, "Badly formed request") connInfo = req.get('connection', []) services = req.get('service', []) aid = "%s" % auth_attr attrs = req.get('fedAttr', []) if not self.auth.check_attribute(fid, auth_attr): raise service_error(service_error.access, "Access denied") else: # See if this is a replay of an earlier succeeded StartSegment - # sometimes SSL kills 'em. If so, replay the response rather than # redoing the allocation. self.state_lock.acquire() retval = self.allocation[aid].get('started', None) self.state_lock.release() if retval: self.log.warning("Duplicate StartSegment for %s: " % aid + \ "replaying response") return retval # A new request. Do it. if topref: topo = topdl.Topology(**topref) else: raise service_error(service_error.req, "Request missing segmentdescription'") certfile = "%s/%s.pem" % (self.certdir, auth_attr) try: tmpdir = tempfile.mkdtemp(prefix="access-") softdir = "%s/software" % tmpdir os.mkdir(softdir) except EnvironmentError: raise service_error(service_error.internal, "Cannot create tmp dir") # Try block alllows us to clean up temporary files. try: self.retrieve_software(topo, certfile, softdir) ename, proj, user, pubkey_base, secretkey_base, alloc_log = \ self.initialize_experiment_info(attrs, aid, certfile, tmpdir) # Set up userconf and seer if needed self.configure_userconf(services, tmpdir) self.configure_seer_services(services, topo, softdir) # Get and send synch store variables self.export_store_info(certfile, proj, ename, connInfo) self.import_store_info(certfile, connInfo) expfile = "%s/experiment.tcl" % tmpdir self.generate_portal_configs(topo, pubkey_base, secretkey_base, tmpdir, proj, ename, connInfo, services) self.generate_ns2(topo, expfile, "/proj/%s/software/%s/" % (proj, ename), connInfo) starter = self.start_segment(keyfile=self.ssh_privkey_file, debug=self.create_debug, log=alloc_log, boss=self.boss, cert=self.xmlrpc_cert) rv = starter(self, ename, proj, user, expfile, tmpdir) except service_error, e: err = e except: t, v, st = sys.exc_info() err = service_error(service_error.internal, "%s: %s" % \ (v, traceback.extract_tb(st))) # Walk up tmpdir, deleting as we go if self.cleanup: self.remove_dirs(tmpdir) else: self.log.debug("[StartSegment]: not removing %s" % tmpdir) if rv: return self.finalize_experiment(starter, topo, aid, req['allocID']) elif err: raise service_error(service_error.federant, "Swapin failed: %s" % err) else: raise service_error(service_error.federant, "Swapin failed") def TerminateSegment(self, req, fid): try: req = req['TerminateSegmentRequestBody'] except KeyError: raise service_error(server_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) if not self.auth.check_attribute(fid, auth_attr): raise service_error(service_error.access, "Access denied") self.state_lock.acquire() if aid in self.allocation: proj = self.allocation[aid].get('project', None) if not proj: proj = self.allocation[aid].get('sproject', None) user = self.allocation[aid].get('user', None) ename = self.allocation[aid].get('experiment', None) else: proj = None user = None ename = None self.state_lock.release() if not proj: raise service_error(service_error.internal, "Can't find project for %s" % aid) if not user: raise service_error(service_error.internal, "Can't find creation user for %s" % aid) if not ename: raise service_error(service_error.internal, "Can't find experiment name for %s" % aid) stopper = self.stop_segment(keyfile=self.ssh_privkey_file, debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert) stopper(self, user, proj, ename) return { 'allocID': req['allocID'] }