#!/usr/local/bin/python import os,sys import stat # for chmod constants import re import time import string import copy import pickle import logging import subprocess import random import traceback import xml.parsers.expat from threading import Thread, Timer, Lock from util import * from fedid import fedid, generate_fedid from authorizer import authorizer, abac_authorizer from service_error import service_error from remote_service import xmlrpc_handler, soap_handler, service_caller import httplib import tempfile from urlparse import urlparse from access import access_base from legacy_access import legacy_access from protogeni_proxy import protogeni_proxy from geniapi_proxy import geniapi_proxy import topdl import list_log # Make log messages disappear if noone configures a fedd logger class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.access") fl.addHandler(nullHandler()) class access(access_base, legacy_access): """ The implementation of access control based on mapping users to projects. Users can be mapped to existing projects or have projects created dynamically. This implements both direct requests and proxies. """ def __init__(self, config=None, auth=None): """ Initializer. Pulls parameters out of the ConfigParser's access section. """ access_base.__init__(self, config, auth) self.domain = config.get("access", "domain") self.userconfdir = config.get("access","userconfdir") self.userconfcmd = config.get("access","userconfcmd") self.userconfurl = config.get("access","userconfurl") self.federation_software = config.get("access", "federation_software") self.portal_software = config.get("access", "portal_software") self.ssh_port = config.get("access","ssh_port") or "22" self.sshd = config.get("access","sshd") self.sshd_config = config.get("access", "sshd_config") self.access_type = config.get("access", "type") self.staging_dir = config.get("access", "staging_dir") or "/tmp" self.staging_host = config.get("access", "staging_host") \ or "ops.emulab.net" self.local_seer_software = config.get("access", "local_seer_software") self.local_seer_image = config.get("access", "local_seer_image") self.local_seer_start = config.get("access", "local_seer_start") self.dragon_endpoint = config.get("access", "dragon") self.dragon_vlans = config.get("access", "dragon_vlans") self.deter_internal = config.get("access", "deter_internal") self.tunnel_config = config.getboolean("access", "tunnel_config") self.portal_command = config.get("access", "portal_command") self.portal_image = config.get("access", "portal_image") self.portal_type = config.get("access", "portal_type") or "pc" self.portal_startcommand = config.get("access", "portal_startcommand") self.node_startcommand = config.get("access", "node_startcommand") self.federation_software = self.software_list(self.federation_software) self.portal_software = self.software_list(self.portal_software) self.local_seer_software = self.software_list(self.local_seer_software) self.renewal_interval = config.get("access", "renewal") or (3 * 60 ) self.renewal_interval = int(self.renewal_interval) * 60 self.ch_url = config.get("access", "ch_url") self.sa_url = config.get("access", "sa_url") self.cm_url = config.get("access", "cm_url") self.restricted = [ ] # read_state in the base_class self.state_lock.acquire() for a in ('allocation', 'projects', 'keys', 'types'): if a not in self.state: self.state[a] = { } self.allocation = self.state['allocation'] self.projects = self.state['projects'] self.keys = self.state['keys'] self.types = self.state['types'] self.state_lock.release() self.log = logging.getLogger("fedd.access") set_log_level(config, "access", self.log) # authorization information self.auth_type = config.get('access', 'auth_type') \ or 'legacy' self.auth_dir = config.get('access', 'auth_dir') accessdb = config.get("access", "accessdb") # initialize the authorization system if self.auth_type == 'legacy': self.access = { } if accessdb: self.legacy_read_access(accessdb, self.make_access_info) # Add the ownership attributes to the authorizer. Note that the # indices of the allocation dict are strings, but the attributes are # fedids, so there is a conversion. self.state_lock.acquire() for k in self.state.get('allocation', {}).keys(): for o in self.state['allocation'][k].get('owners', []): self.auth.set_attribute(o, fedid(hexstr=k)) self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k)) self.state_lock.release() self.lookup_access = self.legacy_lookup_access_base elif self.auth_type == 'abac': self.auth = abac_authorizer(load=self.auth_dir) self.access = [ ] if accessdb: self.read_access(accessdb, self.make_access_info) else: raise service_error(service_error.internal, "Unknown auth_type: %s" % self.auth_type) api = config.get("access", "api") or "protogeni" if api == "protogeni": self.api_proxy = protogeni_proxy elif api == "geniapi": self.api_proxy = geniapi_proxy else: self.log.debug("Unknown interface, using protogeni") self.api_proxy = protogeni_proxy self.call_SetValue = service_caller('SetValue') self.call_GetValue = service_caller('GetValue') self.exports = { 'local_seer_control': self.export_local_seer, 'seer_master': self.export_seer_master, 'hide_hosts': self.export_hide_hosts, } if not self.local_seer_image or not self.local_seer_software or \ not self.local_seer_start: if 'local_seer_control' in self.exports: del self.exports['local_seer_control'] if not self.local_seer_image or not self.local_seer_software or \ not self.seer_master_start: if 'seer_master' in self.exports: del self.exports['seer_master'] self.RenewSlices() self.soap_services = {\ 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 'StartSegment': soap_handler("StartSegment", self.StartSegment), 'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment), } self.xmlrpc_services = {\ 'RequestAccess': xmlrpc_handler('RequestAccess', self.RequestAccess), 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', self.ReleaseAccess), 'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment), 'TerminateSegment': xmlrpc_handler('TerminateSegment', self.TerminateSegment), } @staticmethod def make_access_info(s): """ Split a string of the form (id, id, id, id) ito its constituent tuples and return them as a tuple. Use to import access info from the access_db. """ ss = s.strip() if ss.startswith('(') and ss.endswith(')'): l = [ s.strip() for s in ss[1:-1].split(",")] if len(l) == 4: return tuple(l) else: raise self.parse_error( "Exactly 4 elements in access info required") else: raise self.parse_error("Expecting parenthezied values") def get_handler(self, path, fid): self.log.info("Get handler %s %s" % (path, fid)) if self.auth.check_attribute(fid, path) and self.userconfdir: return ("%s/%s" % (self.userconfdir, path), "application/binary") else: return (None, None) def build_access_response(self, alloc_id, services, proof): """ Create the SOAP response. Build the dictionary description of the response and use fedd_utils.pack_soap to create the soap message. ap is the allocate project message returned from a remote project allocation (even if that allocation was done locally). """ # Because alloc_id is already a fedd_services_types.IDType_Holder, # there's no need to repack it msg = { 'allocID': alloc_id, 'fedAttr': [ { 'attribute': 'domain', 'value': self.domain } , ], 'proof': proof.to_dict() } if self.dragon_endpoint: msg['fedAttr'].append({'attribute': 'dragon', 'value': self.dragon_endpoint}) if self.deter_internal: msg['fedAttr'].append({'attribute': 'deter_internal', 'value': self.deter_internal}) #XXX: ?? if self.dragon_vlans: msg['fedAttr'].append({'attribute': 'vlans', 'value': self.dragon_vlans}) if services: msg['service'] = services return msg def RequestAccess(self, req, fid): """ Handle the access request. """ # The dance to get into the request body if req.has_key('RequestAccessRequestBody'): req = req['RequestAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") if req.has_key('destinationTestbed'): dt = unpack_id(req['destinationTestbed']) # Request for this fedd found, match, owners, proof = self.lookup_access(req, fid) services, svc_state = self.export_services(req.get('service',[]), None, None) # keep track of what's been added allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) aid = unicode(allocID) self.state_lock.acquire() self.allocation[aid] = { } # The protoGENI certificate self.allocation[aid]['credentials'] = found # The list of owner FIDs self.allocation[aid]['owners'] = owners self.allocation[aid]['auth'] = set() self.append_allocation_authorization(aid, ((fid, allocID), (allocID, allocID)), state_attr='allocation') self.write_state() self.state_lock.release() try: f = open("%s/%s.pem" % (self.certdir, aid), "w") print >>f, alloc_cert f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Can't open %s/%s : %s" % (self.certdir, aid, e)) return self.build_access_response({ 'fedid': allocID }, None, proof) def ReleaseAccess(self, req, fid): # The dance to get into the request body if req.has_key('ReleaseAccessRequestBody'): req = req['ReleaseAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") # Local request try: if req['allocID'].has_key('localname'): auth_attr = aid = req['allocID']['localname'] elif req['allocID'].has_key('fedid'): aid = unicode(req['allocID']['fedid']) auth_attr = req['allocID']['fedid'] else: raise service_error(service_error.req, "Only localnames and fedids are understood") except KeyError: raise service_error(service_error.req, "Badly formed request") self.log.debug("[access] deallocation requested for %s", aid) access_ok , proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) if not access_ok: self.log.debug("[access] deallocation denied for %s", aid) raise service_error(service_error.access, "Access Denied") self.state_lock.acquire() if self.allocation.has_key(aid): self.log.debug("Found allocation for %s" %aid) self.clear_allocation_authorization(aid, state_attr='allocation') del self.allocation[aid] self.write_state() self.state_lock.release() # And remove the access cert cf = "%s/%s.pem" % (self.certdir, aid) self.log.debug("Removing %s" % cf) os.remove(cf) return { 'allocID': req['allocID'], 'proof': proof.to_dict() } else: self.state_lock.release() raise service_error(service_error.req, "No such allocation") def manifest_to_dict(self, manifest, ignore_debug=False): """ Turn the manifest into a dict were each virtual nodename (i.e. the topdl name) has an entry with the allocated machine in hostname and the interfaces in 'interfaces'. I love having XML parser code lying around. """ if self.create_debug and not ignore_debug: self.log.debug("Returning null manifest dict") return { } # The class allows us to keep a little state - the dict under # consteruction and the current entry in that dict for the interface # element code. class manifest_parser: def __init__(self): self.d = { } self.current_key=None # If the element is a node, create a dict entry for it. If it's an # interface inside a node, add an entry in the interfaces list with # the virtual name and component id. def start_element(self, name, attrs): if name == 'node': self.current_key = attrs.get('virtual_id',"") if self.current_key: self.d[self.current_key] = { 'hostname': attrs.get('hostname', None), 'interfaces': { }, 'mac': { } } elif name == 'interface' and self.current_key: self.d[self.current_key]['interfaces']\ [attrs.get('virtual_id','')] = \ attrs.get('component_id', None) elif name == 'interface_ref': # Collect mac address information from an interface_ref. # These appear after the node info has been parsed. nid = attrs.get('virtual_node_id', None) ifid = attrs.get('virtual_interface_id', None) mac = attrs.get('MAC', None) self.d[nid]['mac'][ifid] = mac # When a node is finished, clear current_key def end_element(self, name): if name == 'node': self.current_key = None node = { } mp = manifest_parser() p = xml.parsers.expat.ParserCreate() # These are bound to the class we just created p.StartElementHandler = mp.start_element p.EndElementHandler = mp.end_element p.Parse(manifest) # Make the node dict that the callers expect for k in mp.d: node[k] = mp.d.get('hostname', '') return mp.d def fake_manifest(self, topo): """ Fake the output of manifest_to_dict with a bunch of generic node an interface names, for debugging. """ node = { } for i, e in enumerate([ e for e in topo.elements \ if isinstance(e, topdl.Computer)]): node[e.name] = { 'hostname': "node%03d" % i, 'interfaces': { } } for j, inf in enumerate(e.interface): node[e.name]['interfaces'][inf.name] = 'eth%d' % j return node def generate_portal_configs(self, topo, pubkey_base, secretkey_base, tmpdir, leid, connInfo, services, nodes): def conninfo_to_dict(key, info): """ Make a cpoy of the connection information about key, and flatten it into a single dict by parsing out any feddAttrs. """ rv = None for i in info: if key == i.get('portal', "") or \ key in [e.get('element', "") \ for e in i.get('member', [])]: rv = i.copy() break else: return rv if 'fedAttr' in rv: for a in rv['fedAttr']: attr = a.get('attribute', "") val = a.get('value', "") if attr and attr not in rv: rv[attr] = val del rv['fedAttr'] return rv # XXX: un hardcode this def client_null(f, s): print >>f, "Service: %s" % s['name'] def client_seer_master(f, s): print >>f, 'PortalAlias: seer-master' def client_smb(f, s): print >>f, "Service: %s" % s['name'] smbshare = None smbuser = None smbproj = None for a in s.get('fedAttr', []): if a.get('attribute', '') == 'SMBSHARE': smbshare = a.get('value', None) elif a.get('attribute', '') == 'SMBUSER': smbuser = a.get('value', None) elif a.get('attribute', '') == 'SMBPROJ': smbproj = a.get('value', None) if all((smbshare, smbuser, smbproj)): print >>f, "SMBshare: %s" % smbshare print >>f, "ProjectUser: %s" % smbuser print >>f, "ProjectName: %s" % smbproj def client_hide_hosts(f, s): for a in s.get('fedAttr', [ ]): if a.get('attribute', "") == 'hosts': print >>f, 'Hide: %s' % a.get('value', "") client_service_out = { 'SMB': client_smb, 'tmcd': client_null, 'seer': client_null, 'userconfig': client_null, 'project_export': client_null, 'seer_master': client_seer_master, 'hide_hosts': client_hide_hosts, } def client_seer_master_export(f, s): print >>f, "AddedNode: seer-master" def client_seer_local_export(f, s): print >>f, "AddedNode: control" client_export_service_out = { 'seer_master': client_seer_master_export, 'local_seer_control': client_seer_local_export, } def server_port(f, s): p = urlparse(s.get('server', 'http://localhost')) print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) def server_null(f,s): pass def server_seer(f, s): print >>f, 'seer: true' server_service_out = { 'SMB': server_port, 'tmcd': server_port, 'userconfig': server_null, 'project_export': server_null, 'seer': server_seer, 'seer_master': server_port, 'hide_hosts': server_null, } # XXX: end un hardcode this seer_out = False client_out = False control_gw = None for e in [ e for e in topo.elements \ if isinstance(e, topdl.Computer) and e.get_attribute('portal')]: myname = e.name type = e.get_attribute('portal_type') info = conninfo_to_dict(myname, connInfo) if not info: raise service_error(service_error.req, "No connectivity info for %s" % myname) # Translate to physical name (ProtoGENI doesn't have DNS) physname = nodes.get(myname, { }).get('hostname', None) peer = info.get('peer', "") ldomain = self.domain ssh_port = info.get('ssh_port', 22) # Collect this for the client.conf file if 'masterexperiment' in info: mproj, meid = info['masterexperiment'].split("/", 1) active = info.get('active', 'False') if type in ('control', 'both'): testbed = e.get_attribute('testbed') control_gw = myname cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower()) tunnelconfig = self.tunnel_config try: f = open(cfn, "w") if active == 'True': print >>f, "active: True" print >>f, "ssh_port: %s" % ssh_port if type in ('control', 'both'): for s in [s for s in services \ if s.get('name', "") in self.imports]: server_service_out[s['name']](f, s) if tunnelconfig: print >>f, "tunnelip: %s" % tunnelconfig print >>f, "peer: %s" % peer.lower() print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \ pubkey_base print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \ secretkey_base f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Can't write protal config %s: %s" % (cfn, e)) # Done with portals, write the client config file. try: f = open("%s/client.conf" % tmpdir, "w") if control_gw: print >>f, "ControlGateway: %s" % physname.lower() for s in services: if s.get('name',"") in self.imports and \ s.get('visibility','') == 'import': client_service_out[s['name']](f, s) if s.get('name', '') in self.exports and \ s.get('visibility', '') == 'export' and \ s['name'] in client_export_service_out: client_export_service_out[s['name']](f, s) # Seer uses this. if mproj and meid: print >>f, "ExperimentID: %s/%s" % (mproj, meid) f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Cannot write client.conf: %s" %s) def export_store_info(self, cf, nodes, ssh_port, connInfo): """ For the export requests in the connection info, install the peer names at the experiment controller via SetValue calls. """ for c in connInfo: for p in [ p for p in c.get('parameter', []) \ if p.get('type', '') == 'output']: if p.get('name', '') == 'peer': k = p.get('key', None) surl = p.get('store', None) if surl and k and k.index('/') != -1: if self.create_debug: req = { 'name': k, 'value': 'debug' } self.call_SetValue(surl, req, cf) else: n = nodes.get(k[k.index('/')+1:], { }) value = n.get('hostname', None) if value: req = { 'name': k, 'value': value } self.call_SetValue(surl, req, cf) else: self.log.error("No hostname for %s" % \ k[k.index('/'):]) else: self.log.error("Bad export request: %s" % p) elif p.get('name', '') == 'ssh_port': k = p.get('key', None) surl = p.get('store', None) if surl and k: req = { 'name': k, 'value': ssh_port } self.call_SetValue(surl, req, cf) else: self.log.error("Bad export request: %s" % p) else: self.log.error("Unknown export parameter: %s" % \ p.get('name')) continue def write_node_config_script(self, elem, node, user, pubkey, secretkey, stagingdir, tmpdir): """ Write out the configuration script that is to run on the node represented by elem in the topology. This is called once per node to configure. """ # These little functions/functors just make things more readable. Each # one encapsulates a small task of copying software files or installing # them. class stage_file_type: """ Write code copying file sfrom the staging host to the host on which this will run. """ def __init__(self, user, host, stagingdir): self.user = user self.host = host self.stagingdir = stagingdir self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \ "'ForwardX11 no' -o 'StrictHostKeyChecking no' " def __call__(self, script, file, dest="."): # If the file is a full pathname, do not use stagingdir if file.find('/') == -1: file = "%s/%s" % (self.stagingdir, file) print >>script, "%s %s@%s:%s %s" % \ (self.scp, self.user, self.host, file, dest) def install_tar(script, loc, base): """ Print code to script to install a tarfile in loc. """ tar = "/bin/tar" mkdir="/bin/mkdir" print >>script, "%s -p %s" % (mkdir, loc) print >>script, "%s -C %s -xzf %s" % (tar, loc, base) def install_rpm(script, base): """ Print code to script to install an rpm """ rpm = "/bin/rpm" print >>script, "%s --install %s" % (rpm, base) ifconfig = "/sbin/ifconfig" findif = '/usr/local/etc/emulab/findif' stage_file = stage_file_type(user, self.staging_host, stagingdir) pname = node.get('hostname', None) fed_dir = "/usr/local/federation" fed_etc_dir = "%s/etc" % fed_dir fed_bin_dir = "%s/bin" % fed_dir fed_lib_dir = "%s/lib" % fed_dir if pname: sfile = "%s/%s.startup" % (tmpdir, pname) script = open(sfile, "w") # Reset the interfaces to the ones in the topo file for i in [ i for i in elem.interface \ if not i.get_attribute('portal')]: if 'interfaces' in node: pinf = node['interfaces'].get(i.name, None) else: pinf = None if 'mac' in node: pmac = node['mac'].get(i.name, None) else: pmac = None addr = i.get_attribute('ip4_address') netmask = i.get_attribute('ip4_netmask') or '255.255.255.0' # The interface names in manifests are not to be trusted, so we # find the interface to configure using the local node's script # to match mac address to interface name. if pinf and addr and pmac: print >>script, '# %s' % pinf print >>script, \ "%s `%s %s` %s netmask %s" % \ (ifconfig, findif, pmac, addr, netmask) else: self.log.error("Missing interface or address for %s" \ % i.name) for l, f in self.federation_software: base = os.path.basename(f) stage_file(script, base) if l: install_tar(script, l, base) else: install_rpm(script, base) for s in elem.software: s_base = s.location.rpartition('/')[2] stage_file(script, s_base) if s.install: install_tar(script, s.install, s_base) else: install_rpm(script, s_base) for f in ('hosts', pubkey, secretkey, 'client.conf', 'userconf'): stage_file(script, f, fed_etc_dir) if self.sshd: stage_file(script, self.sshd, fed_bin_dir) if self.sshd_config: stage_file(script, self.sshd_config, fed_etc_dir) # Look in tmpdir to get the names. They've all been copied # into the (remote) staging dir if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK): stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir) # Done with staging, remove the identity used to stage print >>script, "#/bin/rm .ssh/id_rsa" # Start commands if elem.get_attribute('portal') and self.portal_startcommand: # Install portal software for l, f in self.portal_software: base = os.path.basename(f) stage_file(script, base) if l: install_tar(script, l, base) else: install_rpm(script, base) # Portals never have a user-specified start command print >>script, self.portal_startcommand elif self.node_startcommand: # XXX: debug print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user) # XXX: debug if elem.get_attribute('startup'): print >>script, "%s \\$USER '%s'" % \ (self.node_startcommand, elem.get_attribute('startup')) else: print >>script, self.node_startcommand script.close() return sfile, pname else: return None, None def configure_nodes(self, segment_commands, topo, nodes, user, pubkey, secretkey, stagingdir, tmpdir): """ For each node in the topology, generate a script file that copies software onto it and installs it in the proper places and then runs the startup command (including the federation commands. """ for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]: vname = e.name sfile, pname = self.write_node_config_script(e, nodes.get(vname, { }), user, pubkey, secretkey, stagingdir, tmpdir) if sfile: if not segment_commands.scp_file(sfile, user, pname): self.log.error("Could not copy script to %s" % pname) else: self.log.error("Unmapped node: %s" % vname) def start_node(self, user, host, node, segment_commands): """ Copy an identity to a node for the configuration script to be able to import data and then run the startup script remotely. """ # Place an identity on the node so that the copying can succeed segment_commands.scp_file( segment_commands.ssh_privkey_file, user, node, ".ssh/id_rsa") segment_commands.ssh_cmd(user, node, "sudo /bin/sh ./%s.startup &" % node) def start_nodes(self, user, host, nodes, segment_commands): """ Start a thread to initialize each node and wait for them to complete. Each thread runs start_node. """ threads = [ ] for n in nodes: t = Thread(target=self.start_node, args=(user, host, n, segment_commands)) t.start() threads.append(t) done = [not t.isAlive() for t in threads] while not all(done): self.log.info("Waiting for threads %s" % done) time.sleep(10) done = [not t.isAlive() for t in threads] def set_up_staging_filespace(self, segment_commands, user, host, stagingdir): """ Set up teh staging area on the staging machine. To reduce the number of ssh commands, we compose a script and execute it remotely. """ self.log.info("[start_segment]: creating script file") try: sf, scriptname = tempfile.mkstemp() scriptfile = os.fdopen(sf, 'w') except EnvironmentError: return False scriptbase = os.path.basename(scriptname) # Script the filesystem changes print >>scriptfile, "/bin/rm -rf %s" % stagingdir print >>scriptfile, 'mkdir -p %s' % stagingdir print >>scriptfile, "rm -f %s" % scriptbase scriptfile.close() # Move the script to the remote machine # XXX: could collide tempfile names on the remote host if segment_commands.scp_file(scriptname, user, host, scriptbase): os.remove(scriptname) else: return False # Execute the script (and the script's last line deletes it) if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase): return False def initialize_protogeni_context(self, segment_commands, certfile, certpw): """ Protogeni interactions take a context and a protogeni certificate. This establishes both for later calls and returns them. """ if os.access(certfile, os.R_OK): ctxt = fedd_ssl_context(my_cert=certfile, password=certpw) else: self.log.error("[start_segment]: Cannot read certfile: %s" % \ certfile) return None, None try: gcred = segment_commands.slice_authority_call('GetCredential', {}, ctxt) except segment_commands.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s" % e) return ctxt, gcred def get_free_slicename(self, segment_commands, user, gcred, ctxt): """ Find a usable slice name by trying random ones until there's no collision. """ def random_slicename(user): """ Return a random slicename by appending 5 letters to the username. """ slicename = user for i in range(0,5): slicename += random.choice(string.ascii_letters) return slicename while True: slicename = random_slicename(user) try: param = { 'credential': gcred, 'hrn': slicename, 'type': 'Slice' } if not self.create_debug: segment_commands.slice_authority_call('Resolve', param, ctxt) else: raise segment_commands.ProtoGENIError(0,0,'Debug') except segment_commands.ProtoGENIError, e: print e break return slicename def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt): """ Create the slice and allocate resources. If any of this stuff fails, the allocations will time out on PG in short order, so we just raise the service_error. Return the slice and sliver credentials as well as the manifest. """ try: param = { 'credential': gcred, 'hrn': slicename, 'type': 'Slice' } slice_cred = segment_commands.slice_authority_call('Register', param, ctxt) # Resolve the slice to get the URN that PG has assigned it. param = { 'credential': gcred, 'type': 'Slice', 'hrn': slicename } data = segment_commands.slice_authority_call('Resolve', param, ctxt) if 'urn' in data: slice_urn = data['urn'] else: raise service_error(service_error.federant, "No URN returned for slice %s" % slicename) if 'creator_urn' in data: creator_urn = data['creator_urn'] else: raise service_error(service_error.federant, "No creator URN returned for slice %s" % slicename) # Populate the ssh keys (let PG format them) param = { 'credential': gcred, } keys = segment_commands.slice_authority_call('GetKeys', param, ctxt) # Create a Sliver param = { 'credentials': [ slice_cred ], 'rspec': rspec, 'users': [ { 'urn': creator_urn, 'keys': keys, }, ], 'slice_urn': slice_urn, } rv = segment_commands.component_manager_call( 'CreateSliver', param, ctxt) # the GENIAPI AM just hands back the manifest, bit the ProtoGENI # API hands back a sliver credential. This just finds the format # of choice. if isinstance(rv, tuple): manifest = rv[1] else: manifest = rv except segment_commands.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s %s" % (e.code, e)) return (slice_urn, slice_cred, manifest, rspec) def wait_for_slice(self, segment_commands, slice_cred, slice_urn, ctxt, timeout=None): """ Wait for the given slice to finish its startup. Return the final status. """ completed_states = ('failed', 'ready') status = 'changing' if timeout is not None: end = time.time() + timeout try: while status not in completed_states: param = { 'credentials': [ slice_cred ], 'slice_urn': slice_urn, } r = segment_commands.component_manager_call( 'SliverStatus', param, ctxt) # GENIAPI uses geni_status as the key, so check for both status = r.get('status', r.get('geni_status','changing')) if status not in completed_states: if timeout is not None and time.time() > end: return 'timeout' time.sleep(30) except segment_commands.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s %s" % (e.code, e)) return status def delete_slice(self, segment_commands, slice_cred, slice_urn, ctxt): """ Delete the slice resources. An error from the service is ignores, because the soft state will go away anyway. """ try: param = { 'credentials': [ slice_cred, ], 'slice_urn': slice_urn, } segment_commands.component_manager_call('DeleteSlice', param, ctxt) except segment_commands.ProtoGENIError, e: self.log.warn("ProtoGENI: %s" % e) def start_segment(self, segment_commands, aid, user, rspec, pubkey, secretkey, ename, stagingdir, tmpdir, certfile, certpw, export_certfile, topo, connInfo, services, timeout=0): """ Start a sub-experiment on a federant. Get the current state, modify or create as appropriate, ship data and configs and start the experiment. There are small ordering differences based on the initial state of the sub-experiment. """ # Local software dir lsoftdir = "%s/software" % tmpdir host = self.staging_host ctxt, gcred = self.initialize_protogeni_context(segment_commands, certfile, certpw) if not ctxt: return False, {} self.set_up_staging_filespace(segment_commands, user, host, stagingdir) slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt) self.log.info("Creating %s" % slicename) slice_urn, slice_cred, manifest, rpsec = self.allocate_slice( segment_commands, slicename, rspec, gcred, ctxt) # With manifest in hand, we can export the portal node names. if self.create_debug: nodes = self.fake_manifest(topo) else: nodes = self.manifest_to_dict(manifest) self.export_store_info(export_certfile, nodes, self.ssh_port, connInfo) self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, ename, connInfo, services, nodes) # Copy software to the staging machine (done after generation to copy # those, too) for d in (tmpdir, lsoftdir): if os.path.isdir(d): for f in os.listdir(d): if not os.path.isdir("%s/%s" % (d, f)): if not segment_commands.scp_file("%s/%s" % (d, f), user, host, "%s/%s" % (stagingdir, f)): self.log.error("Scp failed") return False, {} # Now we wait for the nodes to start on PG status = self.wait_for_slice(segment_commands, slice_cred, slice_urn, ctxt, timeout=300) if status == 'failed': self.log.error('Sliver failed to start on ProtoGENI') self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt) return False, {} elif status == 'timeout': self.log.error('Sliver failed to start on ProtoGENI (timeout)') self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt) return False, {} else: # All good: save ProtoGENI info in shared state self.state_lock.acquire() self.allocation[aid]['slice_urn'] = slice_urn self.allocation[aid]['slice_name'] = slicename self.allocation[aid]['slice_credential'] = slice_cred self.allocation[aid]['manifest'] = manifest self.allocation[aid]['rspec'] = rspec self.allocation[aid]['certfile'] = certfile self.allocation[aid]['certpw'] = certpw self.write_state() self.state_lock.release() # Now we have configuration to do for ProtoGENI self.configure_nodes(segment_commands, topo, nodes, user, pubkey, secretkey, stagingdir, tmpdir) self.start_nodes(user, self.staging_host, [ n.get('hostname', None) for n in nodes.values()], segment_commands) # Everything has gone OK. return True, dict([(k, n.get('hostname', None)) \ for k, n in nodes.items()]) def generate_rspec(self, topo, softdir, connInfo): # Force a useful image. Without picking this the nodes can get # different images and there is great pain. def image_filter(e): if isinstance(e, topdl.Computer): return '' else: return "" # Main line of generate t = topo.clone() starts = { } # Weed out the things we aren't going to instantiate: Segments, portal # substrates, and portal interfaces. (The copy in the for loop allows # us to delete from e.elements in side the for loop). While we're # touching all the elements, we also adjust paths from the original # testbed to local testbed paths and put the federation commands and # startcommands into a dict so we can start them manually later. # ProtoGENI requires setup before the federation commands run, so we # run them by hand after we've seeded configurations. for e in [e for e in t.elements]: if isinstance(e, topdl.Segment): t.elements.remove(e) # Fix software paths for s in getattr(e, 'software', []): s.location = re.sub("^.*/", softdir, s.location) if isinstance(e, topdl.Computer): if e.get_attribute('portal') and self.portal_startcommand: # Portals never have a user-specified start command starts[e.name] = self.portal_startcommand elif self.node_startcommand: if e.get_attribute('startup'): starts[e.name] = "%s \\$USER '%s'" % \ (self.node_startcommand, e.get_attribute('startup')) e.remove_attribute('startup') else: starts[e.name] = self.node_startcommand # Remove portal interfaces e.interface = [i for i in e.interface \ if not i.get_attribute('portal')] t.substrates = [ s.clone() for s in t.substrates ] t.incorporate_elements() # Customize the rspec output to use the image we like filters = [ image_filter ] # Convert to rspec and return it exp_rspec = topdl.topology_to_rspec(t, filters) return exp_rspec def retrieve_software(self, topo, certfile, softdir): """ Collect the software that nodes in the topology need loaded and stage it locally. This implies retrieving it from the experiment_controller and placing it into softdir. Certfile is used to prove that this node has access to that data (it's the allocation/segment fedid). Finally local portal and federation software is also copied to the same staging directory for simplicity - all software needed for experiment creation is in softdir. """ sw = set() for e in topo.elements: for s in getattr(e, 'software', []): sw.add(s.location) os.mkdir(softdir) for s in sw: self.log.debug("Retrieving %s" % s) try: get_url(s, certfile, softdir) except: t, v, st = sys.exc_info() raise service_error(service_error.internal, "Error retrieving %s: %s" % (s, v)) # Copy local portal node software to the tempdir for s in (self.portal_software, self.federation_software): for l, f in s: base = os.path.basename(f) copy_file(f, "%s/%s" % (softdir, base)) def initialize_experiment_info(self, attrs, aid, certfile, tmpdir): """ Gather common configuration files, retrieve or create an experiment name and project name, and return the ssh_key filenames. Create an allocation log bound to the state log variable as well. """ configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey')) ename = None pubkey_base = None secretkey_base = None alloc_log = None for a in attrs: if a['attribute'] in configs: try: self.log.debug("Retrieving %s" % a['value']) get_url(a['value'], certfile, tmpdir) except: t, v, st = sys.exc_info() raise service_error(service_error.internal, "Error retrieving %s: %s" % (a.get('value', ""), v)) if a['attribute'] == 'ssh_pubkey': pubkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'ssh_secretkey': secretkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'experiment_name': ename = a['value'] if not ename: ename = "" for i in range(0,5): ename += random.choice(string.ascii_letters) self.log.warn("No experiment name: picked one randomly: %s" \ % ename) self.state_lock.acquire() if self.allocation.has_key(aid): cf, user, ssh_key, cpw = self.allocation[aid]['credentials'] self.allocation[aid]['experiment'] = ename self.allocation[aid]['log'] = [ ] # Create a logger that logs to the experiment's state object as # well as to the main log file. alloc_log = logging.getLogger('fedd.access.%s' % ename) h = logging.StreamHandler( list_log.list_log(self.allocation[aid]['log'])) # XXX: there should be a global one of these rather than # repeating the code. h.setFormatter(logging.Formatter( "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) alloc_log.addHandler(h) self.write_state() else: self.log.error("No allocation for %s!?" % aid) self.state_lock.release() return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, cpw, alloc_log) def finalize_experiment(self, topo, nodes, aid, alloc_id, proof): # Copy the assigned names into the return topology rvtopo = topo.clone() embedding = [ ] for k, n in nodes.items(): embedding.append({ 'toponame': k, 'physname': [n ], }) # Grab the log (this is some anal locking, but better safe than # sorry) self.state_lock.acquire() logv = "".join(self.allocation[aid]['log']) # It's possible that the StartSegment call gets retried (!). # if the 'started' key is in the allocation, we'll return it rather # than redo the setup. self.allocation[aid]['started'] = { 'allocID': alloc_id, 'allocationLog': logv, 'segmentdescription': { 'topdldescription': rvtopo.to_dict() }, 'embedding': embedding, 'proof': proof.to_dict(), } retval = copy.deepcopy(self.allocation[aid]['started']) self.write_state() self.state_lock.release() return retval def StartSegment(self, req, fid): err = None # Any service_error generated after tmpdir is created rv = None # Return value from segment creation try: req = req['StartSegmentRequestBody'] topref = req['segmentdescription']['topdldescription'] except KeyError: raise service_error(service_error.req, "Badly formed request") connInfo = req.get('connection', []) services = req.get('service', []) auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) access_ok, proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) if not access_ok: raise service_error(service_error.access, "Access denied") else: # See if this is a replay of an earlier succeeded StartSegment - # sometimes SSL kills 'em. If so, replay the response rather than # redoing the allocation. self.state_lock.acquire() retval = self.allocation[aid].get('started', None) self.state_lock.release() if retval: self.log.warning("Duplicate StartSegment for %s: " % aid + \ "replaying response") return retval if topref: topo = topdl.Topology(**topref) else: raise service_error(service_error.req, "Request missing segmentdescription'") certfile = "%s/%s.pem" % (self.certdir, auth_attr) try: tmpdir = tempfile.mkdtemp(prefix="access-") softdir = "%s/software" % tmpdir except EnvironmentError: raise service_error(service_error.internal, "Cannot create tmp dir") # Try block alllows us to clean up temporary files. try: self.retrieve_software(topo, certfile, softdir) self.configure_userconf(services, tmpdir) ename, pubkey_base, secretkey_base, cf, user, ssh_key, \ cpw, alloc_log = self.initialize_experiment_info(attrs, aid, certfile, tmpdir) self.import_store_info(certfile, connInfo) rspec = self.generate_rspec(topo, "%s/%s/" \ % (self.staging_dir, ename), connInfo) segment_commands = self.api_proxy(keyfile=ssh_key, debug=self.create_debug, log=alloc_log, ch_url = self.ch_url, sa_url=self.sa_url, cm_url=self.cm_url) rv, nodes = self.start_segment(segment_commands, aid, user, rspec, pubkey_base, secretkey_base, ename, "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw, certfile, topo, connInfo, services) except EnvironmentError, e: err = service_error(service_error.internal, "%s" % e) except service_error, e: err = e except: t, v, st = sys.exc_info() err = service_error(service_error.internal, "%s: %s" % \ (v, traceback.extract_tb(st))) # Walk up tmpdir, deleting as we go if self.cleanup: self.remove_dirs(tmpdir) else: self.log.debug("[StartSegment]: not removing %s" % tmpdir) if rv: return self.finalize_experiment(topo, nodes, aid, req['allocID'], proof) elif err: raise service_error(service_error.federant, "Swapin failed: %s" % err) else: raise service_error(service_error.federant, "Swapin failed") def stop_segment(self, segment_commands, user, stagingdir, slice_cred, slice_urn, certfile, certpw): """ Stop a sub experiment by calling swapexp on the federant """ host = self.staging_host rv = False try: # Clean out tar files: we've gone over quota in the past if stagingdir: segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir) if slice_cred: self.log.error('Removing Sliver on ProtoGENI') ctxt = fedd_ssl_context(my_cert=certfile, password=certpw) self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt) return True except self.ssh_cmd_timeout: rv = False return rv def TerminateSegment(self, req, fid): try: req = req['TerminateSegmentRequestBody'] except KeyError: raise service_error(service_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) access_ok, proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) if not access_ok: raise service_error(service_error.access, "Access denied") self.state_lock.acquire() if self.allocation.has_key(aid): cf, user, ssh_key, cpw = self.allocation[aid]['credentials'] slice_cred = self.allocation[aid].get('slice_credential', None) slice_urn = self.allocation[aid].get('slice_urn', None) ename = self.allocation[aid].get('experiment', None) else: cf, user, ssh_key, cpw = (None, None, None, None) slice_cred = None slice_urn = None ename = None self.state_lock.release() if ename: staging = "%s/%s" % ( self.staging_dir, ename) else: self.log.warn("Can't find experiment name for %s" % aid) staging = None segment_commands = self.api_proxy(keyfile=ssh_key, debug=self.create_debug, ch_url = self.ch_url, sa_url=self.sa_url, cm_url=self.cm_url) self.stop_segment(segment_commands, user, staging, slice_cred, slice_urn, cf, cpw) return { 'allocID': req['allocID'], 'proof': proof.to_dict() } def renew_segment(self, segment_commands, name, scred, slice_urn, interval, certfile, certpw): """ Linear code through the segment renewal calls. """ ctxt = fedd_ssl_context(my_cert=certfile, password=certpw) try: expiration = time.strftime("%Y%m%dT%H:%M:%S", time.gmtime(time.time() + interval)) cred = segment_commands.slice_authority_call('GetCredential', {}, ctxt) param = { 'credential': scred, 'expiration': expiration } r = segment_commands.slice_authority_call('RenewSlice', param, ctxt) param = { 'credential': cred, 'urn': slice_urn, 'type': 'Slice', } new_scred = segment_commands.slice_authority_call('GetCredential', param, ctxt) except segment_commands.ProtoGENIError, e: self.log.error("Failed to extend slice %s: %s" % (name, e)) return None try: param = { 'credentials': [new_scred,], 'slice_urn': slice_urn, } r = segment_commands.component_manager_call('RenewSlice', param, ctxt) except segment_commands.ProtoGENIError, e: self.log.warn("Failed to renew sliver for %s: %s" % (name, e)) return new_scred def RenewSlices(self): self.log.info("Scanning for slices to renew") self.state_lock.acquire() aids = self.allocation.keys() self.state_lock.release() for aid in aids: self.state_lock.acquire() if self.allocation.has_key(aid): name = self.allocation[aid].get('slice_name', None) scred = self.allocation[aid].get('slice_credential', None) slice_urn = self.allocation[aid].get('slice_urn', None) cf, user, ssh_key, cpw = self.allocation[aid]['credentials'] else: name = None scred = None self.state_lock.release() if not os.access(cf, os.R_OK): self.log.error( "[RenewSlices] cred.file %s unreadable, ignoring" % cf) continue # There's a ProtoGENI slice associated with the segment; renew it. if name and scred and slice_urn: segment_commands = self.api_proxy(log=self.log, debug=self.create_debug, keyfile=ssh_key, cm_url = self.cm_url, sa_url = self.sa_url, ch_url = self.ch_url) new_scred = self.renew_segment(segment_commands, name, scred, slice_urn, self.renewal_interval, cf, cpw) if new_scred: self.log.info("Slice %s renewed until %s GMT" % \ (name, time.asctime(time.gmtime(\ time.time()+self.renewal_interval)))) self.state_lock.acquire() if self.allocation.has_key(aid): self.allocation[aid]['slice_credential'] = new_scred self.write_state() self.state_lock.release() else: self.log.info("Failed to renew slice %s " % name) # Let's do this all again soon. (4 tries before the slices time out) t = Timer(self.renewal_interval/4, self.RenewSlices) t.start()