source: fedd/federation/proxy_protogeni_segment.py @ c119839

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since c119839 was c119839, checked in by Ted Faber <faber@…>, 14 years ago

Initial check in

  • Property mode set to 100644
File size: 14.1 KB
Line 
1#!/usr/local/bin/python
2
3import sys, os
4import re
5
6import tempfile
7import subprocess
8import logging 
9import time
10import random
11import string
12import signal
13
14import xml.parsers.expat
15from threading import Thread
16
17from proxy_segment import proxy_segment
18from service_error import service_error
19from remote_service import service_caller
20from util import fedd_ssl_context
21
22import topdl
23
24class segment_base(proxy_segment):
25    class ProtoGENIError(Exception): 
26        def __init__(self, op, code, output):
27            Exception.__init__(self, output)
28            self.op = op
29            self.code = code
30            self.output = output
31
32    def __init__(self, log=None, keyfile=None, debug=False, 
33            ch_url=None, sa_url=None, cm_url=None):
34        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
35
36        self.ProtoGENIError = start_segment.ProtoGENIError
37        self.ch_url = ch_url
38        self.sa_url = sa_url
39        self.cm_url = cm_url
40
41        self.call_SetValue = service_caller('SetValue')
42
43        self.debug_fail = ['Resolve']
44        self.debug_response = {
45                'RedeemTicket': ("XML blob1", "XML blob2"),
46                'SliceStatus': { 'status': 'ready' },
47            }
48
49
50    def pg_call(self, url, method, params, context):
51        s = service_caller(method, request_body_name="", strict=False)
52        self.log.debug("Calling %s %s" % (url, method))
53        if not self.debug:
54            r = s.call_xmlrpc_service(url, params, context=context)
55            if r.get('code', -1) != 0:
56                raise self.ProtoGENIError(op=method, 
57                        code=r.get('code', 'unknown'), 
58                        output=r.get('output', 'No output'))
59            else:
60                return r.get('value', None)
61        else:
62            if method in self.debug_fail:
63                raise self.ProtoGENIError(op=method, code='unknown',
64                        output='No output')
65            elif self.debug_response.has_key(method):
66                return self.debug_response[method]
67            else:
68                return "%s XML blob" % method
69
70
71class start_segment(segment_base):
72    def __init__(self, log=None, keyfile=None, debug=False, 
73            ch_url=None, sa_url=None, cm_url=None):
74        segment_base.__init__(self, log=log, keyfile=keyfile, debug=debug,
75                ch_url=cm_url, sa_url=sa_url, cm_url=cm_url)
76
77
78    # Turn the manifest into a dict were each virtual nodename (i.e. the topdl
79    # name) has an entry with the allocated machine in hostname and the
80    # interfaces in 'interfaces'.  I love having XML parser code lying around.
81    def manifest_to_dict(self, manifest):
82        # XXX
83        # if self.debug: return { }
84
85        # The class allows us to keep a little state - the dict under
86        # consteruction and the current entry in that dict for the interface
87        # element code.
88        class manifest_parser:
89            def __init__(self):
90                self.d = { }
91                self.current_key=None
92
93            # If the element is a node, create a dict entry for it.  If it's an
94            # interface inside a node, add an entry in the interfaces list with
95            # the virtual name and component id.
96            def start_element(self, name, attrs):
97                if name == 'node':
98                    self.current_key = attrs.get('virtual_id',"")
99                    if self.current_key:
100                        self.d[self.current_key] = {
101                                'hostname': attrs.get('hostname', None),
102                                'interfaces': { }
103                                }
104                elif name == 'interface' and self.current_key:
105                    self.d[self.current_key]['interfaces']\
106                            [attrs.get('virtual_id','')] = \
107                            attrs.get('component_id', None)
108            #  When a node is finished, clear current_key
109            def end_element(self, name):
110                if name == 'node': self.current_key = None
111
112
113        mp = manifest_parser()
114        p = xml.parsers.expat.ParserCreate()
115        # These are bound to the class we just created
116        p.StartElementHandler = mp.start_element
117        p.EndElementHandler = mp.end_element
118
119        p.Parse(manifest)
120        return mp.d
121
122
123
124    def export_store_info(self, cf, nodes, ssh_port, connInfo):
125        """
126        For the export requests in the connection info, install the peer names
127        at the experiment controller via SetValue calls.
128        """
129
130        for c in connInfo:
131            for p in [ p for p in c.get('parameter', []) \
132                    if p.get('type', '') == 'output']:
133
134                if p.get('name', '') == 'peer':
135                    k = p.get('key', None)
136                    surl = p.get('store', None)
137                    if surl and k and k.index('/') != -1:
138                        if self.debug:
139                            req = { 'name': k, 'value': 'debug' }
140                            self.call_SetValue(surl, req, cf)
141                        else:
142                            value = nodes.get(k[k.index('/')+1:], 
143                                    {}).get('hostname',"")
144                            if value:
145                                req = { 'name': k, 'value': value }
146                                self.call_SetValue(surl, req, cf)
147                            else:
148                                self.log.error("No hostname for %s" % \
149                                        k[k.index('/'):])
150                    else:
151                        self.log.error("Bad export request: %s" % p)
152                elif p.get('name', '') == 'ssh_port':
153                    k = p.get('key', None)
154                    surl = p.get('store', None)
155                    if surl and k:
156                        req = { 'name': k, 'value': self.ssh_port }
157                        self.call_SetValue(surl, req, cf)
158                    else:
159                        self.log.error("Bad export request: %s" % p)
160                else:
161
162                    self.log.error("Unknown export parameter: %s" % \
163                            p.get('name'))
164                    continue
165
166    def configure_nodes(self, topo, nodes, user, host, sshd, sshd_config,
167            gate_cmd, node_cmd, pubkey, secretkey, stagingdir, tmpdir):
168
169        fed_dir = "/usr/local/federation"
170        ssh = "/usr/bin/ssh -n -o 'ForwardX11 no' -o 'StrictHostKeyChecking no' "
171        scp = "/usr/bin/scp -o 'ForwardX11 no' -o 'StrictHostKeyChecking no' "
172        ifconfig = "/sbin/ifconfig"
173        tar = "/bin/tar"
174
175        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
176            vname = e.name[0]
177            node = nodes.get(vname, {})
178            pname = node.get('hostname', None)
179            if pname:
180                script = open("%s/%s.startup" %(tmpdir, pname), "w")
181                # Reset the interfaces to the ones in the topo file
182                for i in [ i for i in e.interface \
183                        if not i.get_attribute('portal')]:
184                    pinf = node['interfaces'].get(i.name, None)
185                    addr = i.get_attribute('ip4_address') 
186                    netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
187                    if pinf and addr:
188                        print >>script, \
189                                "%s %s %s netmask %s"  % \
190                                (ifconfig, pinf, addr, netmask)
191                    else:
192                        self.log.error("Missing interface or address for %s" \
193                                % i.name)
194                   
195                for s in e.software:
196                    # XXX: Just tarfiles for now
197                    if not (s.location and s.install):
198                        continue
199                    s_base = s.location.rpartition('/')[2]
200                    print >>script, "%s %s@%s:%s/%s ." % \
201                            (scp, user, host, stagingdir, s_base)
202                    print >>script, \
203                            "%s -C %s -xzf %s" % (tar, s.install, s_base)
204                for f in ('hosts', pubkey, secretkey):
205                    print >>script, "%s %s@%s:%s/%s %s/etc" % \
206                            (scp, user, host, stagingdir, f, fed_dir)
207                if sshd:
208                    print >>script, "%s %s@%s:%s %s/bin" % \
209                            (scp, user, host, sshd, fed_dir)
210                if sshd_config:
211                    print >>script, "%s %s@%s:%s %s/etc" % \
212                            (scp, user, host, sshd_config, fed_dir)
213                if os.access("%s/%s.gw.conf" % (tmpdir, vname), os.R_OK):
214                    print >>script, "%s %s@%s:%s/%s.gw.conf %s/etc" % \
215                            (scp, user, host, stagingdir, vname, fed_dir)
216
217                # Start commands
218                if e.get_attribute('portal') and gate_cmd:
219                    # Portals never have a user-specified start command
220                    print >>script, gate_cmd
221                elif node_cmd:
222                    if e.get_attribute('startup'):
223                        print >>script, "%s \\$USER '%s'" % \
224                                (node_cmd, e.get_attribute('startup'))
225                    else:
226                        print >>script, node_cmd
227                script.close()
228                if not self.scp_file("%s/%s.startup" % (tmpdir, pname), 
229                        user, pname):
230                    self.log.error("Could not copy script to %s" % pname)
231            else:
232                self.log.error("Unmapped node: %s" % vname)
233
234    def start_nodes(self, user, nodes):
235        threads = [ ]
236        for n in nodes:
237            t = Thread(target=self.ssh_cmd, args=(user, n,
238                    "sudo /bin/sh ./%s.startup" % n))
239            t.start()
240            threads.append(t)
241
242        done = [not t.isAlive() for t in threads]
243        while not all(done):
244            self.log.info("Waiting for threads %s" % done)
245            time.sleep(10)
246            done = [not t.isAlive() for t in threads]
247
248
249
250
251    def __call__(self, parent, aid, user, rspec, pubkey, secretkey, 
252            stagingdir, tmpdir, certfile, certpw, export_certfile, topo,
253            connInfo, timeout=0):
254        """
255        Start a sub-experiment on a federant.
256
257        Get the current state, modify or create as appropriate, ship data
258        and configs and start the experiment.  There are small ordering
259        differences based on the initial state of the sub-experiment.
260        """
261
262        def random_slicename(user):
263            slicename = user
264            for i in range(0,5):
265                slicename += random.choice(string.ascii_letters)
266            return slicename
267
268        host = parent.staging_host
269        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
270        # Local software dir
271        lsoftdir = "%s/software" % tmpdir
272
273        # Open up a temporary file to contain a script for setting up the
274        # filespace for the new experiment.
275        self.log.info("[start_segment]: creating script file")
276        try:
277            sf, scriptname = tempfile.mkstemp()
278            scriptfile = os.fdopen(sf, 'w')
279        except IOError:
280            return False
281
282        scriptbase = os.path.basename(scriptname)
283
284        # Script the filesystem changes
285        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
286        print >>scriptfile, 'mkdir -p %s' % stagingdir
287        print >>scriptfile, "rm -f %s" % scriptbase
288        scriptfile.close()
289
290        # Move the script to the remote machine
291        # XXX: could collide tempfile names on the remote host
292        if self.scp_file(scriptname, user, host, scriptbase):
293            os.remove(scriptname)
294        else:
295            return False
296
297        # Execute the script (and the script's last line deletes it)
298        if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
299            return False
300
301        # Copy software to the staging machine
302        for d in (tmpdir, lsoftdir):
303            if os.path.isdir(d):
304                for f in os.listdir(d):
305                    if not os.path.isdir("%s/%s" % (d, f)):
306                        if not self.scp_file("%s/%s" % (d, f), 
307                                user, host, "%s/%s" % (stagingdir, f)):
308                            self.log.error("Scp failed")
309                            return False
310
311        try:
312            gcred = self.pg_call(self.sa_url, 'GetCredential', {}, ctxt)
313        except self.ProtoGENIError, e:
314            raise service_error(service_error.federant,
315                    "ProtoGENI: %s" % e)
316        # Find a slicename not in use
317        slicename = "fabereGpgL"
318        while True:
319            slicename = random_slicename(user)
320            try:
321                param = {
322                        'credential': gcred, 
323                        'hrn': slicename,
324                        'type': 'Slice'
325                        }
326                self.pg_call(self.sa_url, 'Resolve', param, ctxt)
327            except self.ProtoGENIError, e:
328                print e
329                break
330
331        self.log.info("Creating %s" % slicename)
332        f = open("./rspec", "w")
333        print >>f, "%s" % rspec
334        f.close()
335        # Create the slice and allocate resources.  If any of this stuff fails,
336        # the allocations will time out on PG in short order, so we just raise
337        # the service_error.
338        try:
339            param = {
340                    'credential': gcred, 
341                    'hrn': slicename,
342                    'type': 'Slice'
343                    }
344            slice_cred = self.pg_call(self.sa_url, 'Register', param, ctxt)
345            f = open("./slice_cred", "w")
346            print >>f, slice_cred
347            f.close()
348            # Populate the ssh keys (let PG format them)
349            param = {
350                    'credential': gcred, 
351                    }
352            keys =  self.pg_call(self.sa_url, 'GetKeys', param, ctxt)
353            # Grab and redeem a ticket
354            param = {
355                    'credential': slice_cred, 
356                    'rspec': rspec,
357                    }
358            ticket = self.pg_call(self.cm_url, 'GetTicket', param, ctxt)
359            f = open("./ticket", "w")
360            print >>f, ticket
361            f.close()
362            param = { 
363                    'credential': slice_cred, 
364                    'keys': keys,
365                    'ticket': ticket,
366                    }
367            sliver_cred, manifest = self.pg_call(self.cm_url, 
368                    'RedeemTicket', param, ctxt)
369            f = open("./sliver_cred", "w")
370            print >>f, sliver_cred
371            f.close()
372            f = open("./manifest", "w")
373            print >>f, manifest
374            f.close()
375            # start 'em up
376            param = { 
377                    'credential': sliver_cred,
378                    }
379            self.pg_call(self.cm_url, 'StartSliver', param, ctxt)
380        except self.ProtoGENIError, e:
381            raise service_error(service_error.federant,
382                    "ProtoGENI: %s" % e)
383
384        # With manifest in hand, we can export the portal node names.
385        nodes = self.manifest_to_dict(manifest)
386        self.export_store_info(export_certfile, nodes, parent.ssh_port,
387                connInfo)
388
389        # Now we wait for the nodes to start on PG
390        status = 'notready'
391        try:
392            while status == 'notready':
393                param = { 
394                        'credential': slice_cred
395                        }
396                r = self.pg_call(self.cm_url, 'SliceStatus', param, ctxt)
397                print r
398                status = r.get('status', 'notready')
399                if status == 'notready':
400                    time.sleep(30)
401        except Self.ProtoGENIError, e:
402            raise service_error(service_error.federant,
403                    "ProtoGENI: %s" % e)
404
405        if status == 'failed':
406            self.log.error('Sliver failed to start on ProtoGENI')
407            try:
408                param = { 
409                        'credential': slice_cred
410                        }
411                self.pg_call(self.cm_url, 'DeleteSliver', param, ctxt)
412            except self.ProtoGENIError, e:
413                raise service_error(service_error.federant,
414                    "ProtoGENI: %s" % e)
415            return False
416        else:
417            parent.state_lock.acquire()
418            parent.allocation[aid]['slice_credential'] = slice_cred
419            parent.allocation[aid]['sliver_credential'] = sliver_cred
420            parent.allocation[aid]['manifest'] = manifest
421            parent.allocation[aid]['certfile'] = certfile
422            parent.allocation[aid]['certpw'] = certpw
423            parent.write_state()
424            parent.state_lock.release()
425
426        # The startcmds for portals and standard nodes (the Master Slave
427        # distinction is going away)
428        gate_cmd = parent.attrs.get('SlaveConnectorStartCmd', '/bin/true')
429        node_cmd = parent.attrs.get('SlaveNodeStartCmd', 'bin/true')
430
431        # Now we have configuration to do for ProtoGENI
432        self.configure_nodes(topo, nodes, user, parent.staging_host,
433                parent.sshd, parent.sshd_config, gate_cmd, node_cmd, 
434                pubkey, secretkey, stagingdir, tmpdir)
435
436        self.start_nodes(user, [ n['hostname'] for n in nodes.values()])
437
438        # Everything has gone OK.
439        return True
440
441class stop_segment(segment_base):
442    def __init__(self, log=None, keyfile=None, debug=False, 
443            ch_url=None, sa_url=None, cm_url=None):
444        segment_base.__init__(self, log=log, keyfile=keyfile, debug=debug,
445                ch_url=cm_url, sa_url=sa_url, cm_url=cm_url)
446
447    def __call__(self, parent, user, stagingdir, slice_cred, certfile, certpw):
448        """
449        Stop a sub experiment by calling swapexp on the federant
450        """
451        host = parent.staging_host
452        rv = False
453        try:
454            # Clean out tar files: we've gone over quota in the past
455            if stagingdir:
456                self.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
457            if slice_cred:
458                self.log.error('Removing Sliver on ProtoGENI')
459                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
460                try:
461                    param = { 
462                            'credential': slice_cred
463                            }
464                    self.pg_call(self.cm_url, 'DeleteSlice', param, ctxt)
465                except self.ProtoGENIError, e:
466                    raise service_error(service_error.federant,
467                        "ProtoGENI: %s" % e)
468            return True
469        except self.ssh_cmd_timeout:
470            rv = False
471        return rv
472
Note: See TracBrowser for help on using the repository browser.