source: fedd/federation/proxy_emulab_segment.py @ 2ac64d1a

compt_changesinfo-ops
Last change on this file since 2ac64d1a was c167378, checked in by Ted Faber <faber@…>, 13 years ago

Restore mnemonic names and allow seer_master experiments in groups. NB:
all existing experiments that are reused on DETER get deleted now.
Don't save anything in there.

  • Property mode set to 100644
File size: 10.9 KB
Line 
1#!/usr/local/bin/python
2
3import sys, os
4import re
5
6import tempfile
7import subprocess
8import logging 
9import time
10import signal
11
12from proxy_segment import proxy_segment
13from service_error import service_error
14
15class start_segment(proxy_segment):
16    """
17    This starts an experiment on an emulab accessed remotely via ssh.  Most of
18    the experiment constuction has been done by the emulab_access object.  This
19    just does the wrangling of the emulab commands and collected the node to
20    physical mapping.  The routine throws service errors.
21    """
22
23    def __init__(self, log=None, keyfile=None, debug=False, boss=None, 
24            cert=None):
25        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
26        self.null = """
27set ns [new Simulator]
28source tb_compat.tcl
29
30set a [$ns node]
31
32$ns rtproto Session
33$ns run
34"""
35        self.node = { }
36
37    def get_state(self, user, host, pid, eid):
38        """
39        Return the state of the experiment as reported by emulab
40        """
41        # command to test experiment state
42        expinfo_exec = "/usr/testbed/bin/expinfo" 
43        # Regular expressions to parse the expinfo response
44        state_re = re.compile("State:\s+(\w+)")
45        no_exp_re = re.compile("^No\s+such\s+experiment")
46        swapping_re = re.compile("^No\s+information\s+available.")
47        state = None    # Experiment state parsed from expinfo
48        # The expinfo ssh command.  Note the identity restriction to use
49        # only the identity provided in the pubkey given.
50        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
51                'StrictHostKeyChecking no', '-i', 
52                self.ssh_privkey_file, "%s@%s" % (user, host), 
53                expinfo_exec, pid, eid]
54
55        dev_null = None
56        try:
57            dev_null = open("/dev/null", "a")
58        except EnvironmentError, e:
59            self.log.error("[get_state]: can't open /dev/null: %s" %e)
60
61        if self.debug:
62            state = 'swapped'
63            rv = 0
64        else:
65            self.log.debug("Checking state")
66            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
67                    stderr=dev_null, close_fds=True)
68            for line in status.stdout:
69                m = state_re.match(line)
70                if m: state = m.group(1)
71                else:
72                    for reg, st in ((no_exp_re, "none"),
73                            (swapping_re, "swapping")):
74                        m = reg.match(line)
75                        if m: state = st
76            rv = status.wait()
77
78        # If the experiment is not present the subcommand returns a
79        # non-zero return value.  If we successfully parsed a "none"
80        # outcome, ignore the return code.
81        if rv != 0 and state != 'none':
82            raise service_error(service_error.internal,
83                    "Cannot get status of segment:%s/%s" % (pid, eid))
84        elif state not in ('active', 'swapped', 'swapping', 'none'):
85            raise service_error(service_error.internal,
86                    "Cannot get status of segment:%s/%s" % (pid, eid))
87        else:
88            self.log.debug("State is %s" % state)
89            return state
90
91
92    def get_mapping(self, user, host, pid, eid):
93        """
94        Get the physical to virtual mapping from the expinfo command and save
95        it in the self.map member.
96        """
97        # command to test experiment state
98        expinfo_exec = "/usr/testbed/bin/expinfo" 
99        # The expinfo ssh command.  Note the identity restriction to use
100        # only the identity provided in the pubkey given.
101        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
102                'StrictHostKeyChecking no', '-i', 
103                self.ssh_privkey_file, "%s@%s" % (user, host), 
104                expinfo_exec, '-m', pid, eid]
105
106        dev_null = None
107        try:
108            dev_null = open("/dev/null", "a")
109        except EnvironmentError, e:
110            self.log.error("[get_mapping]: can't open /dev/null: %s" %e)
111
112        if self.debug:
113            rv = 0
114        else:
115            self.log.debug("Getting mapping for %s %s" % (pid, eid))
116            phys_start = re.compile('^Physical\s+Node\s+Mapping')
117            phys_line = re.compile('(\S+)(\s+\S+)*\s+(\S+)')
118            phys_end = re.compile('^$')
119            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
120                    stderr=dev_null, close_fds=True)
121
122            # Parse the info output.  Format:
123            #
124            # stuff
125            # Physical Node Mapping:
126            # ID              Type         OS              Physical   
127            # --------------- ------------ --------------- ------------
128            # virtual         dummy        dummy           physical
129            #
130            foundit = False
131            skip = 0
132            for line in status.stdout:
133                if phys_start.match(line):
134                    skip = 2
135                    foundit = True
136                elif not foundit:
137                    continue
138                elif skip > 0:
139                    skip -= 1
140                elif phys_end.match(line):
141                    break
142                else:
143                    m = phys_line.match(line.strip())
144                    if m: self.node[m.group(1)] = m.group(3)
145                    else: self.log.warn(
146                            "Matching failed while parsing node mapping: " +\
147                                    "line %s" % line)
148            rv = status.wait()
149
150        # If the experiment is not present the subcommand returns a
151        # non-zero return value.  If we successfully parsed a "none"
152        # outcome, ignore the return code.
153        if rv != 0 :
154            raise service_error(service_error.internal,
155                    "Cannot get node mapping of segment:%s/%s" % (pid, eid))
156        else:
157            return True
158
159
160    def make_null_experiment(self, user, host, pid, eid, tmpdir, gid=None):
161        """
162        Create a null copy of the experiment so that we capture any logs there
163        if the modify fails.  Emulab software discards the logs from a failed
164        startexp
165        """
166
167        if gid is not None: gparam = '-g %s' % gid
168        else: gparam = ''
169
170        try:
171            f = open("%s/null.tcl" % tmpdir, "w")
172            print >>f, self.null
173            f.close()
174        except EnvironmentError, e:
175            raise service_error(service_error.internal,
176                    "Cannot stage tarfile/rpm: %s" % e.strerror)
177
178        if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
179            return False
180        self.log.info("[start_segment]: Creating %s" % eid)
181        timedout = False
182        try:
183            if not self.ssh_cmd(user, host,
184                    ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
185                    "-e %s %s null.tcl") % (pid, eid, gparam), "startexp",
186                    timeout=60 * 10):
187                return False
188        except self.ssh_cmd_timeout:
189            timedout = True
190
191        if timedout:
192            state = self.get_state(user, host, pid, eid)
193            if state != "swapped":
194                return False
195        return True
196
197    def set_up_experiment_filespace(self, user, host, pid, eid, tmpdir):
198        """
199        Send all the software and configuration files into the experiment's
200        file space.  To reduce the number of ssh connections, we script many
201        changes and execute the script.
202        """
203        # Configuration directories on the remote machine
204        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
205        softdir = "/proj/%s/software/%s" % (pid, eid)
206        # Local software dir
207        lsoftdir = "%s/software" % tmpdir
208
209        # Open up a temporary file to contain a script for setting up the
210        # filespace for the new experiment.
211        self.log.info("[start_segment]: creating script file")
212        try:
213            sf, scriptname = tempfile.mkstemp()
214            scriptfile = os.fdopen(sf, 'w')
215        except EnvironmentError:
216            return False
217
218        scriptbase = os.path.basename(scriptname)
219
220        # Script the filesystem changes
221        print >>scriptfile, "/bin/rm -rf %s" % proj_dir
222        # Clear and create the software directory
223        print >>scriptfile, "/bin/rm -rf %s/*" % softdir
224        print >>scriptfile, 'mkdir -p %s' % proj_dir
225        if os.path.isdir(lsoftdir):
226            print >>scriptfile, 'mkdir -p %s' % softdir
227        print >>scriptfile, "rm -f %s" % scriptbase
228        scriptfile.close()
229
230        # Move the script to the remote machine
231        # XXX: could collide tempfile names on the remote host
232        if self.scp_file(scriptname, user, host, scriptbase):
233            os.remove(scriptname)
234        else:
235            return False
236
237        # Execute the script (and the script's last line deletes it)
238        if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
239            return False
240
241        for f in os.listdir(tmpdir):
242            if not os.path.isdir("%s/%s" % (tmpdir, f)):
243                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
244                        "%s/%s" % (proj_dir, f)):
245                    return False
246        if os.path.isdir(lsoftdir):
247            for f in os.listdir(lsoftdir):
248                if not os.path.isdir("%s/%s" % (lsoftdir, f)):
249                    if not self.scp_file("%s/%s" % (lsoftdir, f), 
250                            user, host, "%s/%s" % (softdir, f)):
251                        return False
252        return True
253
254    def swap_in(self, user, host, pid, eid):
255        """
256        Swap experiment in.  This includes code to cope with the experiment
257        swaping command timing out, but the experiment being swapped in
258        successfully.
259        """
260        self.log.info("[start_segment]: Swapping %s in" % eid)
261        timedout = False
262        try:
263            if not self.ssh_cmd(user, host,
264                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
265                    "swapexp", timeout=25*60):
266                return False
267        except self.ssh_cmd_timeout:
268            timedout = True
269       
270        # If the command was terminated, but completed successfully,
271        # report success.
272        if timedout:
273            self.log.debug("[start_segment]: swapin timed out " +\
274                    "checking state")
275            state = self.get_state(user, host, pid, eid)
276            self.log.debug("[start_segment]: state is %s" % state)
277            return state == 'active'
278
279        return True
280
281
282    def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0,
283            gid=None):
284        """
285        Start a sub-experiment on a federant.
286
287        Get the current state, and terminate the experiment if it exists. The
288        group membership of the experiment is difficult to determine or change,
289        so start with a clean slate.  Create a new one and ship data
290        and configs and start the experiment.  There are small ordering
291        """
292        # ops node in the federant
293        host = "%s%s" % (parent.ops, parent.domain)
294        state = self.get_state(user, host, pid, eid)
295
296        if not self.scp_file(tclfile, user, host):
297            return False
298       
299        if state != 'none':
300            self.ssh_cmd(user, host, 
301                    "/usr/testbed/bin/endexp -w %s %s" % (pid, eid))
302
303        # Put a dummy in place to capture logs, and establish an experiment
304        # directory.
305        if not self.make_null_experiment(user, host, pid, eid, tmpdir, gid):
306            return False
307
308        if not self.set_up_experiment_filespace(user, host, pid, eid, tmpdir):
309            return False
310
311        # With the filespace in place, we can modify and swap in.
312        self.log.info("[start_segment]: Modifying %s" % eid)
313        try:
314            if not self.ssh_cmd(user, host,
315                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
316                            (pid, eid, tclfile.rpartition('/')[2]),
317                    "modexp", timeout= 60 * 10):
318                return False
319        except self.ssh_cmd_timeout:
320            self.log.error("Modify command failed to complete in time")
321            # There's really no way to see if this succeeded or failed, so
322            # if it hangs, assume the worst.
323            return False
324
325        if not self.swap_in(user, host, pid, eid):
326            self.log.error("swap in failed")
327            return False
328        # Everything has gone OK.
329        self.get_mapping(user, host, pid,eid)
330        return True
331
332class stop_segment(proxy_segment):
333    def __init__(self, log=None, keyfile=None, debug=False, boss=None, cert=None):
334        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
335
336    def __call__(self, parent, user, pid, eid, gid=None, terminate=False):
337        """
338        Stop a sub experiment by calling swapexp on the federant
339        """
340        host = "%s%s" % (parent.ops, parent.domain)
341        self.log.info("[stop_segment]: Stopping %s" % eid)
342        rv = False
343        try:
344            # Clean out tar files: we've gone over quota in the past
345            self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \
346                    (pid, eid))
347            rv = self.ssh_cmd(user, host,
348                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
349            if terminate:
350                rv = self.ssh_cmd(user, host,
351                        "/usr/testbed/bin/endexp -w %s %s" % (pid, eid))
352        except self.ssh_cmd_timeout:
353            rv = False
354        return rv
355
Note: See TracBrowser for help on using the repository browser.