source: fedd/federation/local_emulab_segment.py @ d3c8759

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since d3c8759 was d3c8759, checked in by Ted Faber <faber@…>, 14 years ago

Wholesale change of IOError to EnvironmentError? for file operations. Lots of
uncaught EnvironmentErrors? were causing spurious error conditions, e.g. on disk
full.

  • Property mode set to 100644
File size: 9.5 KB
Line 
1#!/usr/local/bin/python
2
3import sys, os
4import re
5
6import tempfile
7import subprocess
8import logging 
9import time
10import signal
11
12import util
13
14class local_emulab_segment:
15    class cmd_timeout(RuntimeError): pass
16
17    def __init__(self, log=None, keyfile=None, debug=False):
18        self.log = log or logging.getLogger(\
19                'fedd.access.proxy_emulab_segment')
20        self.certfile = keyfile
21        self.debug = debug
22        self.cmd_timeout = local_emulab_segment.cmd_timeout
23
24    def copy_file(self, src, dest, size=1024):
25        """
26        Exceedingly simple file copy.
27        """
28
29        if not self.debug:
30            util.copy_file(src, dest, size)
31        else:
32            self.log.debug("Copy %s to %s" % (src, dest))
33
34    def cmd_with_timeout(self, cmd, wname=None, timeout=None):
35        """
36        Run a command.  If debug is set, the action
37        is only logged.  Commands are run without stdin, to avoid stray
38        SIGTTINs. If timeout is given and the command runs longer, a
39        cmd_timeout exception is thrown.
40        """
41
42        try:
43            dnull = open("/dev/null", "w")
44        except EnvironmentError:
45            self.log.debug("[cmd_with_timeout]: failed to open /dev/null " + \
46                    "for redirect")
47            dnull = Null
48
49        self.log.debug("[cmd_with_timeout]: %s" % cmd)
50        if not self.debug:
51            if dnull:
52                sub = subprocess.Popen(cmd, shell=True, stdout=dnull,
53                        stderr=dnull, close_fds=True)
54            else:
55                sub = subprocess.Popen(cmd, shell=True, close_fds=True)
56            if timeout:
57                i = 0
58                rv = sub.poll()
59                while i < timeout:
60                    if rv is not None: break
61                    else:
62                        time.sleep(1)
63                        rv = sub.poll()
64                        i += 1
65                else:
66                    self.log.debug("Process exceeded runtime: %s" % cmd)
67                    os.kill(sub.pid, signal.SIGKILL)
68                    raise self.cmd_timeout();
69                return rv == 0
70            else:
71                return sub.wait() == 0
72        else:
73            if timeout == 0:
74                self.log.debug("debug timeout raised on %s " % cmd)
75                raise self.cmd_timeout()
76            else:
77                return True
78
79class start_segment(local_emulab_segment):
80    def __init__(self, log=None, keyfile=None, debug=False):
81        local_emulab_segment.__init__(self, log=log, 
82                keyfile=keyfile, debug=debug)
83        self.null = """
84set ns [new Simulator]
85source tb_compat.tcl
86
87set a [$ns node]
88
89$ns rtproto Session
90$ns run
91"""
92        self.node = { }
93
94    def get_state(self, pid, eid):
95        # command to test experiment state
96        expinfo_exec = "/usr/testbed/bin/expinfo" 
97        # Regular expressions to parse the expinfo response
98        state_re = re.compile("State:\s+(\w+)")
99        no_exp_re = re.compile("^No\s+such\s+experiment")
100        swapping_re = re.compile("^No\s+information\s+available.")
101        state = None    # Experiment state parsed from expinfo
102        # The expinfo ssh command.  Note the identity restriction to use
103        # only the identity provided in the pubkey given.
104        cmd = [ expinfo_exec, pid, eid]
105
106        dev_null = None
107        try:
108            dev_null = open("/dev/null", "a")
109        except EnvironmentError, e:
110            self.log.error("[get_state]: can't open /dev/null: %s" %e)
111
112        if self.debug:
113            state = 'swapped'
114            rv = 0
115        else:
116            self.log.debug("Checking state")
117            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
118                    stderr=dev_null, close_fds=True)
119            for line in status.stdout:
120                m = state_re.match(line)
121                if m: state = m.group(1)
122                else:
123                    for reg, st in ((no_exp_re, "none"),
124                            (swapping_re, "swapping")):
125                        m = reg.match(line)
126                        if m: state = st
127            rv = status.wait()
128
129        # If the experiment is not present the subcommand returns a
130        # non-zero return value.  If we successfully parsed a "none"
131        # outcome, ignore the return code.
132        if rv != 0 and state != 'none':
133            raise service_error(service_error.internal,
134                    "Cannot get status of segment:%s/%s" % (pid, eid))
135        elif state not in ('active', 'swapped', 'swapping', 'none'):
136            raise service_error(service_error.internal,
137                    "Cannot get status of segment:%s/%s" % (pid, eid))
138        else:
139            self.log.debug("State is %s" % state)
140            return state
141
142    def get_mapping(self, pid, eid):
143        # command to test experiment state
144        expinfo_exec = "/usr/testbed/bin/expinfo" 
145        # The expinfo command.
146        cmd = [ expinfo_exec, '-m', pid, eid]
147
148        dev_null = None
149        try:
150            dev_null = open("/dev/null", "a")
151        except EnvironmentError, e:
152            self.log.error("[get_state]: can't open /dev/null: %s" %e)
153
154        if self.debug:
155            rv = 0
156        else:
157            self.log.debug("Getting mapping for %s %s" % (pid, eid))
158            phys_start = re.compile('^Physical\s+Node\s+Mapping')
159            phys_line = re.compile('(\S+)(\s+\S+)*\s+(\S+)')
160            phys_end = re.compile('^$')
161            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
162                    stderr=dev_null, close_fds=True)
163
164            # Parse the info output.  Format:
165            #
166            # stuff
167            # Physical Node Mapping:
168            # ID              Type         OS              Physical   
169            # --------------- ------------ --------------- ------------
170            # virtual         dummy        dummy           physical
171            #
172            foundit = False
173            skip = 0
174            for line in status.stdout:
175                if phys_start.match(line):
176                    skip = 2
177                    foundit = True
178                elif not foundit:
179                    continue
180                elif skip > 0:
181                    skip -= 1
182                elif phys_end.match(line):
183                    break
184                else:
185                    m = phys_line.match(line.strip())
186                    if m: self.node[m.group(1)] = m.group(3)
187                    else: self.log.warn(
188                            "Matching failed while parsing node mapping: " +\
189                                    "line %s" % line)
190            rv = status.wait()
191
192        # If the experiment is not present the subcommand returns a
193        # non-zero return value.  If we successfully parsed a "none"
194        # outcome, ignore the return code.
195        if rv != 0 :
196            raise service_error(service_error.internal,
197                    "Cannot get node mapping of segment:%s/%s" % (pid, eid))
198        else:
199            return True
200
201
202
203    def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0):
204        """
205        Start a sub-experiment on a federant.
206
207        Get the current state, modify or create as appropriate, ship data
208        and configs and start the experiment.  There are small ordering
209        differences based on the initial state of the sub-experiment.
210        """
211        # Configuration directories on this machine
212        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
213        softdir = "/proj/%s/software/%s" % (pid, eid)
214        # Softwrae staging directory software dir
215        lsoftdir = "%s/software" % tmpdir
216
217        state = self.get_state(pid, eid)
218
219        if state == 'none':
220            # Create a null copy of the experiment so that we capture any
221            # logs there if the modify fails.  Emulab software discards the
222            # logs from a failed startexp
223            try:
224                f = open("%s/null.tcl" % tmpdir, "w")
225                print >>f, self.null
226                f.close()
227            except EnvironmentError, e:
228                raise service_error(service_error.internal,
229                        "Cannot stage null.tcl: %s" % e.strerror)
230
231            timedout = False
232            try:
233                if not self.cmd_with_timeout(
234                        ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
235                        "-e %s %s/null.tcl") % (pid, eid, tmpdir), "startexp",
236                        timeout=60 * 10):
237                    return False
238            except self.cmd_timeout:
239                timedout = True
240
241            if timedout:
242                state = self.get_state(pid, eid)
243                if state != "swapped":
244                    return False
245       
246        # Set up the experiment's file space
247        if not self.cmd_with_timeout("/bin/rm -rf %s" % proj_dir):
248            return False
249        # Clear and create the software and configuration directories
250        if not self.cmd_with_timeout("/bin/rm -rf %s/*" % softdir):
251            return False
252        if not self.cmd_with_timeout('mkdir -p %s' % proj_dir):
253            return False
254        if os.path.isdir(lsoftdir):
255            if not self.cmd_with_timeout('mkdir -p %s' % softdir):
256                return False
257
258        try:
259            for f in os.listdir(tmpdir):
260                if not os.path.isdir("%s/%s" % (tmpdir, f)):
261                    self.copy_file("%s/%s" % (tmpdir, f), 
262                            "%s/%s" % (proj_dir, f))
263            if os.path.isdir(lsoftdir):
264                for f in os.listdir(lsoftdir):
265                    if not os.path.isdir("%s/%s" % (lsoftdir, f)):
266                        self.copy_file("%s/%s" % (lsoftdir, f), 
267                                "%s/%s" % (softdir, f))
268        except EnvironmentError, e:
269            self.log.error("Error copying file: %s" %e)
270            return False
271
272        # Stage the new configuration (active experiments will stay swapped
273        # in now)
274        self.log.info("[start_segment]: Modifying %s" % eid)
275        try:
276            if not self.cmd_with_timeout(
277                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
278                            (pid, eid, tclfile),
279                    "modexp", timeout= 60 * 10):
280                return False
281        except self.cmd_timeout:
282            self.log.error("Modify command failed to complete in time")
283            # There's really no way to see if this succeeded or failed, so
284            # if it hangs, assume the worst.
285            return False
286        # Active experiments are still swapped, this swaps the others in.
287        if state != 'active':
288            self.log.info("[start_segment]: Swapping %s" % eid)
289            timedout = False
290            try:
291                if not self.cmd_with_timeout(
292                        "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
293                        "swapexp", timeout=25*60):
294                    return False
295            except self.cmd_timeout:
296                timedout = True
297           
298            # If the command was terminated, but completed successfully,
299            # report success.
300            if timedout:
301                self.log.debug("[start_segment]: swapin timed out " +\
302                        "checking state")
303                state = self.get_state(pid, eid)
304                self.log.debug("[start_segment]: state is %s" % state)
305                if state != 'active':
306                    return False
307        # Everything has gone OK.
308        self.get_mapping(pid,eid)
309        return True
310
311class stop_segment(local_emulab_segment):
312    def __init__(self, log=None, keyfile=None, debug=False):
313        local_emulab_segment.__init__(self,
314                log=log, keyfile=keyfile, debug=debug)
315
316    def __call__(self, parent, user, pid, eid):
317        """
318        Stop a sub experiment by calling swapexp on the federant
319        """
320        self.log.info("[stop_segment]: Stopping %s" % eid)
321        rv = False
322        try:
323            # Clean out tar files: we've gone over quota in the past
324            self.cmd_with_timeout("rm -rf /proj/%s/software/%s" % (pid, eid))
325            rv = self.cmd_with_timeout(
326                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid), 
327                    timeout = 60*10)
328        except self.cmd_timeout:
329            rv = False
330        return rv
331
Note: See TracBrowser for help on using the repository browser.