source: fedd/federation/local_emulab_segment.py @ b4b19c7

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since b4b19c7 was b4b19c7, checked in by Ted Faber <faber@…>, 14 years ago

Get topology information into the info operation, as annotations of a topology description. This required adding such information to the start segment replies as well

  • Property mode set to 100644
File size: 9.5 KB
Line 
1#!/usr/local/bin/python
2
3import sys, os
4import re
5
6import tempfile
7import subprocess
8import logging 
9import time
10import signal
11
12import util
13
14class local_emulab_segment:
15    class cmd_timeout(RuntimeError): pass
16
17    def __init__(self, log=None, keyfile=None, debug=False):
18        self.log = log or logging.getLogger(\
19                'fedd.access.proxy_emulab_segment')
20        self.certfile = keyfile
21        self.debug = debug
22        self.cmd_timeout = local_emulab_segment.cmd_timeout
23
24    def copy_file(self, src, dest, size=1024):
25        """
26        Exceedingly simple file copy.
27        """
28
29        if not self.debug:
30            util.copy_file(src, dest, size)
31        else:
32            self.log.debug("Copy %s to %s" % (src, dest))
33
34    def cmd_with_timeout(self, cmd, wname=None, timeout=None):
35        """
36        Run a command.  If debug is set, the action
37        is only logged.  Commands are run without stdin, to avoid stray
38        SIGTTINs. If timeout is given and the command runs longer, a
39        cmd_timeout exception is thrown.
40        """
41
42        try:
43            dnull = open("/dev/null", "w")
44        except IOError:
45            self.log.debug("[cmd_with_timeout]: failed to open /dev/null " + \
46                    "for redirect")
47            dnull = Null
48
49        self.log.debug("[cmd_with_timeout]: %s" % cmd)
50        if not self.debug:
51            if dnull:
52                sub = subprocess.Popen(cmd, shell=True, stdout=dnull,
53                        stderr=dnull, close_fds=True)
54            else:
55                sub = subprocess.Popen(cmd, shell=True, close_fds=True)
56            if timeout:
57                i = 0
58                rv = sub.poll()
59                while i < timeout:
60                    if rv is not None: break
61                    else:
62                        time.sleep(1)
63                        rv = sub.poll()
64                        i += 1
65                else:
66                    self.log.debug("Process exceeded runtime: %s" % cmd)
67                    os.kill(sub.pid, signal.SIGKILL)
68                    raise self.cmd_timeout();
69                return rv == 0
70            else:
71                return sub.wait() == 0
72        else:
73            if timeout == 0:
74                self.log.debug("debug timeout raised on %s " % cmd)
75                raise self.cmd_timeout()
76            else:
77                return True
78
79class start_segment(local_emulab_segment):
80    def __init__(self, log=None, keyfile=None, debug=False):
81        local_emulab_segment.__init__(self, log=log, 
82                keyfile=keyfile, debug=debug)
83        self.null = """
84set ns [new Simulator]
85source tb_compat.tcl
86
87set a [$ns node]
88
89$ns rtproto Session
90$ns run
91"""
92        self.node = { }
93
94    def get_state(self, pid, eid):
95        # command to test experiment state
96        expinfo_exec = "/usr/testbed/bin/expinfo" 
97        # Regular expressions to parse the expinfo response
98        state_re = re.compile("State:\s+(\w+)")
99        no_exp_re = re.compile("^No\s+such\s+experiment")
100        swapping_re = re.compile("^No\s+information\s+available.")
101        state = None    # Experiment state parsed from expinfo
102        # The expinfo ssh command.  Note the identity restriction to use
103        # only the identity provided in the pubkey given.
104        cmd = [ expinfo_exec, pid, eid]
105
106        dev_null = None
107        try:
108            dev_null = open("/dev/null", "a")
109        except IOError, e:
110            self.log.error("[get_state]: can't open /dev/null: %s" %e)
111
112        if self.debug:
113            state = 'swapped'
114            rv = 0
115        else:
116            self.log.debug("Checking state")
117            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
118                    stderr=dev_null, close_fds=True)
119            for line in status.stdout:
120                m = state_re.match(line)
121                if m: state = m.group(1)
122                else:
123                    for reg, st in ((no_exp_re, "none"),
124                            (swapping_re, "swapping")):
125                        m = reg.match(line)
126                        if m: state = st
127            rv = status.wait()
128
129        # If the experiment is not present the subcommand returns a
130        # non-zero return value.  If we successfully parsed a "none"
131        # outcome, ignore the return code.
132        if rv != 0 and state != 'none':
133            raise service_error(service_error.internal,
134                    "Cannot get status of segment:%s/%s" % (pid, eid))
135        elif state not in ('active', 'swapped', 'swapping', 'none'):
136            raise service_error(service_error.internal,
137                    "Cannot get status of segment:%s/%s" % (pid, eid))
138        else:
139            self.log.debug("State is %s" % state)
140            return state
141
142    def get_mapping(self, pid, eid):
143        # command to test experiment state
144        expinfo_exec = "/usr/testbed/bin/expinfo" 
145        # The expinfo command.
146        cmd = [ expinfo_exec, '-m', pid, eid]
147
148        dev_null = None
149        try:
150            dev_null = open("/dev/null", "a")
151        except IOError, e:
152            self.log.error("[get_state]: can't open /dev/null: %s" %e)
153
154        if self.debug:
155            rv = 0
156        else:
157            self.log.debug("Getting mapping for %s %s" % (pid, eid))
158            phys_start = re.compile('^Physical\s+Node\s+Mapping')
159            phys_line = re.compile('(\S+)\s+\S+\s+\S+\s+(.*)')
160            phys_end = re.compile('^$')
161            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
162                    stderr=dev_null, close_fds=True)
163
164            # Parse the info output.  Format:
165            #
166            # stuff
167            # Physical Node Mapping:
168            # ID              Type         OS              Physical   
169            # --------------- ------------ --------------- ------------
170            # virtual         dummy        dummy           physical
171            #
172            foundit = False
173            skip = 0
174            for line in status.stdout:
175                if phys_start.match(line):
176                    skip = 2
177                    foundit = True
178                elif not foundit:
179                    continue
180                elif skip > 0:
181                    skip -= 1
182                elif phys_end.match(line):
183                    break
184                else:
185                    m = phys_line.match(line.strip())
186                    if m: self.node[m.group(1)] = m.group(2)
187                    else: self.log.warn(
188                            "Matching failed while parsing node mapping")
189            rv = status.wait()
190
191        # If the experiment is not present the subcommand returns a
192        # non-zero return value.  If we successfully parsed a "none"
193        # outcome, ignore the return code.
194        if rv != 0 :
195            raise service_error(service_error.internal,
196                    "Cannot get node mapping of segment:%s/%s" % (pid, eid))
197        else:
198            return True
199
200
201
202    def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0):
203        """
204        Start a sub-experiment on a federant.
205
206        Get the current state, modify or create as appropriate, ship data
207        and configs and start the experiment.  There are small ordering
208        differences based on the initial state of the sub-experiment.
209        """
210        # Configuration directories on this machine
211        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
212        softdir = "/proj/%s/software/%s" % (pid, eid)
213        # Softwrae staging directory software dir
214        lsoftdir = "%s/software" % tmpdir
215
216        state = self.get_state(pid, eid)
217
218        if state == 'none':
219            # Create a null copy of the experiment so that we capture any
220            # logs there if the modify fails.  Emulab software discards the
221            # logs from a failed startexp
222            try:
223                f = open("%s/null.tcl" % tmpdir, "w")
224                print >>f, self.null
225                f.close()
226            except IOError, e:
227                raise service_error(service_error.internal,
228                        "Cannot stage null.tcl: %s" % e.strerror)
229
230            timedout = False
231            try:
232                if not self.cmd_with_timeout(
233                        ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
234                        "-e %s %s/null.tcl") % (pid, eid, tmpdir), "startexp",
235                        timeout=60 * 10):
236                    return False
237            except self.cmd_timeout:
238                timedout = True
239
240            if timedout:
241                state = self.get_state(pid, eid)
242                if state != "swapped":
243                    return False
244       
245        # Set up the experiment's file space
246        if not self.cmd_with_timeout("/bin/rm -rf %s" % proj_dir):
247            return False
248        # Clear and create the software and configuration directories
249        if not self.cmd_with_timeout("/bin/rm -rf %s/*" % softdir):
250            return False
251        if not self.cmd_with_timeout('mkdir -p %s' % proj_dir):
252            return False
253        if os.path.isdir(lsoftdir):
254            if not self.cmd_with_timeout('mkdir -p %s' % softdir):
255                return False
256
257        try:
258            for f in os.listdir(tmpdir):
259                if not os.path.isdir("%s/%s" % (tmpdir, f)):
260                    self.copy_file("%s/%s" % (tmpdir, f), 
261                            "%s/%s" % (proj_dir, f))
262            if os.path.isdir(lsoftdir):
263                for f in os.listdir(lsoftdir):
264                    if not os.path.isdir("%s/%s" % (lsoftdir, f)):
265                        self.copy_file("%s/%s" % (lsoftdir, f), 
266                                "%s/%s" % (softdir, f))
267        except IOError, e:
268            self.log.error("Error copying file: %s" %e)
269            return False
270
271        # Stage the new configuration (active experiments will stay swapped
272        # in now)
273        self.log.info("[start_segment]: Modifying %s" % eid)
274        try:
275            if not self.cmd_with_timeout(
276                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
277                            (pid, eid, tclfile),
278                    "modexp", timeout= 60 * 10):
279                return False
280        except self.cmd_timeout:
281            self.log.error("Modify command failed to complete in time")
282            # There's really no way to see if this succeeded or failed, so
283            # if it hangs, assume the worst.
284            return False
285        # Active experiments are still swapped, this swaps the others in.
286        if state != 'active':
287            self.log.info("[start_segment]: Swapping %s" % eid)
288            timedout = False
289            try:
290                if not self.cmd_with_timeout(
291                        "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
292                        "swapexp", timeout=25*60):
293                    return False
294            except self.cmd_timeout:
295                timedout = True
296           
297            # If the command was terminated, but completed successfully,
298            # report success.
299            if timedout:
300                self.log.debug("[start_segment]: swapin timed out " +\
301                        "checking state")
302                state = self.get_state(pid, eid)
303                self.log.debug("[start_segment]: state is %s" % state)
304                if state != 'active':
305                    return False
306        # Everything has gone OK.
307        self.get_mapping(pid,eid)
308        return True
309
310class stop_segment(local_emulab_segment):
311    def __init__(self, log=None, keyfile=None, debug=False):
312        local_emulab_segment.__init__(self,
313                log=log, keyfile=keyfile, debug=debug)
314
315    def __call__(self, parent, user, pid, eid):
316        """
317        Stop a sub experiment by calling swapexp on the federant
318        """
319        self.log.info("[stop_segment]: Stopping %s" % eid)
320        rv = False
321        try:
322            # Clean out tar files: we've gone over quota in the past
323            self.cmd_with_timeout("rm -rf /proj/%s/software/%s" % (pid, eid))
324            rv = self.cmd_with_timeout(
325                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid), 
326                    timeout = 60*10)
327        except self.cmd_timeout:
328            rv = False
329        return rv
330
Note: See TracBrowser for help on using the repository browser.