#!/usr/local/bin/python import sys, os import re import tempfile import subprocess import logging import time import random import string import signal import xml.parsers.expat from threading import Thread from proxy_segment import proxy_segment from service_error import service_error from remote_service import service_caller from util import fedd_ssl_context import topdl class segment_base(proxy_segment): class ProtoGENIError(Exception): def __init__(self, op, code, output): Exception.__init__(self, output) self.op = op self.code = code self.output = output def __init__(self, log=None, keyfile=None, debug=False, ch_url=None, sa_url=None, cm_url=None): proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug) self.ProtoGENIError = start_segment.ProtoGENIError self.ch_url = ch_url self.sa_url = sa_url self.cm_url = cm_url self.call_SetValue = service_caller('SetValue') self.debug_fail = ['Resolve'] self.debug_response = { 'RedeemTicket': ("XML blob1", "XML blob2"), 'SliceStatus': { 'status': 'ready' }, } def pg_call(self, url, method, params, context): max_retries = 5 retries = 0 s = service_caller(method, request_body_name="", strict=False) self.log.debug("Calling %s %s" % (url, method)) if not self.debug: while retries < max_retries: r = s.call_xmlrpc_service(url, params, context=context) code = r.get('code', -1) if code == 0: # Success leaves the loop here return r.get('value', None) elif code == 14 and retries +1 < max_retries: # Busy resource retries+= 1 self.log.info("Resource busy, retrying in 30 secs") time.sleep(30) else: # NB: out of retries falls through to here raise self.ProtoGENIError(op=method, code=r.get('code', 'unknown'), output=r.get('output', 'No output')) else: if method in self.debug_fail: raise self.ProtoGENIError(op=method, code='unknown', output='No output') elif self.debug_response.has_key(method): return self.debug_response[method] else: return "%s XML blob" % method class start_segment(segment_base): def __init__(self, log=None, keyfile=None, debug=False, ch_url=None, sa_url=None, cm_url=None): segment_base.__init__(self, log=log, keyfile=keyfile, debug=debug, ch_url=cm_url, sa_url=sa_url, cm_url=cm_url) # Turn the manifest into a dict were each virtual nodename (i.e. the topdl # name) has an entry with the allocated machine in hostname and the # interfaces in 'interfaces'. I love having XML parser code lying around. def manifest_to_dict(self, manifest, ignore_debug=False): if self.debug and not ignore_debug: self.log.debug("Returning null manifest dict") return { } # The class allows us to keep a little state - the dict under # consteruction and the current entry in that dict for the interface # element code. class manifest_parser: def __init__(self): self.d = { } self.current_key=None # If the element is a node, create a dict entry for it. If it's an # interface inside a node, add an entry in the interfaces list with # the virtual name and component id. def start_element(self, name, attrs): if name == 'node': self.current_key = attrs.get('virtual_id',"") if self.current_key: self.d[self.current_key] = { 'hostname': attrs.get('hostname', None), 'interfaces': { } } elif name == 'interface' and self.current_key: self.d[self.current_key]['interfaces']\ [attrs.get('virtual_id','')] = \ attrs.get('component_id', None) # When a node is finished, clear current_key def end_element(self, name): if name == 'node': self.current_key = None mp = manifest_parser() p = xml.parsers.expat.ParserCreate() # These are bound to the class we just created p.StartElementHandler = mp.start_element p.EndElementHandler = mp.end_element p.Parse(manifest) return mp.d def generate_portal_configs(self, parent, topo, pubkey_base, secretkey_base, tmpdir, master, leid, connInfo, services, nodes): def conninfo_to_dict(key, info): """ Make a cpoy of the connection information about key, and flatten it into a single dict by parsing out any feddAttrs. """ rv = None for i in info: if key == i.get('portal', "") or \ key in [e.get('element', "") \ for e in i.get('member', [])]: rv = i.copy() break else: return rv if 'fedAttr' in rv: for a in rv['fedAttr']: attr = a.get('attribute', "") val = a.get('value', "") if attr and attr not in rv: rv[attr] = val del rv['fedAttr'] return rv # XXX: un hardcode this def client_null(f, s): print >>f, "Service: %s" % s['name'] def client_smb(f, s): print >>f, "Service: %s" % s['name'] smbshare = None smbuser = None smbproj = None for a in s.get('fedAttr', []): if a.get('attribute', '') == 'SMBSHARE': smbshare = a.get('value', None) elif a.get('attribute', '') == 'SMBUSER': smbuser = a.get('value', None) elif a.get('attribute', '') == 'SMBPROJ': smbproj = a.get('value', None) if all((smbshare, smbuser, smbproj)): print >>f, "SMBshare: %s" % smbshare print >>f, "ProjectUser: %s" % smbuser print >>f, "ProjectName: %s" % smbproj client_service_out = { 'SMB': client_smb, 'tmcd': client_null, 'seer': client_null, 'userconfig': client_null, } # XXX: end un hardcode this seer_out = False client_out = False for e in [ e for e in topo.elements \ if isinstance(e, topdl.Computer) and e.get_attribute('portal')]: myname = e.name[0] type = e.get_attribute('portal_type') info = conninfo_to_dict(myname, connInfo) if not info: raise service_error(service_error.req, "No connectivity info for %s" % myname) # Translate to physical name (ProtoGENI doesn't have DNS) physname = nodes.get(myname, {'hostname': myname})['hostname'] peer = info.get('peer', "") ldomain = parent.domain; mexp = info.get('masterexperiment',"") mproj, meid = mexp.split("/", 1) mdomain = info.get('masterdomain',"") muser = info.get('masteruser','root') smbshare = info.get('smbshare', 'USERS') ssh_port = info.get('ssh_port', '22') active = info.get('active', 'False') cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower()) tunnelconfig = parent.attrs.has_key('TunnelCfg') try: f = open(cfn, "w") if active == 'True': print >>f, "active: True" print >>f, "ssh_port: %s" % ssh_port if type in ('control', 'both'): for s in [s for s in services \ if s.get('name', "") in parent.imports]: p = urlparse(s.get('server', 'http://localhost')) print >>f, 'port: remote:%s:%s:%s' % \ (p.port, p.hostname, p.port) if tunnelconfig: print >>f, "tunnelip: %s" % tunnelconfig # XXX: send this an fedattr #print >>f, "seercontrol: control.%s.%s%s" % \ #(meid.lower(), mproj.lower(), mdomain) print >>f, "peer: %s" % peer.lower() print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \ pubkey_base print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \ secretkey_base f.close() except IOError, e: raise service_error(service_error.internal, "Can't write protal config %s: %s" % (cfn, e)) # XXX: This little seer config file needs to go away. if not seer_out: try: seerfn = "%s/seer.conf" % tmpdir f = open(seerfn, "w") if not master: print >>f, "ControlNode: control.%s.%s%s" % \ (meid.lower(), mproj.lower(), mdomain) print >>f, "ExperimentID: %s" % mexp f.close() except IOError, e: raise service_error(service_error.internal, "Can't write seer.conf: %s" %e) seer_out = True if not client_out and type in ('control', 'both'): try: f = open("%s/client.conf" % tmpdir, "w") print >>f, "ControlGateway: %s" % physname.lower() for s in services: if s.get('name',"") in parent.imports and \ s.get('visibility','') == 'import': client_service_out[s['name']](f, s) # Does seer need this? # print >>f, "ExperimentID: %s/%s" % (mproj, meid) f.close() except IOError, e: raise service_error(service_error.internal, "Cannot write client.conf: %s" %s) client_out = True def export_store_info(self, cf, nodes, ssh_port, connInfo): """ For the export requests in the connection info, install the peer names at the experiment controller via SetValue calls. """ for c in connInfo: for p in [ p for p in c.get('parameter', []) \ if p.get('type', '') == 'output']: if p.get('name', '') == 'peer': k = p.get('key', None) surl = p.get('store', None) if surl and k and k.index('/') != -1: if self.debug: req = { 'name': k, 'value': 'debug' } self.call_SetValue(surl, req, cf) else: value = nodes.get(k[k.index('/')+1:], {}).get('hostname',"") if value: req = { 'name': k, 'value': value } self.call_SetValue(surl, req, cf) else: self.log.error("No hostname for %s" % \ k[k.index('/'):]) else: self.log.error("Bad export request: %s" % p) elif p.get('name', '') == 'ssh_port': k = p.get('key', None) surl = p.get('store', None) if surl and k: req = { 'name': k, 'value': ssh_port } self.call_SetValue(surl, req, cf) else: self.log.error("Bad export request: %s" % p) else: self.log.error("Unknown export parameter: %s" % \ p.get('name')) continue def configure_nodes(self, topo, nodes, user, host, sshd, sshd_config, gate_cmd, node_cmd, pubkey, secretkey, stagingdir, tmpdir): fed_dir = "/usr/local/federation" ssh = "/usr/bin/ssh -n -i .ssh/id_rsa -o 'ForwardX11 no' -o 'StrictHostKeyChecking no' " scp = "/usr/bin/scp -i .ssh/id_rsa -o 'ForwardX11 no' -o 'StrictHostKeyChecking no' " ifconfig = "/sbin/ifconfig" tar = "/bin/tar" for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]: vname = e.name[0] node = nodes.get(vname, {}) pname = node.get('hostname', None) if pname: script = open("%s/%s.startup" %(tmpdir, pname), "w") # Reset the interfaces to the ones in the topo file for i in [ i for i in e.interface \ if not i.get_attribute('portal')]: pinf = node['interfaces'].get(i.name, None) addr = i.get_attribute('ip4_address') netmask = i.get_attribute('ip4_netmask') or '255.255.255.0' if pinf and addr: print >>script, \ "%s %s %s netmask %s" % \ (ifconfig, pinf, addr, netmask) else: self.log.error("Missing interface or address for %s" \ % i.name) for s in e.software: # XXX: Just tarfiles for now if not (s.location and s.install): continue s_base = s.location.rpartition('/')[2] print >>script, "%s %s@%s:%s/%s ." % \ (scp, user, host, stagingdir, s_base) print >>script, \ "%s -C %s -xzf %s" % (tar, s.install, s_base) for f in ('hosts', pubkey, secretkey, 'client.conf', 'userconf', 'seer.conf'): print >>script, "%s %s@%s:%s/%s %s/etc" % \ (scp, user, host, stagingdir, f, fed_dir) if sshd: print >>script, "%s %s@%s:%s %s/bin" % \ (scp, user, host, sshd, fed_dir) if sshd_config: print >>script, "%s %s@%s:%s %s/etc" % \ (scp, user, host, sshd_config, fed_dir) # Look in tmpdir to get the names. They've all been copied # into the (remote) staging dir if os.access("%s/%s.gw.conf" % (tmpdir, vname), os.R_OK): print >>script, "%s %s@%s:%s/%s.gw.conf %s/etc" % \ (scp, user, host, stagingdir, vname, fed_dir) # Start commands if e.get_attribute('portal') and gate_cmd: # Portals never have a user-specified start command print >>script, gate_cmd elif node_cmd: # XXX: debug print >>script, "sudo perl -I%s/lib %s/bin/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_dir, fed_dir, user) # XXX: debug # start routing on nodes print >>script, "sudo perl %s/bin/protogeni_routing.pl" % \ fed_dir if e.get_attribute('startup'): print >>script, "%s \\$USER '%s'" % \ (node_cmd, e.get_attribute('startup')) else: print >>script, node_cmd script.close() if not self.scp_file("%s/%s.startup" % (tmpdir, pname), user, pname): self.log.error("Could not copy script to %s" % pname) else: self.log.error("Unmapped node: %s" % vname) def start_node(self, user, host, node): # Place an identity on the node so that the copying can succeed self.ssh_cmd(user, host, "scp .ssh/id_rsa %s:.ssh" % node) self.ssh_cmd(user, node, "sudo /bin/sh ./%s.startup &" % node) def start_nodes(self, user, host, nodes): threads = [ ] for n in nodes: t = Thread(target=self.start_node, args=(user, host, n)) t.start() threads.append(t) done = [not t.isAlive() for t in threads] while not all(done): self.log.info("Waiting for threads %s" % done) time.sleep(10) done = [not t.isAlive() for t in threads] def __call__(self, parent, aid, user, rspec, pubkey, secretkey, master, ename, stagingdir, tmpdir, certfile, certpw, export_certfile, topo, connInfo, services, timeout=0): """ Start a sub-experiment on a federant. Get the current state, modify or create as appropriate, ship data and configs and start the experiment. There are small ordering differences based on the initial state of the sub-experiment. """ def random_slicename(user): slicename = user for i in range(0,5): slicename += random.choice(string.ascii_letters) return slicename host = parent.staging_host ctxt = fedd_ssl_context(my_cert=certfile, password=certpw) # Local software dir lsoftdir = "%s/software" % tmpdir # Open up a temporary file to contain a script for setting up the # filespace for the new experiment. self.log.info("[start_segment]: creating script file") try: sf, scriptname = tempfile.mkstemp() scriptfile = os.fdopen(sf, 'w') except IOError: return False scriptbase = os.path.basename(scriptname) # Script the filesystem changes print >>scriptfile, "/bin/rm -rf %s" % stagingdir print >>scriptfile, 'mkdir -p %s' % stagingdir print >>scriptfile, "rm -f %s" % scriptbase scriptfile.close() # Move the script to the remote machine # XXX: could collide tempfile names on the remote host if self.scp_file(scriptname, user, host, scriptbase): os.remove(scriptname) else: return False # Execute the script (and the script's last line deletes it) if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase): return False try: gcred = self.pg_call(self.sa_url, 'GetCredential', {}, ctxt) except self.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s" % e) # Find a slicename not in use slicename = "fabereGpgL" while True: slicename = random_slicename(user) try: param = { 'credential': gcred, 'hrn': slicename, 'type': 'Slice' } self.pg_call(self.sa_url, 'Resolve', param, ctxt) except self.ProtoGENIError, e: print e break self.log.info("Creating %s" % slicename) f = open("./rspec", "w") print >>f, "%s" % rspec f.close() # Create the slice and allocate resources. If any of this stuff fails, # the allocations will time out on PG in short order, so we just raise # the service_error. try: param = { 'credential': gcred, 'hrn': slicename, 'type': 'Slice' } slice_cred = self.pg_call(self.sa_url, 'Register', param, ctxt) f = open("./slice_cred", "w") print >>f, slice_cred f.close() # Populate the ssh keys (let PG format them) param = { 'credential': gcred, } keys = self.pg_call(self.sa_url, 'GetKeys', param, ctxt) # Grab and redeem a ticket param = { 'credential': slice_cred, 'rspec': rspec, } ticket = self.pg_call(self.cm_url, 'GetTicket', param, ctxt) f = open("./ticket", "w") print >>f, ticket f.close() param = { 'credential': slice_cred, 'keys': keys, 'ticket': ticket, } sliver_cred, manifest = self.pg_call(self.cm_url, 'RedeemTicket', param, ctxt) f = open("./sliver_cred", "w") print >>f, sliver_cred f.close() f = open("./manifest", "w") print >>f, manifest f.close() # start 'em up param = { 'credential': sliver_cred, } self.pg_call(self.cm_url, 'StartSliver', param, ctxt) except self.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s %s" % (e.code, e)) # With manifest in hand, we can export the portal node names. nodes = self.manifest_to_dict(manifest, ignore_debug=True) self.export_store_info(export_certfile, nodes, parent.ssh_port, connInfo) self.generate_portal_configs(parent, topo, pubkey, secretkey, tmpdir, master, ename, connInfo, services, nodes) # Copy software to the staging machine (done after generation to copy # those, too) for d in (tmpdir, lsoftdir): if os.path.isdir(d): for f in os.listdir(d): if not os.path.isdir("%s/%s" % (d, f)): if not self.scp_file("%s/%s" % (d, f), user, host, "%s/%s" % (stagingdir, f)): self.log.error("Scp failed") return False # Now we wait for the nodes to start on PG status = 'notready' try: while status == 'notready': param = { 'credential': slice_cred } r = self.pg_call(self.cm_url, 'SliceStatus', param, ctxt) print r status = r.get('status', 'notready') if status == 'notready': time.sleep(30) except self.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s %s" % (e.code, e)) if status == 'failed': self.log.error('Sliver failed to start on ProtoGENI') try: param = { 'credential': slice_cred } self.pg_call(self.cm_url, 'DeleteSliver', param, ctxt) except self.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s" % e) return False else: parent.state_lock.acquire() parent.allocation[aid]['slice_name'] = slicename parent.allocation[aid]['slice_credential'] = slice_cred parent.allocation[aid]['sliver_credential'] = sliver_cred parent.allocation[aid]['manifest'] = manifest parent.allocation[aid]['certfile'] = certfile parent.allocation[aid]['certpw'] = certpw parent.write_state() parent.state_lock.release() # The startcmds for portals and standard nodes (the Master Slave # distinction is going away) gate_cmd = parent.attrs.get('SlaveConnectorStartCmd', '/bin/true') node_cmd = parent.attrs.get('SlaveNodeStartCmd', 'bin/true') # Now we have configuration to do for ProtoGENI self.configure_nodes(topo, nodes, user, parent.staging_host, parent.sshd, parent.sshd_config, gate_cmd, node_cmd, pubkey, secretkey, stagingdir, tmpdir) self.start_nodes(user, parent.staging_host, [ n['hostname'] for n in nodes.values()]) # Everything has gone OK. return True class stop_segment(segment_base): def __init__(self, log=None, keyfile=None, debug=False, ch_url=None, sa_url=None, cm_url=None): segment_base.__init__(self, log=log, keyfile=keyfile, debug=debug, ch_url=cm_url, sa_url=sa_url, cm_url=cm_url) def __call__(self, parent, user, stagingdir, slice_cred, certfile, certpw): """ Stop a sub experiment by calling swapexp on the federant """ host = parent.staging_host rv = False try: # Clean out tar files: we've gone over quota in the past if stagingdir: self.ssh_cmd(user, host, "rm -rf %s" % stagingdir) if slice_cred: self.log.error('Removing Sliver on ProtoGENI') ctxt = fedd_ssl_context(my_cert=certfile, password=certpw) try: param = { 'credential': slice_cred } self.pg_call(self.cm_url, 'DeleteSlice', param, ctxt) except self.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s" % e) return True except self.ssh_cmd_timeout: rv = False return rv class renew_segment(segment_base): def __init__(self, log=None, debug=False, keyfile=None, ch_url=None, sa_url=None, cm_url=None): segment_base.__init__(self, log=log, keyfile=keyfile, debug=debug, ch_url=cm_url, sa_url=sa_url, cm_url=cm_url) def __call__(self, name, scred, interval, certfile, certpw): ctxt = fedd_ssl_context(my_cert=certfile, password=certpw) try: expiration = time.strftime("%Y%m%dT%H:%M:%S", time.gmtime(time.time() + interval)) cred = self.pg_call(self.sa_url, 'GetCredential', {}, ctxt) param = { 'credential': scred, 'expiration': expiration } r = self.pg_call(self.sa_url, 'RenewSlice', param, ctxt) param = { 'credential': cred, 'hrn': name, 'type': 'Slice', } slice = self.pg_call(self.sa_url, 'Resolve', param, ctxt) uuid = slice.get('uuid', None) if uuid == None: sys.exit('No uuid for %s' % slicename) print 'Calling GetCredential (uuid)' param = { 'credential': cred, 'uuid': uuid, 'type': 'Slice', } new_scred = self.pg_call(self.sa_url, 'GetCredential', param, ctxt) f = open('./new_slice_cred', 'w') print >>f, new_scred f.close() except self.ProtoGENIError, e: self.log.error("Failed to extend slice %s: %s" % (name, e)) return None try: print 'Calling RenewSlice (CM)' param = { 'credential': new_scred, } r = self.pg_call(self.cm_url, 'RenewSlice', param, ctxt) except self.ProtoGENIError, e: self.log.warn("Failed to renew sliver for %s: %s" % (name, e)) return new_scred