#!/usr/local/bin/python import os,sys import re import random import string import subprocess import tempfile import copy import pickle import logging import signal import time import traceback # For parsing visualization output and splitter output import xml.parsers.expat from threading import Lock, Thread, Condition from subprocess import call, Popen, PIPE from urlparse import urlparse from urllib2 import urlopen from util import * from fedid import fedid, generate_fedid from remote_service import xmlrpc_handler, soap_handler, service_caller from service_error import service_error from synch_store import synch_store import topdl import list_log from ip_allocator import ip_allocator from ip_addr import ip_addr class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.experiment_control") fl.addHandler(nullHandler()) class experiment_partition: def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2, direct_transit=None): """ Intialize the various attributes """ self.log = logging.getLogger("fedd.experiment_control." + \ "experiment_paritition") self.auth = auth self.store_url = store_url self.tbmap = tbmap self.direct_transit = direct_transit or [ ] self.muxmax = muxmax def new_portal_node(self, st, dt, tbparams, masters, eid, myname, desthost, portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], expid=None): """ Return a new internet portal node and a dict with the connectionInfo to be attached. """ seer_master = None for m in masters.values(): for s in m: if s.name == 'SEER': seer_master = m break if seer_master: break if seer_master: mdomain = tbparams[seer_master].get('domain', '.example.com') mproject = tbparams[seer_master].get('project', 'project') muser = tbparams[seer_master].get('user', 'root') smbshare = tbparams[seer_master].get('smbshare', 'USERS') else: mdomain = '.example.com' mproject = 'project' muser = 'root' smbshare = 'USERS' dproject = tbparams[dt].get('project', 'project') ddomain = tbparams[dt].get('domain', '.example.com') if (st in masters and dt not in masters) or \ ( st not in masters and dt in masters ): active = ("%s" % (st in masters)) else: active = ("%s" % (st > dt)) ifaces = [ ] for sub, attrs in iface_desc: inf = topdl.Interface( name="inf%03d" % len(ifaces), substrate=sub, attribute=[ topdl.Attribute( attribute=n, value = v) for n, v in attrs ] ) ifaces.append(inf) if conn_type == "ssh": try: aid = tbparams[st]['allocID']['fedid'] except: self.log.debug("[new_portal_node] Can't get alloc id for %s?" \ % st) aid = None info = { "type" : conn_type, "portal": myname, 'fedAttr': [ { 'attribute': 'masterdomain', 'value': mdomain}, { 'attribute': 'masterexperiment', 'value': "%s/%s" % (mproject, eid)}, { 'attribute': 'active', 'value': active}, # Move to SMB service description { 'attribute': 'masteruser', 'value': muser}, { 'attribute': 'smbshare', 'value': smbshare}, ], 'parameter': [ { 'name': 'peer', 'key': 'fedid:%s/%s' % (expid, myname), 'store': self.store_url, 'type': 'output', }, { 'name': 'ssh_port', 'key': 'fedid:%s/%s-port' % (expid, myname), 'store': self.store_url, 'type': 'output', }, { 'name': 'peer', 'key': 'fedid:%s/%s' % (expid, desthost), 'store': self.store_url, 'type': 'input', }, { 'name': 'ssh_port', 'key': 'fedid:%s/%s-port' % (expid, desthost), 'store': self.store_url, 'type': 'input', }, ] } # Give this allocation the rights to access the key of the # peers if aid: for h in (myname, desthost): self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h)) self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \ (expid, h)) else: self.log.error("No aid for %s in new_portal_node" % st) else: info = None return (topdl.Computer( name=myname, attribute=[ topdl.Attribute(attribute=n,value=v) for n, v in (\ ('portal', 'true'), ('portal_type', portal_type), ('destination_testbed', dt), ) ], interface=ifaces, ), info) def new_portal_substrate(self, st, dt, eid, tbparams, expid): ddomain = tbparams[dt].get('domain', ".example.com") dproject = tbparams[dt].get('project', 'project') tsubstrate = \ topdl.Substrate(name='%s-%s' % (st, dt), attribute= [ topdl.Attribute( attribute='portal', value='true') ] ) segment_element = topdl.Segment( id= tbparams[dt]['allocID'], type='emulab', uri = self.tbmap.get(testbed_base(dt), None), interface=[ topdl.Interface( substrate=tsubstrate.name), ], attribute = [ topdl.Attribute(attribute=n, value=v) for n, v in (\ ('domain', ddomain), ('experiment', "%s/%s" % \ (dproject, eid)),) ], ) return (tsubstrate, segment_element) def new_direct_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid, tb_name): if sub.capacity is None: raise service_error(service_error.internal, "Cannot direct split substrate w/o capacity") segs = [ ] name = join_testbed(tb_name, "%d" % idx) substr = topdl.Substrate(name=name, capacity=sub.capacity.clone(), attribute=[ topdl.Attribute(attribute=n, value=v) for n, v, in (\ ('vlan', 'unassigned%d' % idx),)]) store_key = 'fedid:%s/vlan%d' % (expid, idx) for tb in tbs.keys(): seg = topdl.Segment( id = tbparams[tb]['allocID'], type='emulab', uri = self.tbmap.get(testbed_base(tb), None), interface=[ topdl.Interface( substrate=substr.name), ], attribute=[ topdl.Attribute( attribute='%s_endpoint' % tb_name, value=tbparams[tb][tb_name]), ] ) vlan_key = "%s_vlans" % tb_name if vlan_key in tbparams[tb]: seg.set_attribute(vlan_key, tbparams[tb][vlan_key]) segs.append(seg) # Give this allocation the rights to access the key of the # vlan_id try: aid = tbparams[tb]['allocID']['fedid'] self.auth.set_attribute(aid, store_key) except: self.log.debug("[new_direct_topo] Can't get alloc id for %s?"\ % tb) connInfo[name] = [ { 'type': 'transit', 'parameter': [ { 'name': 'vlan_id', 'key': store_key, 'store': self.store_url, 'type': 'output' } ] } ] topo[name] = \ topdl.Topology(substrates=[substr], elements=segs, attribute=[ topdl.Attribute(attribute="transit", value='true'), topdl.Attribute(attribute="dynamic", value='true'), topdl.Attribute(attribute="testbed", value=tb_name), topdl.Attribute(attribute="store_keys", value=store_key), ] ) def create_direct_substrate(self, sub, topo, tbs, tbparams, masters, eid, connInfo, expid=None, tb_name=None): """ Create connection information that tells which nodes are to be connected to direct transits, and create an additional topology with just the interconnected segments and a substrate. """ def get_substrate_from_topo(name, t): for s in t.substrates: if s.name == name: return s else: return None seer_master = None for m in masters.values(): for s in m: if s.name == 'SEER': seer_master = m break if seer_master: break if seer_master: mdomain = tbparams[seer_master].get('domain', '.example.com') mproject = tbparams[seer_master].get('project', 'project') else: mdomain = '.example.com' mproject = 'project' # dn is the number of previously created direct nets on this direct # testbed. This routine creates a net numbered by dn dn = len([x for x in topo.keys() if x.startswith(tb_name)]) # Set the attributes in the copies that will allow setup of direct # connections. for tb in tbs.keys(): s = get_substrate_from_topo(sub.name, topo[tb]) if s: if not connInfo.has_key(tb): connInfo[tb] = [ ] try: aid = tbparams[tb]['allocID']['fedid'] except: self.log.debug("[create_direct_substrate] " + "Can't get alloc id for %s?" %tb) aid = None # This may need another look, but only a service gateway will # look at the active parameter, and these are only inserted to # connect to a master. active = "%s" % ( tb in masters) info = { 'type': 'transit', 'member': [ { 'element': i.element.name, 'interface': i.name } for i in s.interfaces \ if isinstance(i.element, topdl.Computer) ], 'fedAttr': [ { 'attribute': 'masterdomain', 'value': mdomain}, { 'attribute': 'masterexperiment', 'value': "%s/%s" % (mproject, eid)}, { 'attribute': 'active', 'value': active}, ], 'parameter': [ { 'name': 'vlan_id', 'key': 'fedid:%s/vlan%d' % (expid, dn), 'store': self.store_url, 'type': 'input', } ] } if tbs.has_key(tb): info['peer'] = tbs[tb] connInfo[tb].append(info) # Give this allocation the rights to access the key of the # vlan_id if aid: self.auth.set_attribute(aid, 'fedid:%s/vlan%d' % (expid, dn)) else: raise service_error(service_error.internal, "No substrate %s in testbed %s" % (sub.name, tb)) self.new_direct_topo(dn, sub, topo, tbs, tbparams, connInfo, expid, tb_name) def insert_internet_portals(self, sub, topo, tbs, tbparams, masters, eid, segment_substrate, portals, connInfo, expid): # More than one testbed is on this substrate. Insert # some portals into the subtopologies. st == source testbed, # dt == destination testbed. for st in tbs.keys(): if not segment_substrate.has_key(st): segment_substrate[st] = { } if not portals.has_key(st): portals[st] = { } if not connInfo.has_key(st): connInfo[st] = [ ] for dt in [ t for t in tbs.keys() if t != st]: sproject = tbparams[st].get('project', 'project') dproject = tbparams[dt].get('project', 'project') sdomain = tbparams[st].get('domain', ".example.com") ddomain = tbparams[dt].get('domain', ".example.com") aid = tbparams[dt]['allocID']['fedid'] seer_master = None for m in masters.values(): for s in m: if s.name == 'SEER': seer_master = m break if seer_master: break if seer_master: mdomain = tbparams[seer_master].get('domain', '.example.com') mproject = tbparams[seer_master].get('project', 'project') muser = tbparams[seer_master].get('user', 'root') smbshare = tbparams[seer_master].get('smbshare', 'USERS') else: mdomain = '.example.com' mproject = 'project' muser = 'root' smbshare = 'USERS' if (st in masters and dt not in masters) or \ (st not in masters and dt in masters): active = ("%s" % (st in masters)) else: active = ("%s" %(st > dt)) if not segment_substrate[st].has_key(dt): # Put a substrate and a segment for the connected # testbed in there. tsubstrate, segment_element = \ self.new_portal_substrate(st, dt, eid, tbparams, expid) segment_substrate[st][dt] = tsubstrate topo[st].substrates.append(tsubstrate) topo[st].elements.append(segment_element) new_portal = False if testbed_suffix(dt): dname = "-".join(split_testbed(dt)) else: dname = dt if testbed_suffix(st): sname = "-".join(split_testbed(st)) else: sname = st if portals[st].has_key(dt): # There's a portal set up to go to this destination. # See if there's room to multiplex this connection on # it. If so, add an interface to the portal; if not, # set up to add a portal below. # [This little festival of braces is just a pop of the # last element in the list of portals between st and # dt.] portal = portals[st][dt][-1] mux = len([ i for i in portal.interface \ if not i.get_attribute('portal')]) if mux == self.muxmax: new_portal = True portal_type = "experiment" myname = "%stunnel%d" % (dname.lower(), len(portals[st][dt])) desthost = "%stunnel%d" % (sname.lower(), len(portals[st][dt])) else: new_i = topdl.Interface( substrate=sub.name, attribute=[ topdl.Attribute( attribute='ip4_address', value=tbs[dt] ) ]) portal.interface.append(new_i) else: # First connection to this testbed, make an empty list # and set up to add the new portal below new_portal = True portals[st][dt] = [ ] myname = "%stunnel%d" % (dname.lower(), len(portals[st][dt])) desthost = "%stunnel%d" % (sname.lower(), len(portals[st][dt])) if dt in masters or st in masters: portal_type = "both" else: portal_type = "experiment" if new_portal: infs = ( (segment_substrate[st][dt].name, (('portal', 'true'),)), (sub.name, (('ip4_address', tbs[dt]),)) ) portal, info = self.new_portal_node(st, dt, tbparams, masters, eid, myname, desthost, portal_type, infs, conn_type="ssh", conn_attrs=[], expid=expid) topo[st].elements.append(portal) portals[st][dt].append(portal) connInfo[st].append(info) def add_control_portal(self, st, dt, masters, eid, topo, tbparams, connInfo, expid): # Add to the master testbed tsubstrate, segment_element = \ self.new_portal_substrate(st, dt, eid, tbparams, expid) myname = "%stunnel" % dt desthost = "%stunnel" % st portal, info = self.new_portal_node(st, dt, tbparams, masters, eid, myname, desthost, "control", ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh", conn_attrs=[], expid=expid) topo[st].substrates.append(tsubstrate) topo[st].elements.append(segment_element) topo[st].elements.append(portal) if not connInfo.has_key(st): connInfo[st] = [ ] connInfo[st].append(info) def new_direct_portal(self, st, dt, masters, eid, myip, dip, idx, substrate, tbparams, expid, tb_name): # Add to the master testbed if testbed_suffix(dt): myname = "%stunnel" % "-".join(split_testbed(dt)) else: myname = "%stunnel" % dt desthost = "%s" % ip_addr(dip) portal, info = self.new_portal_node(st, dt, tbparams, masters, eid, myname, desthost, "control", ((substrate.name,( ('portal','true'), ('ip4_address', "%s" % ip_addr(myip)),)),), conn_type="transit", conn_attrs=[], expid=expid) return portal def add_portals(self, top, topo, eid, masters, tbparams, ip_allocator, connInfo, expid): """ For each substrate in the main topology, find those that have nodes on more than one testbed. Insert portal nodes into the copies of those substrates on the sub topologies. """ segment_substrate = { } portals = { } for s in top.substrates: # tbs will contain an ip address on this subsrate that is in # each testbed. tbs = { } for i in s.interfaces: e = i.element tb = e.get_attribute('testbed') if tb and tb not in tbs: tbs[tb]= i.get_attribute('ip4_address') if len(tbs) < 2: continue base_tbs = set([testbed_base(t) for t in tbs]) # DRAGON will not create multi-site vlans yet, so we don't do multi # connection direct transits yet. if len(tbs) == 2 : # NB: the else if on the for loop - if none of the direct # transits is applicable, use the internet. for d in self.direct_transit: if all([tbparams[x].has_key(d) for x in tbs]): self.create_direct_substrate(s, topo, tbs, tbparams, masters, eid, connInfo, expid, d) break else: self.insert_internet_portals(s, topo, tbs, tbparams, masters, eid, segment_substrate, portals, connInfo, expid) else: self.insert_internet_portals(s, topo, tbs, tbparams, masters, eid, segment_substrate, portals, connInfo, expid) # Make sure that all the service importers have a control portal back # to the master for each service. for mtb in [ t for t in tbparams if t in masters ]: importers = set([]) for m in masters[mtb]: importers |= set(m.importers) if mtb in importers: importers.discard(mtb) for tb in importers: if tb not in topo: self.log.error("Importer not in experiment: %s" % tb) continue if len([e for e in topo[tb].elements \ if isinstance(e, topdl.Computer) and \ e.get_attribute('destination_testbed') == mtb and \ e.get_attribute('portal') and \ e.get_attribute('portal_type') == 'both']) == 0: for tb_name in self.direct_transit: if tbparams[mtb].has_key(tb_name) \ and tbparams[tb].has_key(tb_name): idx = len([x for x in topo.keys() \ if x.startswith(tb_name)]) dip, leng = ip_allocator.allocate(4) dip += 1 mip = dip+1 csub = topdl.Substrate( name="%s-control-%s" % (tb_name, tb), capacity=topdl.Capacity(100000.0, 'max'), attribute=[ topdl.Attribute( attribute='portal', value='true' ) ] ) seg = topdl.Segment( id= tbparams[mtb]['allocID'], type='emulab', uri = self.tbmap.get(testbed_base(mtb), None), interface=[ topdl.Interface( substrate=csub.name), ], attribute = [ topdl.Attribute(attribute=n, value=v) for n, v in (\ ('domain', tbparams[mtb].get('domain', ".example.com")), ('experiment', "%s/%s" % \ (tbparams[mtb].get( 'project', 'project'), eid)),) ], ) portal = self.new_direct_portal(tb, mtb, masters, eid, dip, mip, idx, csub, tbparams, expid, tb_name) topo[tb].substrates.append(csub) topo[tb].elements.append(portal) topo[tb].elements.append(seg) mcsub = csub.clone() seg = topdl.Segment( id= tbparams[tb]['allocID'], type='emulab', uri = self.tbmap.get(testbed_base(tb), None), interface=[ topdl.Interface( substrate=csub.name), ], attribute = [ topdl.Attribute(attribute=n, value=v) for n, v in (\ ('domain', tbparams[tb].get('domain', ".example.com")), ('experiment', "%s/%s" % \ (tbparams[tb].get('project', 'project'), eid)),) ], ) portal = self.new_direct_portal(mtb, tb, masters, eid, mip, dip, idx, mcsub, tbparams, expid, tb_name) topo[mtb].substrates.append(mcsub) topo[mtb].elements.append(portal) topo[mtb].elements.append(seg) for t in (mtb, tb): topo[t].incorporate_elements() self.create_direct_substrate(csub, topo, {tb: ip_addr(mip), mtb: ip_addr(dip)}, tbparams, masters, eid, connInfo, expid, tb_name) break # This matches with the for tb_name in self.direct_transit else: self.add_control_portal(mtb, tb, masters, eid, topo, tbparams, connInfo, expid) self.add_control_portal(tb, mtb, masters, eid, topo, tbparams, connInfo, expid) # Connect the portal nodes into the topologies and clear out # substrates that are not in the topologies for tb in tbparams: topo[tb].incorporate_elements() topo[tb].substrates = \ [s for s in topo[tb].substrates \ if len(s.interfaces) >0]