source: fedd/federation/proxy_emulab_segment.py @ b4b19c7

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since b4b19c7 was b4b19c7, checked in by Ted Faber <faber@…>, 14 years ago

Get topology information into the info operation, as annotations of a topology description. This required adding such information to the start segment replies as well

  • Property mode set to 100644
File size: 9.1 KB
Line 
1#!/usr/local/bin/python
2
3import sys, os
4import re
5
6import tempfile
7import subprocess
8import logging 
9import time
10import signal
11
12from proxy_segment import proxy_segment
13from service_error import service_error
14
15class start_segment(proxy_segment):
16    def __init__(self, log=None, keyfile=None, debug=False):
17        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
18        self.null = """
19set ns [new Simulator]
20source tb_compat.tcl
21
22set a [$ns node]
23
24$ns rtproto Session
25$ns run
26"""
27        self.node = { }
28
29    def get_state(self, user, host, pid, eid):
30        # command to test experiment state
31        expinfo_exec = "/usr/testbed/bin/expinfo" 
32        # Regular expressions to parse the expinfo response
33        state_re = re.compile("State:\s+(\w+)")
34        no_exp_re = re.compile("^No\s+such\s+experiment")
35        swapping_re = re.compile("^No\s+information\s+available.")
36        state = None    # Experiment state parsed from expinfo
37        # The expinfo ssh command.  Note the identity restriction to use
38        # only the identity provided in the pubkey given.
39        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
40                'StrictHostKeyChecking no', '-i', 
41                self.ssh_privkey_file, "%s@%s" % (user, host), 
42                expinfo_exec, pid, eid]
43
44        dev_null = None
45        try:
46            dev_null = open("/dev/null", "a")
47        except IOError, e:
48            self.log.error("[get_state]: can't open /dev/null: %s" %e)
49
50        if self.debug:
51            state = 'swapped'
52            rv = 0
53        else:
54            self.log.debug("Checking state")
55            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
56                    stderr=dev_null, close_fds=True)
57            for line in status.stdout:
58                m = state_re.match(line)
59                if m: state = m.group(1)
60                else:
61                    for reg, st in ((no_exp_re, "none"),
62                            (swapping_re, "swapping")):
63                        m = reg.match(line)
64                        if m: state = st
65            rv = status.wait()
66
67        # If the experiment is not present the subcommand returns a
68        # non-zero return value.  If we successfully parsed a "none"
69        # outcome, ignore the return code.
70        if rv != 0 and state != 'none':
71            raise service_error(service_error.internal,
72                    "Cannot get status of segment:%s/%s" % (pid, eid))
73        elif state not in ('active', 'swapped', 'swapping', 'none'):
74            raise service_error(service_error.internal,
75                    "Cannot get status of segment:%s/%s" % (pid, eid))
76        else:
77            self.log.debug("State is %s" % state)
78            return state
79
80
81    def get_mapping(self, pid, eid):
82        # command to test experiment state
83        expinfo_exec = "/usr/testbed/bin/expinfo" 
84        # The expinfo ssh command.  Note the identity restriction to use
85        # only the identity provided in the pubkey given.
86        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
87                'StrictHostKeyChecking no', '-i', 
88                self.ssh_privkey_file, "%s@%s" % (user, host), 
89                expinfo_exec, '-m', pid, eid]
90
91        dev_null = None
92        try:
93            dev_null = open("/dev/null", "a")
94        except IOError, e:
95            self.log.error("[get_state]: can't open /dev/null: %s" %e)
96
97        if self.debug:
98            rv = 0
99        else:
100            self.log.debug("Getting mapping for %s %s" % (pid, eid))
101            phys_start = re.compile('^Physical\s+Node\s+Mapping')
102            phys_line = re.compile('(\S+)\s+\S+\s+\S+\s+(.*)')
103            phys_end = re.compile('^$')
104            status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
105                    stderr=dev_null, close_fds=True)
106
107            # Parse the info output.  Format:
108            #
109            # stuff
110            # Physical Node Mapping:
111            # ID              Type         OS              Physical   
112            # --------------- ------------ --------------- ------------
113            # virtual         dummy        dummy           physical
114            #
115            foundit = False
116            skip = 0
117            for line in status.stdout:
118                if phys_start.match(line):
119                    skip = 2
120                    foundit = True
121                elif not foundit:
122                    continue
123                elif skip > 0:
124                    skip -= 1
125                elif phys_end.match(line):
126                    break
127                else:
128                    m = phys_line.match(line.strip())
129                    if m: self.node[m.group(1)] = m.group(2)
130                    else: self.log.warn(
131                            "Matching failed while parsing node mapping")
132            rv = status.wait()
133
134        # If the experiment is not present the subcommand returns a
135        # non-zero return value.  If we successfully parsed a "none"
136        # outcome, ignore the return code.
137        if rv != 0 :
138            raise service_error(service_error.internal,
139                    "Cannot get node mapping of segment:%s/%s" % (pid, eid))
140        else:
141            return True
142
143
144    def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0):
145        """
146        Start a sub-experiment on a federant.
147
148        Get the current state, modify or create as appropriate, ship data
149        and configs and start the experiment.  There are small ordering
150        differences based on the initial state of the sub-experiment.
151        """
152        # ops node in the federant
153        host = "%s%s" % (parent.ops, parent.domain)
154        # Configuration directories on the remote machine
155        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
156        softdir = "/proj/%s/software/%s" % (pid, eid)
157        # Local software dir
158        lsoftdir = "%s/software" % tmpdir
159
160        state = self.get_state(user, host, pid, eid)
161
162        if not self.scp_file(tclfile, user, host):
163            return False
164       
165        if state == 'none':
166            # Create a null copy of the experiment so that we capture any
167            # logs there if the modify fails.  Emulab software discards the
168            # logs from a failed startexp
169            try:
170                f = open("%s/null.tcl" % tmpdir, "w")
171                print >>f, self.null
172                f.close()
173            except IOError, e:
174                raise service_error(service_error.internal,
175                        "Cannot stage tarfile/rpm: %s" % e.strerror)
176
177            if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
178                return False
179            self.log.info("[start_segment]: Creating %s" % eid)
180            timedout = False
181            try:
182                if not self.ssh_cmd(user, host,
183                        ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
184                        "-e %s null.tcl") % (pid, eid), "startexp",
185                        timeout=60 * 10):
186                    return False
187            except self.ssh_cmd_timeout:
188                timedout = True
189
190            if timedout:
191                state = self.get_state(user, host, pid, eid)
192                if state != "swapped":
193                    return False
194       
195        # Open up a temporary file to contain a script for setting up the
196        # filespace for the new experiment.
197        self.log.info("[start_segment]: creating script file")
198        try:
199            sf, scriptname = tempfile.mkstemp()
200            scriptfile = os.fdopen(sf, 'w')
201        except IOError:
202            return False
203
204        scriptbase = os.path.basename(scriptname)
205
206        # Script the filesystem changes
207        print >>scriptfile, "/bin/rm -rf %s" % proj_dir
208        # Clear and create the software directory
209        print >>scriptfile, "/bin/rm -rf %s/*" % softdir
210        print >>scriptfile, 'mkdir -p %s' % proj_dir
211        if os.path.isdir(lsoftdir):
212            print >>scriptfile, 'mkdir -p %s' % softdir
213        print >>scriptfile, "rm -f %s" % scriptbase
214        scriptfile.close()
215
216        # Move the script to the remote machine
217        # XXX: could collide tempfile names on the remote host
218        if self.scp_file(scriptname, user, host, scriptbase):
219            os.remove(scriptname)
220        else:
221            return False
222
223        # Execute the script (and the script's last line deletes it)
224        if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
225            return False
226
227        for f in os.listdir(tmpdir):
228            if not os.path.isdir("%s/%s" % (tmpdir, f)):
229                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
230                        "%s/%s" % (proj_dir, f)):
231                    return False
232        if os.path.isdir(lsoftdir):
233            for f in os.listdir(lsoftdir):
234                if not os.path.isdir("%s/%s" % (lsoftdir, f)):
235                    if not self.scp_file("%s/%s" % (lsoftdir, f), 
236                            user, host, "%s/%s" % (softdir, f)):
237                        return False
238        # Stage the new configuration (active experiments will stay swapped
239        # in now)
240        self.log.info("[start_segment]: Modifying %s" % eid)
241        try:
242            if not self.ssh_cmd(user, host,
243                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
244                            (pid, eid, tclfile.rpartition('/')[2]),
245                    "modexp", timeout= 60 * 10):
246                return False
247        except self.ssh_cmd_timeout:
248            self.log.error("Modify command failed to complete in time")
249            # There's really no way to see if this succeeded or failed, so
250            # if it hangs, assume the worst.
251            return False
252        # Active experiments are still swapped, this swaps the others in.
253        if state != 'active':
254            self.log.info("[start_segment]: Swapping %s" % eid)
255            timedout = False
256            try:
257                if not self.ssh_cmd(user, host,
258                        "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
259                        "swapexp", timeout=25*60):
260                    return False
261            except self.ssh_cmd_timeout:
262                timedout = True
263           
264            # If the command was terminated, but completed successfully,
265            # report success.
266            if timedout:
267                self.log.debug("[start_segment]: swapin timed out " +\
268                        "checking state")
269                state = self.get_state(user, host, pid, eid)
270                self.log.debug("[start_segment]: state is %s" % state)
271                return state == 'active'
272        # Everything has gone OK.
273        return True
274
275class stop_segment(proxy_segment):
276    def __init__(self, log=None, keyfile=None, debug=False):
277        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
278
279    def __call__(self, parent, user, pid, eid):
280        """
281        Stop a sub experiment by calling swapexp on the federant
282        """
283        host = "%s%s" % (parent.ops, parent.domain)
284        self.log.info("[stop_segment]: Stopping %s" % eid)
285        rv = False
286        try:
287            # Clean out tar files: we've gone over quota in the past
288            self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \
289                    (pid, eid))
290            rv = self.ssh_cmd(user, host,
291                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
292        except self.ssh_cmd_timeout:
293            rv = False
294        return rv
295
Note: See TracBrowser for help on using the repository browser.