Changeset 4b362df


Ignore:
Timestamp:
Jul 22, 2009 1:46:55 PM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
728001e
Parents:
c2dbca8
Message:

Segment operations into functors for migration to async creation

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    rc2dbca8 r4b362df  
    4242
    4343    class ssh_cmd_timeout(RuntimeError): pass
     44
     45    class list_log:
     46        """
     47        Provide an interface that lets logger.StreamHandler s write to a list
     48        of strings.
     49        """
     50        def __init__(self, l=[]):
     51            """
     52            Link to an existing list or just create a log
     53            """
     54            self.ll = l
     55            self.lock = Lock()
     56        def write(self, str):
     57            """
     58            Add the string to the log.  Lock for consistency.
     59            """
     60            self.lock.acquire()
     61            self.ll.append(str)
     62            self.lock.release()
     63
     64        def flush(self):
     65            """
     66            No-op that StreamHandlers expect
     67            """
     68            pass
     69
    4470   
    4571    class thread_pool:
     
    204230        self.randomize_experiments = False
    205231
    206         self.scp_exec = "/usr/bin/scp"
    207232        self.splitter = None
    208         self.ssh_exec="/usr/bin/ssh"
    209233        self.ssh_keygen = "/usr/bin/ssh-keygen"
    210234        self.ssh_identity_file = None
     
    468492        f.close()
    469493
    470     def scp_file(self, file, user, host, dest=""):
    471         """
    472         scp a file to the remote host.  If debug is set the action is only
    473         logged.
    474         """
    475 
    476         scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes',
    477                 '-o', 'StrictHostKeyChecking yes', '-i',
    478                 self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
    479         rv = 0
    480 
    481         try:
    482             dnull = open("/dev/null", "w")
    483         except IOError:
    484             self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
    485             dnull = Null
    486 
    487         self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
    488         if not self.debug:
    489             rv = call(scp_cmd, stdout=dnull, stderr=dnull)
    490 
    491         return rv == 0
    492 
    493     def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
    494         """
    495         Run a remote command on host as user.  If debug is set, the action is
    496         only logged.
    497         """
    498         sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
    499                 "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
    500                 (self.ssh_exec, self.ssh_privkey_file,
    501                         user, host, cmd)
    502 
    503         try:
    504             dnull = open("/dev/null", "r")
    505         except IOError:
    506             self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
    507             dnull = Null
    508 
    509         self.log.debug("[ssh_cmd]: %s" % sh_str)
    510         if not self.debug:
    511             if dnull:
    512                 sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
    513             else:
    514                 sub = Popen(sh_str, shell=True)
    515             if timeout:
    516                 i = 0
    517                 rv = sub.poll()
    518                 while i < timeout:
    519                     if rv is not None: break
     494    class emulab_segment:
     495        def __init__(self, log=None, keyfile=None, debug=False):
     496            self.log = log or logging.getLogger(\
     497                    'fedd.experiment_control.emulab_segment')
     498            self.ssh_privkey_file = keyfile
     499            self.debug = debug
     500            self.ssh_exec="/usr/bin/ssh"
     501            self.scp_exec = "/usr/bin/scp"
     502            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
     503
     504        def scp_file(self, file, user, host, dest=""):
     505            """
     506            scp a file to the remote host.  If debug is set the action is only
     507            logged.
     508            """
     509
     510            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes',
     511                    '-o', 'StrictHostKeyChecking yes', '-i',
     512                    self.ssh_privkey_file, file,
     513                    "%s@%s:%s" % (user, host, dest)]
     514            rv = 0
     515
     516            try:
     517                dnull = open("/dev/null", "w")
     518            except IOError:
     519                self.log.debug("[ssh_file]: failed to open " + \
     520                        "/dev/null for redirect")
     521                dnull = Null
     522
     523            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
     524            if not self.debug:
     525                rv = call(scp_cmd, stdout=dnull, stderr=dnull)
     526
     527            return rv == 0
     528
     529        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
     530            """
     531            Run a remote command on host as user.  If debug is set, the action is
     532            only logged.
     533            """
     534            sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
     535                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
     536                    (self.ssh_exec, self.ssh_privkey_file,
     537                            user, host, cmd)
     538
     539            try:
     540                dnull = open("/dev/null", "r")
     541            except IOError:
     542                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
     543                        "for redirect")
     544                dnull = Null
     545
     546            self.log.debug("[ssh_cmd]: %s" % sh_str)
     547            if not self.debug:
     548                if dnull:
     549                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
     550                else:
     551                    sub = Popen(sh_str, shell=True)
     552                if timeout:
     553                    i = 0
     554                    rv = sub.poll()
     555                    while i < timeout:
     556                        if rv is not None: break
     557                        else:
     558                            time.sleep(1)
     559                            rv = sub.poll()
     560                            i += 1
    520561                    else:
    521                         time.sleep(1)
    522                         rv = sub.poll()
    523                         i += 1
     562                        self.log.debug("Process exceeded runtime: %s" % sh_str)
     563                        os.kill(sub.pid, signal.SIGKILL)
     564                        raise self.ssh_cmd_timeout();
     565                    return rv == 0
    524566                else:
    525                     self.log.debug("Process exceeded runtime: %s" % sh_str)
    526                     os.kill(sub.pid, signal.SIGKILL)
    527                     raise self.ssh_cmd_timeout();
    528                 return rv == 0
     567                    return sub.wait() == 0
    529568            else:
    530                 return sub.wait() == 0
    531         else:
    532             return True
    533 
    534 
    535     def create_config_tree(self, src_dir, dest_dir, script):
    536         """
    537         Append commands to script that will create the directory hierarchy on
    538         the remote federant.
    539         """
    540 
    541         if os.path.isdir(src_dir):
    542             print >>script, "mkdir -p %s" % dest_dir
    543             print >>script, "chmod 770 %s" % dest_dir
    544 
     569                return True
     570
     571    class start_segment(emulab_segment):
     572        def __init__(self, log=None, keyfile=None, debug=False):
     573            experiment_control_local.emulab_segment.__init__(self,
     574                    log=log, keyfile=keyfile, debug=debug)
     575
     576        def create_config_tree(self, src_dir, dest_dir, script):
     577            """
     578            Append commands to script that will create the directory hierarchy
     579            on the remote federant.
     580            """
     581
     582            if os.path.isdir(src_dir):
     583                print >>script, "mkdir -p %s" % dest_dir
     584                print >>script, "chmod 770 %s" % dest_dir
     585
     586                for f in os.listdir(src_dir):
     587                    if os.path.isdir(f):
     588                        self.create_config_tree("%s/%s" % (src_dir, f),
     589                                "%s/%s" % (dest_dir, f), script)
     590            else:
     591                self.log.debug("[create_config_tree]: Not a directory: %s" \
     592                        % src_dir)
     593
     594        def ship_configs(self, host, user, src_dir, dest_dir):
     595            """
     596            Copy federant-specific configuration files to the federant.
     597            """
    545598            for f in os.listdir(src_dir):
    546599                if os.path.isdir(f):
    547                     self.create_config_tree("%s/%s" % (src_dir, f),
    548                             "%s/%s" % (dest_dir, f), script)
    549         else:
    550             self.log.debug("[create_config_tree]: Not a directory: %s" \
    551                     % src_dir)
    552 
    553     def ship_configs(self, host, user, src_dir, dest_dir):
    554         """
    555         Copy federant-specific configuration files to the federant.
    556         """
    557         for f in os.listdir(src_dir):
    558             if os.path.isdir(f):
    559                 if not self.ship_configs(host, user, "%s/%s" % (src_dir, f),
    560                         "%s/%s" % (dest_dir, f)):
    561                     return False
    562             else:
    563                 if not self.scp_file("%s/%s" % (src_dir, f),
    564                         user, host, dest_dir):
    565                     return False
    566         return True
    567 
    568     def get_state(self, user, host, ssh_key, tb, pid, eid):
    569         # command to test experiment state
    570         expinfo_exec = "/usr/testbed/bin/expinfo" 
    571         # Regular expressions to parse the expinfo response
    572         state_re = re.compile("State:\s+(\w+)")
    573         no_exp_re = re.compile("^No\s+such\s+experiment")
    574         swapping_re = re.compile("^No\s+information\s+available.")
    575         state = None    # Experiment state parsed from expinfo
    576         # The expinfo ssh command.  Note the identity restriction to use only
    577         # the identity provided in the pubkey given.
    578         cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o',
    579                 'StrictHostKeyChecking yes', '-i',
    580                 ssh_key, "%s@%s" % (user, host),
    581                 expinfo_exec, pid, eid]
    582 
    583         dev_null = None
    584         try:
    585             dev_null = open("/dev/null", "a")
    586         except IOError, e:
    587             self.log.error("[get_state]: can't open /dev/null: %s" %e)
    588 
    589         if self.debug:
    590             state = 'swapped'
    591             rv = 0
    592         else:
    593             status = Popen(cmd, stdout=PIPE, stderr=dev_null)
    594             for line in status.stdout:
    595                 m = state_re.match(line)
    596                 if m: state = m.group(1)
    597                 else:
    598                     for reg, st in ((no_exp_re, "none"),
    599                             (swapping_re, "swapping")):
    600                         m = reg.match(line)
    601                         if m: state = st
    602             rv = status.wait()
    603 
    604         # If the experiment is not present the subcommand returns a non-zero
    605         # return value.  If we successfully parsed a "none" outcome, ignore the
    606         # return code.
    607         if rv != 0 and state != 'none':
    608             raise service_error(service_error.internal,
    609                     "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
    610         elif state not in ('active', 'swapped', 'swapping', 'none'):
    611             raise service_error(service_error.internal,
    612                     "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
    613         else: return state
    614 
    615 
    616     def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
    617         """
    618         Start a sub-experiment on a federant.
    619 
    620         Get the current state, modify or create as appropriate, ship data and
    621         configs and start the experiment.  There are small ordering differences
    622         based on the initial state of the sub-experiment.
    623         """
    624         # ops node in the federant
    625         host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
    626         user = tbparams[tb]['user']     # federant user
    627         pid = tbparams[tb]['project']   # federant project
    628         # XXX
    629         base_confs = ( "hosts",)
    630         tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
    631         # Configuration directories on the remote machine
    632         proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
    633         tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
    634         rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
    635 
    636         state = self.get_state(user, host, self.ssh_privkey_file, tb, pid, eid)
    637 
    638         self.log.debug("[start_segment]: %s: %s" % (tb, state))
    639         self.log.info("[start_segment]:transferring experiment to %s" % tb)
    640 
    641         if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
    642             return False
    643        
    644         if state == 'none':
    645             # Create a null copy of the experiment so that we capture any logs
    646             # there if the modify fails.  Emulab software discards the logs
    647             # from a failed startexp
    648             if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
    649                 return False
    650             self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
    651             timedout = False
     600                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f),
     601                            "%s/%s" % (dest_dir, f)):
     602                        return False
     603                else:
     604                    if not self.scp_file("%s/%s" % (src_dir, f),
     605                            user, host, dest_dir):
     606                        return False
     607            return True
     608
     609        def get_state(self, user, host, tb, pid, eid):
     610            # command to test experiment state
     611            expinfo_exec = "/usr/testbed/bin/expinfo" 
     612            # Regular expressions to parse the expinfo response
     613            state_re = re.compile("State:\s+(\w+)")
     614            no_exp_re = re.compile("^No\s+such\s+experiment")
     615            swapping_re = re.compile("^No\s+information\s+available.")
     616            state = None    # Experiment state parsed from expinfo
     617            # The expinfo ssh command.  Note the identity restriction to use
     618            # only the identity provided in the pubkey given.
     619            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o',
     620                    'StrictHostKeyChecking yes', '-i',
     621                    self.ssh_privkey_file, "%s@%s" % (user, host),
     622                    expinfo_exec, pid, eid]
     623
     624            dev_null = None
     625            try:
     626                dev_null = open("/dev/null", "a")
     627            except IOError, e:
     628                self.log.error("[get_state]: can't open /dev/null: %s" %e)
     629
     630            if self.debug:
     631                state = 'swapped'
     632                rv = 0
     633            else:
     634                status = Popen(cmd, stdout=PIPE, stderr=dev_null)
     635                for line in status.stdout:
     636                    m = state_re.match(line)
     637                    if m: state = m.group(1)
     638                    else:
     639                        for reg, st in ((no_exp_re, "none"),
     640                                (swapping_re, "swapping")):
     641                            m = reg.match(line)
     642                            if m: state = st
     643                rv = status.wait()
     644
     645            # If the experiment is not present the subcommand returns a
     646            # non-zero return value.  If we successfully parsed a "none"
     647            # outcome, ignore the return code.
     648            if rv != 0 and state != 'none':
     649                raise service_error(service_error.internal,
     650                        "Cannot get status of segment %s:%s/%s" % \
     651                                (tb, pid, eid))
     652            elif state not in ('active', 'swapped', 'swapping', 'none'):
     653                raise service_error(service_error.internal,
     654                        "Cannot get status of segment %s:%s/%s" % \
     655                                (tb, pid, eid))
     656            else: return state
     657
     658
     659        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
     660            """
     661            Start a sub-experiment on a federant.
     662
     663            Get the current state, modify or create as appropriate, ship data
     664            and configs and start the experiment.  There are small ordering
     665            differences based on the initial state of the sub-experiment.
     666            """
     667            # ops node in the federant
     668            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
     669            user = tbparams[tb]['user']     # federant user
     670            pid = tbparams[tb]['project']   # federant project
     671            # XXX
     672            base_confs = ( "hosts",)
     673            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
     674            # Configuration directories on the remote machine
     675            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
     676            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
     677            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
     678
     679            state = self.get_state(user, host, tb, pid, eid)
     680
     681            self.log.debug("[start_segment]: %s: %s" % (tb, state))
     682            self.log.info("[start_segment]:transferring experiment to %s" % tb)
     683
     684            if not self.scp_file("%s/%s/%s" % \
     685                    (tmpdir, tb, tclfile), user, host):
     686                return False
     687           
     688            if state == 'none':
     689                # Create a null copy of the experiment so that we capture any
     690                # logs there if the modify fails.  Emulab software discards the
     691                # logs from a failed startexp
     692                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
     693                    return False
     694                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
     695                timedout = False
     696                try:
     697                    if not self.ssh_cmd(user, host,
     698                            ("/usr/testbed/bin/startexp -i -f -w -p %s " +
     699                            "-e %s null.tcl") % (pid, eid), "startexp",
     700                            timeout=60 * 10):
     701                        return False
     702                except self.ssh_cmd_timeout:
     703                    timedout = True
     704
     705                if timedout:
     706                    state = self.get_state(user, host, self.ssh_privkey_file,
     707                            tb, eid, pid)
     708                    if state != "swapped":
     709                        return False
     710
     711           
     712            # Open up a temporary file to contain a script for setting up the
     713            # filespace for the new experiment.
     714            self.log.info("[start_segment]: creating script file")
     715            try:
     716                sf, scriptname = tempfile.mkstemp()
     717                scriptfile = os.fdopen(sf, 'w')
     718            except IOError:
     719                return False
     720
     721            scriptbase = os.path.basename(scriptname)
     722
     723            # Script the filesystem changes
     724            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
     725            # Clear and create the tarfiles and rpm directories
     726            for d in (tarfiles_dir, rpms_dir):
     727                print >>scriptfile, "/bin/rm -rf %s/*" % d
     728                print >>scriptfile, "mkdir -p %s" % d
     729            print >>scriptfile, 'mkdir -p %s' % proj_dir
     730            self.create_config_tree("%s/%s" % (tmpdir, tb),
     731                    proj_dir, scriptfile)
     732            if os.path.isdir("%s/tarfiles" % tmpdir):
     733                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
     734                        scriptfile)
     735            if os.path.isdir("%s/rpms" % tmpdir):
     736                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir,
     737                        scriptfile)
     738            print >>scriptfile, "rm -f %s" % scriptbase
     739            scriptfile.close()
     740
     741            # Move the script to the remote machine
     742            # XXX: could collide tempfile names on the remote host
     743            if self.scp_file(scriptname, user, host, scriptbase):
     744                os.remove(scriptname)
     745            else:
     746                return False
     747
     748            # Execute the script (and the script's last line deletes it)
     749            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
     750                return False
     751
     752            for f in base_confs:
     753                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
     754                        "%s/%s" % (proj_dir, f)):
     755                    return False
     756            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
     757                    proj_dir):
     758                return False
     759            if os.path.isdir("%s/tarfiles" % tmpdir):
     760                if not self.ship_configs(host, user,
     761                        "%s/tarfiles" % tmpdir, tarfiles_dir):
     762                    return False
     763            if os.path.isdir("%s/rpms" % tmpdir):
     764                if not self.ship_configs(host, user,
     765                        "%s/rpms" % tmpdir, tarfiles_dir):
     766                    return False
     767            # Stage the new configuration (active experiments will stay swapped
     768            # in now)
     769            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
    652770            try:
    653771                if not self.ssh_cmd(user, host,
    654                         ("/usr/testbed/bin/startexp -i -f -w -p %s " +
    655                         "-e %s null.tcl") % (pid, eid), "startexp",
    656                         timeout=60 * 10):
     772                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
     773                                (pid, eid, tclfile),
     774                        "modexp", timeout= 60 * 10):
    657775                    return False
    658776            except self.ssh_cmd_timeout:
    659                 timedout = True
    660 
    661             if timedout:
    662                 state = self.get_state(user, host, self.ssh_privkey_file,
    663                         tb, eid, pid)
    664                 if state != "swapped":
    665                     return False
    666 
    667        
    668         # Open up a temporary file to contain a script for setting up the
    669         # filespace for the new experiment.
    670         self.log.info("[start_segment]: creating script file")
    671         try:
    672             sf, scriptname = tempfile.mkstemp()
    673             scriptfile = os.fdopen(sf, 'w')
    674         except IOError:
    675             return False
    676 
    677         scriptbase = os.path.basename(scriptname)
    678 
    679         # Script the filesystem changes
    680         print >>scriptfile, "/bin/rm -rf %s" % proj_dir
    681         # Clear and create the tarfiles and rpm directories
    682         for d in (tarfiles_dir, rpms_dir):
    683             print >>scriptfile, "/bin/rm -rf %s/*" % d
    684             print >>scriptfile, "mkdir -p %s" % d
    685         print >>scriptfile, 'mkdir -p %s' % proj_dir
    686         self.create_config_tree("%s/%s" % (tmpdir, tb), proj_dir, scriptfile)
    687         if os.path.isdir("%s/tarfiles" % tmpdir):
    688             self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
    689                     scriptfile)
    690         if os.path.isdir("%s/rpms" % tmpdir):
    691             self.create_config_tree("%s/rpms" % tmpdir, rpms_dir,
    692                     scriptfile)
    693         print >>scriptfile, "rm -f %s" % scriptbase
    694         scriptfile.close()
    695 
    696         # Move the script to the remote machine
    697         # XXX: could collide tempfile names on the remote host
    698         if self.scp_file(scriptname, user, host, scriptbase):
    699             os.remove(scriptname)
    700         else:
    701             return False
    702 
    703         # Execute the script (and the script's last line deletes it)
    704         if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
    705             return False
    706 
    707         for f in base_confs:
    708             if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
    709                     "%s/%s" % (proj_dir, f)):
    710                 return False
    711         if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
    712                 proj_dir):
    713             return False
    714         if os.path.isdir("%s/tarfiles" % tmpdir):
    715             if not self.ship_configs(host, user,
    716                     "%s/tarfiles" % tmpdir, tarfiles_dir):
    717                 return False
    718         if os.path.isdir("%s/rpms" % tmpdir):
    719             if not self.ship_configs(host, user,
    720                     "%s/rpms" % tmpdir, tarfiles_dir):
    721                 return False
    722         # Stage the new configuration (active experiments will stay swapped in
    723         # now)
    724         self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
    725         try:
    726             if not self.ssh_cmd(user, host,
    727                     "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
    728                             (pid, eid, tclfile),
    729                     "modexp", timeout= 60 * 10):
     777                print "modexp timeout"
     778                # There's really no way to see if this succeeded or failed, so
     779                # if it hangs, assume the worst.
    730780                return False
    731         except self.ssh_cmd_timeout:
    732             print "modexp timeout"
    733             # There's really no way to see if this succeeded or failed, so if
    734             # it hangs, assume the worst.
    735             return False
    736         # Active experiments are still swapped, this swaps the others in.
    737         if state != 'active':
    738             self.log.info("[start_segment]: Swapping %s in on %s" % \
    739                     (eid, tb))
    740             timedout = False
    741             try:
    742                 if not self.ssh_cmd(user, host,
    743                         "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
    744                         "swapexp", timeout=10*60):
    745                     return False
    746             except self.ssh_cmd_timeout:
    747                 timedout = True
    748            
    749             # If the command was terminated, but completed successfully, report
    750             # success.
    751             if timedout:
    752                 state = self.get_state(user, host, self.ssh_privkey_file,
    753                         tb, eid, pid)
    754                 self.log.debug("[start_segment]: swapin timed out (state)")
    755                 return state == 'active'
    756         # Everything has gone OK.
    757         return True
    758 
    759     def stop_segment(self, tb, eid, tbparams):
    760         """
    761         Stop a sub experiment by calling swapexp on the federant
    762         """
    763         user = tbparams[tb]['user']
    764         host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
    765         pid = tbparams[tb]['project']
    766 
    767         self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
    768         return self.ssh_cmd(user, host,
    769                 "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
     781            # Active experiments are still swapped, this swaps the others in.
     782            if state != 'active':
     783                self.log.info("[start_segment]: Swapping %s in on %s" % \
     784                        (eid, tb))
     785                timedout = False
     786                try:
     787                    if not self.ssh_cmd(user, host,
     788                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
     789                            "swapexp", timeout=10*60):
     790                        return False
     791                except self.ssh_cmd_timeout:
     792                    timedout = True
     793               
     794                # If the command was terminated, but completed successfully,
     795                # report success.
     796                if timedout:
     797                    state = self.get_state(user, host, self.ssh_privkey_file,
     798                            tb, eid, pid)
     799                    self.log.debug("[start_segment]: swapin timed out (state)")
     800                    return state == 'active'
     801            # Everything has gone OK.
     802            return True
     803
     804    class stop_segment(emulab_segment):
     805        def __init__(self, log=None, keyfile=None, debug=False):
     806            experiment_control_local.emulab_segment.__init__(self,
     807                    log=log, keyfile=keyfile, debug=debug)
     808
     809        def __call__(self, tb, eid, tbparams):
     810            """
     811            Stop a sub experiment by calling swapexp on the federant
     812            """
     813            user = tbparams[tb]['user']
     814            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
     815            pid = tbparams[tb]['project']
     816
     817            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
     818            return self.ssh_cmd(user, host,
     819                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
    770820
    771821       
     
    17251775            allocated = { }         # Testbeds we can access
    17261776            started = { }           # Testbeds where a sub-experiment started
    1727                                 # successfully
     1777                                    # successfully
    17281778
    17291779            # Objects to parse the splitter output (defined above)
     
    18371887            # get the return value later
    18381888            thread_pool.wait_for_slot()
    1839             t  = self.pooled_thread(target=self.start_segment,
     1889            t  = self.pooled_thread(\
     1890                    target=self.start_segment(log=self.log,
     1891                        keyfile=self.ssh_privkey_file, debug=self.debug),
    18401892                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
    18411893                    pdata=thread_pool, trace_file=self.trace_file)
     
    18501902
    18511903        if len(failed) == 0:
    1852             if not self.start_segment(master, eid, tbparams, tmpdir):
     1904            starter = self.start_segment(log=self.log,
     1905                    keyfile=self.ssh_privkey_file, debug=self.debug)
     1906            if not starter(master, eid, tbparams, tmpdir):
    18531907                failed.append(master)
    18541908
     
    18611915                    # Create and start a thread to stop the segment
    18621916                    thread_pool.wait_for_slot()
    1863                     t  = self.pooled_thread(target=self.stop_segment,
     1917                    t  = self.pooled_thread(\
     1918                            target=self.stop_segment(log=self.log,
     1919                                keyfile=self.ssh_privkey_file,
     1920                                debug=self.debug),
    18641921                            args=(tb, eid, tbparams), name=tb,
    18651922                            pdata=thread_pool, trace_file=self.trace_file)
Note: See TracChangeset for help on using the changeset viewer.