source: fedd/federation/proxy_emulab_segment.py @ b12e315

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since b12e315 was 0e366a6, checked in by Ted Faber <faber@…>, 15 years ago

Type, Python 2.5 allows duplicate params, and 2.6 does not.

  • Property mode set to 100644
File size: 9.4 KB
Line 
1#!/usr/local/bin/python
2
3import sys, os
4import re
5
6import tempfile
7import subprocess
8import logging 
9import time
10import signal
11
12class proxy_emulab_segment:
13    class ssh_cmd_timeout(RuntimeError): pass
14
15    def __init__(self, log=None, keyfile=None, debug=False):
16        self.log = log or logging.getLogger(\
17                'fedd.access.proxy_emulab_segment')
18        self.ssh_privkey_file = keyfile
19        self.debug = debug
20        self.ssh_exec="/usr/bin/ssh"
21        self.scp_exec = "/usr/bin/scp"
22        self.ssh_cmd_timeout = proxy_emulab_segment.ssh_cmd_timeout
23
24    def scp_file(self, file, user, host, dest=""):
25        """
26        scp a file to the remote host.  If debug is set the action is only
27        logged.
28        """
29
30        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
31                '-o', 'StrictHostKeyChecking yes', '-i', 
32                self.ssh_privkey_file, file, 
33                "%s@%s:%s" % (user, host, dest)]
34        rv = 0
35
36        try:
37            dnull = open("/dev/null", "w")
38        except IOError:
39            self.log.debug("[ssh_file]: failed to open " + \
40                    "/dev/null for redirect")
41            dnull = Null
42
43        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
44        if not self.debug:
45            rv = subprocess.call(scp_cmd, stdout=dnull, 
46                    stderr=dnull, close_fds=True)
47
48        return rv == 0
49
50    def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
51        """
52        Run a remote command on host as user.  If debug is set, the action
53        is only logged.  Commands are run without stdin, to avoid stray
54        SIGTTINs.
55        """
56        sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
57                "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
58                (self.ssh_exec, self.ssh_privkey_file, 
59                        user, host, cmd)
60
61        try:
62            dnull = open("/dev/null", "w")
63        except IOError:
64            self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
65                    "for redirect")
66            dnull = Null
67
68        self.log.debug("[ssh_cmd]: %s" % sh_str)
69        if not self.debug:
70            if dnull:
71                sub = subprocess.Popen(sh_str, shell=True, stdout=dnull,
72                        stderr=dnull, close_fds=True)
73            else:
74                sub = subprocess.Popen(sh_str, shell=True, close_fds=True)
75            if timeout:
76                i = 0
77                rv = sub.poll()
78                while i < timeout:
79                    if rv is not None: break
80                    else:
81                        time.sleep(1)
82                        rv = sub.poll()
83                        i += 1
84                else:
85                    self.log.debug("Process exceeded runtime: %s" % sh_str)
86                    os.kill(sub.pid, signal.SIGKILL)
87                    raise self.ssh_cmd_timeout();
88                return rv == 0
89            else:
90                return sub.wait() == 0
91        else:
92            if timeout == 0:
93                self.log.debug("debug timeout raised on %s " % sh_str)
94                raise self.ssh_cmd_timeout()
95            else:
96                return True
97
98class start_segment(proxy_emulab_segment):
99    def __init__(self, log=None, keyfile=None, debug=False):
100        proxy_emulab_segment.__init__(self, log=log, 
101                keyfile=keyfile, debug=debug)
102        self.null = """
103set ns [new Simulator]
104source tb_compat.tcl
105
106set a [$ns node]
107
108$ns rtproto Session
109$ns run
110"""
111
112    def get_state(self, user, host, pid, eid):
113        # command to test experiment state
114        expinfo_exec = "/usr/testbed/bin/expinfo" 
115        # Regular expressions to parse the expinfo response
116        state_re = re.compile("State:\s+(\w+)")
117        no_exp_re = re.compile("^No\s+such\s+experiment")
118        swapping_re = re.compile("^No\s+information\s+available.")
119        state = None    # Experiment state parsed from expinfo
120        # The expinfo ssh command.  Note the identity restriction to use
121        # only the identity provided in the pubkey given.
122        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
123                'StrictHostKeyChecking yes', '-i', 
124                self.ssh_privkey_file, "%s@%s" % (user, host), 
125                expinfo_exec, pid, eid]
126
127        dev_null = None
128        try:
129            dev_null = open("/dev/null", "a")
130        except IOError, e:
131            self.log.error("[get_state]: can't open /dev/null: %s" %e)
132
133        if self.debug:
134            state = 'swapped'
135            rv = 0
136        else:
137            self.log.debug("Checking state")
138            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
139                    stderr=dev_null, close_fds=True)
140            for line in status.stdout:
141                m = state_re.match(line)
142                if m: state = m.group(1)
143                else:
144                    for reg, st in ((no_exp_re, "none"),
145                            (swapping_re, "swapping")):
146                        m = reg.match(line)
147                        if m: state = st
148            rv = status.wait()
149
150        # If the experiment is not present the subcommand returns a
151        # non-zero return value.  If we successfully parsed a "none"
152        # outcome, ignore the return code.
153        if rv != 0 and state != 'none':
154            raise service_error(service_error.internal,
155                    "Cannot get status of segment:%s/%s" % (pid, eid))
156        elif state not in ('active', 'swapped', 'swapping', 'none'):
157            raise service_error(service_error.internal,
158                    "Cannot get status of segment:%s/%s" % (pid, eid))
159        else:
160            self.log.debug("State is %s" % state)
161            return state
162
163
164    def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0):
165        """
166        Start a sub-experiment on a federant.
167
168        Get the current state, modify or create as appropriate, ship data
169        and configs and start the experiment.  There are small ordering
170        differences based on the initial state of the sub-experiment.
171        """
172        # ops node in the federant
173        host = "%s%s" % (parent.ops, parent.domain)
174        # Configuration directories on the remote machine
175        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
176        softdir = "/proj/%s/software/%s" % (pid, eid)
177        # Local software dir
178        lsoftdir = "%s/software" % tmpdir
179
180        state = self.get_state(user, host, pid, eid)
181
182        if not self.scp_file(tclfile, user, host):
183            return False
184       
185        if state == 'none':
186            # Create a null copy of the experiment so that we capture any
187            # logs there if the modify fails.  Emulab software discards the
188            # logs from a failed startexp
189            try:
190                f = open("%s/null.tcl" % tmpdir, "w")
191                print >>f, self.null
192                f.close()
193            except IOError, e:
194                raise service_error(service_error.internal,
195                        "Cannot stage tarfile/rpm: %s" % e.strerror)
196
197            if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
198                return False
199            self.log.info("[start_segment]: Creating %s" % eid)
200            timedout = False
201            try:
202                if not self.ssh_cmd(user, host,
203                        ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
204                        "-e %s null.tcl") % (pid, eid), "startexp",
205                        timeout=60 * 10):
206                    return False
207            except self.ssh_cmd_timeout:
208                timedout = True
209
210            if timedout:
211                state = self.get_state(user, host, pid, eid)
212                if state != "swapped":
213                    return False
214       
215        # Open up a temporary file to contain a script for setting up the
216        # filespace for the new experiment.
217        self.log.info("[start_segment]: creating script file")
218        try:
219            sf, scriptname = tempfile.mkstemp()
220            scriptfile = os.fdopen(sf, 'w')
221        except IOError:
222            return False
223
224        scriptbase = os.path.basename(scriptname)
225
226        # Script the filesystem changes
227        print >>scriptfile, "/bin/rm -rf %s" % proj_dir
228        # Clear and create the software directory
229        print >>scriptfile, "/bin/rm -rf %s/*" % softdir
230        print >>scriptfile, 'mkdir -p %s' % proj_dir
231        if os.path.isdir(lsoftdir):
232            print >>scriptfile, 'mkdir -p %s' % softdir
233        print >>scriptfile, "rm -f %s" % scriptbase
234        scriptfile.close()
235
236        # Move the script to the remote machine
237        # XXX: could collide tempfile names on the remote host
238        if self.scp_file(scriptname, user, host, scriptbase):
239            os.remove(scriptname)
240        else:
241            return False
242
243        # Execute the script (and the script's last line deletes it)
244        if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
245            return False
246
247        for f in os.listdir(tmpdir):
248            if not os.path.isdir("%s/%s" % (tmpdir, f)):
249                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
250                        "%s/%s" % (proj_dir, f)):
251                    return False
252        if os.path.isdir(lsoftdir):
253            for f in os.listdir(lsoftdir):
254                if not os.path.isdir("%s/%s" % (lsoftdir, f)):
255                    if not self.scp_file("%s/%s" % (lsoftdir, f), 
256                            user, host, "%s/%s" % (softdir, f)):
257                        return False
258        # Stage the new configuration (active experiments will stay swapped
259        # in now)
260        self.log.info("[start_segment]: Modifying %s" % eid)
261        try:
262            if not self.ssh_cmd(user, host,
263                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
264                            (pid, eid, tclfile.rpartition('/')[2]),
265                    "modexp", timeout= 60 * 10):
266                return False
267        except self.ssh_cmd_timeout:
268            self.log.error("Modify command failed to complete in time")
269            # There's really no way to see if this succeeded or failed, so
270            # if it hangs, assume the worst.
271            return False
272        # Active experiments are still swapped, this swaps the others in.
273        if state != 'active':
274            self.log.info("[start_segment]: Swapping %s" % eid)
275            timedout = False
276            try:
277                if not self.ssh_cmd(user, host,
278                        "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
279                        "swapexp", timeout=10*60):
280                    return False
281            except self.ssh_cmd_timeout:
282                timedout = True
283           
284            # If the command was terminated, but completed successfully,
285            # report success.
286            if timedout:
287                self.log.debug("[start_segment]: swapin timed out " +\
288                        "checking state")
289                state = self.get_state(user, host, pid, eid)
290                self.log.debug("[start_segment]: state is %s" % state)
291                return state == 'active'
292        # Everything has gone OK.
293        return True
294
295class stop_segment(proxy_emulab_segment):
296    def __init__(self, log=None, keyfile=None, debug=False):
297        proxy_emulab_segment.__init__(self,
298                log=log, keyfile=keyfile, debug=debug)
299
300    def __call__(self, parent, user, pid, eid):
301        """
302        Stop a sub experiment by calling swapexp on the federant
303        """
304        host = "%s%s" % (parent.ops, parent.domain)
305        self.log.info("[stop_segment]: Stopping %s" % eid)
306        rv = False
307        try:
308            # Clean out tar files: we've gone over quota in the past
309            self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \
310                    (pid, eid))
311            rv = self.ssh_cmd(user, host,
312                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
313        except self.ssh_cmd_timeout:
314            rv = False
315        return rv
316
Note: See TracBrowser for help on using the repository browser.