#!/usr/local/bin/python import os,sys import stat # for chmod constants import re import random import string import copy import pickle import logging import subprocess import traceback import socket from threading import * from M2Crypto.SSL import SSLError from access import access_base from util import * from deter import fedid, generate_fedid from authorizer import authorizer, abac_authorizer from service_error import service_error from remote_service import xmlrpc_handler, soap_handler, service_caller from proof import proof as access_proof import httplib import tempfile from urlparse import urlparse from deter import topdl import list_log import emulab_segment # Make log messages disappear if noone configures a fedd logger class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.access") fl.addHandler(nullHandler()) class access(access_base): """ The implementation of access control based on mapping users to projects. Users can be mapped to existing projects or have projects created dynamically. This implements both direct requests and proxies. """ max_name_len = 19 def __init__(self, config=None, auth=None): """ Initializer. Pulls parameters out of the ConfigParser's access section. """ access_base.__init__(self, config, auth) self.max_name_len = access.max_name_len self.allow_proxy = config.getboolean("access", "allow_proxy") self.domain = config.get("access", "domain") self.userconfdir = config.get("access","userconfdir") self.userconfcmd = config.get("access","userconfcmd") self.userconfurl = config.get("access","userconfurl") self.federation_software = config.get("access", "federation_software") self.portal_software = config.get("access", "portal_software") self.local_seer_software = config.get("access", "local_seer_software") self.local_seer_image = config.get("access", "local_seer_image") self.local_seer_start = config.get("access", "local_seer_start") self.seer_master_start = config.get("access", "seer_master_start") self.ssh_privkey_file = config.get("access","ssh_privkey_file") self.ssh_pubkey_file = config.get("access","ssh_pubkey_file") self.ssh_port = config.get("access","ssh_port") or "22" self.boss = config.get("access", "boss") self.ops = config.get("access", "ops") self.xmlrpc_cert = config.get("access", "xmlrpc_cert") self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw") self.dragon_endpoint = config.get("access", "dragon") self.dragon_vlans = config.get("access", "dragon_vlans") self.deter_internal = config.get("access", "deter_internal") self.tunnel_config = config.getboolean("access", "tunnel_config") self.portal_command = config.get("access", "portal_command") self.portal_image = config.get("access", "portal_image") self.portal_type = config.get("access", "portal_type") or "pc" self.portal_startcommand = config.get("access", "portal_startcommand") self.node_startcommand = config.get("access", "node_startcommand") self.nat_portal = config.get("access", "nat_portal") self.uri = 'https://%s:%d' % (socket.getfqdn(), self.get_port(config.get("globals", "services", "23235"))) self.federation_software = self.software_list(self.federation_software) self.portal_software = self.software_list(self.portal_software) self.local_seer_software = self.software_list(self.local_seer_software) self.access_type = self.access_type.lower() self.start_segment = emulab_segment.start_segment self.stop_segment = emulab_segment.stop_segment self.info_segment = emulab_segment.info_segment self.operation_segment = emulab_segment.operation_segment self.restricted = [ ] tb = config.get('access', 'testbed') if tb: self.testbed = [ t.strip() for t in tb.split(',') ] else: self.testbed = [ ] # authorization information self.auth_type = config.get('access', 'auth_type') \ or 'abac' self.auth_dir = config.get('access', 'auth_dir') accessdb = config.get("access", "accessdb") # initialize the authorization system if self.auth_type == 'abac': self.auth = abac_authorizer(load=self.auth_dir) self.access = [ ] if accessdb: self.read_access(accessdb, self.access_tuple) else: raise service_error(service_error.internal, "Unknown auth_type: %s" % self.auth_type) # read_state in the base_class self.state_lock.acquire() if 'allocation' not in self.state: self.state['allocation']= { } self.allocation = self.state['allocation'] self.state_lock.release() self.exports = { 'SMB': self.export_SMB, 'seer': self.export_seer, 'tmcd': self.export_tmcd, 'userconfig': self.export_userconfig, 'project_export': self.export_project_export, 'local_seer_control': self.export_local_seer, 'seer_master': self.export_seer_master, 'hide_hosts': self.export_hide_hosts, } if not self.local_seer_image or not self.local_seer_software or \ not self.local_seer_start: if 'local_seer_control' in self.exports: del self.exports['local_seer_control'] if not self.local_seer_image or not self.local_seer_software or \ not self.seer_master_start: if 'seer_master' in self.exports: del self.exports['seer_master'] self.soap_services = {\ 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 'StartSegment': soap_handler("StartSegment", self.StartSegment), 'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment), 'InfoSegment': soap_handler("InfoSegment", self.InfoSegment), 'OperationSegment': soap_handler("OperationSegment", self.OperationSegment), } self.xmlrpc_services = {\ 'RequestAccess': xmlrpc_handler('RequestAccess', self.RequestAccess), 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', self.ReleaseAccess), 'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment), 'TerminateSegment': xmlrpc_handler('TerminateSegment', self.TerminateSegment), 'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment), 'OperationSegment': xmlrpc_handler("OperationSegment", self.OperationSegment), } self.call_SetValue = service_caller('SetValue', log=self.log) self.call_GetValue = service_caller('GetValue', log=self.log) @staticmethod def get_port(ps): ''' Take a fedd service string and return the first port. Used in creating the testbed uri identifier. ''' p = ps.split(',') smallport = p[0].split(':') try: rv = int(smallport[0]) except ValueError: rv = 23235 return rv @staticmethod def access_tuple(str): """ Convert a string of the form (id, id, id) into an access_project. This is called by read_access to convert to local attributes. It returns a tuple of the form (project, user, certificate_file). """ str = str.strip() if str.startswith('(') and str.endswith(')') and str.count(',') == 2: # The slice takes the parens off the string. proj, user, cert = str[1:-1].split(',') return (proj.strip(), user.strip(), cert.strip()) else: raise self.parse_error( 'Bad mapping (unbalanced parens or more than 2 commas)') # RequestAccess support routines def save_project_state(self, aid, pname, uname, certf, owners): """ Save the project, user, and owners associated with this allocation. This creates the allocation entry. """ self.state_lock.acquire() self.allocation[aid] = { } self.allocation[aid]['project'] = pname self.allocation[aid]['user'] = uname self.allocation[aid]['cert'] = certf self.allocation[aid]['owners'] = owners self.write_state() self.state_lock.release() return (pname, uname) # End of RequestAccess support routines def RequestAccess(self, req, fid): """ Handle the access request. Proxy if not for us. Parse out the fields and make the allocations or rejections if for us, otherwise, assuming we're willing to proxy, proxy the request out. """ def gateway_hardware(h): if h == 'GWTYPE': return self.portal_type or 'GWTYPE' else: return h def get_export_project(svcs): """ if the service requests includes one to export a project, return that project. """ rv = None for s in svcs: if s.get('name', '') == 'project_export' and \ s.get('visibility', '') == 'export': if not rv: for a in s.get('fedAttr', []): if a.get('attribute', '') == 'project' \ and 'value' in a: rv = a['value'] else: raise service_error(service_error, access, 'Requesting multiple project exports is ' + \ 'not supported'); return rv self.log.info("RequestAccess called by %s" % fid) # The dance to get into the request body if req.has_key('RequestAccessRequestBody'): req = req['RequestAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") # if this includes a project export request, construct a filter such # that only the ABAC attributes mapped to that project are checked for # access. if 'service' in req: ep = get_export_project(req['service']) if ep: pf = lambda(a): a.value[0] == ep else: pf = None else: ep = None pf = None if self.auth.import_credentials( data_list=req.get('abac_credential', [])): self.auth.save() else: self.log.debug('failed to import incoming credentials') if self.auth_type == 'abac': found, owners, proof = self.lookup_access(req, fid, filter=pf) else: raise service_error(service_error.internal, 'Unknown auth_type: %s' % self.auth_type) ap = None # keep track of what's been added allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) aid = unicode(allocID) pname, uname = self.save_project_state(aid, found[0], found[1], found[2], owners) services, svc_state = self.export_services(req.get('service',[]), pname, uname) self.state_lock.acquire() # Store services state in global state for k, v in svc_state.items(): self.allocation[aid][k] = v self.append_allocation_authorization(aid, set([(o, allocID) for o in owners]), state_attr='allocation') self.write_state() self.state_lock.release() try: f = open("%s/%s.pem" % (self.certdir, aid), "w") print >>f, alloc_cert f.close() except EnvironmentError, e: self.log.info("RequestAccess failed for by %s: internal error" \ % fid) raise service_error(service_error.internal, "Can't open %s/%s : %s" % (self.certdir, aid, e)) self.log.debug('RequestAccess Returning allocation ID: %s' % allocID) resp = self.build_access_response({ 'fedid': allocID } , pname, services, proof) return resp def ReleaseAccess(self, req, fid): self.log.info("ReleaseAccess called by %s" % fid) # The dance to get into the request body if req.has_key('ReleaseAccessRequestBody'): req = req['ReleaseAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") try: if req['allocID'].has_key('localname'): auth_attr = aid = req['allocID']['localname'] elif req['allocID'].has_key('fedid'): aid = unicode(req['allocID']['fedid']) auth_attr = req['allocID']['fedid'] else: raise service_error(service_error.req, "Only localnames and fedids are understood") except KeyError: raise service_error(service_error.req, "Badly formed request") self.log.debug("[access] deallocation requested for %s by %s" % \ (aid, fid)) access_ok, proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) if not access_ok: self.log.debug("[access] deallocation denied for %s", aid) raise service_error(service_error.access, "Access Denied") self.state_lock.acquire() if aid in self.allocation: self.log.debug("Found allocation for %s" %aid) self.clear_allocation_authorization(aid, state_attr='allocation') del self.allocation[aid] self.write_state() self.state_lock.release() # Remove the access cert cf = "%s/%s.pem" % (self.certdir, aid) self.log.debug("Removing %s" % cf) os.remove(cf) self.log.info("ReleaseAccess succeeded for %s" % fid) return { 'allocID': req['allocID'], 'proof': proof.to_dict() } else: self.state_lock.release() raise service_error(service_error.req, "No such allocation") # These are subroutines for StartSegment def generate_ns2(self, topo, expfn, softdir, connInfo): """ Convert topo into an ns2 file, decorated with appropriate commands for the particular testbed setup. Convert all requests for software, etc to point at the staged copies on this testbed and add the federation startcommands. """ class dragon_commands: """ Functor to spit out approrpiate dragon commands for nodes listed in the connectivity description. The constructor makes a dict mapping dragon nodes to their parameters and the __call__ checks each element in turn for membership. """ def __init__(self, map): self.node_info = map def __call__(self, e): s = "" if isinstance(e, topdl.Computer): if self.node_info.has_key(e.name): info = self.node_info[e.name] for ifname, vlan, type in info: for i in e.interface: if i.name == ifname: addr = i.get_attribute('ip4_address') subs = i.substrate[0] break else: raise service_error(service_error.internal, "No interface %s on element %s" % \ (ifname, e.name)) # XXX: do netmask right if type =='link': s = ("tb-allow-external ${%s} " + \ "dragonportal ip %s vlan %s " + \ "netmask 255.255.255.0\n") % \ (topdl.to_tcl_name(e.name), addr, vlan) elif type =='lan': s = ("tb-allow-external ${%s} " + \ "dragonportal " + \ "ip %s vlan %s usurp %s\n") % \ (topdl.to_tcl_name(e.name), addr, vlan, subs) else: raise service_error(service_error_internal, "Unknown DRAGON type %s" % type) return s class not_dragon: """ Return true if a node is in the given map of dragon nodes. """ def __init__(self, map): self.nodes = set(map.keys()) def __call__(self, e): return e.name not in self.nodes def have_portals(top): ''' Return true if the topology has a portal node ''' # The else is on the for for e in top.elements: if isinstance(e, topdl.Computer) and e.get_attribute('portal'): return True else: return False # Main line of generate_ns2 t = topo.clone() # Create the map of nodes that need direct connections (dragon # connections) from the connInfo dragon_map = { } for i in [ i for i in connInfo if i['type'] == 'transit']: for a in i.get('fedAttr', []): if a['attribute'] == 'vlan_id': vlan = a['value'] break else: raise service_error(service_error.internal, "No vlan tag") members = i.get('member', []) if len(members) > 1: type = 'lan' else: type = 'link' try: for m in members: if m['element'] in dragon_map: dragon_map[m['element']].append(( m['interface'], vlan, type)) else: dragon_map[m['element']] = [( m['interface'], vlan, type),] except KeyError: raise service_error(service_error.req, "Missing connectivity info") def output_fixed_filter(e): if not isinstance(e, topdl.Computer): return "" fn = e.get_attribute('fixed') if fn is None: return "" else: return 'tb-fix-node ${%s} %s' % (topdl.to_tcl_name(e.name), fn) # Weed out the things we aren't going to instantiate: Segments, portal # substrates, and portal interfaces. (The copy in the for loop allows # us to delete from e.elements in side the for loop). While we're # touching all the elements, we also adjust paths from the original # testbed to local testbed paths and put the federation commands into # the start commands local = len(dragon_map) == 0 and not have_portals(t) if local: routing = 'Static' else: routing = 'Manual' if local: self.log.debug("Local experiment.") for e in [e for e in t.elements]: if isinstance(e, topdl.Segment): t.elements.remove(e) if isinstance(e, topdl.Computer): self.add_kit(e, self.federation_software) if e.get_attribute('portal') and self.portal_startcommand: # Add local portal support software self.add_kit(e, self.portal_software) # Portals never have a user-specified start command e.set_attribute('startup', self.portal_startcommand) elif not local and self.node_startcommand: if e.get_attribute('startup'): e.set_attribute('startup', "%s \\$USER '%s'" % \ (self.node_startcommand, e.get_attribute('startup'))) else: e.set_attribute('startup', self.node_startcommand) dinf = [i[0] for i in dragon_map.get(e.name, []) ] # Remove portal interfaces that do not connect to DRAGON e.interface = [i for i in e.interface \ if not i.get_attribute('portal') or i.name in dinf ] # Fix software paths for s in getattr(e, 'software', []): s.location = re.sub("^.*/", softdir, s.location) t.substrates = [ s.clone() for s in t.substrates ] t.incorporate_elements() # Customize the ns2 output for local portal commands and images filters = [output_fixed_filter] if self.dragon_endpoint: add_filter = not_dragon(dragon_map) filters.append(dragon_commands(dragon_map)) else: add_filter = None if self.portal_command: filters.append(topdl.generate_portal_command_filter( self.portal_command, add_filter=add_filter)) if self.portal_image: filters.append(topdl.generate_portal_image_filter( self.portal_image)) if self.portal_type: filters.append(topdl.generate_portal_hardware_filter( self.portal_type)) # Convert to ns and write it out expfile = topdl.topology_to_ns2(t, filters, routing=routing) try: f = open(expfn, "w") print >>f, expfile f.close() except EnvironmentError: raise service_error(service_error.internal, "Cannot write experiment file %s: %s" % (expfn,e)) def export_store_info(self, cf, proj, ename, connInfo): """ For the export requests in the connection info, install the peer names at the experiment controller via SetValue calls. """ for c in connInfo: for p in [ p for p in c.get('parameter', []) \ if p.get('type', '') == 'output']: if p.get('name', '') == 'peer': k = p.get('key', None) surl = p.get('store', None) if surl : if self.nat_portal: value = self.nat_portal elif k and k.index('/') != -1: value = "%s.%s.%s%s" % \ (k[k.index('/')+1:], ename, proj, self.domain) else: self.log.error("Bad export request: %s" % p) continue self.log.debug("Setting %s to %s on %s" % \ (k, value, surl)) req = { 'name': k, 'value': value } self.call_SetValue(surl, req, cf) else: self.log.error("Bad export request: %s" % p) elif p.get('name', '') == 'ssh_port': k = p.get('key', None) surl = p.get('store', None) if surl and k: req = { 'name': k, 'value': self.ssh_port } self.log.debug("Setting %s to %s on %s" % \ (k, self.ssh_port, surl)) self.call_SetValue(surl, req, cf) else: self.log.error("Bad export request: %s" % p) else: self.log.error("Unknown export parameter: %s" % \ p.get('name')) continue def add_seer_node(self, topo, name, startup): """ Add a seer node to the given topology, with the startup command passed in. Used by configure seer_services. """ c_node = topdl.Computer( name=name, os= topdl.OperatingSystem( attribute=[ { 'attribute': 'osid', 'value': self.local_seer_image }, ]), attribute=[ { 'attribute': 'startup', 'value': startup }, ] ) self.add_kit(c_node, self.local_seer_software) topo.elements.append(c_node) def configure_seer_services(self, services, topo, softdir): """ Make changes to the topology required for the seer requests being made. Specifically, add any control or master nodes required and set up the start commands on the nodes to interconnect them. """ local_seer = False # True if we need to add a control node collect_seer = False # True if there is a seer-master node seer_master= False # True if we need to add the seer-master for s in services: s_name = s.get('name', '') s_vis = s.get('visibility','') if s_name == 'local_seer_control' and s_vis == 'export': local_seer = True elif s_name == 'seer_master': if s_vis == 'import': collect_seer = True elif s_vis == 'export': seer_master = True # We've got the whole picture now, so add nodes if needed and configure # them to interconnect properly. if local_seer or seer_master: # Copy local seer control node software to the tempdir for l, f in self.local_seer_software: base = os.path.basename(f) copy_file(f, "%s/%s" % (softdir, base)) # If we're collecting seers somewhere the controllers need to talk to # the master. In testbeds that export the service, that will be a # local node that we'll add below. Elsewhere it will be the control # portal that will port forward to the exporting master. if local_seer: if collect_seer: startup = "%s -C %s" % (self.local_seer_start, "seer-master") else: startup = self.local_seer_start self.add_seer_node(topo, 'control', startup) # If this is the seer master, add that node, too. if seer_master: self.add_seer_node(topo, 'seer-master', "%s -R -n -R seer-master -R -A -R sink" % \ self.seer_master_start) def retrieve_software(self, topo, certfile, softdir): """ Collect the software that nodes in the topology need loaded and stage it locally. This implies retrieving it from the experiment_controller and placing it into softdir. Certfile is used to prove that this node has access to that data (it's the allocation/segment fedid). Finally local portal and federation software is also copied to the same staging directory for simplicity - all software needed for experiment creation is in softdir. """ sw = set() for e in topo.elements: for s in getattr(e, 'software', []): sw.add(s.location) for s in sw: self.log.debug("Retrieving %s" % s) try: get_url(s, certfile, softdir, log=self.log) except: t, v, st = sys.exc_info() raise service_error(service_error.internal, "Error retrieving %s: %s" % (s, v)) # Copy local federation and portal node software to the tempdir for s in (self.federation_software, self.portal_software): for l, f in s: base = os.path.basename(f) copy_file(f, "%s/%s" % (softdir, base)) def initialize_experiment_info(self, attrs, aid, certfile, tmpdir): """ Gather common configuration files, retrieve or create an experiment name and project name, and return the ssh_key filenames. Create an allocation log bound to the state log variable as well. """ configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 'seer_ca_pem', 'seer_node_pem') ename = None pubkey_base = None secretkey_base = None proj = None user = None alloc_log = None nonce_experiment = False vchars_re = '[^' + string.ascii_letters + string.digits + '-]' self.state_lock.acquire() if aid in self.allocation: proj = self.allocation[aid].get('project', None) self.state_lock.release() if not proj: raise service_error(service_error.internal, "Can't find project for %s" %aid) for a in attrs: if a['attribute'] in configs: try: self.log.debug("Retrieving %s from %s" % \ (a['attribute'], a['value'])) get_url(a['value'], certfile, tmpdir, log=self.log) except: t, v, st = sys.exc_info() raise service_error(service_error.internal, "Error retrieving %s: %s" % (a.get('value', ""), v)) if a['attribute'] == 'ssh_pubkey': pubkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'ssh_secretkey': secretkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'experiment_name': ename = a['value'] # Names longer than the emulab max are discarded if ename and len(ename) <= self.max_name_len: # Clean up the experiment name so that emulab will accept it. ename = re.sub(vchars_re, '-', ename) else: ename = "" for i in range(0,5): ename += random.choice(string.ascii_letters) nonce_experiment = True self.log.warn("No experiment name or suggestion too long: " + \ "picked one randomly: %s" % ename) if not pubkey_base: raise service_error(service_error.req, "No public key attribute") if not secretkey_base: raise service_error(service_error.req, "No secret key attribute") self.state_lock.acquire() if aid in self.allocation: user = self.allocation[aid].get('user', None) cert = self.allocation[aid].get('cert', None) self.allocation[aid]['experiment'] = ename self.allocation[aid]['nonce'] = nonce_experiment self.allocation[aid]['log'] = [ ] # Create a logger that logs to the experiment's state object as # well as to the main log file. alloc_log = logging.getLogger('fedd.access.%s' % ename) h = logging.StreamHandler( list_log.list_log(self.allocation[aid]['log'])) # XXX: there should be a global one of these rather than # repeating the code. h.setFormatter(logging.Formatter( "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) alloc_log.addHandler(h) self.write_state() self.state_lock.release() if not user: raise service_error(service_error.internal, "Can't find creation user for %s" %aid) return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log) def decorate_topology(self, info, t): """ Copy the physical mapping and status onto the topology. Used by StartSegment and InfoSegment """ def add_new(ann, attr): for a in ann: if a not in attr: attr.append(a) def merge_os(os, e): if len(e.os) == 0: # No OS at all: if os.get_attribute('emulab_access:image'): os.set_attribute('emulab_access:initial_image', os.get_attribute('emulab_access:image')) e.os = [ os ] elif len(e.os) == 1: # There's one OS, copy the initial image and replace eos = e.os[0] initial = eos.get_attribute('emulab_access:initial_image') if initial: os.set_attribute('emulab_access:initial_image', initial) e.os = [ os] else: # Multiple OSes, replace or append for eos in e.os: if os.name == eos.name: eos.version = os.version eos.version = os.distribution eos.version = os.distributionversion for a in os.attribute: if eos.get_attribute(a.attribute): eos.remove_attribute(a.attribute) eos.set_attribute(a.attribute, a.value) break else: e.os.append(os) if t is None: return i = 0 # For fake debugging instantiation # Copy the assigned names into the return topology for e in t.elements: if isinstance(e, topdl.Computer): if not self.create_debug: if e.name in info.node: add_new(("%s%s" % (info.node[e.name].pname, self.domain),), e.localname) add_new(("%s%s" % (info.node[e.name].lname, self.domain),), e.localname) e.status = info.node[e.name].status os = info.node[e.name].getOS() if os: merge_os(os, e) else: # Simple debugging assignment add_new(("node%d%s" % (i, self.domain),), e.localname) e.status = 'active' add_new(('testop1', 'testop2'), e.operation) i += 1 for s in t.substrates: if s.name in info.subs: sub = info.subs[s.name] if sub.cap is not None: s.capacity = topdl.Capacity(sub.cap, 'max') if sub.delay is not None: s.delay = topdl.Latency(sub.delay, 'max') # XXX interfaces def finalize_experiment(self, starter, topo, aid, alloc_id, proof): """ Store key bits of experiment state in the global repository, including the response that may need to be replayed, and return the response. """ def get_localnames(t): names = [ ] for e in t.elements: if isinstance(e, topdl.Computer): n = e.get_attribute('testbed') if n is not None and n not in names: names.append(n) return names i = 0 t = topo.clone() self.decorate_topology(starter, t) # Grab the log (this is some anal locking, but better safe than # sorry) self.state_lock.acquire() # Put information about this testbed into the topdl tb = topdl.Testbed(self.uri, "deter", localname=get_localnames(t), attribute=[ { 'attribute': 'project', 'value': self.allocation[aid]['project'] }, { 'attribute': 'experiment', 'value': self.allocation[aid]['experiment'] }]) t.elements.append(tb) logv = "".join(self.allocation[aid]['log']) # It's possible that the StartSegment call gets retried (!). # if the 'started' key is in the allocation, we'll return it rather # than redo the setup. self.allocation[aid]['started'] = { 'allocID': alloc_id, 'allocationLog': logv, 'segmentdescription': { 'topdldescription': t.to_dict() }, 'proof': proof.to_dict(), } self.allocation[aid]['topo'] = t retval = copy.copy(self.allocation[aid]['started']) self.write_state() self.state_lock.release() return retval # End of StartSegment support routines def StartSegment(self, req, fid): err = None # Any service_error generated after tmpdir is created rv = None # Return value from segment creation self.log.info("StartSegment called by %s" % fid) try: req = req['StartSegmentRequestBody'] auth_attr = req['allocID']['fedid'] topref = req['segmentdescription']['topdldescription'] except KeyError: raise service_error(server_error.req, "Badly formed request") connInfo = req.get('connection', []) services = req.get('service', []) aid = "%s" % auth_attr attrs = req.get('fedAttr', []) access_ok, proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) if not access_ok: self.log.info("StartSegment for %s failed: access denied" % fid) raise service_error(service_error.access, "Access denied") else: # See if this is a replay of an earlier succeeded StartSegment - # sometimes SSL kills 'em. If so, replay the response rather than # redoing the allocation. self.state_lock.acquire() retval = self.allocation[aid].get('started', None) self.state_lock.release() if retval: self.log.warning("Duplicate StartSegment for %s: " % aid + \ "replaying response") return retval # A new request. Do it. if topref: topo = topdl.Topology(**topref) else: raise service_error(service_error.req, "Request missing segmentdescription'") certfile = "%s/%s.pem" % (self.certdir, auth_attr) try: tmpdir = tempfile.mkdtemp(prefix="access-") softdir = "%s/software" % tmpdir os.mkdir(softdir) except EnvironmentError: self.log.info("StartSegment for %s failed: internal error" % fid) raise service_error(service_error.internal, "Cannot create tmp dir") # Try block alllows us to clean up temporary files. try: self.retrieve_software(topo, certfile, softdir) ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \ alloc_log = self.initialize_experiment_info(attrs, aid, certfile, tmpdir) # A misconfigured cert in the ABAC map can be confusing... if not os.access(xmlrpc_cert, os.R_OK): self.log.error("Cannot open user's emulab SSL cert: %s" % \ xmlrpc_cert) raise service_error(service_error.internal, "Cannot open user's emulab SSL cert: %s" % xmlrpc_cert) if '/' in proj: proj, gid = proj.split('/') else: gid = None # Set up userconf and seer if needed self.configure_userconf(services, tmpdir) self.configure_seer_services(services, topo, softdir) # Get and send synch store variables self.export_store_info(certfile, proj, ename, connInfo) self.import_store_info(certfile, connInfo) expfile = "%s/experiment.tcl" % tmpdir self.generate_portal_configs(topo, pubkey_base, secretkey_base, tmpdir, proj, ename, connInfo, services) self.generate_ns2(topo, expfile, "/proj/%s/software/%s/" % (proj, ename), connInfo) starter = self.start_segment(keyfile=self.ssh_privkey_file, debug=self.create_debug, log=alloc_log, boss=self.boss, ops=self.ops, cert=xmlrpc_cert) rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid) except service_error, e: self.log.info("StartSegment for %s failed: %s" % (fid, e)) err = e except: t, v, st = sys.exc_info() self.log.info("StartSegment for %s failed:unexpected error: %s" \ % (fid, traceback.extract_tb(st))) err = service_error(service_error.internal, "%s: %s" % \ (v, traceback.extract_tb(st))) # Walk up tmpdir, deleting as we go if self.cleanup: self.remove_dirs(tmpdir) else: self.log.debug("[StartSegment]: not removing %s" % tmpdir) if rv: self.log.info("StartSegment for %s succeeded" % fid) return self.finalize_experiment(starter, topo, aid, req['allocID'], proof) elif err: raise service_error(service_error.federant, "Swapin failed: %s" % err) else: raise service_error(service_error.federant, "Swapin failed") def TerminateSegment(self, req, fid): self.log.info("TerminateSegment called by %s" % fid) try: req = req['TerminateSegmentRequestBody'] except KeyError: raise service_error(server_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) access_ok, proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) if not access_ok: raise service_error(service_error.access, "Access denied") self.state_lock.acquire() if aid in self.allocation: proj = self.allocation[aid].get('project', None) user = self.allocation[aid].get('user', None) xmlrpc_cert = self.allocation[aid].get('cert', None) ename = self.allocation[aid].get('experiment', None) nonce = self.allocation[aid].get('nonce', False) else: proj = None user = None ename = None nonce = False xmlrpc_cert = None self.state_lock.release() if not proj: self.log.info("TerminateSegment failed for %s: cannot find project"\ % fid) raise service_error(service_error.internal, "Can't find project for %s" % aid) else: if '/' in proj: proj, gid = proj.split('/') else: gid = None if not user: self.log.info("TerminateSegment failed for %s: cannot find user"\ % fid) raise service_error(service_error.internal, "Can't find creation user for %s" % aid) if not ename: self.log.info( "TerminateSegment failed for %s: cannot find experiment"\ % fid) raise service_error(service_error.internal, "Can't find experiment name for %s" % aid) stopper = self.stop_segment(keyfile=self.ssh_privkey_file, debug=self.create_debug, boss=self.boss, ops=self.ops, cert=xmlrpc_cert) stopper(self, user, proj, ename, gid, nonce) self.log.info("TerminateSegment succeeded for %s %s %s" % \ (fid, proj, ename)) self.state_lock.acquire() # Remove the started flag/info - the segment is no longer started if aid in self.allocation: del self.allocation[aid]['started'] self.write_state() self.state_lock.release() return { 'allocID': req['allocID'], 'proof': proof.to_dict() } def InfoSegment(self, req, fid): self.log.info("InfoSegment called by %s" % fid) try: req = req['InfoSegmentRequestBody'] except KeyError: raise service_error(server_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr access_ok, proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) if not access_ok: raise service_error(service_error.access, "Access denied") self.state_lock.acquire() if aid in self.allocation: topo = self.allocation[aid].get('topo', None) if topo: topo = topo.clone() proj = self.allocation[aid].get('project', None) user = self.allocation[aid].get('user', None) xmlrpc_cert = self.allocation[aid].get('cert', None) ename = self.allocation[aid].get('experiment', None) else: proj = None user = None ename = None topo = None xmlrpc_cert = None self.state_lock.release() if not proj: self.log.info("InfoSegment failed for %s: cannot find project"% fid) raise service_error(service_error.internal, "Can't find project for %s" % aid) else: if '/' in proj: proj, gid = proj.split('/') else: gid = None if not user: self.log.info("InfoSegment failed for %s: cannot find user"% fid) raise service_error(service_error.internal, "Can't find creation user for %s" % aid) if not ename: self.log.info("InfoSegment failed for %s: cannot find exp"% fid) raise service_error(service_error.internal, "Can't find experiment name for %s" % aid) info = self.info_segment(keyfile=self.ssh_privkey_file, debug=self.create_debug, boss=self.boss, ops=self.ops, cert=xmlrpc_cert) info(self, user, proj, ename) self.log.info("InfoSegment gathered info for %s %s %s %s" % \ (fid, user, proj, ename)) self.decorate_topology(info, topo) self.state_lock.acquire() if aid in self.allocation: self.allocation[aid]['topo'] = topo self.write_state() self.state_lock.release() self.log.info("InfoSegment updated info for %s %s %s %s" % \ (fid, user, proj, ename)) rv = { 'allocID': req['allocID'], 'proof': proof.to_dict(), } self.log.info("InfoSegment succeeded info for %s %s %s %s" % \ (fid, user, proj, ename)) if topo: rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() } return rv def OperationSegment(self, req, fid): def get_pname(e): """ Get the physical name of a node """ if e.localname: return re.sub('\..*','', e.localname[0]) else: return None self.log.info("OperationSegment called by %s" % fid) try: req = req['OperationSegmentRequestBody'] except KeyError: raise service_error(server_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr access_ok, proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) if not access_ok: self.log.info("OperationSegment failed for %s: access denied" % fid) raise service_error(service_error.access, "Access denied") op = req.get('operation', None) targets = req.get('target', None) params = req.get('parameter', None) if op is None : self.log.info("OperationSegment failed for %s: no operation" % fid) raise service_error(service_error.req, "missing operation") elif targets is None: self.log.info("OperationSegment failed for %s: no targets" % fid) raise service_error(service_error.req, "no targets") self.state_lock.acquire() if aid in self.allocation: topo = self.allocation[aid].get('topo', None) if topo: topo = topo.clone() xmlrpc_cert = self.allocation[aid].get('cert', None) else: topo = None xmlrpc_cert = None self.state_lock.release() targets = copy.copy(targets) ptargets = { } for e in topo.elements: if isinstance(e, topdl.Computer): if e.name in targets: targets.remove(e.name) pn = get_pname(e) if pn: ptargets[e.name] = pn status = [ operation_status(t, operation_status.no_target) \ for t in targets] ops = self.operation_segment(keyfile=self.ssh_privkey_file, debug=self.create_debug, boss=self.boss, ops=self.ops, cert=xmlrpc_cert) ops(self, op, ptargets, params, topo) self.log.info("OperationSegment operated for %s" % fid) status.extend(ops.status) self.log.info("OperationSegment succeed for %s" % fid) return { 'allocID': req['allocID'], 'status': [s.to_dict() for s in status], 'proof': proof.to_dict(), }