#!/usr/local/bin/python import os,sys import stat # for chmod constants import re import time import string import copy import pickle import logging import subprocess import random import traceback from threading import Thread, Timer, Lock from M2Crypto.SSL import SSLError from util import * from access_project import access_project from fedid import fedid, generate_fedid from authorizer import authorizer from service_error import service_error from remote_service import xmlrpc_handler, soap_handler, service_caller import httplib import tempfile from urlparse import urlparse from access import access_base from proxy_segment import proxy_segment import topdl import list_log # Make log messages disappear if noone configures a fedd logger class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.access") fl.addHandler(nullHandler()) class protogeni_proxy(proxy_segment): class ProtoGENIError(Exception): def __init__(self, op, code, output): Exception.__init__(self, output) self.op = op self.code = code self.output = output def __init__(self, log=None, keyfile=None, debug=False, ch_url=None, sa_url=None, cm_url=None): proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug) self.ProtoGENIError = protogeni_proxy.ProtoGENIError self.ch_url = ch_url self.sa_url = sa_url self.cm_url = cm_url self.call_SetValue = service_caller('SetValue') self.debug_fail = ['Resolve'] self.debug_response = { 'RedeemTicket': ("XML blob1", "XML blob2"), 'SliceStatus': { 'status': 'ready' }, } def pg_call(self, url, method, params, context): max_retries = 5 retries = 0 s = service_caller(method, request_body_name="", strict=False) self.log.debug("Calling %s %s" % (url, method)) if not self.debug: while retries < max_retries: r = s.call_xmlrpc_service(url, params, context=context) code = r.get('code', -1) if code == 0: # Success leaves the loop here return r.get('value', None) elif code == 14 and retries +1 < max_retries: # Busy resource retries+= 1 self.log.info("Resource busy, retrying in 30 secs") time.sleep(30) else: # NB: out of retries falls through to here raise self.ProtoGENIError(op=method, code=r.get('code', 'unknown'), output=r.get('output', 'No output')) else: if method in self.debug_fail: raise self.ProtoGENIError(op=method, code='unknown', output='No output') elif self.debug_response.has_key(method): return self.debug_response[method] else: return "%s XML blob" % method class access(access_base): """ The implementation of access control based on mapping users to projects. Users can be mapped to existing projects or have projects created dynamically. This implements both direct requests and proxies. """ def __init__(self, config=None, auth=None): """ Initializer. Pulls parameters out of the ConfigParser's access section. """ access_base.__init__(self, config, auth) self.domain = config.get("access", "domain") self.userconfdir = config.get("access","userconfdir") self.userconfcmd = config.get("access","userconfcmd") self.userconfurl = config.get("access","userconfurl") self.federation_software = config.get("access", "federation_software") self.portal_software = config.get("access", "portal_software") self.ssh_port = config.get("access","ssh_port") or "22" self.sshd = config.get("access","sshd") self.sshd_config = config.get("access", "sshd_config") self.access_type = config.get("access", "type") self.staging_dir = config.get("access", "staging_dir") or "/tmp" self.staging_host = config.get("access", "staging_host") \ or "ops.emulab.net" self.local_seer_software = config.get("access", "local_seer_software") self.local_seer_image = config.get("access", "local_seer_image") self.local_seer_start = config.get("access", "local_seer_start") self.dragon_endpoint = config.get("access", "dragon") self.dragon_vlans = config.get("access", "dragon_vlans") self.deter_internal = config.get("access", "deter_internal") self.tunnel_config = config.getboolean("access", "tunnel_config") self.portal_command = config.get("access", "portal_command") self.portal_image = config.get("access", "portal_image") self.portal_type = config.get("access", "portal_type") or "pc" self.portal_startcommand = config.get("access", "portal_startcommand") self.node_startcommand = config.get("access", "node_startcommand") self.federation_software = self.software_list(self.federation_software) self.portal_software = self.software_list(self.portal_software) self.local_seer_software = self.software_list(self.local_seer_software) self.renewal_interval = config.get("access", "renewal") or (3 * 60 ) self.renewal_interval = int(self.renewal_interval) * 60 self.ch_url = config.get("access", "ch_url") self.sa_url = config.get("access", "sa_url") self.cm_url = config.get("access", "cm_url") self.restricted = [ ] # read_state in the base_class self.state_lock.acquire() for a in ('allocation', 'projects', 'keys', 'types'): if a not in self.state: self.state[a] = { } self.allocation = self.state['allocation'] self.projects = self.state['projects'] self.keys = self.state['keys'] self.types = self.state['types'] # Add the ownership attributes to the authorizer. Note that the # indices of the allocation dict are strings, but the attributes are # fedids, so there is a conversion. for k in self.state.get('allocation', {}).keys(): for o in self.state['allocation'][k].get('owners', []): self.auth.set_attribute(o, fedid(hexstr=k)) self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k)) self.state_lock.release() self.log = logging.getLogger("fedd.access") set_log_level(config, "access", self.log) self.access = { } if config.has_option("access", "accessdb"): self.read_access(config.get("access", "accessdb"), access_obj=self.make_access_info) self.lookup_access = self.lookup_access_base self.call_SetValue = service_caller('SetValue') self.call_GetValue = service_caller('GetValue') self.exports = { 'local_seer_control': self.export_local_seer, 'seer_master': self.export_seer_master, 'hide_hosts': self.export_hide_hosts, } if not self.local_seer_image or not self.local_seer_software or \ not self.local_seer_start: if 'local_seer_control' in self.exports: del self.exports['local_seer_control'] if not self.local_seer_image or not self.local_seer_software or \ not self.seer_master_start: if 'seer_master' in self.exports: del self.exports['seer_master'] self.RenewSlices() self.soap_services = {\ 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 'StartSegment': soap_handler("StartSegment", self.StartSegment), 'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment), } self.xmlrpc_services = {\ 'RequestAccess': xmlrpc_handler('RequestAccess', self.RequestAccess), 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', self.ReleaseAccess), 'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment), 'TerminateSegment': xmlrpc_handler('TerminateSegment', self.TerminateSegment), } @staticmethod def make_access_info(s): """ Split a string of the form (id, id, id, id) ito its constituent tuples and return them as a tuple. Use to import access info from the access_db. """ ss = s.strip() if ss.startswith('(') and ss.endswith(')'): l = [ s.strip() for s in ss[1:-1].split(",")] if len(l) == 4: return tuple(l) else: raise self.parse_error( "Exactly 4 elements in access info required") else: raise self.parse_error("Expecting parenthezied values") def get_handler(self, path, fid): self.log.info("Get handler %s %s" % (path, fid)) if self.auth.check_attribute(fid, path) and self.userconfdir: return ("%s/%s" % (self.userconfdir, path), "application/binary") else: return (None, None) def build_access_response(self, alloc_id, services): """ Create the SOAP response. Build the dictionary description of the response and use fedd_utils.pack_soap to create the soap message. ap is the allocate project message returned from a remote project allocation (even if that allocation was done locally). """ # Because alloc_id is already a fedd_services_types.IDType_Holder, # there's no need to repack it msg = { 'allocID': alloc_id, 'fedAttr': [ { 'attribute': 'domain', 'value': self.domain } , ] } if self.dragon_endpoint: msg['fedAttr'].append({'attribute': 'dragon', 'value': self.dragon_endpoint}) if self.deter_internal: msg['fedAttr'].append({'attribute': 'deter_internal', 'value': self.deter_internal}) #XXX: ?? if self.dragon_vlans: msg['fedAttr'].append({'attribute': 'vlans', 'value': self.dragon_vlans}) if services: msg['service'] = services return msg def RequestAccess(self, req, fid): """ Handle the access request. """ # The dance to get into the request body if req.has_key('RequestAccessRequestBody'): req = req['RequestAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") if req.has_key('destinationTestbed'): dt = unpack_id(req['destinationTestbed']) # Request for this fedd found, match = self.lookup_access(req, fid) services, svc_state = self.export_services(req.get('service',[]), None, None) # keep track of what's been added allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) aid = unicode(allocID) self.state_lock.acquire() self.allocation[aid] = { } # The protoGENI certificate self.allocation[aid]['credentials'] = found # The list of owner FIDs self.allocation[aid]['owners'] = [ fid ] self.write_state() self.state_lock.release() self.auth.set_attribute(fid, allocID) self.auth.set_attribute(allocID, allocID) try: f = open("%s/%s.pem" % (self.certdir, aid), "w") print >>f, alloc_cert f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Can't open %s/%s : %s" % (self.certdir, aid, e)) return self.build_access_response({ 'fedid': allocID }, None) def ReleaseAccess(self, req, fid): # The dance to get into the request body if req.has_key('ReleaseAccessRequestBody'): req = req['ReleaseAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") # Local request try: if req['allocID'].has_key('localname'): auth_attr = aid = req['allocID']['localname'] elif req['allocID'].has_key('fedid'): aid = unicode(req['allocID']['fedid']) auth_attr = req['allocID']['fedid'] else: raise service_error(service_error.req, "Only localnames and fedids are understood") except KeyError: raise service_error(service_error.req, "Badly formed request") self.log.debug("[access] deallocation requested for %s", aid) if not self.auth.check_attribute(fid, auth_attr): self.log.debug("[access] deallocation denied for %s", aid) raise service_error(service_error.access, "Access Denied") self.state_lock.acquire() if self.allocation.has_key(aid): self.log.debug("Found allocation for %s" %aid) del self.allocation[aid] self.write_state() self.state_lock.release() # And remove the access cert cf = "%s/%s.pem" % (self.certdir, aid) self.log.debug("Removing %s" % cf) os.remove(cf) return { 'allocID': req['allocID'] } else: self.state_lock.release() raise service_error(service_error.req, "No such allocation") # Turn the manifest into a dict were each virtual nodename (i.e. the topdl # name) has an entry with the allocated machine in hostname and the # interfaces in 'interfaces'. I love having XML parser code lying around. def manifest_to_dict(self, manifest, ignore_debug=False): if self.create_debug and not ignore_debug: self.log.debug("Returning null manifest dict") return { } # The class allows us to keep a little state - the dict under # consteruction and the current entry in that dict for the interface # element code. class manifest_parser: def __init__(self): self.d = { } self.current_key=None # If the element is a node, create a dict entry for it. If it's an # interface inside a node, add an entry in the interfaces list with # the virtual name and component id. def start_element(self, name, attrs): if name == 'node': self.current_key = attrs.get('virtual_id',"") if self.current_key: self.d[self.current_key] = { 'hostname': attrs.get('hostname', None), 'interfaces': { } } elif name == 'interface' and self.current_key: self.d[self.current_key]['interfaces']\ [attrs.get('virtual_id','')] = \ attrs.get('component_id', None) # When a node is finished, clear current_key def end_element(self, name): if name == 'node': self.current_key = None node = { } mp = manifest_parser() p = xml.parsers.expat.ParserCreate() # These are bound to the class we just created p.StartElementHandler = mp.start_element p.EndElementHandler = mp.end_element p.Parse(manifest) # Make the node dict that the callers expect for k in mp.d: node[k] = mp.d.get('hostname', '') return mp.d def fake_manifest(self, topo): node = { } for i, e in enumerate([ e for e in topo.elements \ if isinstance(e, topdl.Computer)]): node[e.name] = { 'hostname': "node%03d" % i, 'interfaces': { } } for j, inf in enumerate(e.interface): node[e.name]['interfaces'][inf.name] = 'eth%d' % j return node def generate_portal_configs(self, topo, pubkey_base, secretkey_base, tmpdir, leid, connInfo, services, nodes): def conninfo_to_dict(key, info): """ Make a cpoy of the connection information about key, and flatten it into a single dict by parsing out any feddAttrs. """ rv = None for i in info: if key == i.get('portal', "") or \ key in [e.get('element', "") \ for e in i.get('member', [])]: rv = i.copy() break else: return rv if 'fedAttr' in rv: for a in rv['fedAttr']: attr = a.get('attribute', "") val = a.get('value', "") if attr and attr not in rv: rv[attr] = val del rv['fedAttr'] return rv # XXX: un hardcode this def client_null(f, s): print >>f, "Service: %s" % s['name'] def client_seer_master(f, s): print >>f, 'PortalAlias: seer-master' def client_smb(f, s): print >>f, "Service: %s" % s['name'] smbshare = None smbuser = None smbproj = None for a in s.get('fedAttr', []): if a.get('attribute', '') == 'SMBSHARE': smbshare = a.get('value', None) elif a.get('attribute', '') == 'SMBUSER': smbuser = a.get('value', None) elif a.get('attribute', '') == 'SMBPROJ': smbproj = a.get('value', None) if all((smbshare, smbuser, smbproj)): print >>f, "SMBshare: %s" % smbshare print >>f, "ProjectUser: %s" % smbuser print >>f, "ProjectName: %s" % smbproj def client_hide_hosts(f, s): for a in s.get('fedAttr', [ ]): if a.get('attribute', "") == 'hosts': print >>f, 'Hide: %s' % a.get('value', "") client_service_out = { 'SMB': client_smb, 'tmcd': client_null, 'seer': client_null, 'userconfig': client_null, 'project_export': client_null, 'seer_master': client_seer_master, 'hide_hosts': client_hide_hosts, } def client_seer_master_export(f, s): print >>f, "AddedNode: seer-master" def client_seer_local_export(f, s): print >>f, "AddedNode: control" client_export_service_out = { 'seer_master': client_seer_master_export, 'local_seer_control': client_seer_local_export, } def server_port(f, s): p = urlparse(s.get('server', 'http://localhost')) print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) def server_null(f,s): pass def server_seer(f, s): print >>f, 'seer: true' server_service_out = { 'SMB': server_port, 'tmcd': server_port, 'userconfig': server_null, 'project_export': server_null, 'seer': server_seer, 'seer_master': server_port, 'hide_hosts': server_null, } # XXX: end un hardcode this seer_out = False client_out = False for e in [ e for e in topo.elements \ if isinstance(e, topdl.Computer) and e.get_attribute('portal')]: myname = e.name type = e.get_attribute('portal_type') info = conninfo_to_dict(myname, connInfo) if not info: raise service_error(service_error.req, "No connectivity info for %s" % myname) # Translate to physical name (ProtoGENI doesn't have DNS) physname = nodes.get(myname, { }).get('hostname', None) peer = info.get('peer', "") ldomain = self.domain ssh_port = info.get('ssh_port', 22) # Collect this for the client.conf file if 'masterexperiment' in info: mproj, meid = info['masterexperiment'].split("/", 1) active = info.get('active', 'False') if type in ('control', 'both'): testbed = e.get_attribute('testbed') control_gw = myname cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower()) tunnelconfig = self.tunnel_config try: f = open(cfn, "w") if active == 'True': print >>f, "active: True" print >>f, "ssh_port: %s" % ssh_port if type in ('control', 'both'): for s in [s for s in services \ if s.get('name', "") in self.imports]: server_service_out[s['name']](f, s) if tunnelconfig: print >>f, "tunnelip: %s" % tunnelconfig print >>f, "peer: %s" % peer.lower() print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \ pubkey_base print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \ secretkey_base f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Can't write protal config %s: %s" % (cfn, e)) # Done with portals, write the client config file. try: f = open("%s/client.conf" % tmpdir, "w") if control_gw: print >>f, "ControlGateway: %s" % physname.lower() for s in services: if s.get('name',"") in self.imports and \ s.get('visibility','') == 'import': client_service_out[s['name']](f, s) if s.get('name', '') in self.exports and \ s.get('visibility', '') == 'export' and \ s['name'] in client_export_service_out: client_export_service_out[s['name']](f, s) # Seer uses this. if mproj and meid: print >>f, "ExperimentID: %s/%s" % (mproj, meid) f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Cannot write client.conf: %s" %s) def export_store_info(self, cf, nodes, ssh_port, connInfo): """ For the export requests in the connection info, install the peer names at the experiment controller via SetValue calls. """ for c in connInfo: for p in [ p for p in c.get('parameter', []) \ if p.get('type', '') == 'output']: if p.get('name', '') == 'peer': k = p.get('key', None) surl = p.get('store', None) if surl and k and k.index('/') != -1: if self.create_debug: req = { 'name': k, 'value': 'debug' } self.call_SetValue(surl, req, cf) else: n = nodes.get(k[k.index('/')+1:], { }) value = n.get('hostname', None) if value: req = { 'name': k, 'value': value } self.call_SetValue(surl, req, cf) else: self.log.error("No hostname for %s" % \ k[k.index('/'):]) else: self.log.error("Bad export request: %s" % p) elif p.get('name', '') == 'ssh_port': k = p.get('key', None) surl = p.get('store', None) if surl and k: req = { 'name': k, 'value': ssh_port } self.call_SetValue(surl, req, cf) else: self.log.error("Bad export request: %s" % p) else: self.log.error("Unknown export parameter: %s" % \ p.get('name')) continue def configure_nodes(self, segment_commands, topo, nodes, user, pubkey, secretkey, stagingdir, tmpdir): # These little functions/functors just make things more readable class stage_file_type: def __init__(self, user, host, stagingdir): self.user = user self.host = host self.stagingdir = stagingdir self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \ "'ForwardX11 no' -o 'StrictHostKeyChecking no' " def __call__(self, script, file, dest="."): # If the file is a full pathname, do not use stagingdir if file.find('/') == -1: file = "%s/%s" % (self.stagingdir, file) print >>script, "%s %s@%s:%s %s" % \ (self.scp, self.user, self.host, file, dest) def install_tar(script, loc, base): tar = "/bin/tar" mkdir="/bin/mkdir" print >>script, "%s -p %s" % (mkdir, loc) print >>script, "%s -C %s -xzf %s" % (tar, loc, base) def install_rpm(script, base): rpm = "/bin/rpm" print >>script, "%s --install %s" % (rpm, base) fed_dir = "/usr/local/federation" fed_etc_dir = "%s/etc" % fed_dir fed_bin_dir = "%s/bin" % fed_dir fed_lib_dir = "%s/lib" % fed_dir ifconfig = "/sbin/ifconfig" stage_file = stage_file_type(user, self.staging_host, stagingdir) for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]: vname = e.name node = nodes.get(vname, { }) pname = node.get('hostname', None) if pname: script = open("%s/%s.startup" %(tmpdir, pname), "w") # Reset the interfaces to the ones in the topo file for i in [ i for i in e.interface \ if not i.get_attribute('portal')]: pinf = node['interfaces'].get(i.name, None) addr = i.get_attribute('ip4_address') netmask = i.get_attribute('ip4_netmask') or '255.255.255.0' if pinf and addr: print >>script, \ "%s %s %s netmask %s" % \ (ifconfig, pinf, addr, netmask) else: self.log.error("Missing interface or address for %s" \ % i.name) for l, f in self.federation_software: base = os.path.basename(f) stage_file(script, base) if l: install_tar(script, l, base) else: install_rpm(script, base) for s in e.software: s_base = s.location.rpartition('/')[2] stage_file(script, s_base) if s.install: install_tar(script, s.install, s_base) else: install_rpm(script, s_base) for f in ('hosts', pubkey, secretkey, 'client.conf', 'userconf'): stage_file(script, f, fed_etc_dir) if self.sshd: stage_file(script, self.sshd, fed_bin_dir) if self.sshd_config: stage_file(script, self.sshd_config, fed_etc_dir) # Look in tmpdir to get the names. They've all been copied # into the (remote) staging dir if os.access("%s/%s.gw.conf" % (tmpdir, vname), os.R_OK): stage_file(script, "%s.gw.conf" % vname, fed_etc_dir) # Hackery dackery dock: the ProtoGENI python is really ancient. # A modern version (though packaged for Mandrake (remember # Mandrake? good times, good times)) should be in the # federation_software list, but we need to move rename is for # SEER. print >>script, "rm /usr/bin/python" print >>script, "ln /usr/bin/python2.4 /usr/bin/python" # Back to less hacky stuff # Start commands if e.get_attribute('portal') and self.portal_startcommand: # Install portal software for l, f in self.portal_software: base = os.path.basename(f) stage_file(script, base) if l: install_tar(script, l, base) else: install_rpm(script, base) # Portals never have a user-specified start command print >>script, self.portal_startcommand elif self.node_startcommand: # XXX: debug print >>script, "sudo perl -I%s %simport_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user) # XXX: debug if e.get_attribute('startup'): print >>script, "%s \\$USER '%s'" % \ (self.node_startcommand, e.get_attribute('startup')) else: print >>script, self.node_startcommand script.close() if not segment_commands.scp_file("%s/%s.startup" % (tmpdir, pname), user, pname): self.log.error("Could not copy script to %s" % pname) else: self.log.error("Unmapped node: %s" % vname) def start_node(self, user, host, node, segment_commands): # Place an identity on the node so that the copying can succeed segment_commands.ssh_cmd(user, host, "scp .ssh/id_rsa %s:.ssh" % node) segment_commands.ssh_cmd(user, node, "sudo /bin/sh ./%s.startup &" % node) def start_nodes(self, user, host, nodes, segment_commands): threads = [ ] for n in nodes: t = Thread(target=self.start_node, args=(user, host, n, segment_commands)) t.start() threads.append(t) done = [not t.isAlive() for t in threads] while not all(done): self.log.info("Waiting for threads %s" % done) time.sleep(10) done = [not t.isAlive() for t in threads] def start_segment(self, segment_commands, aid, user, rspec, pubkey, secretkey, ename, stagingdir, tmpdir, certfile, certpw, export_certfile, topo, connInfo, services, timeout=0): """ Start a sub-experiment on a federant. Get the current state, modify or create as appropriate, ship data and configs and start the experiment. There are small ordering differences based on the initial state of the sub-experiment. """ def random_slicename(user): slicename = user for i in range(0,5): slicename += random.choice(string.ascii_letters) return slicename host = self.staging_host if not os.access(certfile, os.R_OK): self.log.error("[start_segment]: Cannot read certfile: %s" % \ certfile) return False ctxt = fedd_ssl_context(my_cert=certfile, password=certpw) # Local software dir lsoftdir = "%s/software" % tmpdir # Open up a temporary file to contain a script for setting up the # filespace for the new experiment. self.log.info("[start_segment]: creating script file") try: sf, scriptname = tempfile.mkstemp() scriptfile = os.fdopen(sf, 'w') except EnvironmentError: return False scriptbase = os.path.basename(scriptname) # Script the filesystem changes print >>scriptfile, "/bin/rm -rf %s" % stagingdir print >>scriptfile, 'mkdir -p %s' % stagingdir print >>scriptfile, "rm -f %s" % scriptbase scriptfile.close() # Move the script to the remote machine # XXX: could collide tempfile names on the remote host if segment_commands.scp_file(scriptname, user, host, scriptbase): os.remove(scriptname) else: return False # Execute the script (and the script's last line deletes it) if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase): return False try: gcred = segment_commands.pg_call(self.sa_url, 'GetCredential', {}, ctxt) except self.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s" % e) # Find a slicename not in use slicename = "fabereGpgL" while True: slicename = random_slicename(user) try: param = { 'credential': gcred, 'hrn': slicename, 'type': 'Slice' } segment_commands.pg_call(self.sa_url, 'Resolve', param, ctxt) except segment_commands.ProtoGENIError, e: print e break self.log.info("Creating %s" % slicename) f = open("./rspec", "w") print >>f, "%s" % rspec f.close() # Create the slice and allocate resources. If any of this stuff fails, # the allocations will time out on PG in short order, so we just raise # the service_error. try: param = { 'credential': gcred, 'hrn': slicename, 'type': 'Slice' } slice_cred = segment_commands.pg_call(self.sa_url, 'Register', param, ctxt) f = open("./slice_cred", "w") print >>f, slice_cred f.close() # Populate the ssh keys (let PG format them) param = { 'credential': gcred, } keys = segment_commands.pg_call(self.sa_url, 'GetKeys', param, ctxt) # Grab and redeem a ticket param = { 'credential': slice_cred, 'rspec': rspec, } ticket = segment_commands.pg_call(self.cm_url, 'GetTicket', param, ctxt) f = open("./ticket", "w") print >>f, ticket f.close() param = { 'credential': slice_cred, 'keys': keys, 'ticket': ticket, } sliver_cred, manifest = segment_commands.pg_call(self.cm_url, 'RedeemTicket', param, ctxt) f = open("./sliver_cred", "w") print >>f, sliver_cred f.close() f = open("./manifest", "w") print >>f, manifest f.close() # start 'em up param = { 'credential': sliver_cred, } segment_commands.pg_call(self.cm_url, 'StartSliver', param, ctxt) except segment_commands.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s %s" % (e.code, e)) # With manifest in hand, we can export the portal node names. if self.create_debug: nodes = self.fake_manifest(topo) else: nodes = self.manifest_to_dict(manifest) self.export_store_info(export_certfile, nodes, self.ssh_port, connInfo) self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, ename, connInfo, services, nodes) # Copy software to the staging machine (done after generation to copy # those, too) for d in (tmpdir, lsoftdir): if os.path.isdir(d): for f in os.listdir(d): if not os.path.isdir("%s/%s" % (d, f)): if not segment_commands.scp_file("%s/%s" % (d, f), user, host, "%s/%s" % (stagingdir, f)): self.log.error("Scp failed") return False # Now we wait for the nodes to start on PG status = 'notready' try: while status == 'notready': param = { 'credential': slice_cred } r = segment_commands.pg_call(self.cm_url, 'SliceStatus', param, ctxt) print r status = r.get('status', 'notready') if status == 'notready': time.sleep(30) except segment_commands.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s %s" % (e.code, e)) if status == 'failed': self.log.error('Sliver failed to start on ProtoGENI') try: param = { 'credential': slice_cred } segment_commands.pg_call(self.cm_url, 'DeleteSliver', param, ctxt) except segment_commands.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s" % e) return False else: self.state_lock.acquire() self.allocation[aid]['slice_name'] = slicename self.allocation[aid]['slice_credential'] = slice_cred self.allocation[aid]['sliver_credential'] = sliver_cred self.allocation[aid]['manifest'] = manifest self.allocation[aid]['certfile'] = certfile self.allocation[aid]['certpw'] = certpw self.write_state() self.state_lock.release() # Now we have configuration to do for ProtoGENI self.configure_nodes(segment_commands, topo, nodes, user, pubkey, secretkey, stagingdir, tmpdir) self.start_nodes(user, self.staging_host, [ n.get('hostname', None) for n in nodes.values()], segment_commands) # Everything has gone OK. return True, dict([(k, n.get('hostname', None)) \ for k, n in nodes.items()]) def generate_rspec(self, topo, softdir, connInfo): t = topo.clone() starts = { } # Weed out the things we aren't going to instantiate: Segments, portal # substrates, and portal interfaces. (The copy in the for loop allows # us to delete from e.elements in side the for loop). While we're # touching all the elements, we also adjust paths from the original # testbed to local testbed paths and put the federation commands and # startcommands into a dict so we can start them manually later. # ProtoGENI requires setup before the federation commands run, so we # run them by hand after we've seeded configurations. for e in [e for e in t.elements]: if isinstance(e, topdl.Segment): t.elements.remove(e) # Fix software paths for s in getattr(e, 'software', []): s.location = re.sub("^.*/", softdir, s.location) if isinstance(e, topdl.Computer): if e.get_attribute('portal') and self.portal_startcommand: # Portals never have a user-specified start command starts[e.name] = self.portal_startcommand elif self.node_startcommand: if e.get_attribute('startup'): starts[e.name] = "%s \\$USER '%s'" % \ (self.node_startcommand, e.get_attribute('startup')) e.remove_attribute('startup') else: starts[e.name] = self.node_startcommand # Remove portal interfaces e.interface = [i for i in e.interface \ if not i.get_attribute('portal')] t.substrates = [ s.clone() for s in t.substrates ] t.incorporate_elements() # Customize the ns2 output for local portal commands and images filters = [] # NB: these are extra commands issued for the node, not the startcmds if self.portal_command: filters.append(topdl.generate_portal_command_filter( self.portal_command)) # Convert to rspec and return it exp_rspec = topdl.topology_to_rspec(t, filters) return exp_rspec def retrieve_software(self, topo, certfile, softdir): """ Collect the software that nodes in the topology need loaded and stage it locally. This implies retrieving it from the experiment_controller and placing it into softdir. Certfile is used to prove that this node has access to that data (it's the allocation/segment fedid). Finally local portal and federation software is also copied to the same staging directory for simplicity - all software needed for experiment creation is in softdir. """ sw = set() for e in topo.elements: for s in getattr(e, 'software', []): sw.add(s.location) os.mkdir(softdir) for s in sw: self.log.debug("Retrieving %s" % s) try: get_url(s, certfile, softdir) except: t, v, st = sys.exc_info() raise service_error(service_error.internal, "Error retrieving %s: %s" % (s, v)) # Copy local portal node software to the tempdir for s in (self.portal_software, self.federation_software): for l, f in s: base = os.path.basename(f) copy_file(f, "%s/%s" % (softdir, base)) # Ick. Put this python rpm in a place that it will get moved into # the staging area. It's a hack to install a modern (in a Roman # sense of modern) python on ProtoGENI python_rpm ="python2.4-2.4-1pydotorg.i586.rpm" if os.access("./%s" % python_rpm, os.R_OK): copy_file("./%s" % python_rpm, "%s/%s" % (softdir, python_rpm)) def initialize_experiment_info(self, attrs, aid, certfile, tmpdir): """ Gather common configuration files, retrieve or create an experiment name and project name, and return the ssh_key filenames. Create an allocation log bound to the state log variable as well. """ configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey')) ename = None pubkey_base = None secretkey_base = None alloc_log = None for a in attrs: if a['attribute'] in configs: try: self.log.debug("Retrieving %s" % a['value']) get_url(a['value'], certfile, tmpdir) except: t, v, st = sys.exc_info() raise service_error(service_error.internal, "Error retrieving %s: %s" % (a.get('value', ""), v)) if a['attribute'] == 'ssh_pubkey': pubkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'ssh_secretkey': secretkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'experiment_name': ename = a['value'] if not ename: ename = "" for i in range(0,5): ename += random.choice(string.ascii_letters) self.log.warn("No experiment name: picked one randomly: %s" \ % ename) self.state_lock.acquire() if self.allocation.has_key(aid): cf, user, ssh_key, cpw = self.allocation[aid]['credentials'] self.allocation[aid]['experiment'] = ename self.allocation[aid]['log'] = [ ] # Create a logger that logs to the experiment's state object as # well as to the main log file. alloc_log = logging.getLogger('fedd.access.%s' % ename) h = logging.StreamHandler( list_log.list_log(self.allocation[aid]['log'])) # XXX: there should be a global one of these rather than # repeating the code. h.setFormatter(logging.Formatter( "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) alloc_log.addHandler(h) self.write_state() else: self.log.error("No allocation for %s!?" % aid) self.state_lock.release() return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, cpw, alloc_log) def finalize_experiment(self, topo, nodes, aid, alloc_id): # Copy the assigned names into the return topology rvtopo = topo.clone() embedding = [ ] for k, n in nodes.items(): embedding.append({ 'toponame': k, 'physname': ["%s%s" % (n, self.domain)], }) # Grab the log (this is some anal locking, but better safe than # sorry) self.state_lock.acquire() logv = "".join(self.allocation[aid]['log']) # It's possible that the StartSegment call gets retried (!). # if the 'started' key is in the allocation, we'll return it rather # than redo the setup. self.allocation[aid]['started'] = { 'allocID': alloc_id, 'allocationLog': logv, 'segmentdescription': { 'topdldescription': rvtopo.to_dict() }, 'embedding': embedding, } retval = copy.deepcopy(self.allocation[aid]['started']) self.write_state() self.state_lock.release() return retval def StartSegment(self, req, fid): err = None # Any service_error generated after tmpdir is created rv = None # Return value from segment creation try: req = req['StartSegmentRequestBody'] topref = req['segmentdescription']['topdldescription'] except KeyError: raise service_error(service_error.req, "Badly formed request") connInfo = req.get('connection', []) services = req.get('service', []) auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) if not self.auth.check_attribute(fid, auth_attr): raise service_error(service_error.access, "Access denied") else: # See if this is a replay of an earlier succeeded StartSegment - # sometimes SSL kills 'em. If so, replay the response rather than # redoing the allocation. self.state_lock.acquire() retval = self.allocation[aid].get('started', None) self.state_lock.release() if retval: self.log.warning("Duplicate StartSegment for %s: " % aid + \ "replaying response") return retval if topref: topo = topdl.Topology(**topref) else: raise service_error(service_error.req, "Request missing segmentdescription'") certfile = "%s/%s.pem" % (self.certdir, auth_attr) try: tmpdir = tempfile.mkdtemp(prefix="access-") softdir = "%s/software" % tmpdir except EnvironmentError: raise service_error(service_error.internal, "Cannot create tmp dir") # Try block alllows us to clean up temporary files. try: self.retrieve_software(topo, certfile, softdir) self.configure_userconf(services, tmpdir) ename, pubkey_base, secretkey_base, cf, user, ssh_key, \ cpw, alloc_log = self.initialize_experiment_info(attrs, aid, certfile, tmpdir) self.import_store_info(certfile, connInfo) rspec = self.generate_rspec(topo, "%s/%s/" \ % (self.staging_dir, ename), connInfo) segment_commands = protogeni_proxy(keyfile=ssh_key, debug=self.create_debug, log=alloc_log, ch_url = self.ch_url, sa_url=self.sa_url, cm_url=self.cm_url) rv, nodes = self.start_segment(segment_commands, aid, user, rspec, pubkey_base, secretkey_base, ename, "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw, certfile, topo, connInfo, services) except EnvironmentError: err = service_error(service_error.internal, "%s" % e) except service_error, e: err = e except: t, v, st = sys.exc_info() err = service_error(service_error.internal, "%s: %s" % \ (v, traceback.extract_tb(st))) # Walk up tmpdir, deleting as we go if self.cleanup: self.remove_dirs(tmpdir) else: self.log.debug("[StartSegment]: not removing %s" % tmpdir) if rv: return self.finalize_experiment(topo, nodes, aid, req['allocID']) elif err: raise service_error(service_error.federant, "Swapin failed: %s" % err) else: raise service_error(service_error.federant, "Swapin failed") def stop_segment(self, segment_commands, user, stagingdir, slice_cred, certfile, certpw): """ Stop a sub experiment by calling swapexp on the federant """ host = self.staging_host rv = False try: # Clean out tar files: we've gone over quota in the past if stagingdir: segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir) if slice_cred: self.log.error('Removing Sliver on ProtoGENI') ctxt = fedd_ssl_context(my_cert=certfile, password=certpw) try: param = { 'credential': slice_cred } segment_commands.pg_call(self.cm_url, 'DeleteSlice', param, ctxt) except segment_commands.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s" % e) return True except self.ssh_cmd_timeout: rv = False return rv def TerminateSegment(self, req, fid): try: req = req['TerminateSegmentRequestBody'] except KeyError: raise service_error(service_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) if not self.auth.check_attribute(fid, auth_attr): raise service_error(service_error.access, "Access denied") self.state_lock.acquire() if self.allocation.has_key(aid): cf, user, ssh_key, cpw = self.allocation[aid]['credentials'] slice_cred = self.allocation[aid].get('slice_credential', None) ename = self.allocation[aid].get('experiment', None) else: cf, user, ssh_key, cpw = (None, None, None, None) slice_cred = None ename = None self.state_lock.release() if ename: staging = "%s/%s" % ( self.staging_dir, ename) else: self.log.warn("Can't find experiment name for %s" % aid) staging = None segment_commands = protogeni_proxy(keyfile=ssh_key, debug=self.create_debug, ch_url = self.ch_url, sa_url=self.sa_url, cm_url=self.cm_url) self.stop_segment(segment_commands, user, staging, slice_cred, cf, cpw) return { 'allocID': req['allocID'] } def renew_segment(self, segment_commands, name, scred, interval, certfile, certpw): ctxt = fedd_ssl_context(my_cert=certfile, password=certpw) try: expiration = time.strftime("%Y%m%dT%H:%M:%S", time.gmtime(time.time() + interval)) cred = segment_commands.pg_call(self.sa_url, 'GetCredential', {}, ctxt) param = { 'credential': scred, 'expiration': expiration } r = segment_commands.pg_call(self.sa_url, 'RenewSlice', param, ctxt) param = { 'credential': cred, 'hrn': name, 'type': 'Slice', } slice = segment_commands.pg_call(self.sa_url, 'Resolve', param, ctxt) uuid = slice.get('uuid', None) if uuid == None: sys.exit('No uuid for %s' % slicename) print 'Calling GetCredential (uuid)' param = { 'credential': cred, 'uuid': uuid, 'type': 'Slice', } new_scred = segment_commands.pg_call(self.sa_url, 'GetCredential', param, ctxt) f = open('./new_slice_cred', 'w') print >>f, new_scred f.close() except segment_commands.ProtoGENIError, e: self.log.error("Failed to extend slice %s: %s" % (name, e)) return None try: print 'Calling RenewSlice (CM)' param = { 'credential': new_scred, } r = segment_commands.pg_call(self.cm_url, 'RenewSlice', param, ctxt) except segment_commands.ProtoGENIError, e: self.log.warn("Failed to renew sliver for %s: %s" % (name, e)) return new_scred def RenewSlices(self): self.log.info("Scanning for slices to renew") self.state_lock.acquire() aids = self.allocation.keys() self.state_lock.release() for aid in aids: self.state_lock.acquire() if self.allocation.has_key(aid): name = self.allocation[aid].get('slice_name', None) scred = self.allocation[aid].get('slice_credential', None) cf, user, ssh_key, cpw = self.allocation[aid]['credentials'] else: name = None scred = None self.state_lock.release() if not os.access(cf, os.R_OK): self.log.error( "[RenewSlices] cred.file %s unreadable, ignoring" % cf) continue # There's a ProtoGENI slice associated with the segment; renew it. if name and scred: segment_commands = protogeni_proxy(log=self.log, debug=self.create_debug, keyfile=ssh_key, cm_url = self.cm_url, sa_url = self.sa_url, ch_url = self.ch_url) new_scred = self.renew_segment(segment_commands, name, scred, self.renewal_interval, cf, cpw) if new_scred: self.log.info("Slice %s renewed until %s GMT" % \ (name, time.asctime(time.gmtime(\ time.time()+self.renewal_interval)))) self.state_lock.acquire() if self.allocation.has_key(aid): self.allocation[aid]['slice_credential'] = new_scred self.state_lock.release() else: self.log.info("Failed to renew slice %s " % name) # Let's do this all again soon. (4 tries before the slices time out) t = Timer(self.renewal_interval/4, self.RenewSlices) t.start()