source: fedd/federation/proxy_emulab_segment.py @ ee7f7e4

axis_examplecompt_changesinfo-ops
Last change on this file since ee7f7e4 was 181aeb4, checked in by Ted Faber <faber@…>, 14 years ago

Initial direct emulab manipulations

  • Property mode set to 100644
File size: 10.5 KB
Line 
1#!/usr/local/bin/python
2
3import sys, os
4import re
5
6import tempfile
7import subprocess
8import logging 
9import time
10import signal
11
12from proxy_segment import proxy_segment
13from service_error import service_error
14
15class start_segment(proxy_segment):
16    """
17    This starts an experiment on an emulab accessed remotely via ssh.  Most of
18    the experiment constuction has been done by the emulab_access object.  This
19    just does the wrangling of the emulab commands and collected the node to
20    physical mapping.  The routine throws service errors.
21    """
22
23    def __init__(self, log=None, keyfile=None, debug=False, boss=None, 
24            cert=None):
25        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
26        self.null = """
27set ns [new Simulator]
28source tb_compat.tcl
29
30set a [$ns node]
31
32$ns rtproto Session
33$ns run
34"""
35        self.node = { }
36
37    def get_state(self, user, host, pid, eid):
38        """
39        Return the state of the experiment as reported by emulab
40        """
41        # command to test experiment state
42        expinfo_exec = "/usr/testbed/bin/expinfo" 
43        # Regular expressions to parse the expinfo response
44        state_re = re.compile("State:\s+(\w+)")
45        no_exp_re = re.compile("^No\s+such\s+experiment")
46        swapping_re = re.compile("^No\s+information\s+available.")
47        state = None    # Experiment state parsed from expinfo
48        # The expinfo ssh command.  Note the identity restriction to use
49        # only the identity provided in the pubkey given.
50        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
51                'StrictHostKeyChecking no', '-i', 
52                self.ssh_privkey_file, "%s@%s" % (user, host), 
53                expinfo_exec, pid, eid]
54
55        dev_null = None
56        try:
57            dev_null = open("/dev/null", "a")
58        except EnvironmentError, e:
59            self.log.error("[get_state]: can't open /dev/null: %s" %e)
60
61        if self.debug:
62            state = 'swapped'
63            rv = 0
64        else:
65            self.log.debug("Checking state")
66            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
67                    stderr=dev_null, close_fds=True)
68            for line in status.stdout:
69                m = state_re.match(line)
70                if m: state = m.group(1)
71                else:
72                    for reg, st in ((no_exp_re, "none"),
73                            (swapping_re, "swapping")):
74                        m = reg.match(line)
75                        if m: state = st
76            rv = status.wait()
77
78        # If the experiment is not present the subcommand returns a
79        # non-zero return value.  If we successfully parsed a "none"
80        # outcome, ignore the return code.
81        if rv != 0 and state != 'none':
82            raise service_error(service_error.internal,
83                    "Cannot get status of segment:%s/%s" % (pid, eid))
84        elif state not in ('active', 'swapped', 'swapping', 'none'):
85            raise service_error(service_error.internal,
86                    "Cannot get status of segment:%s/%s" % (pid, eid))
87        else:
88            self.log.debug("State is %s" % state)
89            return state
90
91
92    def get_mapping(self, user, host, pid, eid):
93        """
94        Get the physical to virtual mapping from the expinfo command and save
95        it in the self.map member.
96        """
97        # command to test experiment state
98        expinfo_exec = "/usr/testbed/bin/expinfo" 
99        # The expinfo ssh command.  Note the identity restriction to use
100        # only the identity provided in the pubkey given.
101        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
102                'StrictHostKeyChecking no', '-i', 
103                self.ssh_privkey_file, "%s@%s" % (user, host), 
104                expinfo_exec, '-m', pid, eid]
105
106        dev_null = None
107        try:
108            dev_null = open("/dev/null", "a")
109        except EnvironmentError, e:
110            self.log.error("[get_mapping]: can't open /dev/null: %s" %e)
111
112        if self.debug:
113            rv = 0
114        else:
115            self.log.debug("Getting mapping for %s %s" % (pid, eid))
116            phys_start = re.compile('^Physical\s+Node\s+Mapping')
117            phys_line = re.compile('(\S+)(\s+\S+)*\s+(\S+)')
118            phys_end = re.compile('^$')
119            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
120                    stderr=dev_null, close_fds=True)
121
122            # Parse the info output.  Format:
123            #
124            # stuff
125            # Physical Node Mapping:
126            # ID              Type         OS              Physical   
127            # --------------- ------------ --------------- ------------
128            # virtual         dummy        dummy           physical
129            #
130            foundit = False
131            skip = 0
132            for line in status.stdout:
133                if phys_start.match(line):
134                    skip = 2
135                    foundit = True
136                elif not foundit:
137                    continue
138                elif skip > 0:
139                    skip -= 1
140                elif phys_end.match(line):
141                    break
142                else:
143                    m = phys_line.match(line.strip())
144                    if m: self.node[m.group(1)] = m.group(3)
145                    else: self.log.warn(
146                            "Matching failed while parsing node mapping: " +\
147                                    "line %s" % line)
148            rv = status.wait()
149
150        # If the experiment is not present the subcommand returns a
151        # non-zero return value.  If we successfully parsed a "none"
152        # outcome, ignore the return code.
153        if rv != 0 :
154            raise service_error(service_error.internal,
155                    "Cannot get node mapping of segment:%s/%s" % (pid, eid))
156        else:
157            return True
158
159
160    def make_null_experiment(self, user, host, pid, eid, tmpdir):
161        """
162        Create a null copy of the experiment so that we capture any logs there
163        if the modify fails.  Emulab software discards the logs from a failed
164        startexp
165        """
166        try:
167            f = open("%s/null.tcl" % tmpdir, "w")
168            print >>f, self.null
169            f.close()
170        except EnvironmentError, e:
171            raise service_error(service_error.internal,
172                    "Cannot stage tarfile/rpm: %s" % e.strerror)
173
174        if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
175            return False
176        self.log.info("[start_segment]: Creating %s" % eid)
177        timedout = False
178        try:
179            if not self.ssh_cmd(user, host,
180                    ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
181                    "-e %s null.tcl") % (pid, eid), "startexp",
182                    timeout=60 * 10):
183                return False
184        except self.ssh_cmd_timeout:
185            timedout = True
186
187        if timedout:
188            state = self.get_state(user, host, pid, eid)
189            if state != "swapped":
190                return False
191        return True
192
193    def set_up_experiment_filespace(self, user, host, pid, eid, tmpdir):
194        """
195        Send all the software and configuration files into the experiment's
196        file space.  To reduce the number of ssh connections, we script many
197        changes and execute the script.
198        """
199        # Configuration directories on the remote machine
200        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
201        softdir = "/proj/%s/software/%s" % (pid, eid)
202        # Local software dir
203        lsoftdir = "%s/software" % tmpdir
204
205        # Open up a temporary file to contain a script for setting up the
206        # filespace for the new experiment.
207        self.log.info("[start_segment]: creating script file")
208        try:
209            sf, scriptname = tempfile.mkstemp()
210            scriptfile = os.fdopen(sf, 'w')
211        except EnvironmentError:
212            return False
213
214        scriptbase = os.path.basename(scriptname)
215
216        # Script the filesystem changes
217        print >>scriptfile, "/bin/rm -rf %s" % proj_dir
218        # Clear and create the software directory
219        print >>scriptfile, "/bin/rm -rf %s/*" % softdir
220        print >>scriptfile, 'mkdir -p %s' % proj_dir
221        if os.path.isdir(lsoftdir):
222            print >>scriptfile, 'mkdir -p %s' % softdir
223        print >>scriptfile, "rm -f %s" % scriptbase
224        scriptfile.close()
225
226        # Move the script to the remote machine
227        # XXX: could collide tempfile names on the remote host
228        if self.scp_file(scriptname, user, host, scriptbase):
229            os.remove(scriptname)
230        else:
231            return False
232
233        # Execute the script (and the script's last line deletes it)
234        if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
235            return False
236
237        for f in os.listdir(tmpdir):
238            if not os.path.isdir("%s/%s" % (tmpdir, f)):
239                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
240                        "%s/%s" % (proj_dir, f)):
241                    return False
242        if os.path.isdir(lsoftdir):
243            for f in os.listdir(lsoftdir):
244                if not os.path.isdir("%s/%s" % (lsoftdir, f)):
245                    if not self.scp_file("%s/%s" % (lsoftdir, f), 
246                            user, host, "%s/%s" % (softdir, f)):
247                        return False
248        return True
249
250    def swap_in(self, user, host, pid, eid):
251        """
252        Swap experiment in.  This includes code to cope with the experiment
253        swaping command timing out, but the experiment being swapped in
254        successfully.
255        """
256        self.log.info("[start_segment]: Swapping %s in" % eid)
257        timedout = False
258        try:
259            if not self.ssh_cmd(user, host,
260                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
261                    "swapexp", timeout=25*60):
262                return False
263        except self.ssh_cmd_timeout:
264            timedout = True
265       
266        # If the command was terminated, but completed successfully,
267        # report success.
268        if timedout:
269            self.log.debug("[start_segment]: swapin timed out " +\
270                    "checking state")
271            state = self.get_state(user, host, pid, eid)
272            self.log.debug("[start_segment]: state is %s" % state)
273            return state == 'active'
274
275        return True
276
277
278    def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0):
279        """
280        Start a sub-experiment on a federant.
281
282        Get the current state, modify or create as appropriate, ship data
283        and configs and start the experiment.  There are small ordering
284        differences based on the initial state of the sub-experiment.
285        """
286        # ops node in the federant
287        host = "%s%s" % (parent.ops, parent.domain)
288        state = self.get_state(user, host, pid, eid)
289
290        if not self.scp_file(tclfile, user, host):
291            return False
292       
293        if state == 'none':
294            # Put a dummy in place to capture logs, and establish an experiment
295            # directory.
296            if not self.make_null_experiment(user, host, pid, eid, tmpdir):
297                return False
298
299        if not self.set_up_experiment_filespace(user, host, pid, eid, tmpdir):
300            return False
301
302        # With the filespace in place, we can modify and swap in.
303        self.log.info("[start_segment]: Modifying %s" % eid)
304        try:
305            if not self.ssh_cmd(user, host,
306                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
307                            (pid, eid, tclfile.rpartition('/')[2]),
308                    "modexp", timeout= 60 * 10):
309                return False
310        except self.ssh_cmd_timeout:
311            self.log.error("Modify command failed to complete in time")
312            # There's really no way to see if this succeeded or failed, so
313            # if it hangs, assume the worst.
314            return False
315        # Active experiments are still swapped, this swaps the others in.
316        if state != 'active':
317            if not self.swap_in(user, host, pid, eid):
318                return False
319        # Everything has gone OK.
320        self.get_mapping(user, host, pid,eid)
321        return True
322
323class stop_segment(proxy_segment):
324    def __init__(self, log=None, keyfile=None, debug=False, boss=None, cert=None):
325        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
326
327    def __call__(self, parent, user, pid, eid):
328        """
329        Stop a sub experiment by calling swapexp on the federant
330        """
331        host = "%s%s" % (parent.ops, parent.domain)
332        self.log.info("[stop_segment]: Stopping %s" % eid)
333        rv = False
334        try:
335            # Clean out tar files: we've gone over quota in the past
336            self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \
337                    (pid, eid))
338            rv = self.ssh_cmd(user, host,
339                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
340        except self.ssh_cmd_timeout:
341            rv = False
342        return rv
343
Note: See TracBrowser for help on using the repository browser.