source: fedd/federation/proxy_emulab_segment.py @ bbd0039

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since bbd0039 was bbd0039, checked in by Ted Faber <faber@…>, 14 years ago

Turn off strict key checking. Failures because of an old .ssh/known_hosts are
unpleasent. This should be an option.

  • Property mode set to 100644
File size: 9.4 KB
Line 
1#!/usr/local/bin/python
2
3import sys, os
4import re
5
6import tempfile
7import subprocess
8import logging 
9import time
10import signal
11
12from service_error import service_error
13
14class proxy_emulab_segment:
15    class ssh_cmd_timeout(RuntimeError): pass
16
17    def __init__(self, log=None, keyfile=None, debug=False):
18        self.log = log or logging.getLogger(\
19                'fedd.access.proxy_emulab_segment')
20        self.ssh_privkey_file = keyfile
21        self.debug = debug
22        self.ssh_exec="/usr/bin/ssh"
23        self.scp_exec = "/usr/bin/scp"
24        self.ssh_cmd_timeout = proxy_emulab_segment.ssh_cmd_timeout
25
26    def scp_file(self, file, user, host, dest=""):
27        """
28        scp a file to the remote host.  If debug is set the action is only
29        logged.
30        """
31
32        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
33                '-o', 'StrictHostKeyChecking no', '-i', 
34                self.ssh_privkey_file, file, 
35                "%s@%s:%s" % (user, host, dest)]
36        rv = 0
37
38        try:
39            dnull = open("/dev/null", "w")
40        except IOError:
41            self.log.debug("[ssh_file]: failed to open " + \
42                    "/dev/null for redirect")
43            dnull = Null
44
45        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
46        if not self.debug:
47            rv = subprocess.call(scp_cmd, stdout=dnull, 
48                    stderr=dnull, close_fds=True)
49
50        return rv == 0
51
52    def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
53        """
54        Run a remote command on host as user.  If debug is set, the action
55        is only logged.  Commands are run without stdin, to avoid stray
56        SIGTTINs.
57        """
58        sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
59                "'StrictHostKeyChecking no' -i %s %s@%s %s") % \
60                (self.ssh_exec, self.ssh_privkey_file, 
61                        user, host, cmd)
62
63        try:
64            dnull = open("/dev/null", "w")
65        except IOError:
66            self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
67                    "for redirect")
68            dnull = Null
69
70        self.log.debug("[ssh_cmd]: %s" % sh_str)
71        if not self.debug:
72            if dnull:
73                sub = subprocess.Popen(sh_str, shell=True, stdout=dnull,
74                        stderr=dnull, close_fds=True)
75            else:
76                sub = subprocess.Popen(sh_str, shell=True, close_fds=True)
77            if timeout:
78                i = 0
79                rv = sub.poll()
80                while i < timeout:
81                    if rv is not None: break
82                    else:
83                        time.sleep(1)
84                        rv = sub.poll()
85                        i += 1
86                else:
87                    self.log.debug("Process exceeded runtime: %s" % sh_str)
88                    os.kill(sub.pid, signal.SIGKILL)
89                    raise self.ssh_cmd_timeout();
90                return rv == 0
91            else:
92                return sub.wait() == 0
93        else:
94            if timeout == 0:
95                self.log.debug("debug timeout raised on %s " % sh_str)
96                raise self.ssh_cmd_timeout()
97            else:
98                return True
99
100class start_segment(proxy_emulab_segment):
101    def __init__(self, log=None, keyfile=None, debug=False):
102        proxy_emulab_segment.__init__(self, log=log, 
103                keyfile=keyfile, debug=debug)
104        self.null = """
105set ns [new Simulator]
106source tb_compat.tcl
107
108set a [$ns node]
109
110$ns rtproto Session
111$ns run
112"""
113
114    def get_state(self, user, host, pid, eid):
115        # command to test experiment state
116        expinfo_exec = "/usr/testbed/bin/expinfo" 
117        # Regular expressions to parse the expinfo response
118        state_re = re.compile("State:\s+(\w+)")
119        no_exp_re = re.compile("^No\s+such\s+experiment")
120        swapping_re = re.compile("^No\s+information\s+available.")
121        state = None    # Experiment state parsed from expinfo
122        # The expinfo ssh command.  Note the identity restriction to use
123        # only the identity provided in the pubkey given.
124        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
125                'StrictHostKeyChecking no', '-i', 
126                self.ssh_privkey_file, "%s@%s" % (user, host), 
127                expinfo_exec, pid, eid]
128
129        dev_null = None
130        try:
131            dev_null = open("/dev/null", "a")
132        except IOError, e:
133            self.log.error("[get_state]: can't open /dev/null: %s" %e)
134
135        if self.debug:
136            state = 'swapped'
137            rv = 0
138        else:
139            self.log.debug("Checking state")
140            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
141                    stderr=dev_null, close_fds=True)
142            for line in status.stdout:
143                m = state_re.match(line)
144                if m: state = m.group(1)
145                else:
146                    for reg, st in ((no_exp_re, "none"),
147                            (swapping_re, "swapping")):
148                        m = reg.match(line)
149                        if m: state = st
150            rv = status.wait()
151
152        # If the experiment is not present the subcommand returns a
153        # non-zero return value.  If we successfully parsed a "none"
154        # outcome, ignore the return code.
155        if rv != 0 and state != 'none':
156            raise service_error(service_error.internal,
157                    "Cannot get status of segment:%s/%s" % (pid, eid))
158        elif state not in ('active', 'swapped', 'swapping', 'none'):
159            raise service_error(service_error.internal,
160                    "Cannot get status of segment:%s/%s" % (pid, eid))
161        else:
162            self.log.debug("State is %s" % state)
163            return state
164
165
166    def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0):
167        """
168        Start a sub-experiment on a federant.
169
170        Get the current state, modify or create as appropriate, ship data
171        and configs and start the experiment.  There are small ordering
172        differences based on the initial state of the sub-experiment.
173        """
174        # ops node in the federant
175        host = "%s%s" % (parent.ops, parent.domain)
176        # Configuration directories on the remote machine
177        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
178        softdir = "/proj/%s/software/%s" % (pid, eid)
179        # Local software dir
180        lsoftdir = "%s/software" % tmpdir
181
182        state = self.get_state(user, host, pid, eid)
183
184        if not self.scp_file(tclfile, user, host):
185            return False
186       
187        if state == 'none':
188            # Create a null copy of the experiment so that we capture any
189            # logs there if the modify fails.  Emulab software discards the
190            # logs from a failed startexp
191            try:
192                f = open("%s/null.tcl" % tmpdir, "w")
193                print >>f, self.null
194                f.close()
195            except IOError, e:
196                raise service_error(service_error.internal,
197                        "Cannot stage tarfile/rpm: %s" % e.strerror)
198
199            if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
200                return False
201            self.log.info("[start_segment]: Creating %s" % eid)
202            timedout = False
203            try:
204                if not self.ssh_cmd(user, host,
205                        ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
206                        "-e %s null.tcl") % (pid, eid), "startexp",
207                        timeout=60 * 10):
208                    return False
209            except self.ssh_cmd_timeout:
210                timedout = True
211
212            if timedout:
213                state = self.get_state(user, host, pid, eid)
214                if state != "swapped":
215                    return False
216       
217        # Open up a temporary file to contain a script for setting up the
218        # filespace for the new experiment.
219        self.log.info("[start_segment]: creating script file")
220        try:
221            sf, scriptname = tempfile.mkstemp()
222            scriptfile = os.fdopen(sf, 'w')
223        except IOError:
224            return False
225
226        scriptbase = os.path.basename(scriptname)
227
228        # Script the filesystem changes
229        print >>scriptfile, "/bin/rm -rf %s" % proj_dir
230        # Clear and create the software directory
231        print >>scriptfile, "/bin/rm -rf %s/*" % softdir
232        print >>scriptfile, 'mkdir -p %s' % proj_dir
233        if os.path.isdir(lsoftdir):
234            print >>scriptfile, 'mkdir -p %s' % softdir
235        print >>scriptfile, "rm -f %s" % scriptbase
236        scriptfile.close()
237
238        # Move the script to the remote machine
239        # XXX: could collide tempfile names on the remote host
240        if self.scp_file(scriptname, user, host, scriptbase):
241            os.remove(scriptname)
242        else:
243            return False
244
245        # Execute the script (and the script's last line deletes it)
246        if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
247            return False
248
249        for f in os.listdir(tmpdir):
250            if not os.path.isdir("%s/%s" % (tmpdir, f)):
251                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
252                        "%s/%s" % (proj_dir, f)):
253                    return False
254        if os.path.isdir(lsoftdir):
255            for f in os.listdir(lsoftdir):
256                if not os.path.isdir("%s/%s" % (lsoftdir, f)):
257                    if not self.scp_file("%s/%s" % (lsoftdir, f), 
258                            user, host, "%s/%s" % (softdir, f)):
259                        return False
260        # Stage the new configuration (active experiments will stay swapped
261        # in now)
262        self.log.info("[start_segment]: Modifying %s" % eid)
263        try:
264            if not self.ssh_cmd(user, host,
265                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
266                            (pid, eid, tclfile.rpartition('/')[2]),
267                    "modexp", timeout= 60 * 10):
268                return False
269        except self.ssh_cmd_timeout:
270            self.log.error("Modify command failed to complete in time")
271            # There's really no way to see if this succeeded or failed, so
272            # if it hangs, assume the worst.
273            return False
274        # Active experiments are still swapped, this swaps the others in.
275        if state != 'active':
276            self.log.info("[start_segment]: Swapping %s" % eid)
277            timedout = False
278            try:
279                if not self.ssh_cmd(user, host,
280                        "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
281                        "swapexp", timeout=25*60):
282                    return False
283            except self.ssh_cmd_timeout:
284                timedout = True
285           
286            # If the command was terminated, but completed successfully,
287            # report success.
288            if timedout:
289                self.log.debug("[start_segment]: swapin timed out " +\
290                        "checking state")
291                state = self.get_state(user, host, pid, eid)
292                self.log.debug("[start_segment]: state is %s" % state)
293                return state == 'active'
294        # Everything has gone OK.
295        return True
296
297class stop_segment(proxy_emulab_segment):
298    def __init__(self, log=None, keyfile=None, debug=False):
299        proxy_emulab_segment.__init__(self,
300                log=log, keyfile=keyfile, debug=debug)
301
302    def __call__(self, parent, user, pid, eid):
303        """
304        Stop a sub experiment by calling swapexp on the federant
305        """
306        host = "%s%s" % (parent.ops, parent.domain)
307        self.log.info("[stop_segment]: Stopping %s" % eid)
308        rv = False
309        try:
310            # Clean out tar files: we've gone over quota in the past
311            self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \
312                    (pid, eid))
313            rv = self.ssh_cmd(user, host,
314                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
315        except self.ssh_cmd_timeout:
316            rv = False
317        return rv
318
Note: See TracBrowser for help on using the repository browser.