Changeset e19b75c for fedd/federation


Ignore:
Timestamp:
Sep 6, 2009 3:23:17 PM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
Children:
e794984
Parents:
fd556d1
Message:

remove old code

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    rfd556d1 re19b75c  
    345345        # Dispatch tables
    346346        self.soap_services = {\
    347                 'Create': soap_handler('Create', self.new_create_experiment),
     347                'Create': soap_handler('Create', self.create_experiment),
    348348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
    349349                'Vis': soap_handler('Vis', self.get_vis),
     
    351351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
    352352                'Terminate': soap_handler('Terminate',
    353                     self.new_terminate_experiment),
     353                    self.terminate_experiment),
    354354        }
    355355
    356356        self.xmlrpc_services = {\
    357                 'Create': xmlrpc_handler('Create', self.new_create_experiment),
     357                'Create': xmlrpc_handler('Create', self.create_experiment),
    358358                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
    359359                'Vis': xmlrpc_handler('Vis', self.get_vis),
     
    361361                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
    362362                'Terminate': xmlrpc_handler('Terminate',
    363                     self.new_terminate_experiment),
     363                    self.terminate_experiment),
    364364        }
    365365
     
    551551                    "open %s: %s" % (file, e))
    552552        f.close()
    553 
    554     class emulab_segment:
    555         def __init__(self, log=None, keyfile=None, debug=False):
    556             self.log = log or logging.getLogger(\
    557                     'fedd.experiment_control.emulab_segment')
    558             self.ssh_privkey_file = keyfile
    559             self.debug = debug
    560             self.ssh_exec="/usr/bin/ssh"
    561             self.scp_exec = "/usr/bin/scp"
    562             self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
    563 
    564         def scp_file(self, file, user, host, dest=""):
    565             """
    566             scp a file to the remote host.  If debug is set the action is only
    567             logged.
    568             """
    569 
    570             scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes',
    571                     '-o', 'StrictHostKeyChecking yes', '-i',
    572                     self.ssh_privkey_file, file,
    573                     "%s@%s:%s" % (user, host, dest)]
    574             rv = 0
    575 
    576             try:
    577                 dnull = open("/dev/null", "w")
    578             except IOError:
    579                 self.log.debug("[ssh_file]: failed to open " + \
    580                         "/dev/null for redirect")
    581                 dnull = Null
    582 
    583             self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
    584             if not self.debug:
    585                 rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
    586                         close_fds=True)
    587 
    588             return rv == 0
    589 
    590         def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
    591             """
    592             Run a remote command on host as user.  If debug is set, the action
    593             is only logged.  Commands are run without stdin, to avoid stray
    594             SIGTTINs.
    595             """
    596             sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
    597                     "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
    598                     (self.ssh_exec, self.ssh_privkey_file,
    599                             user, host, cmd)
    600 
    601             try:
    602                 dnull = open("/dev/null", "w")
    603             except IOError:
    604                 self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
    605                         "for redirect")
    606                 dnull = Null
    607 
    608             self.log.debug("[ssh_cmd]: %s" % sh_str)
    609             if not self.debug:
    610                 if dnull:
    611                     sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
    612                             close_fds=True)
    613                 else:
    614                     sub = Popen(sh_str, shell=True,
    615                             close_fds=True)
    616                 if timeout:
    617                     i = 0
    618                     rv = sub.poll()
    619                     while i < timeout:
    620                         if rv is not None: break
    621                         else:
    622                             time.sleep(1)
    623                             rv = sub.poll()
    624                             i += 1
    625                     else:
    626                         self.log.debug("Process exceeded runtime: %s" % sh_str)
    627                         os.kill(sub.pid, signal.SIGKILL)
    628                         raise self.ssh_cmd_timeout();
    629                     return rv == 0
    630                 else:
    631                     return sub.wait() == 0
    632             else:
    633                 if timeout == 0:
    634                     self.log.debug("debug timeout raised on %s " % sh_str)
    635                     raise self.ssh_cmd_timeout()
    636                 else:
    637                     return True
    638 
    639     class start_segment(emulab_segment):
    640         def __init__(self, log=None, keyfile=None, debug=False):
    641             experiment_control_local.emulab_segment.__init__(self,
    642                     log=log, keyfile=keyfile, debug=debug)
    643 
    644         def create_config_tree(self, src_dir, dest_dir, script):
    645             """
    646             Append commands to script that will create the directory hierarchy
    647             on the remote federant.
    648             """
    649 
    650             if os.path.isdir(src_dir):
    651                 print >>script, "mkdir -p %s" % dest_dir
    652                 print >>script, "chmod 770 %s" % dest_dir
    653 
    654                 for f in os.listdir(src_dir):
    655                     if os.path.isdir(f):
    656                         self.create_config_tree("%s/%s" % (src_dir, f),
    657                                 "%s/%s" % (dest_dir, f), script)
    658             else:
    659                 self.log.debug("[create_config_tree]: Not a directory: %s" \
    660                         % src_dir)
    661 
    662         def ship_configs(self, host, user, src_dir, dest_dir):
    663             """
    664             Copy federant-specific configuration files to the federant.
    665             """
    666             for f in os.listdir(src_dir):
    667                 if os.path.isdir(f):
    668                     if not self.ship_configs(host, user, "%s/%s" % (src_dir, f),
    669                             "%s/%s" % (dest_dir, f)):
    670                         return False
    671                 else:
    672                     if not self.scp_file("%s/%s" % (src_dir, f),
    673                             user, host, dest_dir):
    674                         return False
    675             return True
    676 
    677         def get_state(self, user, host, tb, pid, eid):
    678             # command to test experiment state
    679             expinfo_exec = "/usr/testbed/bin/expinfo" 
    680             # Regular expressions to parse the expinfo response
    681             state_re = re.compile("State:\s+(\w+)")
    682             no_exp_re = re.compile("^No\s+such\s+experiment")
    683             swapping_re = re.compile("^No\s+information\s+available.")
    684             state = None    # Experiment state parsed from expinfo
    685             # The expinfo ssh command.  Note the identity restriction to use
    686             # only the identity provided in the pubkey given.
    687             cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o',
    688                     'StrictHostKeyChecking yes', '-i',
    689                     self.ssh_privkey_file, "%s@%s" % (user, host),
    690                     expinfo_exec, pid, eid]
    691 
    692             dev_null = None
    693             try:
    694                 dev_null = open("/dev/null", "a")
    695             except IOError, e:
    696                 self.log.error("[get_state]: can't open /dev/null: %s" %e)
    697 
    698             if self.debug:
    699                 state = 'swapped'
    700                 rv = 0
    701             else:
    702                 status = Popen(cmd, stdout=PIPE, stderr=dev_null,
    703                         close_fds=True)
    704                 for line in status.stdout:
    705                     m = state_re.match(line)
    706                     if m: state = m.group(1)
    707                     else:
    708                         for reg, st in ((no_exp_re, "none"),
    709                                 (swapping_re, "swapping")):
    710                             m = reg.match(line)
    711                             if m: state = st
    712                 rv = status.wait()
    713 
    714             # If the experiment is not present the subcommand returns a
    715             # non-zero return value.  If we successfully parsed a "none"
    716             # outcome, ignore the return code.
    717             if rv != 0 and state != 'none':
    718                 raise service_error(service_error.internal,
    719                         "Cannot get status of segment %s:%s/%s" % \
    720                                 (tb, pid, eid))
    721             elif state not in ('active', 'swapped', 'swapping', 'none'):
    722                 raise service_error(service_error.internal,
    723                         "Cannot get status of segment %s:%s/%s" % \
    724                                 (tb, pid, eid))
    725             else: return state
    726 
    727 
    728         def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
    729             """
    730             Start a sub-experiment on a federant.
    731 
    732             Get the current state, modify or create as appropriate, ship data
    733             and configs and start the experiment.  There are small ordering
    734             differences based on the initial state of the sub-experiment.
    735             """
    736             # ops node in the federant
    737             host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
    738             user = tbparams[tb]['user']     # federant user
    739             pid = tbparams[tb]['project']   # federant project
    740             # XXX
    741             base_confs = ( "hosts",)
    742             tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
    743             # Configuration directories on the remote machine
    744             proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
    745             tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
    746             rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
    747 
    748             state = self.get_state(user, host, tb, pid, eid)
    749 
    750             self.log.debug("[start_segment]: %s: %s" % (tb, state))
    751             self.log.info("[start_segment]:transferring experiment to %s" % tb)
    752 
    753             if not self.scp_file("%s/%s/%s" % \
    754                     (tmpdir, tb, tclfile), user, host):
    755                 return False
    756            
    757             if state == 'none':
    758                 # Create a null copy of the experiment so that we capture any
    759                 # logs there if the modify fails.  Emulab software discards the
    760                 # logs from a failed startexp
    761                 if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
    762                     return False
    763                 self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
    764                 timedout = False
    765                 try:
    766                     if not self.ssh_cmd(user, host,
    767                             ("/usr/testbed/bin/startexp -i -f -w -p %s " +
    768                             "-e %s null.tcl") % (pid, eid), "startexp",
    769                             timeout=60 * 10):
    770                         return False
    771                 except self.ssh_cmd_timeout:
    772                     timedout = True
    773 
    774                 if timedout:
    775                     state = self.get_state(user, host, tb, pid, eid)
    776                     if state != "swapped":
    777                         return False
    778 
    779            
    780             # Open up a temporary file to contain a script for setting up the
    781             # filespace for the new experiment.
    782             self.log.info("[start_segment]: creating script file")
    783             try:
    784                 sf, scriptname = tempfile.mkstemp()
    785                 scriptfile = os.fdopen(sf, 'w')
    786             except IOError:
    787                 return False
    788 
    789             scriptbase = os.path.basename(scriptname)
    790 
    791             # Script the filesystem changes
    792             print >>scriptfile, "/bin/rm -rf %s" % proj_dir
    793             # Clear and create the tarfiles and rpm directories
    794             for d in (tarfiles_dir, rpms_dir):
    795                 print >>scriptfile, "/bin/rm -rf %s/*" % d
    796                 print >>scriptfile, "mkdir -p %s" % d
    797             print >>scriptfile, 'mkdir -p %s' % proj_dir
    798             self.create_config_tree("%s/%s" % (tmpdir, tb),
    799                     proj_dir, scriptfile)
    800             if os.path.isdir("%s/tarfiles" % tmpdir):
    801                 self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
    802                         scriptfile)
    803             if os.path.isdir("%s/rpms" % tmpdir):
    804                 self.create_config_tree("%s/rpms" % tmpdir, rpms_dir,
    805                         scriptfile)
    806             print >>scriptfile, "rm -f %s" % scriptbase
    807             scriptfile.close()
    808 
    809             # Move the script to the remote machine
    810             # XXX: could collide tempfile names on the remote host
    811             if self.scp_file(scriptname, user, host, scriptbase):
    812                 os.remove(scriptname)
    813             else:
    814                 return False
    815 
    816             # Execute the script (and the script's last line deletes it)
    817             if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
    818                 return False
    819 
    820             for f in base_confs:
    821                 if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
    822                         "%s/%s" % (proj_dir, f)):
    823                     return False
    824             if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
    825                     proj_dir):
    826                 return False
    827             if os.path.isdir("%s/tarfiles" % tmpdir):
    828                 if not self.ship_configs(host, user,
    829                         "%s/tarfiles" % tmpdir, tarfiles_dir):
    830                     return False
    831             if os.path.isdir("%s/rpms" % tmpdir):
    832                 if not self.ship_configs(host, user,
    833                         "%s/rpms" % tmpdir, tarfiles_dir):
    834                     return False
    835             # Stage the new configuration (active experiments will stay swapped
    836             # in now)
    837             self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
    838             try:
    839                 if not self.ssh_cmd(user, host,
    840                         "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
    841                                 (pid, eid, tclfile),
    842                         "modexp", timeout= 60 * 10):
    843                     return False
    844             except self.ssh_cmd_timeout:
    845                 self.log.error("Modify command failed to complete in time")
    846                 # There's really no way to see if this succeeded or failed, so
    847                 # if it hangs, assume the worst.
    848                 return False
    849             # Active experiments are still swapped, this swaps the others in.
    850             if state != 'active':
    851                 self.log.info("[start_segment]: Swapping %s in on %s" % \
    852                         (eid, tb))
    853                 timedout = False
    854                 try:
    855                     if not self.ssh_cmd(user, host,
    856                             "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
    857                             "swapexp", timeout=10*60):
    858                         return False
    859                 except self.ssh_cmd_timeout:
    860                     timedout = True
    861                
    862                 # If the command was terminated, but completed successfully,
    863                 # report success.
    864                 if timedout:
    865                     self.log.debug("[start_segment]: swapin timed out " +\
    866                             "checking state")
    867                     state = self.get_state(user, host, tb, pid, eid)
    868                     self.log.debug("[start_segment]: state is %s" % state)
    869                     return state == 'active'
    870             # Everything has gone OK.
    871             return True
    872 
    873     class stop_segment(emulab_segment):
    874         def __init__(self, log=None, keyfile=None, debug=False):
    875             experiment_control_local.emulab_segment.__init__(self,
    876                     log=log, keyfile=keyfile, debug=debug)
    877 
    878         def __call__(self, tb, eid, tbparams):
    879             """
    880             Stop a sub experiment by calling swapexp on the federant
    881             """
    882             user = tbparams[tb]['user']
    883             host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
    884             pid = tbparams[tb]['project']
    885 
    886             self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
    887             rv = False
    888             try:
    889                 # Clean out tar files: we've gone over quota in the past
    890                 self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
    891                 self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
    892                         (pid, eid))
    893                 rv = self.ssh_cmd(user, host,
    894                         "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
    895             except self.ssh_cmd_timeout:
    896                 rv = False
    897             return rv
    898 
    899553       
    900554    def generate_ssh_keys(self, dest, type="rsa" ):
     
    1267921        else:
    1268922            raise service_error(service_error.protocol, "Bad splitter response")
    1269        
    1270     class current_testbed:
    1271         """
    1272         Object for collecting the current testbed description.  The testbed
    1273         description is saved to a file with the local testbed variables
    1274         subsittuted line by line.
    1275         """
    1276         def __init__(self, eid, tmpdir, fedkit, gatewaykit):
    1277             def tar_list_to_string(tl):
    1278                 if tl is None: return None
    1279 
    1280                 rv = ""
    1281                 for t in tl:
    1282                     rv += " %s PROJDIR/tarfiles/EID/%s" % \
    1283                             (t[0], os.path.basename(t[1]))
    1284                 return rv
    1285 
    1286 
    1287             self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
    1288             self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
    1289             self.current_testbed = None
    1290             self.testbed_file = None
    1291 
    1292             self.def_expstart = \
    1293                     "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
    1294             self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
    1295             self.def_gwstart = \
    1296                     "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
    1297             self.def_mgwstart = \
    1298                     "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
    1299             self.def_gwimage = "FBSD61-TUNNEL2";
    1300             self.def_gwtype = "pc";
    1301             self.def_mgwcmd = '# '
    1302             self.def_mgwcmdparams = ''
    1303             self.def_gwcmd = '# '
    1304             self.def_gwcmdparams = ''
    1305 
    1306             self.eid = eid
    1307             self.tmpdir = tmpdir
    1308             # Convert fedkit and gateway kit (which are lists of tuples) into a
    1309             # substituition string.
    1310             self.fedkit = tar_list_to_string(fedkit)
    1311             self.gatewaykit = tar_list_to_string(gatewaykit)
    1312 
    1313         def __call__(self, line, master, allocated, tbparams):
    1314             # Capture testbed topology descriptions
    1315             if self.current_testbed == None:
    1316                 m = self.begin_testbed.match(line)
    1317                 if m != None:
    1318                     self.current_testbed = m.group(1)
    1319                     if self.current_testbed == None:
    1320                         raise service_error(service_error.req,
    1321                                 "Bad request format (unnamed testbed)")
    1322                     allocated[self.current_testbed] = \
    1323                             allocated.get(self.current_testbed,0) + 1
    1324                     tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
    1325                     if not os.path.exists(tb_dir):
    1326                         try:
    1327                             os.mkdir(tb_dir)
    1328                         except IOError:
    1329                             raise service_error(service_error.internal,
    1330                                     "Cannot create %s" % tb_dir)
    1331                     try:
    1332                         self.testbed_file = open("%s/%s.%s.tcl" %
    1333                                 (tb_dir, self.eid, self.current_testbed), 'w')
    1334                     except IOError:
    1335                         self.testbed_file = None
    1336                     return True
    1337                 else: return False
    1338             else:
    1339                 m = self.end_testbed.match(line)
    1340                 if m != None:
    1341                     if m.group(1) != self.current_testbed:
    1342                         raise service_error(service_error.internal,
    1343                                 "Mismatched testbed markers!?")
    1344                     if self.testbed_file != None:
    1345                         self.testbed_file.close()
    1346                         self.testbed_file = None
    1347                     self.current_testbed = None
    1348                 elif self.testbed_file:
    1349                     # Substitute variables and put the line into the local
    1350                     # testbed file.
    1351                     gwtype = tbparams[self.current_testbed].get(\
    1352                             'connectortype', self.def_gwtype)
    1353                     gwimage = tbparams[self.current_testbed].get(\
    1354                             'connectorimage', self.def_gwimage)
    1355                     mgwstart = tbparams[self.current_testbed].get(\
    1356                             'masterconnectorstartcmd', self.def_mgwstart)
    1357                     mexpstart = tbparams[self.current_testbed].get(\
    1358                             'masternodestartcmd', self.def_mexpstart)
    1359                     gwstart = tbparams[self.current_testbed].get(\
    1360                             'slaveconnectorstartcmd', self.def_gwstart)
    1361                     expstart = tbparams[self.current_testbed].get(\
    1362                             'slavenodestartcmd', self.def_expstart)
    1363                     project = tbparams[self.current_testbed].get('project')
    1364                     gwcmd = tbparams[self.current_testbed].get(\
    1365                             'slaveconnectorcmd', self.def_gwcmd)
    1366                     gwcmdparams = tbparams[self.current_testbed].get(\
    1367                             'slaveconnectorcmdparams', self.def_gwcmdparams)
    1368                     mgwcmd = tbparams[self.current_testbed].get(\
    1369                             'masterconnectorcmd', self.def_gwcmd)
    1370                     mgwcmdparams = tbparams[self.current_testbed].get(\
    1371                             'masterconnectorcmdparams', self.def_gwcmdparams)
    1372                     line = re.sub("GWTYPE", gwtype, line)
    1373                     line = re.sub("GWIMAGE", gwimage, line)
    1374                     if self.current_testbed == master:
    1375                         line = re.sub("GWSTART", mgwstart, line)
    1376                         line = re.sub("EXPSTART", mexpstart, line)
    1377                         # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
    1378                         line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
    1379                         line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
    1380                     else:
    1381                         line = re.sub("GWSTART", gwstart, line)
    1382                         line = re.sub("EXPSTART", expstart, line)
    1383                         # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
    1384                         line = re.sub("GWCMDPARAMS", gwcmdparams, line)
    1385                         line = re.sub("(#\s*)?GWCMD", gwcmd, line)
    1386                     #These expansions contain EID and PROJDIR.  NB these are
    1387                     # local fedkit and gatewaykit, which are strings.
    1388                     if self.fedkit:
    1389                         line = re.sub("FEDKIT", self.fedkit, line)
    1390                     if self.gatewaykit:
    1391                         line = re.sub("GATEWAYKIT", self.gatewaykit, line)
    1392                     line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
    1393                     line = re.sub("PROJDIR", "/proj/%s/" % project, line)
    1394                     line = re.sub("EID", self.eid, line)
    1395                     line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
    1396                             (project, self.eid), line)
    1397                     print >>self.testbed_file, line
    1398                 return True
    1399 
    1400     class allbeds:
    1401         """
    1402         Process the Allbeds section.  Get access to each federant and save the
    1403         parameters in tbparams
    1404         """
    1405         def __init__(self, get_access):
    1406             self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
    1407             self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
    1408             self.in_allbeds = False
    1409             self.get_access = get_access
    1410 
    1411         def __call__(self, line, user, tbparams, master, export_project,
    1412                 access_user):
    1413             # Testbed access parameters
    1414             if not self.in_allbeds:
    1415                 if self.begin_allbeds.match(line):
    1416                     self.in_allbeds = True
    1417                     return True
    1418                 else:
    1419                     return False
    1420             else:
    1421                 if self.end_allbeds.match(line):
    1422                     self.in_allbeds = False
    1423                 else:
    1424                     nodes = line.split('|')
    1425                     tb = nodes.pop(0)
    1426                     self.get_access(tb, nodes, user, tbparams, master,
    1427                             export_project, access_user)
    1428                 return True
    1429 
    1430     class gateways:
    1431         def __init__(self, eid, master, tmpdir, gw_pubkey,
    1432                 gw_secretkey, copy_file, fedkit):
    1433             self.begin_gateways = \
    1434                     re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
    1435             self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
    1436             self.current_gateways = None
    1437             self.control_gateway = None
    1438             self.active_end = { }
    1439 
    1440             self.eid = eid
    1441             self.master = master
    1442             self.tmpdir = tmpdir
    1443             self.gw_pubkey_base = gw_pubkey
    1444             self.gw_secretkey_base = gw_secretkey
    1445 
    1446             self.copy_file = copy_file
    1447             self.fedkit = fedkit
    1448 
    1449 
    1450         def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
    1451                 active_end, tbparams, dtb, myname, desthost, type):
    1452             """
    1453             Produce a gateway configuration file from a gateways line.
    1454             """
    1455 
    1456             sproject = tbparams[gw].get('project', 'project')
    1457             dproject = tbparams[dtb].get('project', 'project')
    1458             sdomain = ".%s.%s%s" % (eid, sproject,
    1459                     tbparams[gw].get('domain', ".example.com"))
    1460             ddomain = ".%s.%s%s" % (eid, dproject,
    1461                     tbparams[dtb].get('domain', ".example.com"))
    1462             boss = tbparams[master].get('boss', "boss")
    1463             fs = tbparams[master].get('fs', "fs")
    1464             event_server = "%s%s" % \
    1465                     (tbparams[gw].get('eventserver', "event_server"),
    1466                             tbparams[gw].get('domain', "example.com"))
    1467             remote_event_server = "%s%s" % \
    1468                     (tbparams[dtb].get('eventserver', "event_server"),
    1469                             tbparams[dtb].get('domain', "example.com"))
    1470             seer_control = "%s%s" % \
    1471                     (tbparams[gw].get('control', "control"), sdomain)
    1472             tunnel_iface = tbparams[gw].get("tunnelinterface", None)
    1473 
    1474             if self.fedkit:
    1475                 remote_script_dir = "/usr/local/federation/bin"
    1476                 local_script_dir = "/usr/local/federation/bin"
    1477             else:
    1478                 remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
    1479                 local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
    1480 
    1481             local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
    1482             remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
    1483             tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
    1484 
    1485             conf_file = "%s%s.gw.conf" % (myname, sdomain)
    1486             remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
    1487 
    1488             # translate to lower case so the `hostname` hack for specifying
    1489             # configuration files works.
    1490             conf_file = conf_file.lower();
    1491             remote_conf_file = remote_conf_file.lower();
    1492 
    1493             if dtb == master:
    1494                 active = "false"
    1495             elif gw == master:
    1496                 active = "true"
    1497             elif active_end.has_key('%s-%s' % (dtb, gw)):
    1498                 active = "false"
    1499             else:
    1500                 active_end['%s-%s' % (gw, dtb)] = 1
    1501                 active = "true"
    1502 
    1503             gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
    1504             print >>gwconfig, "Active: %s" % active
    1505             print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
    1506             if tunnel_iface:
    1507                 print >>gwconfig, "Interface: %s" % tunnel_iface
    1508             print >>gwconfig, "BossName: %s" % boss
    1509             print >>gwconfig, "FsName: %s" % fs
    1510             print >>gwconfig, "EventServerName: %s" % event_server
    1511             print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
    1512             print >>gwconfig, "SeerControl: %s" % seer_control
    1513             print >>gwconfig, "Type: %s" % type
    1514             print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
    1515             print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
    1516                     local_script_dir
    1517             print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
    1518             print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
    1519             print >>gwconfig, "RemoteConfigFile: %s/%s" % \
    1520                     (remote_conf_dir, remote_conf_file)
    1521             print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
    1522             print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
    1523             print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
    1524             gwconfig.close()
    1525 
    1526             return active == "true"
    1527 
    1528         def __call__(self, line, allocated, tbparams):
    1529             # Process gateways
    1530             if not self.current_gateways:
    1531                 m = self.begin_gateways.match(line)
    1532                 if m:
    1533                     self.current_gateways = m.group(1)
    1534                     if allocated.has_key(self.current_gateways):
    1535                         # This test should always succeed
    1536                         tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
    1537                         if not os.path.exists(tb_dir):
    1538                             try:
    1539                                 os.mkdir(tb_dir)
    1540                             except IOError:
    1541                                 raise service_error(service_error.internal,
    1542                                         "Cannot create %s" % tb_dir)
    1543                     else:
    1544                         # XXX
    1545                         self.log.error("[gateways]: Ignoring gateways for " + \
    1546                                 "unknown testbed %s" % self.current_gateways)
    1547                         self.current_gateways = None
    1548                     return True
    1549                 else:
    1550                     return False
    1551             else:
    1552                 m = self.end_gateways.match(line)
    1553                 if m :
    1554                     if m.group(1) != self.current_gateways:
    1555                         raise service_error(service_error.internal,
    1556                                 "Mismatched gateway markers!?")
    1557                     if self.control_gateway:
    1558                         try:
    1559                             cc = open("%s/%s/client.conf" %
    1560                                     (self.tmpdir, self.current_gateways), 'w')
    1561                             print >>cc, "ControlGateway: %s" % \
    1562                                     self.control_gateway
    1563                             if tbparams[self.master].has_key('smbshare'):
    1564                                 print >>cc, "SMBSHare: %s" % \
    1565                                         tbparams[self.master]['smbshare']
    1566                             print >>cc, "ProjectUser: %s" % \
    1567                                     tbparams[self.master]['user']
    1568                             print >>cc, "ProjectName: %s" % \
    1569                                     tbparams[self.master]['project']
    1570                             print >>cc, "ExperimentID: %s/%s" % \
    1571                                     ( tbparams[self.master]['project'], \
    1572                                     self.eid )
    1573                             cc.close()
    1574                         except IOError:
    1575                             raise service_error(service_error.internal,
    1576                                     "Error creating client config")
    1577                         # XXX: This seer specific file should disappear
    1578                         try:
    1579                             cc = open("%s/%s/seer.conf" %
    1580                                     (self.tmpdir, self.current_gateways),
    1581                                     'w')
    1582                             if self.current_gateways != self.master:
    1583                                 print >>cc, "ControlNode: %s" % \
    1584                                         self.control_gateway
    1585                             print >>cc, "ExperimentID: %s/%s" % \
    1586                                     ( tbparams[self.master]['project'], \
    1587                                     self.eid )
    1588                             cc.close()
    1589                         except IOError:
    1590                             raise service_error(service_error.internal,
    1591                                     "Error creating seer config")
    1592                     else:
    1593                         debug.error("[gateways]: No control gateway for %s" %\
    1594                                     self.current_gateways)
    1595                     self.current_gateways = None
    1596                 else:
    1597                     dtb, myname, desthost, type = line.split(" ")
    1598 
    1599                     if type == "control" or type == "both":
    1600                         self.control_gateway = "%s.%s.%s%s" % (myname,
    1601                                 self.eid,
    1602                                 tbparams[self.current_gateways]['project'],
    1603                                 tbparams[self.current_gateways]['domain'])
    1604                     try:
    1605                         active = self.gateway_conf_file(self.current_gateways,
    1606                                 self.master, self.eid, self.gw_pubkey_base,
    1607                                 self.gw_secretkey_base,
    1608                                 self.active_end, tbparams, dtb, myname,
    1609                                 desthost, type)
    1610                     except IOError, e:
    1611                         raise service_error(service_error.internal,
    1612                                 "Failed to write config file for %s" % \
    1613                                         self.current_gateway)
    1614            
    1615                     gw_pubkey = "%s/keys/%s" % \
    1616                             (self.tmpdir, self.gw_pubkey_base)
    1617                     gw_secretkey = "%s/keys/%s" % \
    1618                             (self.tmpdir, self.gw_secretkey_base)
    1619 
    1620                     pkfile = "%s/%s/%s" % \
    1621                             ( self.tmpdir, self.current_gateways,
    1622                                     self.gw_pubkey_base)
    1623                     skfile = "%s/%s/%s" % \
    1624                             ( self.tmpdir, self.current_gateways,
    1625                                     self.gw_secretkey_base)
    1626 
    1627                     if not os.path.exists(pkfile):
    1628                         try:
    1629                             self.copy_file(gw_pubkey, pkfile)
    1630                         except IOError:
    1631                             service_error(service_error.internal,
    1632                                     "Failed to copy pubkey file")
    1633 
    1634                     if active and not os.path.exists(skfile):
    1635                         try:
    1636                             self.copy_file(gw_secretkey, skfile)
    1637                         except IOError:
    1638                             service_error(service_error.internal,
    1639                                     "Failed to copy secretkey file")
    1640                 return True
    1641 
    1642     class shunt_to_file:
    1643         """
    1644         Simple class to write data between two regexps to a file.
    1645         """
    1646         def __init__(self, begin, end, filename):
    1647             """
    1648             Begin shunting on a match of begin, stop on end, send data to
    1649             filename.
    1650             """
    1651             self.begin = re.compile(begin)
    1652             self.end = re.compile(end)
    1653             self.in_shunt = False
    1654             self.file = None
    1655             self.filename = filename
    1656 
    1657         def __call__(self, line):
    1658             """
    1659             Call this on each line in the input that may be shunted.
    1660             """
    1661             if not self.in_shunt:
    1662                 if self.begin.match(line):
    1663                     self.in_shunt = True
    1664                     try:
    1665                         self.file = open(self.filename, "w")
    1666                     except:
    1667                         self.file = None
    1668                         raise
    1669                     return True
    1670                 else:
    1671                     return False
    1672             else:
    1673                 if self.end.match(line):
    1674                     if self.file:
    1675                         self.file.close()
    1676                         self.file = None
    1677                     self.in_shunt = False
    1678                 else:
    1679                     if self.file:
    1680                         print >>self.file, line
    1681                 return True
    1682 
    1683     class shunt_to_list:
    1684         """
    1685         Same interface as shunt_to_file.  Data collected in self.list, one list
    1686         element per line.
    1687         """
    1688         def __init__(self, begin, end):
    1689             self.begin = re.compile(begin)
    1690             self.end = re.compile(end)
    1691             self.in_shunt = False
    1692             self.list = [ ]
    1693        
    1694         def __call__(self, line):
    1695             if not self.in_shunt:
    1696                 if self.begin.match(line):
    1697                     self.in_shunt = True
    1698                     return True
    1699                 else:
    1700                     return False
    1701             else:
    1702                 if self.end.match(line):
    1703                     self.in_shunt = False
    1704                 else:
    1705                     self.list.append(line)
    1706                 return True
    1707 
    1708     class shunt_to_string:
    1709         """
    1710         Same interface as shunt_to_file.  Data collected in self.str, all in
    1711         one string.
    1712         """
    1713         def __init__(self, begin, end):
    1714             self.begin = re.compile(begin)
    1715             self.end = re.compile(end)
    1716             self.in_shunt = False
    1717             self.str = ""
    1718        
    1719         def __call__(self, line):
    1720             if not self.in_shunt:
    1721                 if self.begin.match(line):
    1722                     self.in_shunt = True
    1723                     return True
    1724                 else:
    1725                     return False
    1726             else:
    1727                 if self.end.match(line):
    1728                     self.in_shunt = False
    1729                 else:
    1730                     self.str += line
    1731                 return True
    1732 
    1733     def allocate_resources(self, allocated, master, eid, expid, expcert,
    1734             tbparams, tmpdir, alloc_log=None):
    1735         started = { }           # Testbeds where a sub-experiment started
    1736                                 # successfully
    1737 
    1738         # XXX
    1739         fail_soft = False
    1740 
    1741         log = alloc_log or self.log
    1742 
    1743         thread_pool = self.thread_pool(self.nthreads)
    1744         threads = [ ]
    1745 
    1746         for tb in [ k for k in allocated.keys() if k != master]:
    1747             # Create and start a thread to start the segment, and save it to
    1748             # get the return value later
    1749             thread_pool.wait_for_slot()
    1750             t  = self.pooled_thread(\
    1751                     target=self.start_segment(log=log,
    1752                         keyfile=self.ssh_privkey_file, debug=self.debug),
    1753                     args=(tb, eid, tbparams, tmpdir, 0), name=tb,
    1754                     pdata=thread_pool, trace_file=self.trace_file)
    1755             threads.append(t)
    1756             t.start()
    1757 
    1758         # Wait until all finish
    1759         thread_pool.wait_for_all_done()
    1760 
    1761         # If none failed, start the master
    1762         failed = [ t.getName() for t in threads if not t.rv ]
    1763 
    1764         if len(failed) == 0:
    1765             starter = self.start_segment(log=log,
    1766                     keyfile=self.ssh_privkey_file, debug=self.debug)
    1767             if not starter(master, eid, tbparams, tmpdir):
    1768                 failed.append(master)
    1769 
    1770         succeeded = [tb for tb in allocated.keys() if tb not in failed]
    1771         # If one failed clean up, unless fail_soft is set
    1772         if failed:
    1773             if not fail_soft:
    1774                 thread_pool.clear()
    1775                 for tb in succeeded:
    1776                     # Create and start a thread to stop the segment
    1777                     thread_pool.wait_for_slot()
    1778                     t  = self.pooled_thread(\
    1779                             target=self.stop_segment(log=log,
    1780                                 keyfile=self.ssh_privkey_file,
    1781                                 debug=self.debug),
    1782                             args=(tb, eid, tbparams), name=tb,
    1783                             pdata=thread_pool, trace_file=self.trace_file)
    1784                     t.start()
    1785                 # Wait until all finish
    1786                 thread_pool.wait_for_all_done()
    1787 
    1788                 # release the allocations
    1789                 for tb in tbparams.keys():
    1790                     self.release_access(tb, tbparams[tb]['allocID'])
    1791                 # Remove the placeholder
    1792                 self.state_lock.acquire()
    1793                 self.state[eid]['experimentStatus'] = 'failed'
    1794                 if self.state_filename: self.write_state()
    1795                 self.state_lock.release()
    1796 
    1797                 #raise service_error(service_error.federant,
    1798                 #    "Swap in failed on %s" % ",".join(failed))
    1799                 log.error("Swap in failed on %s" % ",".join(failed))
    1800                 return
    1801         else:
    1802             log.info("[start_segment]: Experiment %s active" % eid)
    1803 
    1804         log.debug("[start_experiment]: removing %s" % tmpdir)
    1805 
    1806         # Walk up tmpdir, deleting as we go
    1807         for path, dirs, files in os.walk(tmpdir, topdown=False):
    1808             for f in files:
    1809                 os.remove(os.path.join(path, f))
    1810             for d in dirs:
    1811                 os.rmdir(os.path.join(path, d))
    1812         os.rmdir(tmpdir)
    1813 
    1814         # Insert the experiment into our state and update the disk copy
    1815         self.state_lock.acquire()
    1816         self.state[expid]['experimentStatus'] = 'active'
    1817         self.state[eid] = self.state[expid]
    1818         if self.state_filename: self.write_state()
    1819         self.state_lock.release()
    1820         return
    1821 
    1822     def create_experiment(self, req, fid):
    1823         """
    1824         The external interface to experiment creation called from the
    1825         dispatcher.
    1826 
    1827         Creates a working directory, splits the incoming description using the
    1828         splitter script and parses out the avrious subsections using the
    1829         lcasses above.  Once each sub-experiment is created, use pooled threads
    1830         to instantiate them and start it all up.
    1831         """
    1832 
    1833         if not self.auth.check_attribute(fid, 'create'):
    1834             raise service_error(service_error.access, "Create access denied")
    1835 
    1836         try:
    1837             tmpdir = tempfile.mkdtemp(prefix="split-")
    1838         except IOError:
    1839             raise service_error(service_error.internal, "Cannot create tmp dir")
    1840 
    1841         gw_pubkey_base = "fed.%s.pub" % self.ssh_type
    1842         gw_secretkey_base = "fed.%s" % self.ssh_type
    1843         gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
    1844         gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
    1845         tclfile = tmpdir + "/experiment.tcl"
    1846         tbparams = { }
    1847         try:
    1848             access_user = self.accessdb[fid]
    1849         except KeyError:
    1850             raise service_error(service_error.internal,
    1851                     "Access map and authorizer out of sync in " + \
    1852                             "create_experiment for fedid %s"  % fid)
    1853 
    1854         pid = "dummy"
    1855         gid = "dummy"
    1856         try:
    1857             os.mkdir(tmpdir+"/keys")
    1858         except OSError:
    1859             raise service_error(service_error.internal,
    1860                     "Can't make temporary dir")
    1861 
    1862         req = req.get('CreateRequestBody', None)
    1863         if not req:
    1864             raise service_error(service_error.req,
    1865                     "Bad request format (no CreateRequestBody)")
    1866         # The tcl parser needs to read a file so put the content into that file
    1867         descr=req.get('experimentdescription', None)
    1868         if descr:
    1869             file_content=descr.get('ns2description', None)
    1870             if file_content:
    1871                 try:
    1872                     f = open(tclfile, 'w')
    1873                     f.write(file_content)
    1874                     f.close()
    1875                 except IOError:
    1876                     raise service_error(service_error.internal,
    1877                             "Cannot write temp experiment description")
    1878             else:
    1879                 raise service_error(service_error.req,
    1880                         "Only ns2descriptions supported")
    1881         else:
    1882             raise service_error(service_error.req, "No experiment description")
    1883 
    1884         # Generate an ID for the experiment (slice) and a certificate that the
    1885         # allocator can use to prove they own it.  We'll ship it back through
    1886         # the encrypted connection.
    1887         (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
    1888 
    1889         if req.has_key('experimentID') and \
    1890                 req['experimentID'].has_key('localname'):
    1891             overwrite = False
    1892             eid = req['experimentID']['localname']
    1893             # If there's an old failed experiment here with the same local name
    1894             # and accessible by this user, we'll overwrite it, otherwise we'll
    1895             # fall through and do the collision avoidance.
    1896             old_expid = self.get_experiment_fedid(eid)
    1897             if old_expid and self.check_experiment_access(fid, old_expid):
    1898                 self.state_lock.acquire()
    1899                 status = self.state[eid].get('experimentStatus', None)
    1900                 if status and status == 'failed':
    1901                     # remove the old access attribute
    1902                     self.auth.unset_attribute(fid, old_expid)
    1903                     overwrite = True
    1904                     del self.state[eid]
    1905                     del self.state[old_expid]
    1906                 self.state_lock.release()
    1907             self.state_lock.acquire()
    1908             while (self.state.has_key(eid) and not overwrite):
    1909                 eid += random.choice(string.ascii_letters)
    1910             # Initial state
    1911             self.state[eid] = {
    1912                     'experimentID' : \
    1913                             [ { 'localname' : eid }, {'fedid': expid } ],
    1914                     'experimentStatus': 'starting',
    1915                     'experimentAccess': { 'X509' : expcert },
    1916                     'owner': fid,
    1917                     'log' : [],
    1918                 }
    1919             self.state[expid] = self.state[eid]
    1920             if self.state_filename: self.write_state()
    1921             self.state_lock.release()
    1922         else:
    1923             eid = self.exp_stem
    1924             for i in range(0,5):
    1925                 eid += random.choice(string.ascii_letters)
    1926             self.state_lock.acquire()
    1927             while (self.state.has_key(eid)):
    1928                 eid = self.exp_stem
    1929                 for i in range(0,5):
    1930                     eid += random.choice(string.ascii_letters)
    1931             # Initial state
    1932             self.state[eid] = {
    1933                     'experimentID' : \
    1934                             [ { 'localname' : eid }, {'fedid': expid } ],
    1935                     'experimentStatus': 'starting',
    1936                     'experimentAccess': { 'X509' : expcert },
    1937                     'owner': fid,
    1938                     'log' : [],
    1939                 }
    1940             self.state[expid] = self.state[eid]
    1941             if self.state_filename: self.write_state()
    1942             self.state_lock.release()
    1943 
    1944         try:
    1945             # This catches exceptions to clear the placeholder if necessary
    1946             try:
    1947                 self.generate_ssh_keys(gw_secretkey, self.ssh_type)
    1948             except ValueError:
    1949                 raise service_error(service_error.server_config,
    1950                         "Bad key type (%s)" % self.ssh_type)
    1951 
    1952             user = req.get('user', None)
    1953             if user == None:
    1954                 raise service_error(service_error.req, "No user")
    1955 
    1956             master = req.get('master', None)
    1957             if not master:
    1958                 raise service_error(service_error.req,
    1959                         "No master testbed label")
    1960             export_project = req.get('exportProject', None)
    1961             if not export_project:
    1962                 raise service_error(service_error.req, "No export project")
    1963            
    1964             if self.splitter_url:
    1965                 self.log.debug("Calling remote splitter at %s" % \
    1966                         self.splitter_url)
    1967                 split_data = self.remote_splitter(self.splitter_url,
    1968                         file_content, master)
    1969             else:
    1970                 tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x',
    1971                     str(self.muxmax), '-m', master]
    1972 
    1973                 if self.fedkit:
    1974                     tclcmd.append('-k')
    1975 
    1976                 if self.gatewaykit:
    1977                     tclcmd.append('-K')
    1978 
    1979                 tclcmd.extend([pid, gid, eid, tclfile])
    1980 
    1981                 self.log.debug("running local splitter %s", " ".join(tclcmd))
    1982                 # This is just fantastic.  As a side effect the parser copies
    1983                 # tb_compat.tcl into the current directory, so that directory
    1984                 # must be writable by the fedd user.  Doing this in the
    1985                 # temporary subdir ensures this is the case.
    1986                 tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True,
    1987                         cwd=tmpdir)
    1988                 split_data = tclparser.stdout
    1989 
    1990             allocated = { }         # Testbeds we can access
    1991             # Objects to parse the splitter output (defined above)
    1992             parse_current_testbed = self.current_testbed(eid, tmpdir,
    1993                     self.fedkit, self.gatewaykit)
    1994             parse_allbeds = self.allbeds(self.get_access)
    1995             parse_gateways = self.gateways(eid, master, tmpdir,
    1996                     gw_pubkey_base, gw_secretkey_base, self.copy_file,
    1997                     self.fedkit)
    1998             parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
    1999                         "^#\s+End\s+Vtopo")
    2000             parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
    2001                         "^#\s+End\s+hostnames", tmpdir + "/hosts")
    2002             parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
    2003                     "^#\s+End\s+tarfiles")
    2004             parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
    2005                     "^#\s+End\s+rpms")
    2006 
    2007             # Working on the split data
    2008             for line in split_data:
    2009                 line = line.rstrip()
    2010                 if parse_current_testbed(line, master, allocated, tbparams):
    2011                     continue
    2012                 elif parse_allbeds(line, user, tbparams, master, export_project,
    2013                         access_user):
    2014                     continue
    2015                 elif parse_gateways(line, allocated, tbparams):
    2016                     continue
    2017                 elif parse_vtopo(line):
    2018                     continue
    2019                 elif parse_hostnames(line):
    2020                     continue
    2021                 elif parse_tarfiles(line):
    2022                     continue
    2023                 elif parse_rpms(line):
    2024                     continue
    2025                 else:
    2026                     raise service_error(service_error.internal,
    2027                             "Bad tcl parse? %s" % line)
    2028             # Virtual topology and visualization
    2029             vtopo = self.gentopo(parse_vtopo.str)
    2030             if not vtopo:
    2031                 raise service_error(service_error.internal,
    2032                         "Failed to generate virtual topology")
    2033 
    2034             vis = self.genviz(vtopo)
    2035             if not vis:
    2036                 raise service_error(service_error.internal,
    2037                         "Failed to generate visualization")
    2038 
    2039            
    2040             # save federant information
    2041             for k in allocated.keys():
    2042                 tbparams[k]['federant'] = {\
    2043                         'name': [ { 'localname' : eid} ],\
    2044                         'emulab': tbparams[k]['emulab'],\
    2045                         'allocID' : tbparams[k]['allocID'],\
    2046                         'master' : k == master,\
    2047                     }
    2048 
    2049             self.state_lock.acquire()
    2050             self.state[eid]['vtopo'] = vtopo
    2051             self.state[eid]['vis'] = vis
    2052             self.state[expid]['federant'] = \
    2053                     [ tbparams[tb]['federant'] for tb in tbparams.keys() \
    2054                         if tbparams[tb].has_key('federant') ]
    2055             if self.state_filename: self.write_state()
    2056             self.state_lock.release()
    2057 
    2058             # Copy tarfiles and rpms needed at remote sites into a staging area
    2059             try:
    2060                 if self.fedkit:
    2061                     for t in self.fedkit:
    2062                         parse_tarfiles.list.append(t[1])
    2063                 if self.gatewaykit:
    2064                     for t in self.gatewaykit:
    2065                         parse_tarfiles.list.append(t[1])
    2066                 for t in parse_tarfiles.list:
    2067                     if not os.path.exists("%s/tarfiles" % tmpdir):
    2068                         os.mkdir("%s/tarfiles" % tmpdir)
    2069                     self.copy_file(t, "%s/tarfiles/%s" % \
    2070                             (tmpdir, os.path.basename(t)))
    2071                 for r in parse_rpms.list:
    2072                     if not os.path.exists("%s/rpms" % tmpdir):
    2073                         os.mkdir("%s/rpms" % tmpdir)
    2074                     self.copy_file(r, "%s/rpms/%s" % \
    2075                             (tmpdir, os.path.basename(r)))
    2076                 # A null experiment file in case we need to create a remote
    2077                 # experiment from scratch
    2078                 f = open("%s/null.tcl" % tmpdir, "w")
    2079                 print >>f, """
    2080 set ns [new Simulator]
    2081 source tb_compat.tcl
    2082 
    2083 set a [$ns node]
    2084 
    2085 $ns rtproto Session
    2086 $ns run
    2087 """
    2088                 f.close()
    2089 
    2090             except IOError, e:
    2091                 raise service_error(service_error.internal,
    2092                         "Cannot stage tarfile/rpm: %s" % e.strerror)
    2093 
    2094         except service_error, e:
    2095             # If something goes wrong in the parse (usually an access error)
    2096             # clear the placeholder state.  From here on out the code delays
    2097             # exceptions.  Failing at this point returns a fault to the remote
    2098             # caller.
    2099             self.state_lock.acquire()
    2100             del self.state[eid]
    2101             del self.state[expid]
    2102             if self.state_filename: self.write_state()
    2103             self.state_lock.release()
    2104             raise e
    2105 
    2106 
    2107         # Start the background swapper and return the starting state.  From
    2108         # here on out, the state will stick around a while.
    2109 
    2110         # Let users touch the state
    2111         self.auth.set_attribute(fid, expid)
    2112         self.auth.set_attribute(expid, expid)
    2113         # Override fedids can manipulate state as well
    2114         for o in self.overrides:
    2115             self.auth.set_attribute(o, expid)
    2116 
    2117         # Create a logger that logs to the experiment's state object as well as
    2118         # to the main log file.
    2119         alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
    2120         h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
    2121         # XXX: there should be a global one of these rather than repeating the
    2122         # code.
    2123         h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
    2124                     '%d %b %y %H:%M:%S'))
    2125         alloc_log.addHandler(h)
    2126        
    2127         # Start a thread to do the resource allocation
    2128         t  = Thread(target=self.allocate_resources,
    2129                 args=(allocated, master, eid, expid, expcert, tbparams,
    2130                     tmpdir, alloc_log),
    2131                 name=eid)
    2132         t.start()
    2133 
    2134         rv = {
    2135                 'experimentID': [
    2136                     {'localname' : eid }, { 'fedid': copy.copy(expid) }
    2137                 ],
    2138                 'experimentStatus': 'starting',
    2139                 'experimentAccess': { 'X509' : expcert }
    2140             }
    2141 
    2142         return rv
    2143 
    2144     class new_start_segment:
     923
     924    class start_segment:
    2145925        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
    2146926                cert_pwd=None, trusted_certs=None, caller=None):
     
    2175955
    2176956
    2177     class new_terminate_segment:
     957    class terminate_segment:
    2178958        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
    2179959                cert_pwd=None, trusted_certs=None, caller=None):
     
    2199979                        (self.testbed, e))
    2200980                return False
    2201 
    2202 
    2203981   
    2204982
    2205     def new_allocate_resources(self, allocated, master, eid, expid, expcert,
     983    def allocate_resources(self, allocated, master, eid, expid, expcert,
    2206984            tbparams, topo, tmpdir, alloc_log=None, attrs=None):
    2207985        started = { }           # Testbeds where a sub-experiment started
     
    22331011
    22341012            t  = self.pooled_thread(\
    2235                     target=self.new_start_segment(log=log, debug=self.debug,
     1013                    target=self.start_segment(log=log, debug=self.debug,
    22361014                        testbed=tb, cert_file=self.cert_file,
    22371015                        cert_pwd=self.cert_pwd,
     
    22611039                raise service_error(service_error.internal,
    22621040                    "No alloc id for testbed %s !?" % master)
    2263             starter = self.new_start_segment(log=log, debug=self.debug,
     1041            starter = self.start_segment(log=log, debug=self.debug,
    22641042                    testbed=master, cert_file=self.cert_file,
    22651043                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
     
    23201098
    23211099
    2322     def new_create_experiment(self, req, fid):
     1100    def create_experiment(self, req, fid):
    23231101        """
    23241102        The external interface to experiment creation called from the
     
    28821660
    28831661        # Start a thread to do the resource allocation
    2884         t  = Thread(target=self.new_allocate_resources,
     1662        t  = Thread(target=self.allocate_resources,
    28851663                args=(allocated, master, eid, expid, expcert, tbparams,
    28861664                    topo, tmpdir, alloc_log, attrs),
     
    31191897        return rv
    31201898
    3121 
    31221899    def terminate_experiment(self, req, fid):
    31231900        """
     
    31941971                if id.has_key('localname'): ids.append(id['localname'])
    31951972
    3196             # Construct enough of the tbparams to make the stop_segment calls
    3197             # work
    3198             for fed in fed_exp.get('federant', []):
    3199                 try:
    3200                     for e in fed['name']:
    3201                         eid = e.get('localname', None)
    3202                         if eid: break
    3203                     else:
    3204                         continue
    3205 
    3206                     p = fed['emulab']['project']
    3207 
    3208                     project = p['name']['localname']
    3209                     tb = p['testbed']['localname']
    3210                     user = p['user'][0]['userID']['localname']
    3211 
    3212                     domain = fed['emulab']['domain']
    3213                     host  = fed['emulab']['ops']
    3214                     aid = fed['allocID']
    3215                 except KeyError, e:
    3216                     continue
    3217                 tbparams[tb] = {\
    3218                         'user': user,\
    3219                         'domain': domain,\
    3220                         'project': project,\
    3221                         'host': host,\
    3222                         'eid': eid,\
    3223                         'aid': aid,\
    3224                     }
    3225             fed_exp['experimentStatus'] = 'terminating'
    3226             if self.state_filename: self.write_state()
    3227             self.state_lock.release()
    3228 
    3229             # Stop everyone.  NB, wait_for_all waits until a thread starts and
    3230             # then completes, so we can't wait if nothing starts.  So, no
    3231             # tbparams, no start.
    3232             if len(tbparams) > 0:
    3233                 thread_pool = self.thread_pool(self.nthreads)
    3234                 for tb in tbparams.keys():
    3235                     # Create and start a thread to stop the segment
    3236                     thread_pool.wait_for_slot()
    3237                     t  = self.pooled_thread(\
    3238                             target=self.stop_segment(log=dealloc_log,
    3239                                 keyfile=self.ssh_privkey_file, debug=self.debug),
    3240                             args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
    3241                             pdata=thread_pool, trace_file=self.trace_file)
    3242                     t.start()
    3243                 # Wait for completions
    3244                 thread_pool.wait_for_all_done()
    3245 
    3246             # release the allocations (failed experiments have done this
    3247             # already, and starting experiments may be in odd states, so we
    3248             # ignore errors releasing those allocations
    3249             try:
    3250                 for tb in tbparams.keys():
    3251                     self.release_access(tb, tbparams[tb]['aid'])
    3252             except service_error, e:
    3253                 if status != 'failed' and not force:
    3254                     raise e
    3255 
    3256             # Remove the terminated experiment
    3257             self.state_lock.acquire()
    3258             for id in ids:
    3259                 if self.state.has_key(id): del self.state[id]
    3260 
    3261             if self.state_filename: self.write_state()
    3262             self.state_lock.release()
    3263 
    3264             return {
    3265                     'experiment': exp ,
    3266                     'deallocationLog': "".join(dealloc_list),
    3267                     }
    3268         else:
    3269             # Don't forget to release the lock
    3270             self.state_lock.release()
    3271             raise service_error(service_error.req, "No saved state")
    3272 
    3273     def new_terminate_experiment(self, req, fid):
    3274         """
    3275         Swap this experiment out on the federants and delete the shared
    3276         information
    3277         """
    3278         tbparams = { }
    3279         req = req.get('TerminateRequestBody', None)
    3280         if not req:
    3281             raise service_error(service_error.req,
    3282                     "Bad request format (no TerminateRequestBody)")
    3283         force = req.get('force', False)
    3284         exp = req.get('experiment', None)
    3285         if exp:
    3286             if exp.has_key('fedid'):
    3287                 key = exp['fedid']
    3288                 keytype = "fedid"
    3289             elif exp.has_key('localname'):
    3290                 key = exp['localname']
    3291                 keytype = "localname"
    3292             else:
    3293                 raise service_error(service_error.req, "Unknown lookup type")
    3294         else:
    3295             raise service_error(service_error.req, "No request?")
    3296 
    3297         self.check_experiment_access(fid, key)
    3298 
    3299         dealloc_list = [ ]
    3300 
    3301 
    3302         # Create a logger that logs to the dealloc_list as well as to the main
    3303         # log file.
    3304         dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
    3305         h = logging.StreamHandler(self.list_log(dealloc_list))
    3306         # XXX: there should be a global one of these rather than repeating the
    3307         # code.
    3308         h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
    3309                     '%d %b %y %H:%M:%S'))
    3310         dealloc_log.addHandler(h)
    3311 
    3312         self.state_lock.acquire()
    3313         fed_exp = self.state.get(key, None)
    3314 
    3315         if fed_exp:
    3316             # This branch of the conditional holds the lock to generate a
    3317             # consistent temporary tbparams variable to deallocate experiments.
    3318             # It releases the lock to do the deallocations and reacquires it to
    3319             # remove the experiment state when the termination is complete.
    3320 
    3321             # First make sure that the experiment creation is complete.
    3322             status = fed_exp.get('experimentStatus', None)
    3323 
    3324             if status:
    3325                 if status in ('starting', 'terminating'):
    3326                     if not force:
    3327                         self.state_lock.release()
    3328                         raise service_error(service_error.partial,
    3329                                 'Experiment still being created or destroyed')
    3330                     else:
    3331                         self.log.warning('Experiment in %s state ' % status + \
    3332                                 'being terminated by force.')
    3333             else:
    3334                 # No status??? trouble
    3335                 self.state_lock.release()
    3336                 raise service_error(service_error.internal,
    3337                         "Experiment has no status!?")
    3338 
    3339             ids = []
    3340             #  experimentID is a list of dicts that are self-describing
    3341             #  identifiers.  This finds all the fedids and localnames - the
    3342             #  keys of self.state - and puts them into ids.
    3343             for id in fed_exp.get('experimentID', []):
    3344                 if id.has_key('fedid'): ids.append(id['fedid'])
    3345                 if id.has_key('localname'): ids.append(id['localname'])
    3346 
    33471973            # Collect the allocation/segment ids
    33481974            for fed in fed_exp.get('federant', []):
     
    33691995                    uri = self.tbmap.get(tb, None)
    33701996                    t  = self.pooled_thread(\
    3371                             target=self.new_terminate_segment(log=dealloc_log,
     1997                            target=self.terminate_segment(log=dealloc_log,
    33721998                                testbed=tb,
    33731999                                cert_file=self.cert_file,
Note: See TracChangeset for help on using the changeset viewer.