source: fedd/federation/proxy_emulab_segment.py @ f3898f7

compt_changesinfo-ops
Last change on this file since f3898f7 was f3898f7, checked in by Ted Faber <faber@…>, 12 years ago

Simple support for experiments in groups. Closes #32

  • Property mode set to 100644
File size: 10.7 KB
Line 
1#!/usr/local/bin/python
2
3import sys, os
4import re
5
6import tempfile
7import subprocess
8import logging 
9import time
10import signal
11
12from proxy_segment import proxy_segment
13from service_error import service_error
14
15class start_segment(proxy_segment):
16    """
17    This starts an experiment on an emulab accessed remotely via ssh.  Most of
18    the experiment constuction has been done by the emulab_access object.  This
19    just does the wrangling of the emulab commands and collected the node to
20    physical mapping.  The routine throws service errors.
21    """
22
23    def __init__(self, log=None, keyfile=None, debug=False, boss=None, 
24            cert=None):
25        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
26        self.null = """
27set ns [new Simulator]
28source tb_compat.tcl
29
30set a [$ns node]
31
32$ns rtproto Session
33$ns run
34"""
35        self.node = { }
36
37    def get_state(self, user, host, pid, eid):
38        """
39        Return the state of the experiment as reported by emulab
40        """
41        # command to test experiment state
42        expinfo_exec = "/usr/testbed/bin/expinfo" 
43        # Regular expressions to parse the expinfo response
44        state_re = re.compile("State:\s+(\w+)")
45        no_exp_re = re.compile("^No\s+such\s+experiment")
46        swapping_re = re.compile("^No\s+information\s+available.")
47        state = None    # Experiment state parsed from expinfo
48        # The expinfo ssh command.  Note the identity restriction to use
49        # only the identity provided in the pubkey given.
50        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
51                'StrictHostKeyChecking no', '-i', 
52                self.ssh_privkey_file, "%s@%s" % (user, host), 
53                expinfo_exec, pid, eid]
54
55        dev_null = None
56        try:
57            dev_null = open("/dev/null", "a")
58        except EnvironmentError, e:
59            self.log.error("[get_state]: can't open /dev/null: %s" %e)
60
61        if self.debug:
62            state = 'swapped'
63            rv = 0
64        else:
65            self.log.debug("Checking state")
66            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
67                    stderr=dev_null, close_fds=True)
68            for line in status.stdout:
69                m = state_re.match(line)
70                if m: state = m.group(1)
71                else:
72                    for reg, st in ((no_exp_re, "none"),
73                            (swapping_re, "swapping")):
74                        m = reg.match(line)
75                        if m: state = st
76            rv = status.wait()
77
78        # If the experiment is not present the subcommand returns a
79        # non-zero return value.  If we successfully parsed a "none"
80        # outcome, ignore the return code.
81        if rv != 0 and state != 'none':
82            raise service_error(service_error.internal,
83                    "Cannot get status of segment:%s/%s" % (pid, eid))
84        elif state not in ('active', 'swapped', 'swapping', 'none'):
85            raise service_error(service_error.internal,
86                    "Cannot get status of segment:%s/%s" % (pid, eid))
87        else:
88            self.log.debug("State is %s" % state)
89            return state
90
91
92    def get_mapping(self, user, host, pid, eid):
93        """
94        Get the physical to virtual mapping from the expinfo command and save
95        it in the self.map member.
96        """
97        # command to test experiment state
98        expinfo_exec = "/usr/testbed/bin/expinfo" 
99        # The expinfo ssh command.  Note the identity restriction to use
100        # only the identity provided in the pubkey given.
101        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
102                'StrictHostKeyChecking no', '-i', 
103                self.ssh_privkey_file, "%s@%s" % (user, host), 
104                expinfo_exec, '-m', pid, eid]
105
106        dev_null = None
107        try:
108            dev_null = open("/dev/null", "a")
109        except EnvironmentError, e:
110            self.log.error("[get_mapping]: can't open /dev/null: %s" %e)
111
112        if self.debug:
113            rv = 0
114        else:
115            self.log.debug("Getting mapping for %s %s" % (pid, eid))
116            phys_start = re.compile('^Physical\s+Node\s+Mapping')
117            phys_line = re.compile('(\S+)(\s+\S+)*\s+(\S+)')
118            phys_end = re.compile('^$')
119            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
120                    stderr=dev_null, close_fds=True)
121
122            # Parse the info output.  Format:
123            #
124            # stuff
125            # Physical Node Mapping:
126            # ID              Type         OS              Physical   
127            # --------------- ------------ --------------- ------------
128            # virtual         dummy        dummy           physical
129            #
130            foundit = False
131            skip = 0
132            for line in status.stdout:
133                if phys_start.match(line):
134                    skip = 2
135                    foundit = True
136                elif not foundit:
137                    continue
138                elif skip > 0:
139                    skip -= 1
140                elif phys_end.match(line):
141                    break
142                else:
143                    m = phys_line.match(line.strip())
144                    if m: self.node[m.group(1)] = m.group(3)
145                    else: self.log.warn(
146                            "Matching failed while parsing node mapping: " +\
147                                    "line %s" % line)
148            rv = status.wait()
149
150        # If the experiment is not present the subcommand returns a
151        # non-zero return value.  If we successfully parsed a "none"
152        # outcome, ignore the return code.
153        if rv != 0 :
154            raise service_error(service_error.internal,
155                    "Cannot get node mapping of segment:%s/%s" % (pid, eid))
156        else:
157            return True
158
159
160    def make_null_experiment(self, user, host, pid, eid, tmpdir, gid=None):
161        """
162        Create a null copy of the experiment so that we capture any logs there
163        if the modify fails.  Emulab software discards the logs from a failed
164        startexp
165        """
166
167        if gid is not None: gparam = '-g %s' % gid
168        else: gparam = ''
169
170        try:
171            f = open("%s/null.tcl" % tmpdir, "w")
172            print >>f, self.null
173            f.close()
174        except EnvironmentError, e:
175            raise service_error(service_error.internal,
176                    "Cannot stage tarfile/rpm: %s" % e.strerror)
177
178        if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
179            return False
180        self.log.info("[start_segment]: Creating %s" % eid)
181        timedout = False
182        try:
183            if not self.ssh_cmd(user, host,
184                    ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
185                    "-e %s %s null.tcl") % (pid, eid, gparam), "startexp",
186                    timeout=60 * 10):
187                return False
188        except self.ssh_cmd_timeout:
189            timedout = True
190
191        if timedout:
192            state = self.get_state(user, host, pid, eid)
193            if state != "swapped":
194                return False
195        return True
196
197    def set_up_experiment_filespace(self, user, host, pid, eid, tmpdir):
198        """
199        Send all the software and configuration files into the experiment's
200        file space.  To reduce the number of ssh connections, we script many
201        changes and execute the script.
202        """
203        # Configuration directories on the remote machine
204        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
205        softdir = "/proj/%s/software/%s" % (pid, eid)
206        # Local software dir
207        lsoftdir = "%s/software" % tmpdir
208
209        # Open up a temporary file to contain a script for setting up the
210        # filespace for the new experiment.
211        self.log.info("[start_segment]: creating script file")
212        try:
213            sf, scriptname = tempfile.mkstemp()
214            scriptfile = os.fdopen(sf, 'w')
215        except EnvironmentError:
216            return False
217
218        scriptbase = os.path.basename(scriptname)
219
220        # Script the filesystem changes
221        print >>scriptfile, "/bin/rm -rf %s" % proj_dir
222        # Clear and create the software directory
223        print >>scriptfile, "/bin/rm -rf %s/*" % softdir
224        print >>scriptfile, 'mkdir -p %s' % proj_dir
225        if os.path.isdir(lsoftdir):
226            print >>scriptfile, 'mkdir -p %s' % softdir
227        print >>scriptfile, "rm -f %s" % scriptbase
228        scriptfile.close()
229
230        # Move the script to the remote machine
231        # XXX: could collide tempfile names on the remote host
232        if self.scp_file(scriptname, user, host, scriptbase):
233            os.remove(scriptname)
234        else:
235            return False
236
237        # Execute the script (and the script's last line deletes it)
238        if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
239            return False
240
241        for f in os.listdir(tmpdir):
242            if not os.path.isdir("%s/%s" % (tmpdir, f)):
243                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
244                        "%s/%s" % (proj_dir, f)):
245                    return False
246        if os.path.isdir(lsoftdir):
247            for f in os.listdir(lsoftdir):
248                if not os.path.isdir("%s/%s" % (lsoftdir, f)):
249                    if not self.scp_file("%s/%s" % (lsoftdir, f), 
250                            user, host, "%s/%s" % (softdir, f)):
251                        return False
252        return True
253
254    def swap_in(self, user, host, pid, eid):
255        """
256        Swap experiment in.  This includes code to cope with the experiment
257        swaping command timing out, but the experiment being swapped in
258        successfully.
259        """
260        self.log.info("[start_segment]: Swapping %s in" % eid)
261        timedout = False
262        try:
263            if not self.ssh_cmd(user, host,
264                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
265                    "swapexp", timeout=25*60):
266                return False
267        except self.ssh_cmd_timeout:
268            timedout = True
269       
270        # If the command was terminated, but completed successfully,
271        # report success.
272        if timedout:
273            self.log.debug("[start_segment]: swapin timed out " +\
274                    "checking state")
275            state = self.get_state(user, host, pid, eid)
276            self.log.debug("[start_segment]: state is %s" % state)
277            return state == 'active'
278
279        return True
280
281
282    def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0,
283            gid=None):
284        """
285        Start a sub-experiment on a federant.
286
287        Get the current state, modify or create as appropriate, ship data
288        and configs and start the experiment.  There are small ordering
289        differences based on the initial state of the sub-experiment.
290        """
291        # ops node in the federant
292        host = "%s%s" % (parent.ops, parent.domain)
293        state = self.get_state(user, host, pid, eid)
294
295        if not self.scp_file(tclfile, user, host):
296            return False
297       
298        if state == 'none':
299            # Put a dummy in place to capture logs, and establish an experiment
300            # directory.
301            if not self.make_null_experiment(user, host, pid, eid, tmpdir, gid):
302                return False
303
304        if not self.set_up_experiment_filespace(user, host, pid, eid, tmpdir):
305            return False
306
307        # With the filespace in place, we can modify and swap in.
308        self.log.info("[start_segment]: Modifying %s" % eid)
309        try:
310            if not self.ssh_cmd(user, host,
311                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
312                            (pid, eid, tclfile.rpartition('/')[2]),
313                    "modexp", timeout= 60 * 10):
314                return False
315        except self.ssh_cmd_timeout:
316            self.log.error("Modify command failed to complete in time")
317            # There's really no way to see if this succeeded or failed, so
318            # if it hangs, assume the worst.
319            return False
320        # Active experiments are still swapped, this swaps the others in.
321        if state != 'active':
322            if not self.swap_in(user, host, pid, eid):
323                return False
324        # Everything has gone OK.
325        self.get_mapping(user, host, pid,eid)
326        return True
327
328class stop_segment(proxy_segment):
329    def __init__(self, log=None, keyfile=None, debug=False, boss=None, cert=None):
330        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
331
332    def __call__(self, parent, user, pid, eid, gid=None):
333        """
334        Stop a sub experiment by calling swapexp on the federant
335        """
336        host = "%s%s" % (parent.ops, parent.domain)
337        self.log.info("[stop_segment]: Stopping %s" % eid)
338        rv = False
339        try:
340            # Clean out tar files: we've gone over quota in the past
341            self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \
342                    (pid, eid))
343            rv = self.ssh_cmd(user, host,
344                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
345        except self.ssh_cmd_timeout:
346            rv = False
347        return rv
348
Note: See TracBrowser for help on using the repository browser.