source: fedd/federation/proxy_emulab_segment.py @ 1962a5b

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 1962a5b was d3c8759, checked in by Ted Faber <faber@…>, 15 years ago

Wholesale change of IOError to EnvironmentError? for file operations. Lots of
uncaught EnvironmentErrors? were causing spurious error conditions, e.g. on disk
full.

  • Property mode set to 100644
File size: 9.2 KB
Line 
1#!/usr/local/bin/python
2
3import sys, os
4import re
5
6import tempfile
7import subprocess
8import logging 
9import time
10import signal
11
12from proxy_segment import proxy_segment
13from service_error import service_error
14
15class start_segment(proxy_segment):
16    def __init__(self, log=None, keyfile=None, debug=False):
17        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
18        self.null = """
19set ns [new Simulator]
20source tb_compat.tcl
21
22set a [$ns node]
23
24$ns rtproto Session
25$ns run
26"""
27        self.node = { }
28
29    def get_state(self, user, host, pid, eid):
30        # command to test experiment state
31        expinfo_exec = "/usr/testbed/bin/expinfo" 
32        # Regular expressions to parse the expinfo response
33        state_re = re.compile("State:\s+(\w+)")
34        no_exp_re = re.compile("^No\s+such\s+experiment")
35        swapping_re = re.compile("^No\s+information\s+available.")
36        state = None    # Experiment state parsed from expinfo
37        # The expinfo ssh command.  Note the identity restriction to use
38        # only the identity provided in the pubkey given.
39        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
40                'StrictHostKeyChecking no', '-i', 
41                self.ssh_privkey_file, "%s@%s" % (user, host), 
42                expinfo_exec, pid, eid]
43
44        dev_null = None
45        try:
46            dev_null = open("/dev/null", "a")
47        except EnvironmentError, e:
48            self.log.error("[get_state]: can't open /dev/null: %s" %e)
49
50        if self.debug:
51            state = 'swapped'
52            rv = 0
53        else:
54            self.log.debug("Checking state")
55            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
56                    stderr=dev_null, close_fds=True)
57            for line in status.stdout:
58                m = state_re.match(line)
59                if m: state = m.group(1)
60                else:
61                    for reg, st in ((no_exp_re, "none"),
62                            (swapping_re, "swapping")):
63                        m = reg.match(line)
64                        if m: state = st
65            rv = status.wait()
66
67        # If the experiment is not present the subcommand returns a
68        # non-zero return value.  If we successfully parsed a "none"
69        # outcome, ignore the return code.
70        if rv != 0 and state != 'none':
71            raise service_error(service_error.internal,
72                    "Cannot get status of segment:%s/%s" % (pid, eid))
73        elif state not in ('active', 'swapped', 'swapping', 'none'):
74            raise service_error(service_error.internal,
75                    "Cannot get status of segment:%s/%s" % (pid, eid))
76        else:
77            self.log.debug("State is %s" % state)
78            return state
79
80
81    def get_mapping(self, user, host, pid, eid):
82        # command to test experiment state
83        expinfo_exec = "/usr/testbed/bin/expinfo" 
84        # The expinfo ssh command.  Note the identity restriction to use
85        # only the identity provided in the pubkey given.
86        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
87                'StrictHostKeyChecking no', '-i', 
88                self.ssh_privkey_file, "%s@%s" % (user, host), 
89                expinfo_exec, '-m', pid, eid]
90
91        dev_null = None
92        try:
93            dev_null = open("/dev/null", "a")
94        except EnvironmentError, e:
95            self.log.error("[get_mapping]: can't open /dev/null: %s" %e)
96
97        if self.debug:
98            rv = 0
99        else:
100            self.log.debug("Getting mapping for %s %s" % (pid, eid))
101            phys_start = re.compile('^Physical\s+Node\s+Mapping')
102            phys_line = re.compile('(\S+)(\s+\S+)*\s+(\S+)')
103            phys_end = re.compile('^$')
104            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
105                    stderr=dev_null, close_fds=True)
106
107            # Parse the info output.  Format:
108            #
109            # stuff
110            # Physical Node Mapping:
111            # ID              Type         OS              Physical   
112            # --------------- ------------ --------------- ------------
113            # virtual         dummy        dummy           physical
114            #
115            foundit = False
116            skip = 0
117            for line in status.stdout:
118                if phys_start.match(line):
119                    skip = 2
120                    foundit = True
121                elif not foundit:
122                    continue
123                elif skip > 0:
124                    skip -= 1
125                elif phys_end.match(line):
126                    break
127                else:
128                    m = phys_line.match(line.strip())
129                    if m: self.node[m.group(1)] = m.group(3)
130                    else: self.log.warn(
131                            "Matching failed while parsing node mapping: " +\
132                                    "line %s" % line)
133            rv = status.wait()
134
135        # If the experiment is not present the subcommand returns a
136        # non-zero return value.  If we successfully parsed a "none"
137        # outcome, ignore the return code.
138        if rv != 0 :
139            raise service_error(service_error.internal,
140                    "Cannot get node mapping of segment:%s/%s" % (pid, eid))
141        else:
142            return True
143
144
145    def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0):
146        """
147        Start a sub-experiment on a federant.
148
149        Get the current state, modify or create as appropriate, ship data
150        and configs and start the experiment.  There are small ordering
151        differences based on the initial state of the sub-experiment.
152        """
153        # ops node in the federant
154        host = "%s%s" % (parent.ops, parent.domain)
155        # Configuration directories on the remote machine
156        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
157        softdir = "/proj/%s/software/%s" % (pid, eid)
158        # Local software dir
159        lsoftdir = "%s/software" % tmpdir
160
161        state = self.get_state(user, host, pid, eid)
162
163        if not self.scp_file(tclfile, user, host):
164            return False
165       
166        if state == 'none':
167            # Create a null copy of the experiment so that we capture any
168            # logs there if the modify fails.  Emulab software discards the
169            # logs from a failed startexp
170            try:
171                f = open("%s/null.tcl" % tmpdir, "w")
172                print >>f, self.null
173                f.close()
174            except EnvironmentError, e:
175                raise service_error(service_error.internal,
176                        "Cannot stage tarfile/rpm: %s" % e.strerror)
177
178            if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
179                return False
180            self.log.info("[start_segment]: Creating %s" % eid)
181            timedout = False
182            try:
183                if not self.ssh_cmd(user, host,
184                        ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
185                        "-e %s null.tcl") % (pid, eid), "startexp",
186                        timeout=60 * 10):
187                    return False
188            except self.ssh_cmd_timeout:
189                timedout = True
190
191            if timedout:
192                state = self.get_state(user, host, pid, eid)
193                if state != "swapped":
194                    return False
195       
196        # Open up a temporary file to contain a script for setting up the
197        # filespace for the new experiment.
198        self.log.info("[start_segment]: creating script file")
199        try:
200            sf, scriptname = tempfile.mkstemp()
201            scriptfile = os.fdopen(sf, 'w')
202        except EnvironmentError:
203            return False
204
205        scriptbase = os.path.basename(scriptname)
206
207        # Script the filesystem changes
208        print >>scriptfile, "/bin/rm -rf %s" % proj_dir
209        # Clear and create the software directory
210        print >>scriptfile, "/bin/rm -rf %s/*" % softdir
211        print >>scriptfile, 'mkdir -p %s' % proj_dir
212        if os.path.isdir(lsoftdir):
213            print >>scriptfile, 'mkdir -p %s' % softdir
214        print >>scriptfile, "rm -f %s" % scriptbase
215        scriptfile.close()
216
217        # Move the script to the remote machine
218        # XXX: could collide tempfile names on the remote host
219        if self.scp_file(scriptname, user, host, scriptbase):
220            os.remove(scriptname)
221        else:
222            return False
223
224        # Execute the script (and the script's last line deletes it)
225        if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
226            return False
227
228        for f in os.listdir(tmpdir):
229            if not os.path.isdir("%s/%s" % (tmpdir, f)):
230                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
231                        "%s/%s" % (proj_dir, f)):
232                    return False
233        if os.path.isdir(lsoftdir):
234            for f in os.listdir(lsoftdir):
235                if not os.path.isdir("%s/%s" % (lsoftdir, f)):
236                    if not self.scp_file("%s/%s" % (lsoftdir, f), 
237                            user, host, "%s/%s" % (softdir, f)):
238                        return False
239        # Stage the new configuration (active experiments will stay swapped
240        # in now)
241        self.log.info("[start_segment]: Modifying %s" % eid)
242        try:
243            if not self.ssh_cmd(user, host,
244                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
245                            (pid, eid, tclfile.rpartition('/')[2]),
246                    "modexp", timeout= 60 * 10):
247                return False
248        except self.ssh_cmd_timeout:
249            self.log.error("Modify command failed to complete in time")
250            # There's really no way to see if this succeeded or failed, so
251            # if it hangs, assume the worst.
252            return False
253        # Active experiments are still swapped, this swaps the others in.
254        if state != 'active':
255            self.log.info("[start_segment]: Swapping %s" % eid)
256            timedout = False
257            try:
258                if not self.ssh_cmd(user, host,
259                        "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
260                        "swapexp", timeout=25*60):
261                    return False
262            except self.ssh_cmd_timeout:
263                timedout = True
264           
265            # If the command was terminated, but completed successfully,
266            # report success.
267            if timedout:
268                self.log.debug("[start_segment]: swapin timed out " +\
269                        "checking state")
270                state = self.get_state(user, host, pid, eid)
271                self.log.debug("[start_segment]: state is %s" % state)
272                return state == 'active'
273        # Everything has gone OK.
274        self.get_mapping(user, host, pid,eid)
275        return True
276
277class stop_segment(proxy_segment):
278    def __init__(self, log=None, keyfile=None, debug=False):
279        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
280
281    def __call__(self, parent, user, pid, eid):
282        """
283        Stop a sub experiment by calling swapexp on the federant
284        """
285        host = "%s%s" % (parent.ops, parent.domain)
286        self.log.info("[stop_segment]: Stopping %s" % eid)
287        rv = False
288        try:
289            # Clean out tar files: we've gone over quota in the past
290            self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \
291                    (pid, eid))
292            rv = self.ssh_cmd(user, host,
293                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
294        except self.ssh_cmd_timeout:
295            rv = False
296        return rv
297
Note: See TracBrowser for help on using the repository browser.