source: fedd/federation/proxy_protogeni_segment.py @ 0b66338

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 0b66338 was 593e901, checked in by Ted Faber <faber@…>, 15 years ago

Checkpoint working federation w/PG (w/o routing yet...)

  • Property mode set to 100644
File size: 19.3 KB
RevLine 
[c119839]1#!/usr/local/bin/python
2
3import sys, os
4import re
5
6import tempfile
7import subprocess
8import logging 
9import time
10import random
11import string
12import signal
13
14import xml.parsers.expat
15from threading import Thread
16
17from proxy_segment import proxy_segment
18from service_error import service_error
19from remote_service import service_caller
20from util import fedd_ssl_context
21
22import topdl
23
24class segment_base(proxy_segment):
25    class ProtoGENIError(Exception): 
26        def __init__(self, op, code, output):
27            Exception.__init__(self, output)
28            self.op = op
29            self.code = code
30            self.output = output
31
32    def __init__(self, log=None, keyfile=None, debug=False, 
33            ch_url=None, sa_url=None, cm_url=None):
34        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
35
36        self.ProtoGENIError = start_segment.ProtoGENIError
37        self.ch_url = ch_url
38        self.sa_url = sa_url
39        self.cm_url = cm_url
40
41        self.call_SetValue = service_caller('SetValue')
42
43        self.debug_fail = ['Resolve']
44        self.debug_response = {
45                'RedeemTicket': ("XML blob1", "XML blob2"),
46                'SliceStatus': { 'status': 'ready' },
47            }
48
49
50    def pg_call(self, url, method, params, context):
[593e901]51        max_retries = 5
52        retries = 0
53
[c119839]54        s = service_caller(method, request_body_name="", strict=False)
55        self.log.debug("Calling %s %s" % (url, method))
56        if not self.debug:
[593e901]57            while retries < max_retries:
58                r = s.call_xmlrpc_service(url, params, context=context)
59                code = r.get('code', -1)
60                if code == 0:
61                    # Success leaves the loop here
62                    return r.get('value', None)
63                elif code == 14 and retries +1 < max_retries:
64                    # Busy resource
65                    retries+= 1
66                    self.log.info("Resource busy, retrying in 30 secs")
67                    time.sleep(30)
68                else:
69                    # NB: out of retries falls through to here
70                    raise self.ProtoGENIError(op=method, 
71                            code=r.get('code', 'unknown'), 
72                            output=r.get('output', 'No output'))
[c119839]73        else:
74            if method in self.debug_fail:
75                raise self.ProtoGENIError(op=method, code='unknown',
76                        output='No output')
77            elif self.debug_response.has_key(method):
78                return self.debug_response[method]
79            else:
80                return "%s XML blob" % method
81
82
83class start_segment(segment_base):
84    def __init__(self, log=None, keyfile=None, debug=False, 
85            ch_url=None, sa_url=None, cm_url=None):
86        segment_base.__init__(self, log=log, keyfile=keyfile, debug=debug,
87                ch_url=cm_url, sa_url=sa_url, cm_url=cm_url)
88
89
90    # Turn the manifest into a dict were each virtual nodename (i.e. the topdl
91    # name) has an entry with the allocated machine in hostname and the
92    # interfaces in 'interfaces'.  I love having XML parser code lying around.
93    def manifest_to_dict(self, manifest):
[593e901]94        if self.debug: 
95            self.log.debug("Returning null manifest dict")
96            return { }
[c119839]97
98        # The class allows us to keep a little state - the dict under
99        # consteruction and the current entry in that dict for the interface
100        # element code.
101        class manifest_parser:
102            def __init__(self):
103                self.d = { }
104                self.current_key=None
105
106            # If the element is a node, create a dict entry for it.  If it's an
107            # interface inside a node, add an entry in the interfaces list with
108            # the virtual name and component id.
109            def start_element(self, name, attrs):
110                if name == 'node':
111                    self.current_key = attrs.get('virtual_id',"")
112                    if self.current_key:
113                        self.d[self.current_key] = {
114                                'hostname': attrs.get('hostname', None),
115                                'interfaces': { }
116                                }
117                elif name == 'interface' and self.current_key:
118                    self.d[self.current_key]['interfaces']\
119                            [attrs.get('virtual_id','')] = \
120                            attrs.get('component_id', None)
121            #  When a node is finished, clear current_key
122            def end_element(self, name):
123                if name == 'node': self.current_key = None
124
125
126        mp = manifest_parser()
127        p = xml.parsers.expat.ParserCreate()
128        # These are bound to the class we just created
129        p.StartElementHandler = mp.start_element
130        p.EndElementHandler = mp.end_element
131
132        p.Parse(manifest)
133        return mp.d
134
135
[593e901]136    def generate_portal_configs(self, parent, topo, pubkey_base, 
137            secretkey_base, tmpdir, master, leid, connInfo, services, nodes):
138
139        def conninfo_to_dict(key, info):
140            """
141            Make a cpoy of the connection information about key, and flatten it
142            into a single dict by parsing out any feddAttrs.
143            """
144
145            rv = None
146            for i in info:
147                if key == i.get('portal', "") or \
148                        key in [e.get('element', "") \
149                        for e in i.get('member', [])]:
150                    rv = i.copy()
151                    break
152
153            else:
154                return rv
155
156            if 'fedAttr' in rv:
157                for a in rv['fedAttr']:
158                    attr = a.get('attribute', "")
159                    val = a.get('value', "")
160                    if attr and attr not in rv:
161                        rv[attr] = val
162                del rv['fedAttr']
163            return rv
164
165        # XXX: un hardcode this
166        def client_null(f, s):
167            print >>f, "Service: %s" % s['name']
168
169        def client_smb(f, s):
170            print >>f, "Service: %s" % s['name']
171            smbshare = None
172            smbuser = None
173            smbproj = None
174            for a in s.get('fedAttr', []):
175                if a.get('attribute', '') == 'SMBSHARE':
176                    smbshare = a.get('value', None)
177                elif a.get('attribute', '') == 'SMBUSER':
178                    smbuser = a.get('value', None)
179                elif a.get('attribute', '') == 'SMBPROJ':
180                    smbproj = a.get('value', None)
181
182            if all((smbshare, smbuser, smbproj)):
183                print >>f, "SMBshare: %s" % smbshare
184                print >>f, "ProjectUser: %s" % smbuser
185                print >>f, "ProjectName: %s" % smbproj
186
187        client_service_out = {
188                'SMB': client_smb,
189                'tmcd': client_null,
190                'seer': client_null,
191                'userconfig': client_null,
192            }
193        # XXX: end un hardcode this
194
195
196        seer_out = False
197        client_out = False
198        for e in [ e for e in topo.elements \
199                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
200            myname = e.name[0]
201            type = e.get_attribute('portal_type')
202
203            info = conninfo_to_dict(myname, connInfo)
204
205            if not info:
206                raise service_error(service_error.req,
207                        "No connectivity info for %s" % myname)
208
209            # Translate to physical name (ProtoGENI doesn't have DNS)
210            physname = nodes.get(myname, {'hostname': myname})['hostname']
211            peer = info.get('peer', "")
212            ldomain = parent.domain;
213
214            mexp = info.get('masterexperiment',"")
215            mproj, meid = mexp.split("/", 1)
216            mdomain = info.get('masterdomain',"")
217            muser = info.get('masteruser','root')
218            smbshare = info.get('smbshare', 'USERS')
219            ssh_port = info.get('ssh_port', '22')
220
221            active = info.get('active', 'False')
222
223            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
224            tunnelconfig = parent.attrs.has_key('TunnelCfg')
225            try:
226                f = open(cfn, "w")
227                if active == 'True':
228                    print >>f, "active: True"
229                    print >>f, "ssh_port: %s" % ssh_port
230                    if type in ('control', 'both'):
231                        for s in [s for s in services \
232                                if s.get('name', "") in parent.imports]:
233                            p = urlparse(s.get('server', 'http://localhost'))
234                            print >>f, 'port: remote:%s:%s:%s' % \
235                                    (p.port, p.hostname, p.port)
236
237                if tunnelconfig:
238                    print >>f, "tunnelip: %s" % tunnelconfig
239                # XXX: send this an fedattr
240                #print >>f, "seercontrol: control.%s.%s%s" % \
241                        #(meid.lower(), mproj.lower(), mdomain)
242                print >>f, "peer: %s" % peer.lower()
243                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
244                        pubkey_base
245                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
246                        secretkey_base
247                f.close()
248            except IOError, e:
249                raise service_error(service_error.internal,
250                        "Can't write protal config %s: %s" % (cfn, e))
251           
252            # XXX: This little seer config file needs to go away.
253            if not seer_out:
254                try:
255                    seerfn = "%s/seer.conf" % tmpdir
256                    f = open(seerfn, "w")
257                    if not master:
258                        print >>f, "ControlNode: control.%s.%s%s" % \
259                            (meid.lower(), mproj.lower(), mdomain)
260                    print >>f, "ExperimentID: %s" % mexp
261                    f.close()
262                except IOError, e:
263                    raise service_error(service_error.internal, 
264                            "Can't write seer.conf: %s" %e)
265                seer_out = True
266
267            if not client_out and type in ('control', 'both'):
268                try:
269                    f = open("%s/client.conf" % tmpdir, "w")
270                    print >>f, "ControlGateway: %s" % physname.lower()
271                    for s in services:
272                        if s.get('name',"") in parent.imports and \
273                                s.get('visibility','') == 'import':
274                            client_service_out[s['name']](f, s)
275                    # Does seer need this?
276                    # print >>f, "ExperimentID: %s/%s" % (mproj, meid)
277                    f.close()
278                except IOError, e:
279                    raise service_error(service_error.internal,
280                            "Cannot write client.conf: %s" %s)
281                client_out = True
282
283
[c119839]284
285    def export_store_info(self, cf, nodes, ssh_port, connInfo):
286        """
287        For the export requests in the connection info, install the peer names
288        at the experiment controller via SetValue calls.
289        """
290
291        for c in connInfo:
292            for p in [ p for p in c.get('parameter', []) \
293                    if p.get('type', '') == 'output']:
294
295                if p.get('name', '') == 'peer':
296                    k = p.get('key', None)
297                    surl = p.get('store', None)
298                    if surl and k and k.index('/') != -1:
299                        if self.debug:
300                            req = { 'name': k, 'value': 'debug' }
301                            self.call_SetValue(surl, req, cf)
302                        else:
303                            value = nodes.get(k[k.index('/')+1:], 
304                                    {}).get('hostname',"")
305                            if value:
306                                req = { 'name': k, 'value': value }
307                                self.call_SetValue(surl, req, cf)
308                            else:
309                                self.log.error("No hostname for %s" % \
310                                        k[k.index('/'):])
311                    else:
312                        self.log.error("Bad export request: %s" % p)
313                elif p.get('name', '') == 'ssh_port':
314                    k = p.get('key', None)
315                    surl = p.get('store', None)
316                    if surl and k:
[593e901]317                        req = { 'name': k, 'value': ssh_port }
[c119839]318                        self.call_SetValue(surl, req, cf)
319                    else:
320                        self.log.error("Bad export request: %s" % p)
321                else:
322
323                    self.log.error("Unknown export parameter: %s" % \
324                            p.get('name'))
325                    continue
326
327    def configure_nodes(self, topo, nodes, user, host, sshd, sshd_config,
328            gate_cmd, node_cmd, pubkey, secretkey, stagingdir, tmpdir):
329
330        fed_dir = "/usr/local/federation"
331        ssh = "/usr/bin/ssh -n -o 'ForwardX11 no' -o 'StrictHostKeyChecking no' "
332        scp = "/usr/bin/scp -o 'ForwardX11 no' -o 'StrictHostKeyChecking no' "
333        ifconfig = "/sbin/ifconfig"
334        tar = "/bin/tar"
335
336        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
337            vname = e.name[0]
338            node = nodes.get(vname, {})
339            pname = node.get('hostname', None)
340            if pname:
341                script = open("%s/%s.startup" %(tmpdir, pname), "w")
342                # Reset the interfaces to the ones in the topo file
343                for i in [ i for i in e.interface \
344                        if not i.get_attribute('portal')]:
345                    pinf = node['interfaces'].get(i.name, None)
346                    addr = i.get_attribute('ip4_address') 
347                    netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
348                    if pinf and addr:
349                        print >>script, \
350                                "%s %s %s netmask %s"  % \
351                                (ifconfig, pinf, addr, netmask)
352                    else:
353                        self.log.error("Missing interface or address for %s" \
354                                % i.name)
355                   
356                for s in e.software:
357                    # XXX: Just tarfiles for now
358                    if not (s.location and s.install):
359                        continue
360                    s_base = s.location.rpartition('/')[2]
361                    print >>script, "%s %s@%s:%s/%s ." % \
362                            (scp, user, host, stagingdir, s_base)
363                    print >>script, \
364                            "%s -C %s -xzf %s" % (tar, s.install, s_base)
[8e6fe4d]365                for f in ('hosts', pubkey, secretkey, 'client.conf', 
366                        'userconf', 'seer.conf'):
[c119839]367                    print >>script, "%s %s@%s:%s/%s %s/etc" % \
368                            (scp, user, host, stagingdir, f, fed_dir)
369                if sshd:
370                    print >>script, "%s %s@%s:%s %s/bin" % \
371                            (scp, user, host, sshd, fed_dir)
372                if sshd_config:
373                    print >>script, "%s %s@%s:%s %s/etc" % \
374                            (scp, user, host, sshd_config, fed_dir)
[593e901]375                # Look in tmpdir to get the names.  They've all been copied
376                # into the (remote) staging dir
[c119839]377                if os.access("%s/%s.gw.conf" % (tmpdir, vname), os.R_OK):
378                    print >>script, "%s %s@%s:%s/%s.gw.conf %s/etc" % \
379                            (scp, user, host, stagingdir, vname, fed_dir)
380
[8e6fe4d]381
[c119839]382                # Start commands
383                if e.get_attribute('portal') and gate_cmd:
384                    # Portals never have a user-specified start command
385                    print >>script, gate_cmd
386                elif node_cmd:
[8e6fe4d]387                    # XXX: debug
388                    print >>script, "sudo perl -I%s/lib %s/bin/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_dir, fed_dir, user)
389                    # XXX: debug
[c119839]390                    if e.get_attribute('startup'):
391                        print >>script, "%s \\$USER '%s'" % \
392                                (node_cmd, e.get_attribute('startup'))
393                    else:
394                        print >>script, node_cmd
395                script.close()
396                if not self.scp_file("%s/%s.startup" % (tmpdir, pname), 
397                        user, pname):
398                    self.log.error("Could not copy script to %s" % pname)
399            else:
400                self.log.error("Unmapped node: %s" % vname)
401
402    def start_nodes(self, user, nodes):
403        threads = [ ]
404        for n in nodes:
405            t = Thread(target=self.ssh_cmd, args=(user, n,
[593e901]406                    "sudo /bin/sh ./%s.startup &" % n))
[c119839]407            t.start()
408            threads.append(t)
409
410        done = [not t.isAlive() for t in threads]
411        while not all(done):
412            self.log.info("Waiting for threads %s" % done)
413            time.sleep(10)
414            done = [not t.isAlive() for t in threads]
415
416
417
418
[593e901]419    def __call__(self, parent, aid, user, rspec, pubkey, secretkey, master,
420            ename, stagingdir, tmpdir, certfile, certpw, export_certfile, topo,
421            connInfo, services, timeout=0):
[c119839]422        """
423        Start a sub-experiment on a federant.
424
425        Get the current state, modify or create as appropriate, ship data
426        and configs and start the experiment.  There are small ordering
427        differences based on the initial state of the sub-experiment.
428        """
429
430        def random_slicename(user):
431            slicename = user
432            for i in range(0,5):
433                slicename += random.choice(string.ascii_letters)
434            return slicename
435
436        host = parent.staging_host
437        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
438        # Local software dir
439        lsoftdir = "%s/software" % tmpdir
440
441        # Open up a temporary file to contain a script for setting up the
442        # filespace for the new experiment.
443        self.log.info("[start_segment]: creating script file")
444        try:
445            sf, scriptname = tempfile.mkstemp()
446            scriptfile = os.fdopen(sf, 'w')
447        except IOError:
448            return False
449
450        scriptbase = os.path.basename(scriptname)
451
452        # Script the filesystem changes
453        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
454        print >>scriptfile, 'mkdir -p %s' % stagingdir
455        print >>scriptfile, "rm -f %s" % scriptbase
456        scriptfile.close()
457
458        # Move the script to the remote machine
459        # XXX: could collide tempfile names on the remote host
460        if self.scp_file(scriptname, user, host, scriptbase):
461            os.remove(scriptname)
462        else:
463            return False
464
465        # Execute the script (and the script's last line deletes it)
466        if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
467            return False
468
469        try:
470            gcred = self.pg_call(self.sa_url, 'GetCredential', {}, ctxt)
471        except self.ProtoGENIError, e:
472            raise service_error(service_error.federant,
473                    "ProtoGENI: %s" % e)
474        # Find a slicename not in use
475        slicename = "fabereGpgL"
476        while True:
477            slicename = random_slicename(user)
478            try:
479                param = {
480                        'credential': gcred, 
481                        'hrn': slicename,
482                        'type': 'Slice'
483                        }
484                self.pg_call(self.sa_url, 'Resolve', param, ctxt)
485            except self.ProtoGENIError, e:
486                print e
487                break
488
489        self.log.info("Creating %s" % slicename)
490        f = open("./rspec", "w")
491        print >>f, "%s" % rspec
492        f.close()
493        # Create the slice and allocate resources.  If any of this stuff fails,
494        # the allocations will time out on PG in short order, so we just raise
495        # the service_error.
496        try:
497            param = {
498                    'credential': gcred, 
499                    'hrn': slicename,
500                    'type': 'Slice'
501                    }
502            slice_cred = self.pg_call(self.sa_url, 'Register', param, ctxt)
503            f = open("./slice_cred", "w")
504            print >>f, slice_cred
505            f.close()
506            # Populate the ssh keys (let PG format them)
507            param = {
508                    'credential': gcred, 
509                    }
510            keys =  self.pg_call(self.sa_url, 'GetKeys', param, ctxt)
511            # Grab and redeem a ticket
512            param = {
513                    'credential': slice_cred, 
514                    'rspec': rspec,
515                    }
516            ticket = self.pg_call(self.cm_url, 'GetTicket', param, ctxt)
517            f = open("./ticket", "w")
518            print >>f, ticket
519            f.close()
520            param = { 
521                    'credential': slice_cred, 
522                    'keys': keys,
523                    'ticket': ticket,
524                    }
525            sliver_cred, manifest = self.pg_call(self.cm_url, 
526                    'RedeemTicket', param, ctxt)
527            f = open("./sliver_cred", "w")
528            print >>f, sliver_cred
529            f.close()
530            f = open("./manifest", "w")
531            print >>f, manifest
532            f.close()
533            # start 'em up
534            param = { 
535                    'credential': sliver_cred,
536                    }
537            self.pg_call(self.cm_url, 'StartSliver', param, ctxt)
538        except self.ProtoGENIError, e:
539            raise service_error(service_error.federant,
[593e901]540                    "ProtoGENI: %s %s" % (e.code, e))
[c119839]541
542        # With manifest in hand, we can export the portal node names.
543        nodes = self.manifest_to_dict(manifest)
[593e901]544        print nodes
[c119839]545        self.export_store_info(export_certfile, nodes, parent.ssh_port,
546                connInfo)
[593e901]547        self.generate_portal_configs(parent, topo, pubkey, secretkey, tmpdir, 
548                master, ename, connInfo, services, nodes)
549
550        # Copy software to the staging machine (done after generation to copy
551        # those, too)
552        for d in (tmpdir, lsoftdir):
553            if os.path.isdir(d):
554                for f in os.listdir(d):
555                    if not os.path.isdir("%s/%s" % (d, f)):
556                        if not self.scp_file("%s/%s" % (d, f), 
557                                user, host, "%s/%s" % (stagingdir, f)):
558                            self.log.error("Scp failed")
559                            return False
560
[c119839]561
562        # Now we wait for the nodes to start on PG
563        status = 'notready'
564        try:
565            while status == 'notready':
566                param = { 
567                        'credential': slice_cred
568                        }
569                r = self.pg_call(self.cm_url, 'SliceStatus', param, ctxt)
570                print r
571                status = r.get('status', 'notready')
572                if status == 'notready':
573                    time.sleep(30)
[593e901]574        except self.ProtoGENIError, e:
[c119839]575            raise service_error(service_error.federant,
[593e901]576                    "ProtoGENI: %s %s" % (e.code, e))
[c119839]577
578        if status == 'failed':
579            self.log.error('Sliver failed to start on ProtoGENI')
580            try:
581                param = { 
582                        'credential': slice_cred
583                        }
584                self.pg_call(self.cm_url, 'DeleteSliver', param, ctxt)
585            except self.ProtoGENIError, e:
586                raise service_error(service_error.federant,
587                    "ProtoGENI: %s" % e)
588            return False
589        else:
590            parent.state_lock.acquire()
591            parent.allocation[aid]['slice_credential'] = slice_cred
592            parent.allocation[aid]['sliver_credential'] = sliver_cred
593            parent.allocation[aid]['manifest'] = manifest
594            parent.allocation[aid]['certfile'] = certfile
595            parent.allocation[aid]['certpw'] = certpw
596            parent.write_state()
597            parent.state_lock.release()
598
599        # The startcmds for portals and standard nodes (the Master Slave
600        # distinction is going away)
601        gate_cmd = parent.attrs.get('SlaveConnectorStartCmd', '/bin/true')
602        node_cmd = parent.attrs.get('SlaveNodeStartCmd', 'bin/true')
603
604        # Now we have configuration to do for ProtoGENI
605        self.configure_nodes(topo, nodes, user, parent.staging_host,
606                parent.sshd, parent.sshd_config, gate_cmd, node_cmd, 
607                pubkey, secretkey, stagingdir, tmpdir)
608
609        self.start_nodes(user, [ n['hostname'] for n in nodes.values()])
610
611        # Everything has gone OK.
612        return True
613
614class stop_segment(segment_base):
615    def __init__(self, log=None, keyfile=None, debug=False, 
616            ch_url=None, sa_url=None, cm_url=None):
617        segment_base.__init__(self, log=log, keyfile=keyfile, debug=debug,
618                ch_url=cm_url, sa_url=sa_url, cm_url=cm_url)
619
620    def __call__(self, parent, user, stagingdir, slice_cred, certfile, certpw):
621        """
622        Stop a sub experiment by calling swapexp on the federant
623        """
624        host = parent.staging_host
625        rv = False
626        try:
627            # Clean out tar files: we've gone over quota in the past
628            if stagingdir:
629                self.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
630            if slice_cred:
631                self.log.error('Removing Sliver on ProtoGENI')
632                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
633                try:
634                    param = { 
635                            'credential': slice_cred
636                            }
637                    self.pg_call(self.cm_url, 'DeleteSlice', param, ctxt)
638                except self.ProtoGENIError, e:
639                    raise service_error(service_error.federant,
640                        "ProtoGENI: %s" % e)
641            return True
642        except self.ssh_cmd_timeout:
643            rv = False
644        return rv
645
Note: See TracBrowser for help on using the repository browser.