Changeset 79b6596
- Timestamp:
- Jul 22, 2009 1:29:23 AM (16 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
- Children:
- c2dbca8
- Parents:
- 012dba5
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
r012dba5 r79b6596 11 11 import pickle 12 12 import logging 13 import signal 14 import time 13 15 14 16 import traceback … … 38 40 Thred safe. 39 41 """ 42 43 class ssh_cmd_timeout(RuntimeError): pass 40 44 41 45 class thread_pool: … … 487 491 return rv == 0 488 492 489 def ssh_cmd(self, user, host, cmd, wname=None ):493 def ssh_cmd(self, user, host, cmd, wname=None, timeout=None): 490 494 """ 491 495 Run a remote command on host as user. If debug is set, the action is … … 509 513 else: 510 514 sub = Popen(sh_str, shell=True) 511 return sub.wait() == 0 515 if timeout: 516 i = 0 517 rv = sub.poll() 518 while i < timeout: 519 if rv is not None: break 520 else: 521 time.sleep(1) 522 rv = sub.poll() 523 i += 1 524 else: 525 self.log.debug("Process exceeded runtime: %s" % sh_str) 526 os.kill(sub.pid, signal.SIGKILL) 527 raise self.ssh_cmd_timeout(); 528 return rv == 0 529 else: 530 return sub.wait() == 0 512 531 else: 513 532 return True … … 533 552 return True 534 553 535 def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0): 536 """ 537 Start a sub-experiment on a federant. 538 539 Get the current state, modify or create as appropriate, ship data and 540 configs and start the experiment. There are small ordering differences 541 based on the initial state of the sub-experiment. 542 """ 543 # ops node in the federant 544 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 545 user = tbparams[tb]['user'] # federant user 546 pid = tbparams[tb]['project'] # federant project 547 # XXX 548 base_confs = ( "hosts",) 549 tclfile = "%s.%s.tcl" % (eid, tb) # sub-experiment description 554 def get_state(self, user, host, ssh_key, tb, pid, eid): 550 555 # command to test experiment state 551 556 expinfo_exec = "/usr/testbed/bin/expinfo" 552 # Configuration directories on the remote machine553 proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)554 tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)555 rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)556 557 # Regular expressions to parse the expinfo response 557 558 state_re = re.compile("State:\s+(\w+)") 558 559 no_exp_re = re.compile("^No\s+such\s+experiment") 560 swapping_re = re.compile("^No\s+information\s+available.") 559 561 state = None # Experiment state parsed from expinfo 560 562 # The expinfo ssh command. Note the identity restriction to use only … … 562 564 cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 563 565 'StrictHostKeyChecking yes', '-i', 564 s elf.ssh_privkey_file, "%s@%s" % (user, host),566 ssh_key, "%s@%s" % (user, host), 565 567 expinfo_exec, pid, eid] 566 568 567 # Get status568 self.log.debug("[start_segment]: %s"% " ".join(cmd))569 569 dev_null = None 570 570 try: … … 582 582 if m: state = m.group(1) 583 583 else: 584 m = no_exp_re.match(line) 585 if m: state = "none" 584 for reg, st in ((no_exp_re, "none"), 585 (swapping_re, "swapping")): 586 m = reg.match(line) 587 if m: state = st 586 588 rv = status.wait() 587 589 … … 592 594 raise service_error(service_error.internal, 593 595 "Cannot get status of segment %s:%s/%s" % (tb, pid, eid)) 594 595 if state not in ('active', 'swapped', 'none'): 596 self.log.debug("[start_segment]:unknown state %s" % state) 597 return False 596 elif state not in ('active', 'swapped', 'swapping', 'none'): 597 raise service_error(service_error.internal, 598 "Cannot get status of segment %s:%s/%s" % (tb, pid, eid)) 599 else: return state 600 601 602 def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0): 603 """ 604 Start a sub-experiment on a federant. 605 606 Get the current state, modify or create as appropriate, ship data and 607 configs and start the experiment. There are small ordering differences 608 based on the initial state of the sub-experiment. 609 """ 610 # ops node in the federant 611 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 612 user = tbparams[tb]['user'] # federant user 613 pid = tbparams[tb]['project'] # federant project 614 # XXX 615 base_confs = ( "hosts",) 616 tclfile = "%s.%s.tcl" % (eid, tb) # sub-experiment description 617 # Configuration directories on the remote machine 618 proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid) 619 tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid) 620 rpms_dir = "/proj/%s/rpms/%s" % (pid, eid) 621 622 state = self.get_state(user, host, self.ssh_privkey_file, tb, pid, eid) 598 623 599 624 self.log.debug("[start_segment]: %s: %s" % (tb, state)) … … 610 635 return False 611 636 self.log.info("[start_segment]: Creating %s on %s" % (eid, tb)) 612 if not self.ssh_cmd(user, host, 613 "/usr/testbed/bin/startexp -i -f -w -p %s -e %s null.tcl" \ 614 % (pid, eid), "startexp"): 615 return False 637 timedout = False 638 try: 639 if not self.ssh_cmd(user, host, 640 ("/usr/testbed/bin/startexp -i -f -w -p %s " + 641 "-e %s null.tcl") % (pid, eid), "startexp", 642 timeout=60 * 10): 643 return False 644 except self.ssh_cmd_timeout: 645 timedout = True 646 647 if timedout: 648 state = self.get_state(user, host, self.ssh_privkey_file, 649 tb, eid, pid) 650 if state != "swapped": 651 return False 652 616 653 617 654 # Open up a temporary file to contain a script for setting up the … … 624 661 return False 625 662 663 scriptbase = os.path.basename(scriptname) 664 626 665 # Script the filesystem changes 627 666 print >>scriptfile, "/bin/rm -rf %s" % proj_dir … … 631 670 print >>scriptfile, "mkdir -p %s" % d 632 671 print >>scriptfile, 'mkdir -p %s' % proj_dir 633 print >>scriptfile, "rm %s" % scriptname672 print >>scriptfile, "rm -f %s" % scriptbase 634 673 scriptfile.close() 635 674 636 675 # Move the script to the remote machine 637 676 # XXX: could collide tempfile names on the remote host 638 if self.scp_file(scriptname, user, host, script name):677 if self.scp_file(scriptname, user, host, scriptbase): 639 678 os.remove(scriptname) 640 679 else: … … 642 681 643 682 # Execute the script (and the script's last line deletes it) 644 if not self.ssh_cmd(user, host, "sh -x %s" % script name):683 if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase): 645 684 return False 646 685 … … 663 702 # now) 664 703 self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb)) 665 if not self.ssh_cmd(user, host, 666 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ 667 (pid, eid, tclfile), 668 "modexp"): 669 return False 704 try: 705 if not self.ssh_cmd(user, host, 706 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ 707 (pid, eid, tclfile), 708 "modexp", timeout= 60 * 10): 709 return False 710 except self.ssh_cmd_timeout: 711 print "modexp timeout" 712 # There's really no way to see if this succeeded or failed, so if 713 # it hangs, assume the worst. 714 return False 670 715 # Active experiments are still swapped, this swaps the others in. 671 716 if state != 'active': 672 717 self.log.info("[start_segment]: Swapping %s in on %s" % \ 673 718 (eid, tb)) 674 if not self.ssh_cmd(user, host, 675 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), 676 "swapexp"): 677 return False 719 timedout = False 720 try: 721 if not self.ssh_cmd(user, host, 722 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), 723 "swapexp", timeout=10*60): 724 return False 725 except self.ssh_cmd_timeout: 726 timedout = True 727 728 # If the command was terminated, but completed successfully, report 729 # success. 730 if timedout: 731 state = self.get_state(user, host, self.ssh_privkey_file, 732 tb, eid, pid) 733 self.log.debug("[start_segment]: swapin timed out (state)") 734 return state == 'active' 735 # Everything has gone OK. 678 736 return True 679 737
Note: See TracChangeset
for help on using the changeset viewer.