Changeset 4b362df
- Timestamp:
- Jul 22, 2009 1:46:55 PM (16 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
- Children:
- 728001e
- Parents:
- c2dbca8
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
rc2dbca8 r4b362df 42 42 43 43 class ssh_cmd_timeout(RuntimeError): pass 44 45 class list_log: 46 """ 47 Provide an interface that lets logger.StreamHandler s write to a list 48 of strings. 49 """ 50 def __init__(self, l=[]): 51 """ 52 Link to an existing list or just create a log 53 """ 54 self.ll = l 55 self.lock = Lock() 56 def write(self, str): 57 """ 58 Add the string to the log. Lock for consistency. 59 """ 60 self.lock.acquire() 61 self.ll.append(str) 62 self.lock.release() 63 64 def flush(self): 65 """ 66 No-op that StreamHandlers expect 67 """ 68 pass 69 44 70 45 71 class thread_pool: … … 204 230 self.randomize_experiments = False 205 231 206 self.scp_exec = "/usr/bin/scp"207 232 self.splitter = None 208 self.ssh_exec="/usr/bin/ssh"209 233 self.ssh_keygen = "/usr/bin/ssh-keygen" 210 234 self.ssh_identity_file = None … … 468 492 f.close() 469 493 470 def scp_file(self, file, user, host, dest=""): 471 """ 472 scp a file to the remote host. If debug is set the action is only 473 logged. 474 """ 475 476 scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 477 '-o', 'StrictHostKeyChecking yes', '-i', 478 self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)] 479 rv = 0 480 481 try: 482 dnull = open("/dev/null", "w") 483 except IOError: 484 self.log.debug("[ssh_file]: failed to open /dev/null for redirect") 485 dnull = Null 486 487 self.log.debug("[scp_file]: %s" % " ".join(scp_cmd)) 488 if not self.debug: 489 rv = call(scp_cmd, stdout=dnull, stderr=dnull) 490 491 return rv == 0 492 493 def ssh_cmd(self, user, host, cmd, wname=None, timeout=None): 494 """ 495 Run a remote command on host as user. If debug is set, the action is 496 only logged. 497 """ 498 sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \ 499 "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \ 500 (self.ssh_exec, self.ssh_privkey_file, 501 user, host, cmd) 502 503 try: 504 dnull = open("/dev/null", "r") 505 except IOError: 506 self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect") 507 dnull = Null 508 509 self.log.debug("[ssh_cmd]: %s" % sh_str) 510 if not self.debug: 511 if dnull: 512 sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull) 513 else: 514 sub = Popen(sh_str, shell=True) 515 if timeout: 516 i = 0 517 rv = sub.poll() 518 while i < timeout: 519 if rv is not None: break 494 class emulab_segment: 495 def __init__(self, log=None, keyfile=None, debug=False): 496 self.log = log or logging.getLogger(\ 497 'fedd.experiment_control.emulab_segment') 498 self.ssh_privkey_file = keyfile 499 self.debug = debug 500 self.ssh_exec="/usr/bin/ssh" 501 self.scp_exec = "/usr/bin/scp" 502 self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout 503 504 def scp_file(self, file, user, host, dest=""): 505 """ 506 scp a file to the remote host. If debug is set the action is only 507 logged. 508 """ 509 510 scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 511 '-o', 'StrictHostKeyChecking yes', '-i', 512 self.ssh_privkey_file, file, 513 "%s@%s:%s" % (user, host, dest)] 514 rv = 0 515 516 try: 517 dnull = open("/dev/null", "w") 518 except IOError: 519 self.log.debug("[ssh_file]: failed to open " + \ 520 "/dev/null for redirect") 521 dnull = Null 522 523 self.log.debug("[scp_file]: %s" % " ".join(scp_cmd)) 524 if not self.debug: 525 rv = call(scp_cmd, stdout=dnull, stderr=dnull) 526 527 return rv == 0 528 529 def ssh_cmd(self, user, host, cmd, wname=None, timeout=None): 530 """ 531 Run a remote command on host as user. If debug is set, the action is 532 only logged. 533 """ 534 sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \ 535 "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \ 536 (self.ssh_exec, self.ssh_privkey_file, 537 user, host, cmd) 538 539 try: 540 dnull = open("/dev/null", "r") 541 except IOError: 542 self.log.debug("[ssh_cmd]: failed to open /dev/null " + \ 543 "for redirect") 544 dnull = Null 545 546 self.log.debug("[ssh_cmd]: %s" % sh_str) 547 if not self.debug: 548 if dnull: 549 sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull) 550 else: 551 sub = Popen(sh_str, shell=True) 552 if timeout: 553 i = 0 554 rv = sub.poll() 555 while i < timeout: 556 if rv is not None: break 557 else: 558 time.sleep(1) 559 rv = sub.poll() 560 i += 1 520 561 else: 521 time.sleep(1) 522 rv = sub.poll() 523 i += 1 562 self.log.debug("Process exceeded runtime: %s" % sh_str) 563 os.kill(sub.pid, signal.SIGKILL) 564 raise self.ssh_cmd_timeout(); 565 return rv == 0 524 566 else: 525 self.log.debug("Process exceeded runtime: %s" % sh_str) 526 os.kill(sub.pid, signal.SIGKILL) 527 raise self.ssh_cmd_timeout(); 528 return rv == 0 567 return sub.wait() == 0 529 568 else: 530 return sub.wait() == 0 531 else: 532 return True 533 534 535 def create_config_tree(self, src_dir, dest_dir, script): 536 """ 537 Append commands to script that will create the directory hierarchy on 538 the remote federant. 539 """ 540 541 if os.path.isdir(src_dir): 542 print >>script, "mkdir -p %s" % dest_dir 543 print >>script, "chmod 770 %s" % dest_dir 544 569 return True 570 571 class start_segment(emulab_segment): 572 def __init__(self, log=None, keyfile=None, debug=False): 573 experiment_control_local.emulab_segment.__init__(self, 574 log=log, keyfile=keyfile, debug=debug) 575 576 def create_config_tree(self, src_dir, dest_dir, script): 577 """ 578 Append commands to script that will create the directory hierarchy 579 on the remote federant. 580 """ 581 582 if os.path.isdir(src_dir): 583 print >>script, "mkdir -p %s" % dest_dir 584 print >>script, "chmod 770 %s" % dest_dir 585 586 for f in os.listdir(src_dir): 587 if os.path.isdir(f): 588 self.create_config_tree("%s/%s" % (src_dir, f), 589 "%s/%s" % (dest_dir, f), script) 590 else: 591 self.log.debug("[create_config_tree]: Not a directory: %s" \ 592 % src_dir) 593 594 def ship_configs(self, host, user, src_dir, dest_dir): 595 """ 596 Copy federant-specific configuration files to the federant. 597 """ 545 598 for f in os.listdir(src_dir): 546 599 if os.path.isdir(f): 547 self.create_config_tree("%s/%s" % (src_dir, f), 548 "%s/%s" % (dest_dir, f), script) 549 else: 550 self.log.debug("[create_config_tree]: Not a directory: %s" \ 551 % src_dir) 552 553 def ship_configs(self, host, user, src_dir, dest_dir): 554 """ 555 Copy federant-specific configuration files to the federant. 556 """ 557 for f in os.listdir(src_dir): 558 if os.path.isdir(f): 559 if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 560 "%s/%s" % (dest_dir, f)): 561 return False 562 else: 563 if not self.scp_file("%s/%s" % (src_dir, f), 564 user, host, dest_dir): 565 return False 566 return True 567 568 def get_state(self, user, host, ssh_key, tb, pid, eid): 569 # command to test experiment state 570 expinfo_exec = "/usr/testbed/bin/expinfo" 571 # Regular expressions to parse the expinfo response 572 state_re = re.compile("State:\s+(\w+)") 573 no_exp_re = re.compile("^No\s+such\s+experiment") 574 swapping_re = re.compile("^No\s+information\s+available.") 575 state = None # Experiment state parsed from expinfo 576 # The expinfo ssh command. Note the identity restriction to use only 577 # the identity provided in the pubkey given. 578 cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 579 'StrictHostKeyChecking yes', '-i', 580 ssh_key, "%s@%s" % (user, host), 581 expinfo_exec, pid, eid] 582 583 dev_null = None 584 try: 585 dev_null = open("/dev/null", "a") 586 except IOError, e: 587 self.log.error("[get_state]: can't open /dev/null: %s" %e) 588 589 if self.debug: 590 state = 'swapped' 591 rv = 0 592 else: 593 status = Popen(cmd, stdout=PIPE, stderr=dev_null) 594 for line in status.stdout: 595 m = state_re.match(line) 596 if m: state = m.group(1) 597 else: 598 for reg, st in ((no_exp_re, "none"), 599 (swapping_re, "swapping")): 600 m = reg.match(line) 601 if m: state = st 602 rv = status.wait() 603 604 # If the experiment is not present the subcommand returns a non-zero 605 # return value. If we successfully parsed a "none" outcome, ignore the 606 # return code. 607 if rv != 0 and state != 'none': 608 raise service_error(service_error.internal, 609 "Cannot get status of segment %s:%s/%s" % (tb, pid, eid)) 610 elif state not in ('active', 'swapped', 'swapping', 'none'): 611 raise service_error(service_error.internal, 612 "Cannot get status of segment %s:%s/%s" % (tb, pid, eid)) 613 else: return state 614 615 616 def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0): 617 """ 618 Start a sub-experiment on a federant. 619 620 Get the current state, modify or create as appropriate, ship data and 621 configs and start the experiment. There are small ordering differences 622 based on the initial state of the sub-experiment. 623 """ 624 # ops node in the federant 625 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 626 user = tbparams[tb]['user'] # federant user 627 pid = tbparams[tb]['project'] # federant project 628 # XXX 629 base_confs = ( "hosts",) 630 tclfile = "%s.%s.tcl" % (eid, tb) # sub-experiment description 631 # Configuration directories on the remote machine 632 proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid) 633 tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid) 634 rpms_dir = "/proj/%s/rpms/%s" % (pid, eid) 635 636 state = self.get_state(user, host, self.ssh_privkey_file, tb, pid, eid) 637 638 self.log.debug("[start_segment]: %s: %s" % (tb, state)) 639 self.log.info("[start_segment]:transferring experiment to %s" % tb) 640 641 if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host): 642 return False 643 644 if state == 'none': 645 # Create a null copy of the experiment so that we capture any logs 646 # there if the modify fails. Emulab software discards the logs 647 # from a failed startexp 648 if not self.scp_file("%s/null.tcl" % tmpdir, user, host): 649 return False 650 self.log.info("[start_segment]: Creating %s on %s" % (eid, tb)) 651 timedout = False 600 if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 601 "%s/%s" % (dest_dir, f)): 602 return False 603 else: 604 if not self.scp_file("%s/%s" % (src_dir, f), 605 user, host, dest_dir): 606 return False 607 return True 608 609 def get_state(self, user, host, tb, pid, eid): 610 # command to test experiment state 611 expinfo_exec = "/usr/testbed/bin/expinfo" 612 # Regular expressions to parse the expinfo response 613 state_re = re.compile("State:\s+(\w+)") 614 no_exp_re = re.compile("^No\s+such\s+experiment") 615 swapping_re = re.compile("^No\s+information\s+available.") 616 state = None # Experiment state parsed from expinfo 617 # The expinfo ssh command. Note the identity restriction to use 618 # only the identity provided in the pubkey given. 619 cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 620 'StrictHostKeyChecking yes', '-i', 621 self.ssh_privkey_file, "%s@%s" % (user, host), 622 expinfo_exec, pid, eid] 623 624 dev_null = None 625 try: 626 dev_null = open("/dev/null", "a") 627 except IOError, e: 628 self.log.error("[get_state]: can't open /dev/null: %s" %e) 629 630 if self.debug: 631 state = 'swapped' 632 rv = 0 633 else: 634 status = Popen(cmd, stdout=PIPE, stderr=dev_null) 635 for line in status.stdout: 636 m = state_re.match(line) 637 if m: state = m.group(1) 638 else: 639 for reg, st in ((no_exp_re, "none"), 640 (swapping_re, "swapping")): 641 m = reg.match(line) 642 if m: state = st 643 rv = status.wait() 644 645 # If the experiment is not present the subcommand returns a 646 # non-zero return value. If we successfully parsed a "none" 647 # outcome, ignore the return code. 648 if rv != 0 and state != 'none': 649 raise service_error(service_error.internal, 650 "Cannot get status of segment %s:%s/%s" % \ 651 (tb, pid, eid)) 652 elif state not in ('active', 'swapped', 'swapping', 'none'): 653 raise service_error(service_error.internal, 654 "Cannot get status of segment %s:%s/%s" % \ 655 (tb, pid, eid)) 656 else: return state 657 658 659 def __call__(self, tb, eid, tbparams, tmpdir, timeout=0): 660 """ 661 Start a sub-experiment on a federant. 662 663 Get the current state, modify or create as appropriate, ship data 664 and configs and start the experiment. There are small ordering 665 differences based on the initial state of the sub-experiment. 666 """ 667 # ops node in the federant 668 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 669 user = tbparams[tb]['user'] # federant user 670 pid = tbparams[tb]['project'] # federant project 671 # XXX 672 base_confs = ( "hosts",) 673 tclfile = "%s.%s.tcl" % (eid, tb) # sub-experiment description 674 # Configuration directories on the remote machine 675 proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid) 676 tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid) 677 rpms_dir = "/proj/%s/rpms/%s" % (pid, eid) 678 679 state = self.get_state(user, host, tb, pid, eid) 680 681 self.log.debug("[start_segment]: %s: %s" % (tb, state)) 682 self.log.info("[start_segment]:transferring experiment to %s" % tb) 683 684 if not self.scp_file("%s/%s/%s" % \ 685 (tmpdir, tb, tclfile), user, host): 686 return False 687 688 if state == 'none': 689 # Create a null copy of the experiment so that we capture any 690 # logs there if the modify fails. Emulab software discards the 691 # logs from a failed startexp 692 if not self.scp_file("%s/null.tcl" % tmpdir, user, host): 693 return False 694 self.log.info("[start_segment]: Creating %s on %s" % (eid, tb)) 695 timedout = False 696 try: 697 if not self.ssh_cmd(user, host, 698 ("/usr/testbed/bin/startexp -i -f -w -p %s " + 699 "-e %s null.tcl") % (pid, eid), "startexp", 700 timeout=60 * 10): 701 return False 702 except self.ssh_cmd_timeout: 703 timedout = True 704 705 if timedout: 706 state = self.get_state(user, host, self.ssh_privkey_file, 707 tb, eid, pid) 708 if state != "swapped": 709 return False 710 711 712 # Open up a temporary file to contain a script for setting up the 713 # filespace for the new experiment. 714 self.log.info("[start_segment]: creating script file") 715 try: 716 sf, scriptname = tempfile.mkstemp() 717 scriptfile = os.fdopen(sf, 'w') 718 except IOError: 719 return False 720 721 scriptbase = os.path.basename(scriptname) 722 723 # Script the filesystem changes 724 print >>scriptfile, "/bin/rm -rf %s" % proj_dir 725 # Clear and create the tarfiles and rpm directories 726 for d in (tarfiles_dir, rpms_dir): 727 print >>scriptfile, "/bin/rm -rf %s/*" % d 728 print >>scriptfile, "mkdir -p %s" % d 729 print >>scriptfile, 'mkdir -p %s' % proj_dir 730 self.create_config_tree("%s/%s" % (tmpdir, tb), 731 proj_dir, scriptfile) 732 if os.path.isdir("%s/tarfiles" % tmpdir): 733 self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir, 734 scriptfile) 735 if os.path.isdir("%s/rpms" % tmpdir): 736 self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 737 scriptfile) 738 print >>scriptfile, "rm -f %s" % scriptbase 739 scriptfile.close() 740 741 # Move the script to the remote machine 742 # XXX: could collide tempfile names on the remote host 743 if self.scp_file(scriptname, user, host, scriptbase): 744 os.remove(scriptname) 745 else: 746 return False 747 748 # Execute the script (and the script's last line deletes it) 749 if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase): 750 return False 751 752 for f in base_confs: 753 if not self.scp_file("%s/%s" % (tmpdir, f), user, host, 754 "%s/%s" % (proj_dir, f)): 755 return False 756 if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb), 757 proj_dir): 758 return False 759 if os.path.isdir("%s/tarfiles" % tmpdir): 760 if not self.ship_configs(host, user, 761 "%s/tarfiles" % tmpdir, tarfiles_dir): 762 return False 763 if os.path.isdir("%s/rpms" % tmpdir): 764 if not self.ship_configs(host, user, 765 "%s/rpms" % tmpdir, tarfiles_dir): 766 return False 767 # Stage the new configuration (active experiments will stay swapped 768 # in now) 769 self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb)) 652 770 try: 653 771 if not self.ssh_cmd(user, host, 654 ("/usr/testbed/bin/startexp -i -f -w -p %s " +655 "-e %s null.tcl") % (pid, eid), "startexp",656 timeout=60 * 10):772 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ 773 (pid, eid, tclfile), 774 "modexp", timeout= 60 * 10): 657 775 return False 658 776 except self.ssh_cmd_timeout: 659 timedout = True 660 661 if timedout: 662 state = self.get_state(user, host, self.ssh_privkey_file, 663 tb, eid, pid) 664 if state != "swapped": 665 return False 666 667 668 # Open up a temporary file to contain a script for setting up the 669 # filespace for the new experiment. 670 self.log.info("[start_segment]: creating script file") 671 try: 672 sf, scriptname = tempfile.mkstemp() 673 scriptfile = os.fdopen(sf, 'w') 674 except IOError: 675 return False 676 677 scriptbase = os.path.basename(scriptname) 678 679 # Script the filesystem changes 680 print >>scriptfile, "/bin/rm -rf %s" % proj_dir 681 # Clear and create the tarfiles and rpm directories 682 for d in (tarfiles_dir, rpms_dir): 683 print >>scriptfile, "/bin/rm -rf %s/*" % d 684 print >>scriptfile, "mkdir -p %s" % d 685 print >>scriptfile, 'mkdir -p %s' % proj_dir 686 self.create_config_tree("%s/%s" % (tmpdir, tb), proj_dir, scriptfile) 687 if os.path.isdir("%s/tarfiles" % tmpdir): 688 self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir, 689 scriptfile) 690 if os.path.isdir("%s/rpms" % tmpdir): 691 self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 692 scriptfile) 693 print >>scriptfile, "rm -f %s" % scriptbase 694 scriptfile.close() 695 696 # Move the script to the remote machine 697 # XXX: could collide tempfile names on the remote host 698 if self.scp_file(scriptname, user, host, scriptbase): 699 os.remove(scriptname) 700 else: 701 return False 702 703 # Execute the script (and the script's last line deletes it) 704 if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase): 705 return False 706 707 for f in base_confs: 708 if not self.scp_file("%s/%s" % (tmpdir, f), user, host, 709 "%s/%s" % (proj_dir, f)): 710 return False 711 if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb), 712 proj_dir): 713 return False 714 if os.path.isdir("%s/tarfiles" % tmpdir): 715 if not self.ship_configs(host, user, 716 "%s/tarfiles" % tmpdir, tarfiles_dir): 717 return False 718 if os.path.isdir("%s/rpms" % tmpdir): 719 if not self.ship_configs(host, user, 720 "%s/rpms" % tmpdir, tarfiles_dir): 721 return False 722 # Stage the new configuration (active experiments will stay swapped in 723 # now) 724 self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb)) 725 try: 726 if not self.ssh_cmd(user, host, 727 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ 728 (pid, eid, tclfile), 729 "modexp", timeout= 60 * 10): 777 print "modexp timeout" 778 # There's really no way to see if this succeeded or failed, so 779 # if it hangs, assume the worst. 730 780 return False 731 except self.ssh_cmd_timeout:732 print "modexp timeout"733 # There's really no way to see if this succeeded or failed, so if734 # it hangs, assume the worst.735 returnFalse736 # Active experiments are still swapped, this swaps the others in. 737 if state != 'active': 738 self.log.info("[start_segment]: Swapping %s in on %s" % \ 739 (eid, tb)) 740 timedout =False741 try:742 if not self.ssh_cmd(user, host,743 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),744 "swapexp", timeout=10*60):745 return False746 except self.ssh_cmd_timeout:747 timedout = True748 749 # If the command was terminated, but completed successfully, report750 # success.751 if timedout:752 state = self.get_state(user, host, self.ssh_privkey_file,753 tb, eid, pid) 754 self.log.debug("[start_segment]: swapin timed out (state)") 755 return state == 'active'756 # Everything has gone OK.757 return True 758 759 def stop_segment(self, tb, eid, tbparams):760 761 762 763 764 765 766 767 768 769 781 # Active experiments are still swapped, this swaps the others in. 782 if state != 'active': 783 self.log.info("[start_segment]: Swapping %s in on %s" % \ 784 (eid, tb)) 785 timedout = False 786 try: 787 if not self.ssh_cmd(user, host, 788 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), 789 "swapexp", timeout=10*60): 790 return False 791 except self.ssh_cmd_timeout: 792 timedout = True 793 794 # If the command was terminated, but completed successfully, 795 # report success. 796 if timedout: 797 state = self.get_state(user, host, self.ssh_privkey_file, 798 tb, eid, pid) 799 self.log.debug("[start_segment]: swapin timed out (state)") 800 return state == 'active' 801 # Everything has gone OK. 802 return True 803 804 class stop_segment(emulab_segment): 805 def __init__(self, log=None, keyfile=None, debug=False): 806 experiment_control_local.emulab_segment.__init__(self, 807 log=log, keyfile=keyfile, debug=debug) 808 809 def __call__(self, tb, eid, tbparams): 810 """ 811 Stop a sub experiment by calling swapexp on the federant 812 """ 813 user = tbparams[tb]['user'] 814 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 815 pid = tbparams[tb]['project'] 816 817 self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb)) 818 return self.ssh_cmd(user, host, 819 "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid)) 770 820 771 821 … … 1725 1775 allocated = { } # Testbeds we can access 1726 1776 started = { } # Testbeds where a sub-experiment started 1727 1777 # successfully 1728 1778 1729 1779 # Objects to parse the splitter output (defined above) … … 1837 1887 # get the return value later 1838 1888 thread_pool.wait_for_slot() 1839 t = self.pooled_thread(target=self.start_segment, 1889 t = self.pooled_thread(\ 1890 target=self.start_segment(log=self.log, 1891 keyfile=self.ssh_privkey_file, debug=self.debug), 1840 1892 args=(tb, eid, tbparams, tmpdir, 0), name=tb, 1841 1893 pdata=thread_pool, trace_file=self.trace_file) … … 1850 1902 1851 1903 if len(failed) == 0: 1852 if not self.start_segment(master, eid, tbparams, tmpdir): 1904 starter = self.start_segment(log=self.log, 1905 keyfile=self.ssh_privkey_file, debug=self.debug) 1906 if not starter(master, eid, tbparams, tmpdir): 1853 1907 failed.append(master) 1854 1908 … … 1861 1915 # Create and start a thread to stop the segment 1862 1916 thread_pool.wait_for_slot() 1863 t = self.pooled_thread(target=self.stop_segment, 1917 t = self.pooled_thread(\ 1918 target=self.stop_segment(log=self.log, 1919 keyfile=self.ssh_privkey_file, 1920 debug=self.debug), 1864 1921 args=(tb, eid, tbparams), name=tb, 1865 1922 pdata=thread_pool, trace_file=self.trace_file)
Note: See TracChangeset
for help on using the changeset viewer.