Changeset 79b6596


Ignore:
Timestamp:
Jul 22, 2009 1:29:23 AM (16 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
c2dbca8
Parents:
012dba5
Message:

Initial swing at timeouts.

Also, the filesystem config script needs to take the ship_configs commands into
account.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    r012dba5 r79b6596  
    1111import pickle
    1212import logging
     13import signal
     14import time
    1315
    1416import traceback
     
    3840    Thred safe.
    3941    """
     42
     43    class ssh_cmd_timeout(RuntimeError): pass
    4044   
    4145    class thread_pool:
     
    487491        return rv == 0
    488492
    489     def ssh_cmd(self, user, host, cmd, wname=None):
     493    def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
    490494        """
    491495        Run a remote command on host as user.  If debug is set, the action is
     
    509513            else:
    510514                sub = Popen(sh_str, shell=True)
    511             return sub.wait() == 0
     515            if timeout:
     516                i = 0
     517                rv = sub.poll()
     518                while i < timeout:
     519                    if rv is not None: break
     520                    else:
     521                        time.sleep(1)
     522                        rv = sub.poll()
     523                        i += 1
     524                else:
     525                    self.log.debug("Process exceeded runtime: %s" % sh_str)
     526                    os.kill(sub.pid, signal.SIGKILL)
     527                    raise self.ssh_cmd_timeout();
     528                return rv == 0
     529            else:
     530                return sub.wait() == 0
    512531        else:
    513532            return True
     
    533552        return True
    534553
    535     def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
    536         """
    537         Start a sub-experiment on a federant.
    538 
    539         Get the current state, modify or create as appropriate, ship data and
    540         configs and start the experiment.  There are small ordering differences
    541         based on the initial state of the sub-experiment.
    542         """
    543         # ops node in the federant
    544         host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
    545         user = tbparams[tb]['user']     # federant user
    546         pid = tbparams[tb]['project']   # federant project
    547         # XXX
    548         base_confs = ( "hosts",)
    549         tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
     554    def get_state(self, user, host, ssh_key, tb, pid, eid):
    550555        # command to test experiment state
    551556        expinfo_exec = "/usr/testbed/bin/expinfo" 
    552         # Configuration directories on the remote machine
    553         proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
    554         tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
    555         rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
    556557        # Regular expressions to parse the expinfo response
    557558        state_re = re.compile("State:\s+(\w+)")
    558559        no_exp_re = re.compile("^No\s+such\s+experiment")
     560        swapping_re = re.compile("^No\s+information\s+available.")
    559561        state = None    # Experiment state parsed from expinfo
    560562        # The expinfo ssh command.  Note the identity restriction to use only
     
    562564        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o',
    563565                'StrictHostKeyChecking yes', '-i',
    564                 self.ssh_privkey_file, "%s@%s" % (user, host),
     566                ssh_key, "%s@%s" % (user, host),
    565567                expinfo_exec, pid, eid]
    566568
    567         # Get status
    568         self.log.debug("[start_segment]: %s"% " ".join(cmd))
    569569        dev_null = None
    570570        try:
     
    582582                if m: state = m.group(1)
    583583                else:
    584                     m = no_exp_re.match(line)
    585                     if m: state = "none"
     584                    for reg, st in ((no_exp_re, "none"),
     585                            (swapping_re, "swapping")):
     586                        m = reg.match(line)
     587                        if m: state = st
    586588            rv = status.wait()
    587589
     
    592594            raise service_error(service_error.internal,
    593595                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
    594 
    595         if state not in ('active', 'swapped', 'none'):
    596             self.log.debug("[start_segment]:unknown state %s" % state)
    597             return False
     596        elif state not in ('active', 'swapped', 'swapping', 'none'):
     597            raise service_error(service_error.internal,
     598                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
     599        else: return state
     600
     601
     602    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
     603        """
     604        Start a sub-experiment on a federant.
     605
     606        Get the current state, modify or create as appropriate, ship data and
     607        configs and start the experiment.  There are small ordering differences
     608        based on the initial state of the sub-experiment.
     609        """
     610        # ops node in the federant
     611        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
     612        user = tbparams[tb]['user']     # federant user
     613        pid = tbparams[tb]['project']   # federant project
     614        # XXX
     615        base_confs = ( "hosts",)
     616        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
     617        # Configuration directories on the remote machine
     618        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
     619        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
     620        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
     621
     622        state = self.get_state(user, host, self.ssh_privkey_file, tb, pid, eid)
    598623
    599624        self.log.debug("[start_segment]: %s: %s" % (tb, state))
     
    610635                return False
    611636            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
    612             if not self.ssh_cmd(user, host,
    613                     "/usr/testbed/bin/startexp -i -f -w -p %s -e %s null.tcl" \
    614                             % (pid, eid), "startexp"):
    615                 return False
     637            timedout = False
     638            try:
     639                if not self.ssh_cmd(user, host,
     640                        ("/usr/testbed/bin/startexp -i -f -w -p %s " +
     641                        "-e %s null.tcl") % (pid, eid), "startexp",
     642                        timeout=60 * 10):
     643                    return False
     644            except self.ssh_cmd_timeout:
     645                timedout = True
     646
     647            if timedout:
     648                state = self.get_state(user, host, self.ssh_privkey_file,
     649                        tb, eid, pid)
     650                if state != "swapped":
     651                    return False
     652
    616653       
    617654        # Open up a temporary file to contain a script for setting up the
     
    624661            return False
    625662
     663        scriptbase = os.path.basename(scriptname)
     664
    626665        # Script the filesystem changes
    627666        print >>scriptfile, "/bin/rm -rf %s" % proj_dir
     
    631670            print >>scriptfile, "mkdir -p %s" % d
    632671        print >>scriptfile, 'mkdir -p %s' % proj_dir
    633         print >>scriptfile, "rm %s" % scriptname
     672        print >>scriptfile, "rm -f %s" % scriptbase
    634673        scriptfile.close()
    635674
    636675        # Move the script to the remote machine
    637676        # XXX: could collide tempfile names on the remote host
    638         if self.scp_file(scriptname, user, host, scriptname):
     677        if self.scp_file(scriptname, user, host, scriptbase):
    639678            os.remove(scriptname)
    640679        else:
     
    642681
    643682        # Execute the script (and the script's last line deletes it)
    644         if not self.ssh_cmd(user, host, "sh -x %s" % scriptname):
     683        if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
    645684            return False
    646685
     
    663702        # now)
    664703        self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
    665         if not self.ssh_cmd(user, host,
    666                 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
    667                         (pid, eid, tclfile),
    668                 "modexp"):
    669             return False
     704        try:
     705            if not self.ssh_cmd(user, host,
     706                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
     707                            (pid, eid, tclfile),
     708                    "modexp", timeout= 60 * 10):
     709                return False
     710        except self.ssh_cmd_timeout:
     711            print "modexp timeout"
     712            # There's really no way to see if this succeeded or failed, so if
     713            # it hangs, assume the worst.
     714            return False
    670715        # Active experiments are still swapped, this swaps the others in.
    671716        if state != 'active':
    672717            self.log.info("[start_segment]: Swapping %s in on %s" % \
    673718                    (eid, tb))
    674             if not self.ssh_cmd(user, host,
    675                     "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
    676                     "swapexp"):
    677                 return False
     719            timedout = False
     720            try:
     721                if not self.ssh_cmd(user, host,
     722                        "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
     723                        "swapexp", timeout=10*60):
     724                    return False
     725            except self.ssh_cmd_timeout:
     726                timedout = True
     727           
     728            # If the command was terminated, but completed successfully, report
     729            # success.
     730            if timedout:
     731                state = self.get_state(user, host, self.ssh_privkey_file,
     732                        tb, eid, pid)
     733                self.log.debug("[start_segment]: swapin timed out (state)")
     734                return state == 'active'
     735        # Everything has gone OK.
    678736        return True
    679737
Note: See TracChangeset for help on using the changeset viewer.