source: fedd/federation/local_emulab_segment.py @ 60961f5

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 60961f5 was 5bf359d, checked in by Ted Faber <faber@…>, 14 years ago

More refactoring. Neaten up the code for creating segments in emulab and make the local and proxy class structures parallel. The code is more readable this way, I hope.

  • Property mode set to 100644
File size: 8.6 KB
Line 
1#!/usr/local/bin/python
2
3import sys, os
4import re
5
6import tempfile
7import subprocess
8import logging 
9import time
10import signal
11
12import util
13
14from local_segment import local_segment
15
16class start_segment(local_segment):
17    def __init__(self, log=None, keyfile=None, debug=False):
18        local_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
19        self.null = """
20set ns [new Simulator]
21source tb_compat.tcl
22
23set a [$ns node]
24
25$ns rtproto Session
26$ns run
27"""
28        self.node = { }
29
30    def get_state(self, pid, eid):
31        """
32        Return the state of the experiment as reported by emulab
33        """
34        # command to test experiment state
35        expinfo_exec = "/usr/testbed/bin/expinfo" 
36        # Regular expressions to parse the expinfo response
37        state_re = re.compile("State:\s+(\w+)")
38        no_exp_re = re.compile("^No\s+such\s+experiment")
39        swapping_re = re.compile("^No\s+information\s+available.")
40        state = None    # Experiment state parsed from expinfo
41        # The expinfo ssh command.  Note the identity restriction to use
42        # only the identity provided in the pubkey given.
43        cmd = [ expinfo_exec, pid, eid]
44
45        dev_null = None
46        try:
47            dev_null = open("/dev/null", "a")
48        except EnvironmentError, e:
49            self.log.error("[get_state]: can't open /dev/null: %s" %e)
50
51        if self.debug:
52            state = 'swapped'
53            rv = 0
54        else:
55            self.log.debug("Checking state")
56            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
57                    stderr=dev_null, close_fds=True)
58            for line in status.stdout:
59                m = state_re.match(line)
60                if m: state = m.group(1)
61                else:
62                    for reg, st in ((no_exp_re, "none"),
63                            (swapping_re, "swapping")):
64                        m = reg.match(line)
65                        if m: state = st
66            rv = status.wait()
67
68        # If the experiment is not present the subcommand returns a
69        # non-zero return value.  If we successfully parsed a "none"
70        # outcome, ignore the return code.
71        if rv != 0 and state != 'none':
72            raise service_error(service_error.internal,
73                    "Cannot get status of segment:%s/%s" % (pid, eid))
74        elif state not in ('active', 'swapped', 'swapping', 'none'):
75            raise service_error(service_error.internal,
76                    "Cannot get status of segment:%s/%s" % (pid, eid))
77        else:
78            self.log.debug("State is %s" % state)
79            return state
80
81    def get_mapping(self, pid, eid):
82        """
83        Get the physical to virtual mapping from the expinfo command and save
84        it in the self.map member.
85        """
86        # command to test experiment state
87        expinfo_exec = "/usr/testbed/bin/expinfo" 
88        # The expinfo command.
89        cmd = [ expinfo_exec, '-m', pid, eid]
90
91        dev_null = None
92        try:
93            dev_null = open("/dev/null", "a")
94        except EnvironmentError, e:
95            self.log.error("[get_state]: can't open /dev/null: %s" %e)
96
97        if self.debug:
98            rv = 0
99        else:
100            self.log.debug("Getting mapping for %s %s" % (pid, eid))
101            phys_start = re.compile('^Physical\s+Node\s+Mapping')
102            phys_line = re.compile('(\S+)(\s+\S+)*\s+(\S+)')
103            phys_end = re.compile('^$')
104            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
105                    stderr=dev_null, close_fds=True)
106
107            # Parse the info output.  Format:
108            #
109            # stuff
110            # Physical Node Mapping:
111            # ID              Type         OS              Physical   
112            # --------------- ------------ --------------- ------------
113            # virtual         dummy        dummy           physical
114            #
115            foundit = False
116            skip = 0
117            for line in status.stdout:
118                if phys_start.match(line):
119                    skip = 2
120                    foundit = True
121                elif not foundit:
122                    continue
123                elif skip > 0:
124                    skip -= 1
125                elif phys_end.match(line):
126                    break
127                else:
128                    m = phys_line.match(line.strip())
129                    if m: self.node[m.group(1)] = m.group(3)
130                    else: self.log.warn(
131                            "Matching failed while parsing node mapping: " +\
132                                    "line %s" % line)
133            rv = status.wait()
134
135        # If the experiment is not present the subcommand returns a
136        # non-zero return value.  If we successfully parsed a "none"
137        # outcome, ignore the return code.
138        if rv != 0 :
139            raise service_error(service_error.internal,
140                    "Cannot get node mapping of segment:%s/%s" % (pid, eid))
141        else:
142            return True
143
144    def make_null_experiment(self, pid, eid, tmpdir):
145        """
146        Create a null copy of the experiment so that we capture any logs there
147        if the modify fails.  Emulab software discards the logs from a failed
148        startexp.
149        """
150        try:
151            f = open("%s/null.tcl" % tmpdir, "w")
152            print >>f, self.null
153            f.close()
154        except EnvironmentError, e:
155            raise service_error(service_error.internal,
156                    "Cannot stage null.tcl: %s" % e.strerror)
157
158        timedout = False
159        try:
160            if not self.cmd_with_timeout(
161                    ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
162                    "-e %s %s/null.tcl") % (pid, eid, tmpdir), "startexp",
163                    timeout=60 * 10):
164                return False
165        except self.cmd_timeout:
166            timedout = True
167
168        if timedout:
169            state = self.get_state(pid, eid)
170            return state == "swapped"
171        else:
172            return True
173
174    def set_up_experiment_filespace(self, pid, eid, tmpdir):
175        # Configuration directories on this machine
176        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
177        softdir = "/proj/%s/software/%s" % (pid, eid)
178        # Softwrae staging directory software dir
179        lsoftdir = "%s/software" % tmpdir
180
181        # Set up the experiment's file space
182        if not self.cmd_with_timeout("/bin/rm -rf %s" % proj_dir):
183            return False
184        # Clear and create the software and configuration directories
185        if not self.cmd_with_timeout("/bin/rm -rf %s/*" % softdir):
186            return False
187        if not self.cmd_with_timeout('mkdir -p %s' % proj_dir):
188            return False
189        if os.path.isdir(lsoftdir):
190            if not self.cmd_with_timeout('mkdir -p %s' % softdir):
191                return False
192
193        try:
194            for f in os.listdir(tmpdir):
195                if not os.path.isdir("%s/%s" % (tmpdir, f)):
196                    self.copy_file("%s/%s" % (tmpdir, f), 
197                            "%s/%s" % (proj_dir, f))
198            if os.path.isdir(lsoftdir):
199                for f in os.listdir(lsoftdir):
200                    if not os.path.isdir("%s/%s" % (lsoftdir, f)):
201                        self.copy_file("%s/%s" % (lsoftdir, f), 
202                                "%s/%s" % (softdir, f))
203        except EnvironmentError, e:
204            self.log.error("Error copying file: %s" %e)
205            return False
206
207        return True
208
209    def swap_in(self, pid, eid):
210        """
211        Swap experiment in.  This includes code to cope with the experiment
212        swaping command timing out, but the experiment being swapped in
213        successfully.
214        """
215        self.log.info("[start_segment]: Swapping %s" % eid)
216        timedout = False
217        try:
218            if not self.cmd_with_timeout(
219                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
220                    "swapexp", timeout=25*60):
221                return False
222        except self.cmd_timeout:
223            timedout = True
224       
225        # If the command was terminated, but completed successfully,
226        # report success.
227        if timedout:
228            self.log.debug("[start_segment]: swapin timed out " +\
229                    "checking state")
230            state = self.get_state(pid, eid)
231            self.log.debug("[start_segment]: state is %s" % state)
232            return state == 'active'
233        else:
234            return True
235
236    def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0):
237        """
238        Start a sub-experiment on a federant.
239
240        Get the current state, modify or create as appropriate, ship data
241        and configs and start the experiment.  There are small ordering
242        differences based on the initial state of the sub-experiment.
243        """
244
245        state = self.get_state(pid, eid)
246
247        if state == 'none':
248            if not self.make_null_experiment(pid, eid, tmpdir):
249                return False
250
251        if not self.set_up_experiment_filespace(pid, eid, tmpdir):
252            return False
253       
254        # Stage the new configuration (active experiments will stay swapped
255        # in now)
256        self.log.info("[start_segment]: Modifying %s" % eid)
257        try:
258            if not self.cmd_with_timeout(
259                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
260                            (pid, eid, tclfile),
261                    "modexp", timeout= 60 * 10):
262                return False
263        except self.cmd_timeout:
264            self.log.error("Modify command failed to complete in time")
265            # There's really no way to see if this succeeded or failed, so
266            # if it hangs, assume the worst.
267            return False
268        # Active experiments are still swapped, this swaps the others in.
269        if state != 'active':
270            if not self.swap_in(pid, eid):
271                return False
272        # Everything has gone OK.
273        self.get_mapping(pid,eid)
274        return True
275
276class stop_segment(local_segment):
277    def __init__(self, log=None, keyfile=None, debug=False):
278        local_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
279
280    def __call__(self, parent, user, pid, eid):
281        """
282        Stop a sub experiment by calling swapexp on the federant
283        """
284        self.log.info("[stop_segment]: Stopping %s" % eid)
285        rv = False
286        try:
287            # Clean out tar files: we've gone over quota in the past
288            self.cmd_with_timeout("rm -rf /proj/%s/software/%s" % (pid, eid))
289            rv = self.cmd_with_timeout(
290                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid), 
291                    timeout = 60*10)
292        except self.cmd_timeout:
293            rv = False
294        return rv
295
Note: See TracBrowser for help on using the repository browser.