source: fedd/federation/proxy_emulab_segment.py @ 2c51061

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 2c51061 was 5bf359d, checked in by Ted Faber <faber@…>, 14 years ago

More refactoring. Neaten up the code for creating segments in emulab and make the local and proxy class structures parallel. The code is more readable this way, I hope.

  • Property mode set to 100644
File size: 10.5 KB
RevLine 
[11860f52]1#!/usr/local/bin/python
2
3import sys, os
4import re
5
6import tempfile
7import subprocess
8import logging 
9import time
10import signal
11
[37776ea]12from proxy_segment import proxy_segment
[bbd0039]13from service_error import service_error
14
[37776ea]15class start_segment(proxy_segment):
[5bf359d]16    """
17    This starts an experiment on an emulab accessed remotely via ssh.  Most of
18    the experiment constuction has been done by the emulab_access object.  This
19    just does the wrangling of the emulab commands and collected the node to
20    physical mapping.  The routine throws service errors.
21    """
22
[11860f52]23    def __init__(self, log=None, keyfile=None, debug=False):
[37776ea]24        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
[11860f52]25        self.null = """
26set ns [new Simulator]
27source tb_compat.tcl
28
29set a [$ns node]
30
31$ns rtproto Session
32$ns run
33"""
[b4b19c7]34        self.node = { }
[11860f52]35
36    def get_state(self, user, host, pid, eid):
[5bf359d]37        """
38        Return the state of the experiment as reported by emulab
39        """
[11860f52]40        # command to test experiment state
41        expinfo_exec = "/usr/testbed/bin/expinfo" 
42        # Regular expressions to parse the expinfo response
43        state_re = re.compile("State:\s+(\w+)")
44        no_exp_re = re.compile("^No\s+such\s+experiment")
45        swapping_re = re.compile("^No\s+information\s+available.")
46        state = None    # Experiment state parsed from expinfo
47        # The expinfo ssh command.  Note the identity restriction to use
48        # only the identity provided in the pubkey given.
49        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
[bbd0039]50                'StrictHostKeyChecking no', '-i', 
[11860f52]51                self.ssh_privkey_file, "%s@%s" % (user, host), 
52                expinfo_exec, pid, eid]
53
54        dev_null = None
55        try:
56            dev_null = open("/dev/null", "a")
[d3c8759]57        except EnvironmentError, e:
[11860f52]58            self.log.error("[get_state]: can't open /dev/null: %s" %e)
59
60        if self.debug:
61            state = 'swapped'
62            rv = 0
63        else:
64            self.log.debug("Checking state")
65            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
66                    stderr=dev_null, close_fds=True)
67            for line in status.stdout:
68                m = state_re.match(line)
69                if m: state = m.group(1)
70                else:
71                    for reg, st in ((no_exp_re, "none"),
72                            (swapping_re, "swapping")):
73                        m = reg.match(line)
74                        if m: state = st
75            rv = status.wait()
76
77        # If the experiment is not present the subcommand returns a
78        # non-zero return value.  If we successfully parsed a "none"
79        # outcome, ignore the return code.
80        if rv != 0 and state != 'none':
81            raise service_error(service_error.internal,
82                    "Cannot get status of segment:%s/%s" % (pid, eid))
83        elif state not in ('active', 'swapped', 'swapping', 'none'):
84            raise service_error(service_error.internal,
85                    "Cannot get status of segment:%s/%s" % (pid, eid))
86        else:
87            self.log.debug("State is %s" % state)
88            return state
89
90
[4c931af]91    def get_mapping(self, user, host, pid, eid):
[5bf359d]92        """
93        Get the physical to virtual mapping from the expinfo command and save
94        it in the self.map member.
95        """
[b4b19c7]96        # command to test experiment state
97        expinfo_exec = "/usr/testbed/bin/expinfo" 
98        # The expinfo ssh command.  Note the identity restriction to use
99        # only the identity provided in the pubkey given.
100        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
101                'StrictHostKeyChecking no', '-i', 
102                self.ssh_privkey_file, "%s@%s" % (user, host), 
103                expinfo_exec, '-m', pid, eid]
104
105        dev_null = None
106        try:
107            dev_null = open("/dev/null", "a")
[d3c8759]108        except EnvironmentError, e:
[4c931af]109            self.log.error("[get_mapping]: can't open /dev/null: %s" %e)
[b4b19c7]110
111        if self.debug:
112            rv = 0
113        else:
114            self.log.debug("Getting mapping for %s %s" % (pid, eid))
115            phys_start = re.compile('^Physical\s+Node\s+Mapping')
[935e46f]116            phys_line = re.compile('(\S+)(\s+\S+)*\s+(\S+)')
[b4b19c7]117            phys_end = re.compile('^$')
118            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
119                    stderr=dev_null, close_fds=True)
120
121            # Parse the info output.  Format:
122            #
123            # stuff
124            # Physical Node Mapping:
125            # ID              Type         OS              Physical   
126            # --------------- ------------ --------------- ------------
127            # virtual         dummy        dummy           physical
128            #
129            foundit = False
130            skip = 0
131            for line in status.stdout:
132                if phys_start.match(line):
133                    skip = 2
134                    foundit = True
135                elif not foundit:
136                    continue
137                elif skip > 0:
138                    skip -= 1
139                elif phys_end.match(line):
140                    break
141                else:
142                    m = phys_line.match(line.strip())
[935e46f]143                    if m: self.node[m.group(1)] = m.group(3)
[b4b19c7]144                    else: self.log.warn(
[935e46f]145                            "Matching failed while parsing node mapping: " +\
146                                    "line %s" % line)
[b4b19c7]147            rv = status.wait()
148
149        # If the experiment is not present the subcommand returns a
150        # non-zero return value.  If we successfully parsed a "none"
151        # outcome, ignore the return code.
152        if rv != 0 :
153            raise service_error(service_error.internal,
154                    "Cannot get node mapping of segment:%s/%s" % (pid, eid))
155        else:
156            return True
157
158
[5bf359d]159    def make_null_experiment(self, user, host, pid, eid, tmpdir):
[11860f52]160        """
[5bf359d]161        Create a null copy of the experiment so that we capture any logs there
162        if the modify fails.  Emulab software discards the logs from a failed
163        startexp
164        """
165        try:
166            f = open("%s/null.tcl" % tmpdir, "w")
167            print >>f, self.null
168            f.close()
169        except EnvironmentError, e:
170            raise service_error(service_error.internal,
171                    "Cannot stage tarfile/rpm: %s" % e.strerror)
[11860f52]172
[5bf359d]173        if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
174            return False
175        self.log.info("[start_segment]: Creating %s" % eid)
176        timedout = False
177        try:
178            if not self.ssh_cmd(user, host,
179                    ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
180                    "-e %s null.tcl") % (pid, eid), "startexp",
181                    timeout=60 * 10):
182                return False
183        except self.ssh_cmd_timeout:
184            timedout = True
185
186        if timedout:
187            state = self.get_state(user, host, pid, eid)
188            if state != "swapped":
189                return False
190        return True
191
192    def set_up_experiment_filespace(self, user, host, pid, eid, tmpdir):
193        """
194        Send all the software and configuration files into the experiment's
195        file space.  To reduce the number of ssh connections, we script many
196        changes and execute the script.
[11860f52]197        """
198        # Configuration directories on the remote machine
199        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
200        softdir = "/proj/%s/software/%s" % (pid, eid)
201        # Local software dir
202        lsoftdir = "%s/software" % tmpdir
203
204        # Open up a temporary file to contain a script for setting up the
205        # filespace for the new experiment.
206        self.log.info("[start_segment]: creating script file")
207        try:
208            sf, scriptname = tempfile.mkstemp()
209            scriptfile = os.fdopen(sf, 'w')
[d3c8759]210        except EnvironmentError:
[11860f52]211            return False
212
213        scriptbase = os.path.basename(scriptname)
214
215        # Script the filesystem changes
216        print >>scriptfile, "/bin/rm -rf %s" % proj_dir
217        # Clear and create the software directory
218        print >>scriptfile, "/bin/rm -rf %s/*" % softdir
219        print >>scriptfile, 'mkdir -p %s' % proj_dir
220        if os.path.isdir(lsoftdir):
221            print >>scriptfile, 'mkdir -p %s' % softdir
222        print >>scriptfile, "rm -f %s" % scriptbase
223        scriptfile.close()
224
225        # Move the script to the remote machine
226        # XXX: could collide tempfile names on the remote host
227        if self.scp_file(scriptname, user, host, scriptbase):
228            os.remove(scriptname)
229        else:
230            return False
231
232        # Execute the script (and the script's last line deletes it)
233        if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
234            return False
235
236        for f in os.listdir(tmpdir):
237            if not os.path.isdir("%s/%s" % (tmpdir, f)):
238                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
239                        "%s/%s" % (proj_dir, f)):
240                    return False
241        if os.path.isdir(lsoftdir):
242            for f in os.listdir(lsoftdir):
243                if not os.path.isdir("%s/%s" % (lsoftdir, f)):
244                    if not self.scp_file("%s/%s" % (lsoftdir, f), 
245                            user, host, "%s/%s" % (softdir, f)):
246                        return False
[5bf359d]247        return True
248
249    def swap_in(self, user, host, pid, eid):
250        """
251        Swap experiment in.  This includes code to cope with the experiment
252        swaping command timing out, but the experiment being swapped in
253        successfully.
254        """
255        self.log.info("[start_segment]: Swapping %s in" % eid)
256        timedout = False
257        try:
258            if not self.ssh_cmd(user, host,
259                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
260                    "swapexp", timeout=25*60):
261                return False
262        except self.ssh_cmd_timeout:
263            timedout = True
264       
265        # If the command was terminated, but completed successfully,
266        # report success.
267        if timedout:
268            self.log.debug("[start_segment]: swapin timed out " +\
269                    "checking state")
270            state = self.get_state(user, host, pid, eid)
271            self.log.debug("[start_segment]: state is %s" % state)
272            return state == 'active'
273
274        return True
275
276
277    def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0):
278        """
279        Start a sub-experiment on a federant.
280
281        Get the current state, modify or create as appropriate, ship data
282        and configs and start the experiment.  There are small ordering
283        differences based on the initial state of the sub-experiment.
284        """
285        # ops node in the federant
286        host = "%s%s" % (parent.ops, parent.domain)
287        state = self.get_state(user, host, pid, eid)
288
289        if not self.scp_file(tclfile, user, host):
290            return False
291       
292        if state == 'none':
293            # Put a dummy in place to capture logs, and establish an experiment
294            # directory.
295            if not self.make_null_experiment(user, host, pid, eid, tmpdir):
296                return False
297
298        if not self.set_up_experiment_filespace(user, host, pid, eid, tmpdir):
299            return False
300
301        # With the filespace in place, we can modify and swap in.
[11860f52]302        self.log.info("[start_segment]: Modifying %s" % eid)
303        try:
304            if not self.ssh_cmd(user, host,
305                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
306                            (pid, eid, tclfile.rpartition('/')[2]),
307                    "modexp", timeout= 60 * 10):
308                return False
309        except self.ssh_cmd_timeout:
310            self.log.error("Modify command failed to complete in time")
311            # There's really no way to see if this succeeded or failed, so
312            # if it hangs, assume the worst.
313            return False
314        # Active experiments are still swapped, this swaps the others in.
315        if state != 'active':
[5bf359d]316            if not self.swap_in(user, host, pid, eid):
317                return False
[11860f52]318        # Everything has gone OK.
[4c931af]319        self.get_mapping(user, host, pid,eid)
[11860f52]320        return True
321
[37776ea]322class stop_segment(proxy_segment):
[11860f52]323    def __init__(self, log=None, keyfile=None, debug=False):
[37776ea]324        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
[11860f52]325
326    def __call__(self, parent, user, pid, eid):
327        """
328        Stop a sub experiment by calling swapexp on the federant
329        """
330        host = "%s%s" % (parent.ops, parent.domain)
331        self.log.info("[stop_segment]: Stopping %s" % eid)
332        rv = False
333        try:
334            # Clean out tar files: we've gone over quota in the past
335            self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \
336                    (pid, eid))
337            rv = self.ssh_cmd(user, host,
338                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
339        except self.ssh_cmd_timeout:
340            rv = False
341        return rv
342
Note: See TracBrowser for help on using the repository browser.