#!/usr/local/bin/python import sys, os import re import tempfile import subprocess import logging import time import random import string import signal import xml.parsers.expat from threading import Thread from proxy_segment import proxy_segment from service_error import service_error from remote_service import service_caller from util import fedd_ssl_context import topdl class segment_base(proxy_segment): class ProtoGENIError(Exception): def __init__(self, op, code, output): Exception.__init__(self, output) self.op = op self.code = code self.output = output def __init__(self, log=None, keyfile=None, debug=False, ch_url=None, sa_url=None, cm_url=None): proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug) self.ProtoGENIError = start_segment.ProtoGENIError self.ch_url = ch_url self.sa_url = sa_url self.cm_url = cm_url self.call_SetValue = service_caller('SetValue') self.debug_fail = ['Resolve'] self.debug_response = { 'RedeemTicket': ("XML blob1", "XML blob2"), 'SliceStatus': { 'status': 'ready' }, } def pg_call(self, url, method, params, context): s = service_caller(method, request_body_name="", strict=False) self.log.debug("Calling %s %s" % (url, method)) if not self.debug: r = s.call_xmlrpc_service(url, params, context=context) if r.get('code', -1) != 0: raise self.ProtoGENIError(op=method, code=r.get('code', 'unknown'), output=r.get('output', 'No output')) else: return r.get('value', None) else: if method in self.debug_fail: raise self.ProtoGENIError(op=method, code='unknown', output='No output') elif self.debug_response.has_key(method): return self.debug_response[method] else: return "%s XML blob" % method class start_segment(segment_base): def __init__(self, log=None, keyfile=None, debug=False, ch_url=None, sa_url=None, cm_url=None): segment_base.__init__(self, log=log, keyfile=keyfile, debug=debug, ch_url=cm_url, sa_url=sa_url, cm_url=cm_url) # Turn the manifest into a dict were each virtual nodename (i.e. the topdl # name) has an entry with the allocated machine in hostname and the # interfaces in 'interfaces'. I love having XML parser code lying around. def manifest_to_dict(self, manifest): # XXX # if self.debug: return { } # The class allows us to keep a little state - the dict under # consteruction and the current entry in that dict for the interface # element code. class manifest_parser: def __init__(self): self.d = { } self.current_key=None # If the element is a node, create a dict entry for it. If it's an # interface inside a node, add an entry in the interfaces list with # the virtual name and component id. def start_element(self, name, attrs): if name == 'node': self.current_key = attrs.get('virtual_id',"") if self.current_key: self.d[self.current_key] = { 'hostname': attrs.get('hostname', None), 'interfaces': { } } elif name == 'interface' and self.current_key: self.d[self.current_key]['interfaces']\ [attrs.get('virtual_id','')] = \ attrs.get('component_id', None) # When a node is finished, clear current_key def end_element(self, name): if name == 'node': self.current_key = None mp = manifest_parser() p = xml.parsers.expat.ParserCreate() # These are bound to the class we just created p.StartElementHandler = mp.start_element p.EndElementHandler = mp.end_element p.Parse(manifest) return mp.d def export_store_info(self, cf, nodes, ssh_port, connInfo): """ For the export requests in the connection info, install the peer names at the experiment controller via SetValue calls. """ for c in connInfo: for p in [ p for p in c.get('parameter', []) \ if p.get('type', '') == 'output']: if p.get('name', '') == 'peer': k = p.get('key', None) surl = p.get('store', None) if surl and k and k.index('/') != -1: if self.debug: req = { 'name': k, 'value': 'debug' } self.call_SetValue(surl, req, cf) else: value = nodes.get(k[k.index('/')+1:], {}).get('hostname',"") if value: req = { 'name': k, 'value': value } self.call_SetValue(surl, req, cf) else: self.log.error("No hostname for %s" % \ k[k.index('/'):]) else: self.log.error("Bad export request: %s" % p) elif p.get('name', '') == 'ssh_port': k = p.get('key', None) surl = p.get('store', None) if surl and k: req = { 'name': k, 'value': self.ssh_port } self.call_SetValue(surl, req, cf) else: self.log.error("Bad export request: %s" % p) else: self.log.error("Unknown export parameter: %s" % \ p.get('name')) continue def configure_nodes(self, topo, nodes, user, host, sshd, sshd_config, gate_cmd, node_cmd, pubkey, secretkey, stagingdir, tmpdir): fed_dir = "/usr/local/federation" ssh = "/usr/bin/ssh -n -o 'ForwardX11 no' -o 'StrictHostKeyChecking no' " scp = "/usr/bin/scp -o 'ForwardX11 no' -o 'StrictHostKeyChecking no' " ifconfig = "/sbin/ifconfig" tar = "/bin/tar" for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]: vname = e.name[0] node = nodes.get(vname, {}) pname = node.get('hostname', None) if pname: script = open("%s/%s.startup" %(tmpdir, pname), "w") # Reset the interfaces to the ones in the topo file for i in [ i for i in e.interface \ if not i.get_attribute('portal')]: pinf = node['interfaces'].get(i.name, None) addr = i.get_attribute('ip4_address') netmask = i.get_attribute('ip4_netmask') or '255.255.255.0' if pinf and addr: print >>script, \ "%s %s %s netmask %s" % \ (ifconfig, pinf, addr, netmask) else: self.log.error("Missing interface or address for %s" \ % i.name) for s in e.software: # XXX: Just tarfiles for now if not (s.location and s.install): continue s_base = s.location.rpartition('/')[2] print >>script, "%s %s@%s:%s/%s ." % \ (scp, user, host, stagingdir, s_base) print >>script, \ "%s -C %s -xzf %s" % (tar, s.install, s_base) for f in ('hosts', pubkey, secretkey): print >>script, "%s %s@%s:%s/%s %s/etc" % \ (scp, user, host, stagingdir, f, fed_dir) if sshd: print >>script, "%s %s@%s:%s %s/bin" % \ (scp, user, host, sshd, fed_dir) if sshd_config: print >>script, "%s %s@%s:%s %s/etc" % \ (scp, user, host, sshd_config, fed_dir) if os.access("%s/%s.gw.conf" % (tmpdir, vname), os.R_OK): print >>script, "%s %s@%s:%s/%s.gw.conf %s/etc" % \ (scp, user, host, stagingdir, vname, fed_dir) # Start commands if e.get_attribute('portal') and gate_cmd: # Portals never have a user-specified start command print >>script, gate_cmd elif node_cmd: if e.get_attribute('startup'): print >>script, "%s \\$USER '%s'" % \ (node_cmd, e.get_attribute('startup')) else: print >>script, node_cmd script.close() if not self.scp_file("%s/%s.startup" % (tmpdir, pname), user, pname): self.log.error("Could not copy script to %s" % pname) else: self.log.error("Unmapped node: %s" % vname) def start_nodes(self, user, nodes): threads = [ ] for n in nodes: t = Thread(target=self.ssh_cmd, args=(user, n, "sudo /bin/sh ./%s.startup" % n)) t.start() threads.append(t) done = [not t.isAlive() for t in threads] while not all(done): self.log.info("Waiting for threads %s" % done) time.sleep(10) done = [not t.isAlive() for t in threads] def __call__(self, parent, aid, user, rspec, pubkey, secretkey, stagingdir, tmpdir, certfile, certpw, export_certfile, topo, connInfo, timeout=0): """ Start a sub-experiment on a federant. Get the current state, modify or create as appropriate, ship data and configs and start the experiment. There are small ordering differences based on the initial state of the sub-experiment. """ def random_slicename(user): slicename = user for i in range(0,5): slicename += random.choice(string.ascii_letters) return slicename host = parent.staging_host ctxt = fedd_ssl_context(my_cert=certfile, password=certpw) # Local software dir lsoftdir = "%s/software" % tmpdir # Open up a temporary file to contain a script for setting up the # filespace for the new experiment. self.log.info("[start_segment]: creating script file") try: sf, scriptname = tempfile.mkstemp() scriptfile = os.fdopen(sf, 'w') except IOError: return False scriptbase = os.path.basename(scriptname) # Script the filesystem changes print >>scriptfile, "/bin/rm -rf %s" % stagingdir print >>scriptfile, 'mkdir -p %s' % stagingdir print >>scriptfile, "rm -f %s" % scriptbase scriptfile.close() # Move the script to the remote machine # XXX: could collide tempfile names on the remote host if self.scp_file(scriptname, user, host, scriptbase): os.remove(scriptname) else: return False # Execute the script (and the script's last line deletes it) if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase): return False # Copy software to the staging machine for d in (tmpdir, lsoftdir): if os.path.isdir(d): for f in os.listdir(d): if not os.path.isdir("%s/%s" % (d, f)): if not self.scp_file("%s/%s" % (d, f), user, host, "%s/%s" % (stagingdir, f)): self.log.error("Scp failed") return False try: gcred = self.pg_call(self.sa_url, 'GetCredential', {}, ctxt) except self.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s" % e) # Find a slicename not in use slicename = "fabereGpgL" while True: slicename = random_slicename(user) try: param = { 'credential': gcred, 'hrn': slicename, 'type': 'Slice' } self.pg_call(self.sa_url, 'Resolve', param, ctxt) except self.ProtoGENIError, e: print e break self.log.info("Creating %s" % slicename) f = open("./rspec", "w") print >>f, "%s" % rspec f.close() # Create the slice and allocate resources. If any of this stuff fails, # the allocations will time out on PG in short order, so we just raise # the service_error. try: param = { 'credential': gcred, 'hrn': slicename, 'type': 'Slice' } slice_cred = self.pg_call(self.sa_url, 'Register', param, ctxt) f = open("./slice_cred", "w") print >>f, slice_cred f.close() # Populate the ssh keys (let PG format them) param = { 'credential': gcred, } keys = self.pg_call(self.sa_url, 'GetKeys', param, ctxt) # Grab and redeem a ticket param = { 'credential': slice_cred, 'rspec': rspec, } ticket = self.pg_call(self.cm_url, 'GetTicket', param, ctxt) f = open("./ticket", "w") print >>f, ticket f.close() param = { 'credential': slice_cred, 'keys': keys, 'ticket': ticket, } sliver_cred, manifest = self.pg_call(self.cm_url, 'RedeemTicket', param, ctxt) f = open("./sliver_cred", "w") print >>f, sliver_cred f.close() f = open("./manifest", "w") print >>f, manifest f.close() # start 'em up param = { 'credential': sliver_cred, } self.pg_call(self.cm_url, 'StartSliver', param, ctxt) except self.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s" % e) # With manifest in hand, we can export the portal node names. nodes = self.manifest_to_dict(manifest) self.export_store_info(export_certfile, nodes, parent.ssh_port, connInfo) # Now we wait for the nodes to start on PG status = 'notready' try: while status == 'notready': param = { 'credential': slice_cred } r = self.pg_call(self.cm_url, 'SliceStatus', param, ctxt) print r status = r.get('status', 'notready') if status == 'notready': time.sleep(30) except Self.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s" % e) if status == 'failed': self.log.error('Sliver failed to start on ProtoGENI') try: param = { 'credential': slice_cred } self.pg_call(self.cm_url, 'DeleteSliver', param, ctxt) except self.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s" % e) return False else: parent.state_lock.acquire() parent.allocation[aid]['slice_credential'] = slice_cred parent.allocation[aid]['sliver_credential'] = sliver_cred parent.allocation[aid]['manifest'] = manifest parent.allocation[aid]['certfile'] = certfile parent.allocation[aid]['certpw'] = certpw parent.write_state() parent.state_lock.release() # The startcmds for portals and standard nodes (the Master Slave # distinction is going away) gate_cmd = parent.attrs.get('SlaveConnectorStartCmd', '/bin/true') node_cmd = parent.attrs.get('SlaveNodeStartCmd', 'bin/true') # Now we have configuration to do for ProtoGENI self.configure_nodes(topo, nodes, user, parent.staging_host, parent.sshd, parent.sshd_config, gate_cmd, node_cmd, pubkey, secretkey, stagingdir, tmpdir) self.start_nodes(user, [ n['hostname'] for n in nodes.values()]) # Everything has gone OK. return True class stop_segment(segment_base): def __init__(self, log=None, keyfile=None, debug=False, ch_url=None, sa_url=None, cm_url=None): segment_base.__init__(self, log=log, keyfile=keyfile, debug=debug, ch_url=cm_url, sa_url=sa_url, cm_url=cm_url) def __call__(self, parent, user, stagingdir, slice_cred, certfile, certpw): """ Stop a sub experiment by calling swapexp on the federant """ host = parent.staging_host rv = False try: # Clean out tar files: we've gone over quota in the past if stagingdir: self.ssh_cmd(user, host, "rm -rf %s" % stagingdir) if slice_cred: self.log.error('Removing Sliver on ProtoGENI') ctxt = fedd_ssl_context(my_cert=certfile, password=certpw) try: param = { 'credential': slice_cred } self.pg_call(self.cm_url, 'DeleteSlice', param, ctxt) except self.ProtoGENIError, e: raise service_error(service_error.federant, "ProtoGENI: %s" % e) return True except self.ssh_cmd_timeout: rv = False return rv