Changeset e19b75c for fedd/federation
- Timestamp:
- Sep 6, 2009 3:23:17 PM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
- Children:
- e794984
- Parents:
- fd556d1
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
rfd556d1 re19b75c 345 345 # Dispatch tables 346 346 self.soap_services = {\ 347 'Create': soap_handler('Create', self. new_create_experiment),347 'Create': soap_handler('Create', self.create_experiment), 348 348 'Vtopo': soap_handler('Vtopo', self.get_vtopo), 349 349 'Vis': soap_handler('Vis', self.get_vis), … … 351 351 'MultiInfo': soap_handler('MultiInfo', self.get_multi_info), 352 352 'Terminate': soap_handler('Terminate', 353 self. new_terminate_experiment),353 self.terminate_experiment), 354 354 } 355 355 356 356 self.xmlrpc_services = {\ 357 'Create': xmlrpc_handler('Create', self. new_create_experiment),357 'Create': xmlrpc_handler('Create', self.create_experiment), 358 358 'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo), 359 359 'Vis': xmlrpc_handler('Vis', self.get_vis), … … 361 361 'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info), 362 362 'Terminate': xmlrpc_handler('Terminate', 363 self. new_terminate_experiment),363 self.terminate_experiment), 364 364 } 365 365 … … 551 551 "open %s: %s" % (file, e)) 552 552 f.close() 553 554 class emulab_segment:555 def __init__(self, log=None, keyfile=None, debug=False):556 self.log = log or logging.getLogger(\557 'fedd.experiment_control.emulab_segment')558 self.ssh_privkey_file = keyfile559 self.debug = debug560 self.ssh_exec="/usr/bin/ssh"561 self.scp_exec = "/usr/bin/scp"562 self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout563 564 def scp_file(self, file, user, host, dest=""):565 """566 scp a file to the remote host. If debug is set the action is only567 logged.568 """569 570 scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes',571 '-o', 'StrictHostKeyChecking yes', '-i',572 self.ssh_privkey_file, file,573 "%s@%s:%s" % (user, host, dest)]574 rv = 0575 576 try:577 dnull = open("/dev/null", "w")578 except IOError:579 self.log.debug("[ssh_file]: failed to open " + \580 "/dev/null for redirect")581 dnull = Null582 583 self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))584 if not self.debug:585 rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,586 close_fds=True)587 588 return rv == 0589 590 def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):591 """592 Run a remote command on host as user. If debug is set, the action593 is only logged. Commands are run without stdin, to avoid stray594 SIGTTINs.595 """596 sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \597 "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \598 (self.ssh_exec, self.ssh_privkey_file,599 user, host, cmd)600 601 try:602 dnull = open("/dev/null", "w")603 except IOError:604 self.log.debug("[ssh_cmd]: failed to open /dev/null " + \605 "for redirect")606 dnull = Null607 608 self.log.debug("[ssh_cmd]: %s" % sh_str)609 if not self.debug:610 if dnull:611 sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,612 close_fds=True)613 else:614 sub = Popen(sh_str, shell=True,615 close_fds=True)616 if timeout:617 i = 0618 rv = sub.poll()619 while i < timeout:620 if rv is not None: break621 else:622 time.sleep(1)623 rv = sub.poll()624 i += 1625 else:626 self.log.debug("Process exceeded runtime: %s" % sh_str)627 os.kill(sub.pid, signal.SIGKILL)628 raise self.ssh_cmd_timeout();629 return rv == 0630 else:631 return sub.wait() == 0632 else:633 if timeout == 0:634 self.log.debug("debug timeout raised on %s " % sh_str)635 raise self.ssh_cmd_timeout()636 else:637 return True638 639 class start_segment(emulab_segment):640 def __init__(self, log=None, keyfile=None, debug=False):641 experiment_control_local.emulab_segment.__init__(self,642 log=log, keyfile=keyfile, debug=debug)643 644 def create_config_tree(self, src_dir, dest_dir, script):645 """646 Append commands to script that will create the directory hierarchy647 on the remote federant.648 """649 650 if os.path.isdir(src_dir):651 print >>script, "mkdir -p %s" % dest_dir652 print >>script, "chmod 770 %s" % dest_dir653 654 for f in os.listdir(src_dir):655 if os.path.isdir(f):656 self.create_config_tree("%s/%s" % (src_dir, f),657 "%s/%s" % (dest_dir, f), script)658 else:659 self.log.debug("[create_config_tree]: Not a directory: %s" \660 % src_dir)661 662 def ship_configs(self, host, user, src_dir, dest_dir):663 """664 Copy federant-specific configuration files to the federant.665 """666 for f in os.listdir(src_dir):667 if os.path.isdir(f):668 if not self.ship_configs(host, user, "%s/%s" % (src_dir, f),669 "%s/%s" % (dest_dir, f)):670 return False671 else:672 if not self.scp_file("%s/%s" % (src_dir, f),673 user, host, dest_dir):674 return False675 return True676 677 def get_state(self, user, host, tb, pid, eid):678 # command to test experiment state679 expinfo_exec = "/usr/testbed/bin/expinfo"680 # Regular expressions to parse the expinfo response681 state_re = re.compile("State:\s+(\w+)")682 no_exp_re = re.compile("^No\s+such\s+experiment")683 swapping_re = re.compile("^No\s+information\s+available.")684 state = None # Experiment state parsed from expinfo685 # The expinfo ssh command. Note the identity restriction to use686 # only the identity provided in the pubkey given.687 cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o',688 'StrictHostKeyChecking yes', '-i',689 self.ssh_privkey_file, "%s@%s" % (user, host),690 expinfo_exec, pid, eid]691 692 dev_null = None693 try:694 dev_null = open("/dev/null", "a")695 except IOError, e:696 self.log.error("[get_state]: can't open /dev/null: %s" %e)697 698 if self.debug:699 state = 'swapped'700 rv = 0701 else:702 status = Popen(cmd, stdout=PIPE, stderr=dev_null,703 close_fds=True)704 for line in status.stdout:705 m = state_re.match(line)706 if m: state = m.group(1)707 else:708 for reg, st in ((no_exp_re, "none"),709 (swapping_re, "swapping")):710 m = reg.match(line)711 if m: state = st712 rv = status.wait()713 714 # If the experiment is not present the subcommand returns a715 # non-zero return value. If we successfully parsed a "none"716 # outcome, ignore the return code.717 if rv != 0 and state != 'none':718 raise service_error(service_error.internal,719 "Cannot get status of segment %s:%s/%s" % \720 (tb, pid, eid))721 elif state not in ('active', 'swapped', 'swapping', 'none'):722 raise service_error(service_error.internal,723 "Cannot get status of segment %s:%s/%s" % \724 (tb, pid, eid))725 else: return state726 727 728 def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):729 """730 Start a sub-experiment on a federant.731 732 Get the current state, modify or create as appropriate, ship data733 and configs and start the experiment. There are small ordering734 differences based on the initial state of the sub-experiment.735 """736 # ops node in the federant737 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])738 user = tbparams[tb]['user'] # federant user739 pid = tbparams[tb]['project'] # federant project740 # XXX741 base_confs = ( "hosts",)742 tclfile = "%s.%s.tcl" % (eid, tb) # sub-experiment description743 # Configuration directories on the remote machine744 proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)745 tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)746 rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)747 748 state = self.get_state(user, host, tb, pid, eid)749 750 self.log.debug("[start_segment]: %s: %s" % (tb, state))751 self.log.info("[start_segment]:transferring experiment to %s" % tb)752 753 if not self.scp_file("%s/%s/%s" % \754 (tmpdir, tb, tclfile), user, host):755 return False756 757 if state == 'none':758 # Create a null copy of the experiment so that we capture any759 # logs there if the modify fails. Emulab software discards the760 # logs from a failed startexp761 if not self.scp_file("%s/null.tcl" % tmpdir, user, host):762 return False763 self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))764 timedout = False765 try:766 if not self.ssh_cmd(user, host,767 ("/usr/testbed/bin/startexp -i -f -w -p %s " +768 "-e %s null.tcl") % (pid, eid), "startexp",769 timeout=60 * 10):770 return False771 except self.ssh_cmd_timeout:772 timedout = True773 774 if timedout:775 state = self.get_state(user, host, tb, pid, eid)776 if state != "swapped":777 return False778 779 780 # Open up a temporary file to contain a script for setting up the781 # filespace for the new experiment.782 self.log.info("[start_segment]: creating script file")783 try:784 sf, scriptname = tempfile.mkstemp()785 scriptfile = os.fdopen(sf, 'w')786 except IOError:787 return False788 789 scriptbase = os.path.basename(scriptname)790 791 # Script the filesystem changes792 print >>scriptfile, "/bin/rm -rf %s" % proj_dir793 # Clear and create the tarfiles and rpm directories794 for d in (tarfiles_dir, rpms_dir):795 print >>scriptfile, "/bin/rm -rf %s/*" % d796 print >>scriptfile, "mkdir -p %s" % d797 print >>scriptfile, 'mkdir -p %s' % proj_dir798 self.create_config_tree("%s/%s" % (tmpdir, tb),799 proj_dir, scriptfile)800 if os.path.isdir("%s/tarfiles" % tmpdir):801 self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,802 scriptfile)803 if os.path.isdir("%s/rpms" % tmpdir):804 self.create_config_tree("%s/rpms" % tmpdir, rpms_dir,805 scriptfile)806 print >>scriptfile, "rm -f %s" % scriptbase807 scriptfile.close()808 809 # Move the script to the remote machine810 # XXX: could collide tempfile names on the remote host811 if self.scp_file(scriptname, user, host, scriptbase):812 os.remove(scriptname)813 else:814 return False815 816 # Execute the script (and the script's last line deletes it)817 if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):818 return False819 820 for f in base_confs:821 if not self.scp_file("%s/%s" % (tmpdir, f), user, host,822 "%s/%s" % (proj_dir, f)):823 return False824 if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),825 proj_dir):826 return False827 if os.path.isdir("%s/tarfiles" % tmpdir):828 if not self.ship_configs(host, user,829 "%s/tarfiles" % tmpdir, tarfiles_dir):830 return False831 if os.path.isdir("%s/rpms" % tmpdir):832 if not self.ship_configs(host, user,833 "%s/rpms" % tmpdir, tarfiles_dir):834 return False835 # Stage the new configuration (active experiments will stay swapped836 # in now)837 self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))838 try:839 if not self.ssh_cmd(user, host,840 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \841 (pid, eid, tclfile),842 "modexp", timeout= 60 * 10):843 return False844 except self.ssh_cmd_timeout:845 self.log.error("Modify command failed to complete in time")846 # There's really no way to see if this succeeded or failed, so847 # if it hangs, assume the worst.848 return False849 # Active experiments are still swapped, this swaps the others in.850 if state != 'active':851 self.log.info("[start_segment]: Swapping %s in on %s" % \852 (eid, tb))853 timedout = False854 try:855 if not self.ssh_cmd(user, host,856 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),857 "swapexp", timeout=10*60):858 return False859 except self.ssh_cmd_timeout:860 timedout = True861 862 # If the command was terminated, but completed successfully,863 # report success.864 if timedout:865 self.log.debug("[start_segment]: swapin timed out " +\866 "checking state")867 state = self.get_state(user, host, tb, pid, eid)868 self.log.debug("[start_segment]: state is %s" % state)869 return state == 'active'870 # Everything has gone OK.871 return True872 873 class stop_segment(emulab_segment):874 def __init__(self, log=None, keyfile=None, debug=False):875 experiment_control_local.emulab_segment.__init__(self,876 log=log, keyfile=keyfile, debug=debug)877 878 def __call__(self, tb, eid, tbparams):879 """880 Stop a sub experiment by calling swapexp on the federant881 """882 user = tbparams[tb]['user']883 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])884 pid = tbparams[tb]['project']885 886 self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))887 rv = False888 try:889 # Clean out tar files: we've gone over quota in the past890 self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))891 self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \892 (pid, eid))893 rv = self.ssh_cmd(user, host,894 "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))895 except self.ssh_cmd_timeout:896 rv = False897 return rv898 899 553 900 554 def generate_ssh_keys(self, dest, type="rsa" ): … … 1267 921 else: 1268 922 raise service_error(service_error.protocol, "Bad splitter response") 1269 1270 class current_testbed: 1271 """ 1272 Object for collecting the current testbed description. The testbed 1273 description is saved to a file with the local testbed variables 1274 subsittuted line by line. 1275 """ 1276 def __init__(self, eid, tmpdir, fedkit, gatewaykit): 1277 def tar_list_to_string(tl): 1278 if tl is None: return None 1279 1280 rv = "" 1281 for t in tl: 1282 rv += " %s PROJDIR/tarfiles/EID/%s" % \ 1283 (t[0], os.path.basename(t[1])) 1284 return rv 1285 1286 1287 self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)") 1288 self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)") 1289 self.current_testbed = None 1290 self.testbed_file = None 1291 1292 self.def_expstart = \ 1293 "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate"; 1294 self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts"; 1295 self.def_gwstart = \ 1296 "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log"; 1297 self.def_mgwstart = \ 1298 "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log"; 1299 self.def_gwimage = "FBSD61-TUNNEL2"; 1300 self.def_gwtype = "pc"; 1301 self.def_mgwcmd = '# ' 1302 self.def_mgwcmdparams = '' 1303 self.def_gwcmd = '# ' 1304 self.def_gwcmdparams = '' 1305 1306 self.eid = eid 1307 self.tmpdir = tmpdir 1308 # Convert fedkit and gateway kit (which are lists of tuples) into a 1309 # substituition string. 1310 self.fedkit = tar_list_to_string(fedkit) 1311 self.gatewaykit = tar_list_to_string(gatewaykit) 1312 1313 def __call__(self, line, master, allocated, tbparams): 1314 # Capture testbed topology descriptions 1315 if self.current_testbed == None: 1316 m = self.begin_testbed.match(line) 1317 if m != None: 1318 self.current_testbed = m.group(1) 1319 if self.current_testbed == None: 1320 raise service_error(service_error.req, 1321 "Bad request format (unnamed testbed)") 1322 allocated[self.current_testbed] = \ 1323 allocated.get(self.current_testbed,0) + 1 1324 tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed) 1325 if not os.path.exists(tb_dir): 1326 try: 1327 os.mkdir(tb_dir) 1328 except IOError: 1329 raise service_error(service_error.internal, 1330 "Cannot create %s" % tb_dir) 1331 try: 1332 self.testbed_file = open("%s/%s.%s.tcl" % 1333 (tb_dir, self.eid, self.current_testbed), 'w') 1334 except IOError: 1335 self.testbed_file = None 1336 return True 1337 else: return False 1338 else: 1339 m = self.end_testbed.match(line) 1340 if m != None: 1341 if m.group(1) != self.current_testbed: 1342 raise service_error(service_error.internal, 1343 "Mismatched testbed markers!?") 1344 if self.testbed_file != None: 1345 self.testbed_file.close() 1346 self.testbed_file = None 1347 self.current_testbed = None 1348 elif self.testbed_file: 1349 # Substitute variables and put the line into the local 1350 # testbed file. 1351 gwtype = tbparams[self.current_testbed].get(\ 1352 'connectortype', self.def_gwtype) 1353 gwimage = tbparams[self.current_testbed].get(\ 1354 'connectorimage', self.def_gwimage) 1355 mgwstart = tbparams[self.current_testbed].get(\ 1356 'masterconnectorstartcmd', self.def_mgwstart) 1357 mexpstart = tbparams[self.current_testbed].get(\ 1358 'masternodestartcmd', self.def_mexpstart) 1359 gwstart = tbparams[self.current_testbed].get(\ 1360 'slaveconnectorstartcmd', self.def_gwstart) 1361 expstart = tbparams[self.current_testbed].get(\ 1362 'slavenodestartcmd', self.def_expstart) 1363 project = tbparams[self.current_testbed].get('project') 1364 gwcmd = tbparams[self.current_testbed].get(\ 1365 'slaveconnectorcmd', self.def_gwcmd) 1366 gwcmdparams = tbparams[self.current_testbed].get(\ 1367 'slaveconnectorcmdparams', self.def_gwcmdparams) 1368 mgwcmd = tbparams[self.current_testbed].get(\ 1369 'masterconnectorcmd', self.def_gwcmd) 1370 mgwcmdparams = tbparams[self.current_testbed].get(\ 1371 'masterconnectorcmdparams', self.def_gwcmdparams) 1372 line = re.sub("GWTYPE", gwtype, line) 1373 line = re.sub("GWIMAGE", gwimage, line) 1374 if self.current_testbed == master: 1375 line = re.sub("GWSTART", mgwstart, line) 1376 line = re.sub("EXPSTART", mexpstart, line) 1377 # NB GWCMDPARAMS is a prefix of GWCMD, so expand first 1378 line = re.sub("GWCMDPARAMS", mgwcmdparams, line) 1379 line = re.sub("(#\s*)?GWCMD", mgwcmd, line) 1380 else: 1381 line = re.sub("GWSTART", gwstart, line) 1382 line = re.sub("EXPSTART", expstart, line) 1383 # NB GWCMDPARAMS is a prefix of GWCMD, so expand first 1384 line = re.sub("GWCMDPARAMS", gwcmdparams, line) 1385 line = re.sub("(#\s*)?GWCMD", gwcmd, line) 1386 #These expansions contain EID and PROJDIR. NB these are 1387 # local fedkit and gatewaykit, which are strings. 1388 if self.fedkit: 1389 line = re.sub("FEDKIT", self.fedkit, line) 1390 if self.gatewaykit: 1391 line = re.sub("GATEWAYKIT", self.gatewaykit, line) 1392 line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line) 1393 line = re.sub("PROJDIR", "/proj/%s/" % project, line) 1394 line = re.sub("EID", self.eid, line) 1395 line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \ 1396 (project, self.eid), line) 1397 print >>self.testbed_file, line 1398 return True 1399 1400 class allbeds: 1401 """ 1402 Process the Allbeds section. Get access to each federant and save the 1403 parameters in tbparams 1404 """ 1405 def __init__(self, get_access): 1406 self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds") 1407 self.end_allbeds = re.compile("^#\s+End\s+Allbeds") 1408 self.in_allbeds = False 1409 self.get_access = get_access 1410 1411 def __call__(self, line, user, tbparams, master, export_project, 1412 access_user): 1413 # Testbed access parameters 1414 if not self.in_allbeds: 1415 if self.begin_allbeds.match(line): 1416 self.in_allbeds = True 1417 return True 1418 else: 1419 return False 1420 else: 1421 if self.end_allbeds.match(line): 1422 self.in_allbeds = False 1423 else: 1424 nodes = line.split('|') 1425 tb = nodes.pop(0) 1426 self.get_access(tb, nodes, user, tbparams, master, 1427 export_project, access_user) 1428 return True 1429 1430 class gateways: 1431 def __init__(self, eid, master, tmpdir, gw_pubkey, 1432 gw_secretkey, copy_file, fedkit): 1433 self.begin_gateways = \ 1434 re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)") 1435 self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)") 1436 self.current_gateways = None 1437 self.control_gateway = None 1438 self.active_end = { } 1439 1440 self.eid = eid 1441 self.master = master 1442 self.tmpdir = tmpdir 1443 self.gw_pubkey_base = gw_pubkey 1444 self.gw_secretkey_base = gw_secretkey 1445 1446 self.copy_file = copy_file 1447 self.fedkit = fedkit 1448 1449 1450 def gateway_conf_file(self, gw, master, eid, pubkey, privkey, 1451 active_end, tbparams, dtb, myname, desthost, type): 1452 """ 1453 Produce a gateway configuration file from a gateways line. 1454 """ 1455 1456 sproject = tbparams[gw].get('project', 'project') 1457 dproject = tbparams[dtb].get('project', 'project') 1458 sdomain = ".%s.%s%s" % (eid, sproject, 1459 tbparams[gw].get('domain', ".example.com")) 1460 ddomain = ".%s.%s%s" % (eid, dproject, 1461 tbparams[dtb].get('domain', ".example.com")) 1462 boss = tbparams[master].get('boss', "boss") 1463 fs = tbparams[master].get('fs', "fs") 1464 event_server = "%s%s" % \ 1465 (tbparams[gw].get('eventserver', "event_server"), 1466 tbparams[gw].get('domain', "example.com")) 1467 remote_event_server = "%s%s" % \ 1468 (tbparams[dtb].get('eventserver', "event_server"), 1469 tbparams[dtb].get('domain', "example.com")) 1470 seer_control = "%s%s" % \ 1471 (tbparams[gw].get('control', "control"), sdomain) 1472 tunnel_iface = tbparams[gw].get("tunnelinterface", None) 1473 1474 if self.fedkit: 1475 remote_script_dir = "/usr/local/federation/bin" 1476 local_script_dir = "/usr/local/federation/bin" 1477 else: 1478 remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid) 1479 local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid) 1480 1481 local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid) 1482 remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid) 1483 tunnel_cfg = tbparams[gw].get("tunnelcfg", "false") 1484 1485 conf_file = "%s%s.gw.conf" % (myname, sdomain) 1486 remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain) 1487 1488 # translate to lower case so the `hostname` hack for specifying 1489 # configuration files works. 1490 conf_file = conf_file.lower(); 1491 remote_conf_file = remote_conf_file.lower(); 1492 1493 if dtb == master: 1494 active = "false" 1495 elif gw == master: 1496 active = "true" 1497 elif active_end.has_key('%s-%s' % (dtb, gw)): 1498 active = "false" 1499 else: 1500 active_end['%s-%s' % (gw, dtb)] = 1 1501 active = "true" 1502 1503 gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w") 1504 print >>gwconfig, "Active: %s" % active 1505 print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg 1506 if tunnel_iface: 1507 print >>gwconfig, "Interface: %s" % tunnel_iface 1508 print >>gwconfig, "BossName: %s" % boss 1509 print >>gwconfig, "FsName: %s" % fs 1510 print >>gwconfig, "EventServerName: %s" % event_server 1511 print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server 1512 print >>gwconfig, "SeerControl: %s" % seer_control 1513 print >>gwconfig, "Type: %s" % type 1514 print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir 1515 print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \ 1516 local_script_dir 1517 print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid) 1518 print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid) 1519 print >>gwconfig, "RemoteConfigFile: %s/%s" % \ 1520 (remote_conf_dir, remote_conf_file) 1521 print >>gwconfig, "Peer: %s%s" % (desthost, ddomain) 1522 print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey) 1523 print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey) 1524 gwconfig.close() 1525 1526 return active == "true" 1527 1528 def __call__(self, line, allocated, tbparams): 1529 # Process gateways 1530 if not self.current_gateways: 1531 m = self.begin_gateways.match(line) 1532 if m: 1533 self.current_gateways = m.group(1) 1534 if allocated.has_key(self.current_gateways): 1535 # This test should always succeed 1536 tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways) 1537 if not os.path.exists(tb_dir): 1538 try: 1539 os.mkdir(tb_dir) 1540 except IOError: 1541 raise service_error(service_error.internal, 1542 "Cannot create %s" % tb_dir) 1543 else: 1544 # XXX 1545 self.log.error("[gateways]: Ignoring gateways for " + \ 1546 "unknown testbed %s" % self.current_gateways) 1547 self.current_gateways = None 1548 return True 1549 else: 1550 return False 1551 else: 1552 m = self.end_gateways.match(line) 1553 if m : 1554 if m.group(1) != self.current_gateways: 1555 raise service_error(service_error.internal, 1556 "Mismatched gateway markers!?") 1557 if self.control_gateway: 1558 try: 1559 cc = open("%s/%s/client.conf" % 1560 (self.tmpdir, self.current_gateways), 'w') 1561 print >>cc, "ControlGateway: %s" % \ 1562 self.control_gateway 1563 if tbparams[self.master].has_key('smbshare'): 1564 print >>cc, "SMBSHare: %s" % \ 1565 tbparams[self.master]['smbshare'] 1566 print >>cc, "ProjectUser: %s" % \ 1567 tbparams[self.master]['user'] 1568 print >>cc, "ProjectName: %s" % \ 1569 tbparams[self.master]['project'] 1570 print >>cc, "ExperimentID: %s/%s" % \ 1571 ( tbparams[self.master]['project'], \ 1572 self.eid ) 1573 cc.close() 1574 except IOError: 1575 raise service_error(service_error.internal, 1576 "Error creating client config") 1577 # XXX: This seer specific file should disappear 1578 try: 1579 cc = open("%s/%s/seer.conf" % 1580 (self.tmpdir, self.current_gateways), 1581 'w') 1582 if self.current_gateways != self.master: 1583 print >>cc, "ControlNode: %s" % \ 1584 self.control_gateway 1585 print >>cc, "ExperimentID: %s/%s" % \ 1586 ( tbparams[self.master]['project'], \ 1587 self.eid ) 1588 cc.close() 1589 except IOError: 1590 raise service_error(service_error.internal, 1591 "Error creating seer config") 1592 else: 1593 debug.error("[gateways]: No control gateway for %s" %\ 1594 self.current_gateways) 1595 self.current_gateways = None 1596 else: 1597 dtb, myname, desthost, type = line.split(" ") 1598 1599 if type == "control" or type == "both": 1600 self.control_gateway = "%s.%s.%s%s" % (myname, 1601 self.eid, 1602 tbparams[self.current_gateways]['project'], 1603 tbparams[self.current_gateways]['domain']) 1604 try: 1605 active = self.gateway_conf_file(self.current_gateways, 1606 self.master, self.eid, self.gw_pubkey_base, 1607 self.gw_secretkey_base, 1608 self.active_end, tbparams, dtb, myname, 1609 desthost, type) 1610 except IOError, e: 1611 raise service_error(service_error.internal, 1612 "Failed to write config file for %s" % \ 1613 self.current_gateway) 1614 1615 gw_pubkey = "%s/keys/%s" % \ 1616 (self.tmpdir, self.gw_pubkey_base) 1617 gw_secretkey = "%s/keys/%s" % \ 1618 (self.tmpdir, self.gw_secretkey_base) 1619 1620 pkfile = "%s/%s/%s" % \ 1621 ( self.tmpdir, self.current_gateways, 1622 self.gw_pubkey_base) 1623 skfile = "%s/%s/%s" % \ 1624 ( self.tmpdir, self.current_gateways, 1625 self.gw_secretkey_base) 1626 1627 if not os.path.exists(pkfile): 1628 try: 1629 self.copy_file(gw_pubkey, pkfile) 1630 except IOError: 1631 service_error(service_error.internal, 1632 "Failed to copy pubkey file") 1633 1634 if active and not os.path.exists(skfile): 1635 try: 1636 self.copy_file(gw_secretkey, skfile) 1637 except IOError: 1638 service_error(service_error.internal, 1639 "Failed to copy secretkey file") 1640 return True 1641 1642 class shunt_to_file: 1643 """ 1644 Simple class to write data between two regexps to a file. 1645 """ 1646 def __init__(self, begin, end, filename): 1647 """ 1648 Begin shunting on a match of begin, stop on end, send data to 1649 filename. 1650 """ 1651 self.begin = re.compile(begin) 1652 self.end = re.compile(end) 1653 self.in_shunt = False 1654 self.file = None 1655 self.filename = filename 1656 1657 def __call__(self, line): 1658 """ 1659 Call this on each line in the input that may be shunted. 1660 """ 1661 if not self.in_shunt: 1662 if self.begin.match(line): 1663 self.in_shunt = True 1664 try: 1665 self.file = open(self.filename, "w") 1666 except: 1667 self.file = None 1668 raise 1669 return True 1670 else: 1671 return False 1672 else: 1673 if self.end.match(line): 1674 if self.file: 1675 self.file.close() 1676 self.file = None 1677 self.in_shunt = False 1678 else: 1679 if self.file: 1680 print >>self.file, line 1681 return True 1682 1683 class shunt_to_list: 1684 """ 1685 Same interface as shunt_to_file. Data collected in self.list, one list 1686 element per line. 1687 """ 1688 def __init__(self, begin, end): 1689 self.begin = re.compile(begin) 1690 self.end = re.compile(end) 1691 self.in_shunt = False 1692 self.list = [ ] 1693 1694 def __call__(self, line): 1695 if not self.in_shunt: 1696 if self.begin.match(line): 1697 self.in_shunt = True 1698 return True 1699 else: 1700 return False 1701 else: 1702 if self.end.match(line): 1703 self.in_shunt = False 1704 else: 1705 self.list.append(line) 1706 return True 1707 1708 class shunt_to_string: 1709 """ 1710 Same interface as shunt_to_file. Data collected in self.str, all in 1711 one string. 1712 """ 1713 def __init__(self, begin, end): 1714 self.begin = re.compile(begin) 1715 self.end = re.compile(end) 1716 self.in_shunt = False 1717 self.str = "" 1718 1719 def __call__(self, line): 1720 if not self.in_shunt: 1721 if self.begin.match(line): 1722 self.in_shunt = True 1723 return True 1724 else: 1725 return False 1726 else: 1727 if self.end.match(line): 1728 self.in_shunt = False 1729 else: 1730 self.str += line 1731 return True 1732 1733 def allocate_resources(self, allocated, master, eid, expid, expcert, 1734 tbparams, tmpdir, alloc_log=None): 1735 started = { } # Testbeds where a sub-experiment started 1736 # successfully 1737 1738 # XXX 1739 fail_soft = False 1740 1741 log = alloc_log or self.log 1742 1743 thread_pool = self.thread_pool(self.nthreads) 1744 threads = [ ] 1745 1746 for tb in [ k for k in allocated.keys() if k != master]: 1747 # Create and start a thread to start the segment, and save it to 1748 # get the return value later 1749 thread_pool.wait_for_slot() 1750 t = self.pooled_thread(\ 1751 target=self.start_segment(log=log, 1752 keyfile=self.ssh_privkey_file, debug=self.debug), 1753 args=(tb, eid, tbparams, tmpdir, 0), name=tb, 1754 pdata=thread_pool, trace_file=self.trace_file) 1755 threads.append(t) 1756 t.start() 1757 1758 # Wait until all finish 1759 thread_pool.wait_for_all_done() 1760 1761 # If none failed, start the master 1762 failed = [ t.getName() for t in threads if not t.rv ] 1763 1764 if len(failed) == 0: 1765 starter = self.start_segment(log=log, 1766 keyfile=self.ssh_privkey_file, debug=self.debug) 1767 if not starter(master, eid, tbparams, tmpdir): 1768 failed.append(master) 1769 1770 succeeded = [tb for tb in allocated.keys() if tb not in failed] 1771 # If one failed clean up, unless fail_soft is set 1772 if failed: 1773 if not fail_soft: 1774 thread_pool.clear() 1775 for tb in succeeded: 1776 # Create and start a thread to stop the segment 1777 thread_pool.wait_for_slot() 1778 t = self.pooled_thread(\ 1779 target=self.stop_segment(log=log, 1780 keyfile=self.ssh_privkey_file, 1781 debug=self.debug), 1782 args=(tb, eid, tbparams), name=tb, 1783 pdata=thread_pool, trace_file=self.trace_file) 1784 t.start() 1785 # Wait until all finish 1786 thread_pool.wait_for_all_done() 1787 1788 # release the allocations 1789 for tb in tbparams.keys(): 1790 self.release_access(tb, tbparams[tb]['allocID']) 1791 # Remove the placeholder 1792 self.state_lock.acquire() 1793 self.state[eid]['experimentStatus'] = 'failed' 1794 if self.state_filename: self.write_state() 1795 self.state_lock.release() 1796 1797 #raise service_error(service_error.federant, 1798 # "Swap in failed on %s" % ",".join(failed)) 1799 log.error("Swap in failed on %s" % ",".join(failed)) 1800 return 1801 else: 1802 log.info("[start_segment]: Experiment %s active" % eid) 1803 1804 log.debug("[start_experiment]: removing %s" % tmpdir) 1805 1806 # Walk up tmpdir, deleting as we go 1807 for path, dirs, files in os.walk(tmpdir, topdown=False): 1808 for f in files: 1809 os.remove(os.path.join(path, f)) 1810 for d in dirs: 1811 os.rmdir(os.path.join(path, d)) 1812 os.rmdir(tmpdir) 1813 1814 # Insert the experiment into our state and update the disk copy 1815 self.state_lock.acquire() 1816 self.state[expid]['experimentStatus'] = 'active' 1817 self.state[eid] = self.state[expid] 1818 if self.state_filename: self.write_state() 1819 self.state_lock.release() 1820 return 1821 1822 def create_experiment(self, req, fid): 1823 """ 1824 The external interface to experiment creation called from the 1825 dispatcher. 1826 1827 Creates a working directory, splits the incoming description using the 1828 splitter script and parses out the avrious subsections using the 1829 lcasses above. Once each sub-experiment is created, use pooled threads 1830 to instantiate them and start it all up. 1831 """ 1832 1833 if not self.auth.check_attribute(fid, 'create'): 1834 raise service_error(service_error.access, "Create access denied") 1835 1836 try: 1837 tmpdir = tempfile.mkdtemp(prefix="split-") 1838 except IOError: 1839 raise service_error(service_error.internal, "Cannot create tmp dir") 1840 1841 gw_pubkey_base = "fed.%s.pub" % self.ssh_type 1842 gw_secretkey_base = "fed.%s" % self.ssh_type 1843 gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base 1844 gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base 1845 tclfile = tmpdir + "/experiment.tcl" 1846 tbparams = { } 1847 try: 1848 access_user = self.accessdb[fid] 1849 except KeyError: 1850 raise service_error(service_error.internal, 1851 "Access map and authorizer out of sync in " + \ 1852 "create_experiment for fedid %s" % fid) 1853 1854 pid = "dummy" 1855 gid = "dummy" 1856 try: 1857 os.mkdir(tmpdir+"/keys") 1858 except OSError: 1859 raise service_error(service_error.internal, 1860 "Can't make temporary dir") 1861 1862 req = req.get('CreateRequestBody', None) 1863 if not req: 1864 raise service_error(service_error.req, 1865 "Bad request format (no CreateRequestBody)") 1866 # The tcl parser needs to read a file so put the content into that file 1867 descr=req.get('experimentdescription', None) 1868 if descr: 1869 file_content=descr.get('ns2description', None) 1870 if file_content: 1871 try: 1872 f = open(tclfile, 'w') 1873 f.write(file_content) 1874 f.close() 1875 except IOError: 1876 raise service_error(service_error.internal, 1877 "Cannot write temp experiment description") 1878 else: 1879 raise service_error(service_error.req, 1880 "Only ns2descriptions supported") 1881 else: 1882 raise service_error(service_error.req, "No experiment description") 1883 1884 # Generate an ID for the experiment (slice) and a certificate that the 1885 # allocator can use to prove they own it. We'll ship it back through 1886 # the encrypted connection. 1887 (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log) 1888 1889 if req.has_key('experimentID') and \ 1890 req['experimentID'].has_key('localname'): 1891 overwrite = False 1892 eid = req['experimentID']['localname'] 1893 # If there's an old failed experiment here with the same local name 1894 # and accessible by this user, we'll overwrite it, otherwise we'll 1895 # fall through and do the collision avoidance. 1896 old_expid = self.get_experiment_fedid(eid) 1897 if old_expid and self.check_experiment_access(fid, old_expid): 1898 self.state_lock.acquire() 1899 status = self.state[eid].get('experimentStatus', None) 1900 if status and status == 'failed': 1901 # remove the old access attribute 1902 self.auth.unset_attribute(fid, old_expid) 1903 overwrite = True 1904 del self.state[eid] 1905 del self.state[old_expid] 1906 self.state_lock.release() 1907 self.state_lock.acquire() 1908 while (self.state.has_key(eid) and not overwrite): 1909 eid += random.choice(string.ascii_letters) 1910 # Initial state 1911 self.state[eid] = { 1912 'experimentID' : \ 1913 [ { 'localname' : eid }, {'fedid': expid } ], 1914 'experimentStatus': 'starting', 1915 'experimentAccess': { 'X509' : expcert }, 1916 'owner': fid, 1917 'log' : [], 1918 } 1919 self.state[expid] = self.state[eid] 1920 if self.state_filename: self.write_state() 1921 self.state_lock.release() 1922 else: 1923 eid = self.exp_stem 1924 for i in range(0,5): 1925 eid += random.choice(string.ascii_letters) 1926 self.state_lock.acquire() 1927 while (self.state.has_key(eid)): 1928 eid = self.exp_stem 1929 for i in range(0,5): 1930 eid += random.choice(string.ascii_letters) 1931 # Initial state 1932 self.state[eid] = { 1933 'experimentID' : \ 1934 [ { 'localname' : eid }, {'fedid': expid } ], 1935 'experimentStatus': 'starting', 1936 'experimentAccess': { 'X509' : expcert }, 1937 'owner': fid, 1938 'log' : [], 1939 } 1940 self.state[expid] = self.state[eid] 1941 if self.state_filename: self.write_state() 1942 self.state_lock.release() 1943 1944 try: 1945 # This catches exceptions to clear the placeholder if necessary 1946 try: 1947 self.generate_ssh_keys(gw_secretkey, self.ssh_type) 1948 except ValueError: 1949 raise service_error(service_error.server_config, 1950 "Bad key type (%s)" % self.ssh_type) 1951 1952 user = req.get('user', None) 1953 if user == None: 1954 raise service_error(service_error.req, "No user") 1955 1956 master = req.get('master', None) 1957 if not master: 1958 raise service_error(service_error.req, 1959 "No master testbed label") 1960 export_project = req.get('exportProject', None) 1961 if not export_project: 1962 raise service_error(service_error.req, "No export project") 1963 1964 if self.splitter_url: 1965 self.log.debug("Calling remote splitter at %s" % \ 1966 self.splitter_url) 1967 split_data = self.remote_splitter(self.splitter_url, 1968 file_content, master) 1969 else: 1970 tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 1971 str(self.muxmax), '-m', master] 1972 1973 if self.fedkit: 1974 tclcmd.append('-k') 1975 1976 if self.gatewaykit: 1977 tclcmd.append('-K') 1978 1979 tclcmd.extend([pid, gid, eid, tclfile]) 1980 1981 self.log.debug("running local splitter %s", " ".join(tclcmd)) 1982 # This is just fantastic. As a side effect the parser copies 1983 # tb_compat.tcl into the current directory, so that directory 1984 # must be writable by the fedd user. Doing this in the 1985 # temporary subdir ensures this is the case. 1986 tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 1987 cwd=tmpdir) 1988 split_data = tclparser.stdout 1989 1990 allocated = { } # Testbeds we can access 1991 # Objects to parse the splitter output (defined above) 1992 parse_current_testbed = self.current_testbed(eid, tmpdir, 1993 self.fedkit, self.gatewaykit) 1994 parse_allbeds = self.allbeds(self.get_access) 1995 parse_gateways = self.gateways(eid, master, tmpdir, 1996 gw_pubkey_base, gw_secretkey_base, self.copy_file, 1997 self.fedkit) 1998 parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo", 1999 "^#\s+End\s+Vtopo") 2000 parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames", 2001 "^#\s+End\s+hostnames", tmpdir + "/hosts") 2002 parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles", 2003 "^#\s+End\s+tarfiles") 2004 parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms", 2005 "^#\s+End\s+rpms") 2006 2007 # Working on the split data 2008 for line in split_data: 2009 line = line.rstrip() 2010 if parse_current_testbed(line, master, allocated, tbparams): 2011 continue 2012 elif parse_allbeds(line, user, tbparams, master, export_project, 2013 access_user): 2014 continue 2015 elif parse_gateways(line, allocated, tbparams): 2016 continue 2017 elif parse_vtopo(line): 2018 continue 2019 elif parse_hostnames(line): 2020 continue 2021 elif parse_tarfiles(line): 2022 continue 2023 elif parse_rpms(line): 2024 continue 2025 else: 2026 raise service_error(service_error.internal, 2027 "Bad tcl parse? %s" % line) 2028 # Virtual topology and visualization 2029 vtopo = self.gentopo(parse_vtopo.str) 2030 if not vtopo: 2031 raise service_error(service_error.internal, 2032 "Failed to generate virtual topology") 2033 2034 vis = self.genviz(vtopo) 2035 if not vis: 2036 raise service_error(service_error.internal, 2037 "Failed to generate visualization") 2038 2039 2040 # save federant information 2041 for k in allocated.keys(): 2042 tbparams[k]['federant'] = {\ 2043 'name': [ { 'localname' : eid} ],\ 2044 'emulab': tbparams[k]['emulab'],\ 2045 'allocID' : tbparams[k]['allocID'],\ 2046 'master' : k == master,\ 2047 } 2048 2049 self.state_lock.acquire() 2050 self.state[eid]['vtopo'] = vtopo 2051 self.state[eid]['vis'] = vis 2052 self.state[expid]['federant'] = \ 2053 [ tbparams[tb]['federant'] for tb in tbparams.keys() \ 2054 if tbparams[tb].has_key('federant') ] 2055 if self.state_filename: self.write_state() 2056 self.state_lock.release() 2057 2058 # Copy tarfiles and rpms needed at remote sites into a staging area 2059 try: 2060 if self.fedkit: 2061 for t in self.fedkit: 2062 parse_tarfiles.list.append(t[1]) 2063 if self.gatewaykit: 2064 for t in self.gatewaykit: 2065 parse_tarfiles.list.append(t[1]) 2066 for t in parse_tarfiles.list: 2067 if not os.path.exists("%s/tarfiles" % tmpdir): 2068 os.mkdir("%s/tarfiles" % tmpdir) 2069 self.copy_file(t, "%s/tarfiles/%s" % \ 2070 (tmpdir, os.path.basename(t))) 2071 for r in parse_rpms.list: 2072 if not os.path.exists("%s/rpms" % tmpdir): 2073 os.mkdir("%s/rpms" % tmpdir) 2074 self.copy_file(r, "%s/rpms/%s" % \ 2075 (tmpdir, os.path.basename(r))) 2076 # A null experiment file in case we need to create a remote 2077 # experiment from scratch 2078 f = open("%s/null.tcl" % tmpdir, "w") 2079 print >>f, """ 2080 set ns [new Simulator] 2081 source tb_compat.tcl 2082 2083 set a [$ns node] 2084 2085 $ns rtproto Session 2086 $ns run 2087 """ 2088 f.close() 2089 2090 except IOError, e: 2091 raise service_error(service_error.internal, 2092 "Cannot stage tarfile/rpm: %s" % e.strerror) 2093 2094 except service_error, e: 2095 # If something goes wrong in the parse (usually an access error) 2096 # clear the placeholder state. From here on out the code delays 2097 # exceptions. Failing at this point returns a fault to the remote 2098 # caller. 2099 self.state_lock.acquire() 2100 del self.state[eid] 2101 del self.state[expid] 2102 if self.state_filename: self.write_state() 2103 self.state_lock.release() 2104 raise e 2105 2106 2107 # Start the background swapper and return the starting state. From 2108 # here on out, the state will stick around a while. 2109 2110 # Let users touch the state 2111 self.auth.set_attribute(fid, expid) 2112 self.auth.set_attribute(expid, expid) 2113 # Override fedids can manipulate state as well 2114 for o in self.overrides: 2115 self.auth.set_attribute(o, expid) 2116 2117 # Create a logger that logs to the experiment's state object as well as 2118 # to the main log file. 2119 alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid) 2120 h = logging.StreamHandler(self.list_log(self.state[eid]['log'])) 2121 # XXX: there should be a global one of these rather than repeating the 2122 # code. 2123 h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", 2124 '%d %b %y %H:%M:%S')) 2125 alloc_log.addHandler(h) 2126 2127 # Start a thread to do the resource allocation 2128 t = Thread(target=self.allocate_resources, 2129 args=(allocated, master, eid, expid, expcert, tbparams, 2130 tmpdir, alloc_log), 2131 name=eid) 2132 t.start() 2133 2134 rv = { 2135 'experimentID': [ 2136 {'localname' : eid }, { 'fedid': copy.copy(expid) } 2137 ], 2138 'experimentStatus': 'starting', 2139 'experimentAccess': { 'X509' : expcert } 2140 } 2141 2142 return rv 2143 2144 class new_start_segment: 923 924 class start_segment: 2145 925 def __init__(self, debug=False, log=None, testbed="", cert_file=None, 2146 926 cert_pwd=None, trusted_certs=None, caller=None): … … 2175 955 2176 956 2177 class new_terminate_segment:957 class terminate_segment: 2178 958 def __init__(self, debug=False, log=None, testbed="", cert_file=None, 2179 959 cert_pwd=None, trusted_certs=None, caller=None): … … 2199 979 (self.testbed, e)) 2200 980 return False 2201 2202 2203 981 2204 982 2205 def new_allocate_resources(self, allocated, master, eid, expid, expcert,983 def allocate_resources(self, allocated, master, eid, expid, expcert, 2206 984 tbparams, topo, tmpdir, alloc_log=None, attrs=None): 2207 985 started = { } # Testbeds where a sub-experiment started … … 2233 1011 2234 1012 t = self.pooled_thread(\ 2235 target=self. new_start_segment(log=log, debug=self.debug,1013 target=self.start_segment(log=log, debug=self.debug, 2236 1014 testbed=tb, cert_file=self.cert_file, 2237 1015 cert_pwd=self.cert_pwd, … … 2261 1039 raise service_error(service_error.internal, 2262 1040 "No alloc id for testbed %s !?" % master) 2263 starter = self. new_start_segment(log=log, debug=self.debug,1041 starter = self.start_segment(log=log, debug=self.debug, 2264 1042 testbed=master, cert_file=self.cert_file, 2265 1043 cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs, … … 2320 1098 2321 1099 2322 def new_create_experiment(self, req, fid):1100 def create_experiment(self, req, fid): 2323 1101 """ 2324 1102 The external interface to experiment creation called from the … … 2882 1660 2883 1661 # Start a thread to do the resource allocation 2884 t = Thread(target=self. new_allocate_resources,1662 t = Thread(target=self.allocate_resources, 2885 1663 args=(allocated, master, eid, expid, expcert, tbparams, 2886 1664 topo, tmpdir, alloc_log, attrs), … … 3119 1897 return rv 3120 1898 3121 3122 1899 def terminate_experiment(self, req, fid): 3123 1900 """ … … 3194 1971 if id.has_key('localname'): ids.append(id['localname']) 3195 1972 3196 # Construct enough of the tbparams to make the stop_segment calls3197 # work3198 for fed in fed_exp.get('federant', []):3199 try:3200 for e in fed['name']:3201 eid = e.get('localname', None)3202 if eid: break3203 else:3204 continue3205 3206 p = fed['emulab']['project']3207 3208 project = p['name']['localname']3209 tb = p['testbed']['localname']3210 user = p['user'][0]['userID']['localname']3211 3212 domain = fed['emulab']['domain']3213 host = fed['emulab']['ops']3214 aid = fed['allocID']3215 except KeyError, e:3216 continue3217 tbparams[tb] = {\3218 'user': user,\3219 'domain': domain,\3220 'project': project,\3221 'host': host,\3222 'eid': eid,\3223 'aid': aid,\3224 }3225 fed_exp['experimentStatus'] = 'terminating'3226 if self.state_filename: self.write_state()3227 self.state_lock.release()3228 3229 # Stop everyone. NB, wait_for_all waits until a thread starts and3230 # then completes, so we can't wait if nothing starts. So, no3231 # tbparams, no start.3232 if len(tbparams) > 0:3233 thread_pool = self.thread_pool(self.nthreads)3234 for tb in tbparams.keys():3235 # Create and start a thread to stop the segment3236 thread_pool.wait_for_slot()3237 t = self.pooled_thread(\3238 target=self.stop_segment(log=dealloc_log,3239 keyfile=self.ssh_privkey_file, debug=self.debug),3240 args=(tb, tbparams[tb]['eid'], tbparams), name=tb,3241 pdata=thread_pool, trace_file=self.trace_file)3242 t.start()3243 # Wait for completions3244 thread_pool.wait_for_all_done()3245 3246 # release the allocations (failed experiments have done this3247 # already, and starting experiments may be in odd states, so we3248 # ignore errors releasing those allocations3249 try:3250 for tb in tbparams.keys():3251 self.release_access(tb, tbparams[tb]['aid'])3252 except service_error, e:3253 if status != 'failed' and not force:3254 raise e3255 3256 # Remove the terminated experiment3257 self.state_lock.acquire()3258 for id in ids:3259 if self.state.has_key(id): del self.state[id]3260 3261 if self.state_filename: self.write_state()3262 self.state_lock.release()3263 3264 return {3265 'experiment': exp ,3266 'deallocationLog': "".join(dealloc_list),3267 }3268 else:3269 # Don't forget to release the lock3270 self.state_lock.release()3271 raise service_error(service_error.req, "No saved state")3272 3273 def new_terminate_experiment(self, req, fid):3274 """3275 Swap this experiment out on the federants and delete the shared3276 information3277 """3278 tbparams = { }3279 req = req.get('TerminateRequestBody', None)3280 if not req:3281 raise service_error(service_error.req,3282 "Bad request format (no TerminateRequestBody)")3283 force = req.get('force', False)3284 exp = req.get('experiment', None)3285 if exp:3286 if exp.has_key('fedid'):3287 key = exp['fedid']3288 keytype = "fedid"3289 elif exp.has_key('localname'):3290 key = exp['localname']3291 keytype = "localname"3292 else:3293 raise service_error(service_error.req, "Unknown lookup type")3294 else:3295 raise service_error(service_error.req, "No request?")3296 3297 self.check_experiment_access(fid, key)3298 3299 dealloc_list = [ ]3300 3301 3302 # Create a logger that logs to the dealloc_list as well as to the main3303 # log file.3304 dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)3305 h = logging.StreamHandler(self.list_log(dealloc_list))3306 # XXX: there should be a global one of these rather than repeating the3307 # code.3308 h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",3309 '%d %b %y %H:%M:%S'))3310 dealloc_log.addHandler(h)3311 3312 self.state_lock.acquire()3313 fed_exp = self.state.get(key, None)3314 3315 if fed_exp:3316 # This branch of the conditional holds the lock to generate a3317 # consistent temporary tbparams variable to deallocate experiments.3318 # It releases the lock to do the deallocations and reacquires it to3319 # remove the experiment state when the termination is complete.3320 3321 # First make sure that the experiment creation is complete.3322 status = fed_exp.get('experimentStatus', None)3323 3324 if status:3325 if status in ('starting', 'terminating'):3326 if not force:3327 self.state_lock.release()3328 raise service_error(service_error.partial,3329 'Experiment still being created or destroyed')3330 else:3331 self.log.warning('Experiment in %s state ' % status + \3332 'being terminated by force.')3333 else:3334 # No status??? trouble3335 self.state_lock.release()3336 raise service_error(service_error.internal,3337 "Experiment has no status!?")3338 3339 ids = []3340 # experimentID is a list of dicts that are self-describing3341 # identifiers. This finds all the fedids and localnames - the3342 # keys of self.state - and puts them into ids.3343 for id in fed_exp.get('experimentID', []):3344 if id.has_key('fedid'): ids.append(id['fedid'])3345 if id.has_key('localname'): ids.append(id['localname'])3346 3347 1973 # Collect the allocation/segment ids 3348 1974 for fed in fed_exp.get('federant', []): … … 3369 1995 uri = self.tbmap.get(tb, None) 3370 1996 t = self.pooled_thread(\ 3371 target=self. new_terminate_segment(log=dealloc_log,1997 target=self.terminate_segment(log=dealloc_log, 3372 1998 testbed=tb, 3373 1999 cert_file=self.cert_file,
Note: See TracChangeset
for help on using the changeset viewer.