source: fedd/federation/proxy_protogeni_segment.py @ 8e6fe4d

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 8e6fe4d was 8e6fe4d, checked in by Ted Faber <faber@…>, 14 years ago

checkpoint and SLSL error catching

  • Property mode set to 100644
File size: 14.3 KB
Line 
1#!/usr/local/bin/python
2
3import sys, os
4import re
5
6import tempfile
7import subprocess
8import logging 
9import time
10import random
11import string
12import signal
13
14import xml.parsers.expat
15from threading import Thread
16
17from proxy_segment import proxy_segment
18from service_error import service_error
19from remote_service import service_caller
20from util import fedd_ssl_context
21
22import topdl
23
24class segment_base(proxy_segment):
25    class ProtoGENIError(Exception): 
26        def __init__(self, op, code, output):
27            Exception.__init__(self, output)
28            self.op = op
29            self.code = code
30            self.output = output
31
32    def __init__(self, log=None, keyfile=None, debug=False, 
33            ch_url=None, sa_url=None, cm_url=None):
34        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
35
36        self.ProtoGENIError = start_segment.ProtoGENIError
37        self.ch_url = ch_url
38        self.sa_url = sa_url
39        self.cm_url = cm_url
40
41        self.call_SetValue = service_caller('SetValue')
42
43        self.debug_fail = ['Resolve']
44        self.debug_response = {
45                'RedeemTicket': ("XML blob1", "XML blob2"),
46                'SliceStatus': { 'status': 'ready' },
47            }
48
49
50    def pg_call(self, url, method, params, context):
51        s = service_caller(method, request_body_name="", strict=False)
52        self.log.debug("Calling %s %s" % (url, method))
53        if not self.debug:
54            r = s.call_xmlrpc_service(url, params, context=context)
55            if r.get('code', -1) != 0:
56                raise self.ProtoGENIError(op=method, 
57                        code=r.get('code', 'unknown'), 
58                        output=r.get('output', 'No output'))
59            else:
60                return r.get('value', None)
61        else:
62            if method in self.debug_fail:
63                raise self.ProtoGENIError(op=method, code='unknown',
64                        output='No output')
65            elif self.debug_response.has_key(method):
66                return self.debug_response[method]
67            else:
68                return "%s XML blob" % method
69
70
71class start_segment(segment_base):
72    def __init__(self, log=None, keyfile=None, debug=False, 
73            ch_url=None, sa_url=None, cm_url=None):
74        segment_base.__init__(self, log=log, keyfile=keyfile, debug=debug,
75                ch_url=cm_url, sa_url=sa_url, cm_url=cm_url)
76
77
78    # Turn the manifest into a dict were each virtual nodename (i.e. the topdl
79    # name) has an entry with the allocated machine in hostname and the
80    # interfaces in 'interfaces'.  I love having XML parser code lying around.
81    def manifest_to_dict(self, manifest):
82        if self.debug: return { }
83
84        # The class allows us to keep a little state - the dict under
85        # consteruction and the current entry in that dict for the interface
86        # element code.
87        class manifest_parser:
88            def __init__(self):
89                self.d = { }
90                self.current_key=None
91
92            # If the element is a node, create a dict entry for it.  If it's an
93            # interface inside a node, add an entry in the interfaces list with
94            # the virtual name and component id.
95            def start_element(self, name, attrs):
96                if name == 'node':
97                    self.current_key = attrs.get('virtual_id',"")
98                    if self.current_key:
99                        self.d[self.current_key] = {
100                                'hostname': attrs.get('hostname', None),
101                                'interfaces': { }
102                                }
103                elif name == 'interface' and self.current_key:
104                    self.d[self.current_key]['interfaces']\
105                            [attrs.get('virtual_id','')] = \
106                            attrs.get('component_id', None)
107            #  When a node is finished, clear current_key
108            def end_element(self, name):
109                if name == 'node': self.current_key = None
110
111
112        mp = manifest_parser()
113        p = xml.parsers.expat.ParserCreate()
114        # These are bound to the class we just created
115        p.StartElementHandler = mp.start_element
116        p.EndElementHandler = mp.end_element
117
118        p.Parse(manifest)
119        return mp.d
120
121
122
123    def export_store_info(self, cf, nodes, ssh_port, connInfo):
124        """
125        For the export requests in the connection info, install the peer names
126        at the experiment controller via SetValue calls.
127        """
128
129        for c in connInfo:
130            for p in [ p for p in c.get('parameter', []) \
131                    if p.get('type', '') == 'output']:
132
133                if p.get('name', '') == 'peer':
134                    k = p.get('key', None)
135                    surl = p.get('store', None)
136                    if surl and k and k.index('/') != -1:
137                        if self.debug:
138                            req = { 'name': k, 'value': 'debug' }
139                            self.call_SetValue(surl, req, cf)
140                        else:
141                            value = nodes.get(k[k.index('/')+1:], 
142                                    {}).get('hostname',"")
143                            if value:
144                                req = { 'name': k, 'value': value }
145                                self.call_SetValue(surl, req, cf)
146                            else:
147                                self.log.error("No hostname for %s" % \
148                                        k[k.index('/'):])
149                    else:
150                        self.log.error("Bad export request: %s" % p)
151                elif p.get('name', '') == 'ssh_port':
152                    k = p.get('key', None)
153                    surl = p.get('store', None)
154                    if surl and k:
155                        req = { 'name': k, 'value': self.ssh_port }
156                        self.call_SetValue(surl, req, cf)
157                    else:
158                        self.log.error("Bad export request: %s" % p)
159                else:
160
161                    self.log.error("Unknown export parameter: %s" % \
162                            p.get('name'))
163                    continue
164
165    def configure_nodes(self, topo, nodes, user, host, sshd, sshd_config,
166            gate_cmd, node_cmd, pubkey, secretkey, stagingdir, tmpdir):
167
168        fed_dir = "/usr/local/federation"
169        ssh = "/usr/bin/ssh -n -o 'ForwardX11 no' -o 'StrictHostKeyChecking no' "
170        scp = "/usr/bin/scp -o 'ForwardX11 no' -o 'StrictHostKeyChecking no' "
171        ifconfig = "/sbin/ifconfig"
172        tar = "/bin/tar"
173
174        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
175            vname = e.name[0]
176            node = nodes.get(vname, {})
177            pname = node.get('hostname', None)
178            if pname:
179                script = open("%s/%s.startup" %(tmpdir, pname), "w")
180                # Reset the interfaces to the ones in the topo file
181                for i in [ i for i in e.interface \
182                        if not i.get_attribute('portal')]:
183                    pinf = node['interfaces'].get(i.name, None)
184                    addr = i.get_attribute('ip4_address') 
185                    netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
186                    if pinf and addr:
187                        print >>script, \
188                                "%s %s %s netmask %s"  % \
189                                (ifconfig, pinf, addr, netmask)
190                    else:
191                        self.log.error("Missing interface or address for %s" \
192                                % i.name)
193                   
194                for s in e.software:
195                    # XXX: Just tarfiles for now
196                    if not (s.location and s.install):
197                        continue
198                    s_base = s.location.rpartition('/')[2]
199                    print >>script, "%s %s@%s:%s/%s ." % \
200                            (scp, user, host, stagingdir, s_base)
201                    print >>script, \
202                            "%s -C %s -xzf %s" % (tar, s.install, s_base)
203                for f in ('hosts', pubkey, secretkey, 'client.conf', 
204                        'userconf', 'seer.conf'):
205                    print >>script, "%s %s@%s:%s/%s %s/etc" % \
206                            (scp, user, host, stagingdir, f, fed_dir)
207                if sshd:
208                    print >>script, "%s %s@%s:%s %s/bin" % \
209                            (scp, user, host, sshd, fed_dir)
210                if sshd_config:
211                    print >>script, "%s %s@%s:%s %s/etc" % \
212                            (scp, user, host, sshd_config, fed_dir)
213                if os.access("%s/%s.gw.conf" % (tmpdir, vname), os.R_OK):
214                    print >>script, "%s %s@%s:%s/%s.gw.conf %s/etc" % \
215                            (scp, user, host, stagingdir, vname, fed_dir)
216
217
218                # Start commands
219                if e.get_attribute('portal') and gate_cmd:
220                    # Portals never have a user-specified start command
221                    print >>script, gate_cmd
222                elif node_cmd:
223                    # XXX: debug
224                    print >>script, "sudo perl -I%s/lib %s/bin/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_dir, fed_dir, user)
225                    # XXX: debug
226                    if e.get_attribute('startup'):
227                        print >>script, "%s \\$USER '%s'" % \
228                                (node_cmd, e.get_attribute('startup'))
229                    else:
230                        print >>script, node_cmd
231                script.close()
232                if not self.scp_file("%s/%s.startup" % (tmpdir, pname), 
233                        user, pname):
234                    self.log.error("Could not copy script to %s" % pname)
235            else:
236                self.log.error("Unmapped node: %s" % vname)
237
238    def start_nodes(self, user, nodes):
239        threads = [ ]
240        for n in nodes:
241            t = Thread(target=self.ssh_cmd, args=(user, n,
242                    "sudo /bin/sh ./%s.startup" % n))
243            t.start()
244            threads.append(t)
245
246        done = [not t.isAlive() for t in threads]
247        while not all(done):
248            self.log.info("Waiting for threads %s" % done)
249            time.sleep(10)
250            done = [not t.isAlive() for t in threads]
251
252
253
254
255    def __call__(self, parent, aid, user, rspec, pubkey, secretkey, 
256            stagingdir, tmpdir, certfile, certpw, export_certfile, topo,
257            connInfo, timeout=0):
258        """
259        Start a sub-experiment on a federant.
260
261        Get the current state, modify or create as appropriate, ship data
262        and configs and start the experiment.  There are small ordering
263        differences based on the initial state of the sub-experiment.
264        """
265
266        def random_slicename(user):
267            slicename = user
268            for i in range(0,5):
269                slicename += random.choice(string.ascii_letters)
270            return slicename
271
272        host = parent.staging_host
273        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
274        # Local software dir
275        lsoftdir = "%s/software" % tmpdir
276
277        # Open up a temporary file to contain a script for setting up the
278        # filespace for the new experiment.
279        self.log.info("[start_segment]: creating script file")
280        try:
281            sf, scriptname = tempfile.mkstemp()
282            scriptfile = os.fdopen(sf, 'w')
283        except IOError:
284            return False
285
286        scriptbase = os.path.basename(scriptname)
287
288        # Script the filesystem changes
289        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
290        print >>scriptfile, 'mkdir -p %s' % stagingdir
291        print >>scriptfile, "rm -f %s" % scriptbase
292        scriptfile.close()
293
294        # Move the script to the remote machine
295        # XXX: could collide tempfile names on the remote host
296        if self.scp_file(scriptname, user, host, scriptbase):
297            os.remove(scriptname)
298        else:
299            return False
300
301        # Execute the script (and the script's last line deletes it)
302        if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
303            return False
304
305        # Copy software to the staging machine
306        for d in (tmpdir, lsoftdir):
307            if os.path.isdir(d):
308                for f in os.listdir(d):
309                    if not os.path.isdir("%s/%s" % (d, f)):
310                        if not self.scp_file("%s/%s" % (d, f), 
311                                user, host, "%s/%s" % (stagingdir, f)):
312                            self.log.error("Scp failed")
313                            return False
314
315        try:
316            gcred = self.pg_call(self.sa_url, 'GetCredential', {}, ctxt)
317        except self.ProtoGENIError, e:
318            raise service_error(service_error.federant,
319                    "ProtoGENI: %s" % e)
320        # Find a slicename not in use
321        slicename = "fabereGpgL"
322        while True:
323            slicename = random_slicename(user)
324            try:
325                param = {
326                        'credential': gcred, 
327                        'hrn': slicename,
328                        'type': 'Slice'
329                        }
330                self.pg_call(self.sa_url, 'Resolve', param, ctxt)
331            except self.ProtoGENIError, e:
332                print e
333                break
334
335        self.log.info("Creating %s" % slicename)
336        f = open("./rspec", "w")
337        print >>f, "%s" % rspec
338        f.close()
339        # Create the slice and allocate resources.  If any of this stuff fails,
340        # the allocations will time out on PG in short order, so we just raise
341        # the service_error.
342        try:
343            param = {
344                    'credential': gcred, 
345                    'hrn': slicename,
346                    'type': 'Slice'
347                    }
348            slice_cred = self.pg_call(self.sa_url, 'Register', param, ctxt)
349            f = open("./slice_cred", "w")
350            print >>f, slice_cred
351            f.close()
352            # Populate the ssh keys (let PG format them)
353            param = {
354                    'credential': gcred, 
355                    }
356            keys =  self.pg_call(self.sa_url, 'GetKeys', param, ctxt)
357            # Grab and redeem a ticket
358            param = {
359                    'credential': slice_cred, 
360                    'rspec': rspec,
361                    }
362            ticket = self.pg_call(self.cm_url, 'GetTicket', param, ctxt)
363            f = open("./ticket", "w")
364            print >>f, ticket
365            f.close()
366            param = { 
367                    'credential': slice_cred, 
368                    'keys': keys,
369                    'ticket': ticket,
370                    }
371            sliver_cred, manifest = self.pg_call(self.cm_url, 
372                    'RedeemTicket', param, ctxt)
373            f = open("./sliver_cred", "w")
374            print >>f, sliver_cred
375            f.close()
376            f = open("./manifest", "w")
377            print >>f, manifest
378            f.close()
379            # start 'em up
380            param = { 
381                    'credential': sliver_cred,
382                    }
383            self.pg_call(self.cm_url, 'StartSliver', param, ctxt)
384        except self.ProtoGENIError, e:
385            raise service_error(service_error.federant,
386                    "ProtoGENI: %s" % e)
387
388        # With manifest in hand, we can export the portal node names.
389        nodes = self.manifest_to_dict(manifest)
390        self.export_store_info(export_certfile, nodes, parent.ssh_port,
391                connInfo)
392
393        # Now we wait for the nodes to start on PG
394        status = 'notready'
395        try:
396            while status == 'notready':
397                param = { 
398                        'credential': slice_cred
399                        }
400                r = self.pg_call(self.cm_url, 'SliceStatus', param, ctxt)
401                print r
402                status = r.get('status', 'notready')
403                if status == 'notready':
404                    time.sleep(30)
405        except Self.ProtoGENIError, e:
406            raise service_error(service_error.federant,
407                    "ProtoGENI: %s" % e)
408
409        if status == 'failed':
410            self.log.error('Sliver failed to start on ProtoGENI')
411            try:
412                param = { 
413                        'credential': slice_cred
414                        }
415                self.pg_call(self.cm_url, 'DeleteSliver', param, ctxt)
416            except self.ProtoGENIError, e:
417                raise service_error(service_error.federant,
418                    "ProtoGENI: %s" % e)
419            return False
420        else:
421            parent.state_lock.acquire()
422            parent.allocation[aid]['slice_credential'] = slice_cred
423            parent.allocation[aid]['sliver_credential'] = sliver_cred
424            parent.allocation[aid]['manifest'] = manifest
425            parent.allocation[aid]['certfile'] = certfile
426            parent.allocation[aid]['certpw'] = certpw
427            parent.write_state()
428            parent.state_lock.release()
429
430        # The startcmds for portals and standard nodes (the Master Slave
431        # distinction is going away)
432        gate_cmd = parent.attrs.get('SlaveConnectorStartCmd', '/bin/true')
433        node_cmd = parent.attrs.get('SlaveNodeStartCmd', 'bin/true')
434
435        # Now we have configuration to do for ProtoGENI
436        self.configure_nodes(topo, nodes, user, parent.staging_host,
437                parent.sshd, parent.sshd_config, gate_cmd, node_cmd, 
438                pubkey, secretkey, stagingdir, tmpdir)
439
440        self.start_nodes(user, [ n['hostname'] for n in nodes.values()])
441
442        # Everything has gone OK.
443        return True
444
445class stop_segment(segment_base):
446    def __init__(self, log=None, keyfile=None, debug=False, 
447            ch_url=None, sa_url=None, cm_url=None):
448        segment_base.__init__(self, log=log, keyfile=keyfile, debug=debug,
449                ch_url=cm_url, sa_url=sa_url, cm_url=cm_url)
450
451    def __call__(self, parent, user, stagingdir, slice_cred, certfile, certpw):
452        """
453        Stop a sub experiment by calling swapexp on the federant
454        """
455        host = parent.staging_host
456        rv = False
457        try:
458            # Clean out tar files: we've gone over quota in the past
459            if stagingdir:
460                self.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
461            if slice_cred:
462                self.log.error('Removing Sliver on ProtoGENI')
463                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
464                try:
465                    param = { 
466                            'credential': slice_cred
467                            }
468                    self.pg_call(self.cm_url, 'DeleteSlice', param, ctxt)
469                except self.ProtoGENIError, e:
470                    raise service_error(service_error.federant,
471                        "ProtoGENI: %s" % e)
472            return True
473        except self.ssh_cmd_timeout:
474            rv = False
475        return rv
476
Note: See TracBrowser for help on using the repository browser.