source: fedd/federation/protogeni_access.py @ f8ae7aa

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since f8ae7aa was f8ae7aa, checked in by Ted Faber <faber@…>, 14 years ago

Neaten things up a little and support rpms for federation and portal software in protoGENI

  • Property mode set to 100644
File size: 39.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12
13from threading import *
14from M2Crypto.SSL import SSLError
15
16from util import *
17from access_project import access_project
18from fedid import fedid, generate_fedid
19from authorizer import authorizer
20from service_error import service_error
21from remote_service import xmlrpc_handler, soap_handler, service_caller
22
23import httplib
24import tempfile
25from urlparse import urlparse
26
27import topdl
28import list_log
29import proxy_protogeni_segment
30
31
32# Make log messages disappear if noone configures a fedd logger
33class nullHandler(logging.Handler):
34    def emit(self, record): pass
35
36fl = logging.getLogger("fedd.access")
37fl.addHandler(nullHandler())
38
39class access:
40    """
41    The implementation of access control based on mapping users to projects.
42
43    Users can be mapped to existing projects or have projects created
44    dynamically.  This implements both direct requests and proxies.
45    """
46
47    class parse_error(RuntimeError): pass
48
49
50    proxy_RequestAccess= service_caller('RequestAccess')
51    proxy_ReleaseAccess= service_caller('ReleaseAccess')
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        def software_list(v):
59            l = [ ]
60            if v:
61                ps = v.split(" ")
62                while len(ps):
63                    loc, file = ps[0:2]
64                    del ps[0:2]
65                    if loc == 'rpm':
66                        loc = None
67                    l.append((loc, file))
68            return l
69
70        # Make sure that the configuration is in place
71        if not config: 
72            raise RunTimeError("No config to fedd.access")
73
74        self.project_priority = config.getboolean("access", "project_priority")
75        self.allow_proxy = config.getboolean("access", "allow_proxy")
76
77        self.domain = config.get("access", "domain")
78        self.certdir = config.get("access","certdir")
79        self.userconfdir = config.get("access","userconfdir")
80        self.userconfcmd = config.get("access","userconfcmd")
81        self.userconfurl = config.get("access","userconfurl")
82        self.federation_software = config.get("access", "federation_software")
83        self.portal_software = config.get("access", "portal_software")
84        self.ssh_port = config.get("access","ssh_port") or "22"
85        self.sshd = config.get("access","sshd")
86        self.sshd_config = config.get("access", "sshd_config")
87        self.create_debug = config.getboolean("access", "create_debug")
88        self.cleanup = not config.getboolean("access", "leave_tmpfiles")
89        self.access_type = config.get("access", "type")
90        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
91        self.staging_host = config.get("access", "staging_host") \
92                or "ops.emulab.net"
93   
94        self.federation_software = software_list(self.federation_software)
95        self.portal_software = software_list(self.portal_software)
96
97        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
98        self.renewal_interval = int(self.renewal_interval) * 60
99
100        self.ch_url = config.get("access", "ch_url")
101        self.sa_url = config.get("access", "sa_url")
102        self.cm_url = config.get("access", "cm_url")
103
104        self.attrs = { }
105        self.access = { }
106        self.restricted = [ ]
107        self.projects = { }
108        self.keys = { }
109        self.types = { }
110        self.allocation = { }
111        self.state = { 
112            'projects': self.projects,
113            'allocation' : self.allocation,
114            'keys' : self.keys,
115            'types': self.types
116        }
117        self.log = logging.getLogger("fedd.access")
118        set_log_level(config, "access", self.log)
119        self.state_lock = Lock()
120        # XXX: Configurable
121        self.exports = set(('SMB', 'seer', 'tmcd', 'userconfig'))
122        self.imports = set(('SMB', 'seer', 'userconfig'))
123
124        if auth: self.auth = auth
125        else:
126            self.log.error(\
127                    "[access]: No authorizer initialized, creating local one.")
128            auth = authorizer()
129
130        tb = config.get('access', 'testbed')
131        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
132        else: self.testbed = [ ]
133
134        if config.has_option("access", "accessdb"):
135            self.read_access(config.get("access", "accessdb"))
136
137        self.state_filename = config.get("access", "access_state")
138        print "Calling read_state %s" % self.state_filename
139        self.read_state()
140
141        # Keep cert_file and cert_pwd coming from the same place
142        self.cert_file = config.get("access", "cert_file")
143        if self.cert_file:
144            self.sert_pwd = config.get("access", "cert_pw")
145        else:
146            self.cert_file = config.get("globals", "cert_file")
147            self.sert_pwd = config.get("globals", "cert_pw")
148
149        self.trusted_certs = config.get("access", "trusted_certs") or \
150                config.get("globals", "trusted_certs")
151
152        self.start_segment = proxy_protogeni_segment.start_segment
153        self.stop_segment = proxy_protogeni_segment.stop_segment
154        self.renew_segment = proxy_protogeni_segment.renew_segment
155
156        self.call_SetValue = service_caller('SetValue')
157        self.call_GetValue = service_caller('GetValue')
158
159        self.RenewSlices()
160
161        self.soap_services = {\
162            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
163            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
164            'StartSegment': soap_handler("StartSegment", self.StartSegment),
165            'TerminateSegment': soap_handler("TerminateSegment", 
166                self.TerminateSegment),
167            }
168        self.xmlrpc_services =  {\
169            'RequestAccess': xmlrpc_handler('RequestAccess',
170                self.RequestAccess),
171            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
172                self.ReleaseAccess),
173            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
174            'TerminateSegment': xmlrpc_handler('TerminateSegment',
175                self.TerminateSegment),
176            }
177
178    def read_access(self, config):
179        """
180        Read a configuration file and set internal parameters.
181
182        There are access lines of the
183        form (tb, proj, user) -> user that map the first tuple of
184        names to the user for for access purposes.  Names in the key (left side)
185        can include "<NONE> or <ANY>" to act as wildcards or to require the
186        fields to be empty.  Similarly aproj or auser can be <SAME> or
187        <DYNAMIC> indicating that either the matching key is to be used or a
188        dynamic user or project will be created.  These names can also be
189        federated IDs (fedid's) if prefixed with fedid:.  The user is the
190        ProtoGENI identity certificate.
191        Testbed attributes outside the forms above can be given using the
192        format attribute: name value: value.  The name is a single word and the
193        value continues to the end of the line.  Empty lines and lines startin
194        with a # are ignored.
195
196        Parsing errors result in a self.parse_error exception being raised.
197        """
198        lineno=0
199        name_expr = "["+string.ascii_letters + string.digits + "\/\.\-_]+"
200        fedid_expr = "fedid:[" + string.hexdigits + "]+"
201        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
202
203        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
204                re.IGNORECASE)
205        access_str = '\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+ \
206                key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*,\s*('\
207                        + name_expr + ')\s*,\s*('+name_expr+')\s*,?\s*(' + \
208                        name_expr+ ')?\)'
209        access_re = re.compile(access_str, re.IGNORECASE)
210
211
212        def parse_name(n):
213            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
214            else: return n
215       
216        def auth_name(n):
217            if isinstance(n, basestring):
218                if n =='<any>' or n =='<none>': return None
219                else: return unicode(n)
220            else:
221                return n
222
223        f = open(config, "r");
224        for line in f:
225            lineno += 1
226            line = line.strip();
227            if len(line) == 0 or line.startswith('#'):
228                continue
229
230            # Extended (attribute: x value: y) attribute line
231            m = attr_re.match(line)
232            if m != None:
233                attr, val = m.group(1,2)
234                self.attrs[attr] = val
235                continue
236
237            # Access line (t, p, u) -> (a, pw) line
238            m = access_re.match(line)
239            if m != None:
240                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
241                auth_key = tuple([ auth_name(x) for x in access_key])
242                cert = auth_name(parse_name(m.group(4)))
243                user_name = auth_name(parse_name(m.group(5)))
244                ssh_key = unicode(m.group(6))
245                if m.group(6): pw = unicode(m.group(7))
246                else: pw = None
247
248                self.access[access_key] = (cert, user_name, ssh_key, pw)
249                self.auth.set_attribute(auth_key, "access")
250                continue
251
252            # Nothing matched to here: unknown line - raise exception
253            f.close()
254            raise self.parse_error("Unknown statement at line %d of %s" % \
255                    (lineno, config))
256        f.close()
257
258    def write_state(self):
259        if self.state_filename:
260            try:
261                f = open(self.state_filename, 'w')
262                pickle.dump(self.state, f)
263            except IOError, e:
264                self.log.error("Can't write file %s: %s" % \
265                        (self.state_filename, e))
266            except pickle.PicklingError, e:
267                self.log.error("Pickling problem: %s" % e)
268            except TypeError, e:
269                self.log.error("Pickling problem (TypeError): %s" % e)
270
271
272    def read_state(self):
273        """
274        Read a new copy of access state.  Old state is overwritten.
275
276        State format is a simple pickling of the state dictionary.
277        """
278        if self.state_filename:
279            try:
280                f = open(self.state_filename, "r")
281                self.state = pickle.load(f)
282                self.log.debug("[read_state]: Read state from %s" % \
283                        self.state_filename)
284            except IOError, e:
285                self.log.warning(("[read_state]: No saved state: " +\
286                        "Can't open %s: %s") % (self.state_filename, e))
287            except EOFError, e:
288                self.log.warning(("[read_state]: " +\
289                        "Empty or damaged state file: %s:") % \
290                        self.state_filename)
291            except pickle.UnpicklingError, e:
292                self.log.warning(("[read_state]: No saved state: " + \
293                        "Unpickling failed: %s") % e)
294
295            # Add the ownership attributes to the authorizer.  Note that the
296            # indices of the allocation dict are strings, but the attributes are
297            # fedids, so there is a conversion.
298            for k in self.state.get('allocation', {}).keys():
299                for o in self.state['allocation'][k].get('owners', []):
300                    self.auth.set_attribute(o, fedid(hexstr=k))
301                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
302
303            if self.allocation != self.state['allocation']:
304                self.allocation = self.state['allocation']
305
306    def permute_wildcards(self, a, p):
307        """Return a copy of a with various fields wildcarded.
308
309        The bits of p control the wildcards.  A set bit is a wildcard
310        replacement with the lowest bit being user then project then testbed.
311        """
312        if p & 1: user = ["<any>"]
313        else: user = a[2]
314        if p & 2: proj = "<any>"
315        else: proj = a[1]
316        if p & 4: tb = "<any>"
317        else: tb = a[0]
318
319        return (tb, proj, user)
320
321
322    def find_access(self, search):
323        """
324        Search the access DB for a match on this tuple.  Return the matching
325        user (protoGENI cert).
326       
327        NB, if the initial tuple fails to match we start inserting wildcards in
328        an order determined by self.project_priority.  Try the list of users in
329        order (when wildcarded, there's only one user in the list).
330        """
331        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
332        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
333
334        for p in perm: 
335            s = self.permute_wildcards(search, p)
336            # s[2] is None on an anonymous, unwildcarded request
337            if s[2] != None:
338                for u in s[2]:
339                    if self.access.has_key((s[0], s[1], u)):
340                        return self.access[(s[0], s[1], u)]
341            else:
342                if self.access.has_key(s):
343                    return self.access[s]
344        return None
345
346    def lookup_access(self, req, fid):
347        """
348        Determine the allowed access for this request.  Return the access and
349        which fields are dynamic.
350
351        The fedid is needed to construct the request
352        """
353        # Search keys
354        tb = None
355        project = None
356        user = None
357        # Return values
358        rp = access_project(None, ())
359        ru = None
360        user_re = re.compile("user:\s(.*)")
361        project_re = re.compile("project:\s(.*)")
362
363        user = [ user_re.findall(x)[0] for x in req.get('credential', []) \
364                if user_re.match(x)]
365        project = [ project_re.findall(x)[0] \
366                for x in req.get('credential', []) \
367                    if project_re.match(x)]
368
369        if len(project) == 1: project = project[0]
370        elif len(project) == 0: project = None
371        else: 
372            raise service_error(service_error.req, 
373                    "More than one project credential")
374
375
376        user_fedids = [ u for u in user if isinstance(u, fedid)]
377
378        # Determine how the caller is representing itself.  If its fedid shows
379        # up as a project or a singleton user, let that stand.  If neither the
380        # usernames nor the project name is a fedid, the caller is a testbed.
381        if project and isinstance(project, fedid):
382            if project == fid:
383                # The caller is the project (which is already in the tuple
384                # passed in to the authorizer)
385                owners = user_fedids
386                owners.append(project)
387            else:
388                raise service_error(service_error.req,
389                        "Project asserting different fedid")
390        else:
391            if fid not in user_fedids:
392                tb = fid
393                owners = user_fedids
394                owners.append(fid)
395            else:
396                if len(fedids) > 1:
397                    raise service_error(service_error.req,
398                            "User asserting different fedid")
399                else:
400                    # Which is a singleton
401                    owners = user_fedids
402        # Confirm authorization
403
404        for u in user:
405            self.log.debug("[lookup_access] Checking access for %s" % \
406                    ((tb, project, u),))
407            if self.auth.check_attribute((tb, project, u), 'access'):
408                self.log.debug("[lookup_access] Access granted")
409                break
410            else:
411                self.log.debug("[lookup_access] Access Denied")
412        else:
413            raise service_error(service_error.access, "Access denied")
414
415        # This maps a valid user to the ProtoGENI credentials to use
416        found = self.find_access((tb, project, user))
417       
418        if found == None:
419            raise service_error(service_error.access,
420                    "Access denied - cannot map access")
421        return found, owners
422
423    def get_handler(self, path, fid):
424        self.log.info("Get handler %s %s" % (path, fid))
425        if self.auth.check_attribute(fid, path) and self.userconfdir:
426            return ("%s/%s" % (self.userconfdir, path), "application/binary")
427        else:
428            return (None, None)
429
430    def export_userconf(self, project):
431        dev_null = None
432        confid, confcert = generate_fedid("test", dir=self.userconfdir, 
433                log=self.log)
434        conffilename = "%s/%s" % (self.userconfdir, str(confid))
435        cf = None
436        try:
437            cf = open(conffilename, "w")
438            os.chmod(conffilename, stat.S_IRUSR | stat.S_IWUSR)
439        except IOError, e:
440            raise service_error(service_error.internal, 
441                    "Cannot create user configuration data")
442
443        try:
444            dev_null = open("/dev/null", "a")
445        except IOError, e:
446            self.log.error("export_userconf: can't open /dev/null: %s" % e)
447
448        cmd = "%s %s" % (self.userconfcmd, project)
449        conf = subprocess.call(cmd.split(" "),
450                stdout=cf, stderr=dev_null, close_fds=True)
451
452        self.auth.set_attribute(confid, "/%s" % str(confid))
453
454        return confid, confcert
455
456
457    def export_services(self, sreq, project, user):
458        exp = [ ]
459        state = { }
460        # XXX: Filthy shortcut here using http: so urlparse will give the right
461        # answers.
462        for s in sreq:
463            sname = s.get('name', '')
464            svis = s.get('visibility', '')
465            if svis == 'export':
466                if sname in self.exports:
467                    outs = s.copy()
468                    if sname == 'SMB':
469                        outs = s.copy()
470                        outs['server'] = "http://fs:139" 
471                        outs['fedAttr'] = [
472                                { 'attribute': 'SMBSHARE', 'value': 'USERS' },
473                                { 'attribute': 'SMBUSER', 'value': user },
474                                { 'attribute': 'SMBPROJ', 'value': project },
475                            ]
476                    elif sname == 'seer':
477                        outs['server'] = "http://control:16606"
478                    elif sname == 'tmcd':
479                        outs['server'] = "http://boss:7777"
480                    elif sname == 'userconfig':
481                        if self.userconfdir and self.userconfcmd \
482                                and self.userconfurl:
483                            cid, cert = self.export_userconf(project)
484                            outs['server'] = "%s/%s" % \
485                                    (self.userconfurl, str(cid))
486                            outs['fedAttr'] = [
487                                    { 'attribute': 'cert', 'value': cert },
488                                ]
489                            state['userconfig'] = unicode(cid)
490                    exp.append(outs)
491        return (exp, state)
492
493    def build_response(self, alloc_id, ap, services):
494        """
495        Create the SOAP response.
496
497        Build the dictionary description of the response and use
498        fedd_utils.pack_soap to create the soap message.  ap is the allocate
499        project message returned from a remote project allocation (even if that
500        allocation was done locally).
501        """
502        # Because alloc_id is already a fedd_services_types.IDType_Holder,
503        # there's no need to repack it
504        msg = { 
505                'allocID': alloc_id,
506                'fedAttr': [
507                    { 'attribute': 'domain', 'value': self.domain } , 
508                    { 'attribute': 'project', 'value': 
509                        ap['project'].get('name', {}).get('localname', "???") },
510                ]
511            }
512        if len(self.attrs) > 0:
513            msg['fedAttr'].extend(
514                [ { 'attribute': x, 'value' : y } \
515                        for x,y in self.attrs.iteritems()])
516
517        if services:
518            msg['service'] = services
519        return msg
520
521    def RequestAccess(self, req, fid):
522        """
523        Handle the access request.  Proxy if not for us.
524
525        Parse out the fields and make the allocations or rejections if for us,
526        otherwise, assuming we're willing to proxy, proxy the request out.
527        """
528
529        # The dance to get into the request body
530        if req.has_key('RequestAccessRequestBody'):
531            req = req['RequestAccessRequestBody']
532        else:
533            raise service_error(service_error.req, "No request!?")
534
535        if req.has_key('destinationTestbed'):
536            dt = unpack_id(req['destinationTestbed'])
537
538        if dt == None or dt in self.testbed:
539            # Request for this fedd
540            found, owners = self.lookup_access(req, fid)
541            # keep track of what's been added
542            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
543            aid = unicode(allocID)
544
545            self.state_lock.acquire()
546            self.allocation[aid] = { }
547            # The protoGENI certificate
548            self.allocation[aid]['credentials'] = found
549            # The list of owner FIDs
550            self.allocation[aid]['owners'] = owners
551            self.write_state()
552            self.state_lock.release()
553            for o in owners:
554                self.auth.set_attribute(o, allocID)
555            self.auth.set_attribute(allocID, allocID)
556
557            try:
558                f = open("%s/%s.pem" % (self.certdir, aid), "w")
559                print >>f, alloc_cert
560                f.close()
561            except IOError, e:
562                raise service_error(service_error.internal, 
563                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
564            return { 'allocID': { 'fedid': allocID } }
565        else:
566            if self.allow_proxy:
567                resp = self.proxy_RequestAccess.call_service(dt, req,
568                            self.cert_file, self.cert_pwd,
569                            self.trusted_certs)
570                if resp.has_key('RequestAccessResponseBody'):
571                    return resp['RequestAccessResponseBody']
572                else:
573                    return None
574            else:
575                raise service_error(service_error.access,
576                        "Access proxying denied")
577
578
579    def ReleaseAccess(self, req, fid):
580        # The dance to get into the request body
581        if req.has_key('ReleaseAccessRequestBody'):
582            req = req['ReleaseAccessRequestBody']
583        else:
584            raise service_error(service_error.req, "No request!?")
585
586        if req.has_key('destinationTestbed'):
587            dt = unpack_id(req['destinationTestbed'])
588        else:
589            dt = None
590
591        if dt == None or dt in self.testbed:
592            # Local request
593            try:
594                if req['allocID'].has_key('localname'):
595                    auth_attr = aid = req['allocID']['localname']
596                elif req['allocID'].has_key('fedid'):
597                    aid = unicode(req['allocID']['fedid'])
598                    auth_attr = req['allocID']['fedid']
599                else:
600                    raise service_error(service_error.req,
601                            "Only localnames and fedids are understood")
602            except KeyError:
603                raise service_error(service_error.req, "Badly formed request")
604
605            self.log.debug("[access] deallocation requested for %s", aid)
606            if not self.auth.check_attribute(fid, auth_attr):
607                self.log.debug("[access] deallocation denied for %s", aid)
608                raise service_error(service_error.access, "Access Denied")
609
610            self.state_lock.acquire()
611            if self.allocation.has_key(aid):
612                self.log.debug("Found allocation for %s" %aid)
613                del self.allocation[aid]
614                self.write_state()
615                self.state_lock.release()
616                # And remove the access cert
617                cf = "%s/%s.pem" % (self.certdir, aid)
618                self.log.debug("Removing %s" % cf)
619                os.remove(cf)
620                return { 'allocID': req['allocID'] } 
621            else:
622                self.state_lock.release()
623                raise service_error(service_error.req, "No such allocation")
624
625        else:
626            if self.allow_proxy:
627                resp = self.proxy_ReleaseAccess.call_service(dt, req,
628                            self.cert_file, self.cert_pwd,
629                            self.trusted_certs)
630                if resp.has_key('ReleaseAccessResponseBody'):
631                    return resp['ReleaseAccessResponseBody']
632                else:
633                    return None
634            else:
635                raise service_error(service_error.access,
636                        "Access proxying denied")
637
638    def import_store_info(self, cf, connInfo):
639        """
640        Pull any import parameters in connInfo in.  We translate them either
641        into known member names or fedAddrs.
642        """
643
644        for c in connInfo:
645            for p in [ p for p in c.get('parameter', []) \
646                    if p.get('type', '') == 'input']:
647                name = p.get('name', None)
648                key = p.get('key', None)
649                store = p.get('store', None)
650
651                if name and key and store :
652                    req = { 'name': key, 'wait': True }
653                    r = self.call_GetValue(store, req, cf)
654                    r = r.get('GetValueResponseBody', None)
655                    if r :
656                        if r.get('name', '') == key:
657                            v = r.get('value', None)
658                            if v is not None:
659                                if name == 'peer':
660                                    c['peer'] = v
661                                else:
662                                    if c.has_key('fedAttr'):
663                                        c['fedAttr'].append({
664                                            'attribute': name, 'value': v})
665                                    else:
666                                        c['fedAttr']= [{
667                                            'attribute': name, 'value': v}]
668                            else:
669                                raise service_error(service_error.internal, 
670                                        'None value exported for %s'  % key)
671                        else:
672                            raise service_error(service_error.internal, 
673                                    'Different name returned for %s: %s' \
674                                            % (key, r.get('name','')))
675                    else:
676                        raise service_error(service_error.internal, 
677                            'Badly formatted response: no GetValueResponseBody')
678                else:
679                    raise service_error(service_error.internal, 
680                        'Bad Services missing info for import %s' % c)
681
682    def generate_portal_configs(self, topo, pubkey_base, secretkey_base, 
683            tmpdir, master, leid, connInfo, services):
684
685        def conninfo_to_dict(key, info):
686            """
687            Make a cpoy of the connection information about key, and flatten it
688            into a single dict by parsing out any feddAttrs.
689            """
690
691            rv = None
692            for i in info:
693                if key == i.get('portal', "") or \
694                        key in [e.get('element', "") \
695                        for e in i.get('member', [])]:
696                    rv = i.copy()
697                    break
698
699            else:
700                return rv
701
702            if 'fedAttr' in rv:
703                for a in rv['fedAttr']:
704                    attr = a.get('attribute', "")
705                    val = a.get('value', "")
706                    if attr and attr not in rv:
707                        rv[attr] = val
708                del rv['fedAttr']
709            return rv
710
711        # XXX: un hardcode this
712        def client_null(f, s):
713            print >>f, "Service: %s" % s['name']
714
715        def client_smb(f, s):
716            print >>f, "Service: %s" % s['name']
717            smbshare = None
718            smbuser = None
719            smbproj = None
720            for a in s.get('fedAttr', []):
721                if a.get('attribute', '') == 'SMBSHARE':
722                    smbshare = a.get('value', None)
723                elif a.get('attribute', '') == 'SMBUSER':
724                    smbuser = a.get('value', None)
725                elif a.get('attribute', '') == 'SMBPROJ':
726                    smbproj = a.get('value', None)
727
728            if all((smbshare, smbuser, smbproj)):
729                print >>f, "SMBshare: %s" % smbshare
730                print >>f, "ProjectUser: %s" % smbuser
731                print >>f, "ProjectName: %s" % smbproj
732
733        client_service_out = {
734                'SMB': client_smb,
735                'tmcd': client_null,
736                'seer': client_null,
737                'userconfig': client_null,
738            }
739        # XXX: end un hardcode this
740
741
742        seer_out = False
743        client_out = False
744        for e in [ e for e in topo.elements \
745                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
746            myname = e.name[0]
747            type = e.get_attribute('portal_type')
748
749            info = conninfo_to_dict(myname, connInfo)
750
751            if not info:
752                raise service_error(service_error.req,
753                        "No connectivity info for %s" % myname)
754
755            peer = info.get('peer', "")
756            ldomain = self.domain;
757
758            mexp = info.get('masterexperiment',"")
759            mproj, meid = mexp.split("/", 1)
760            mdomain = info.get('masterdomain',"")
761            muser = info.get('masteruser','root')
762            smbshare = info.get('smbshare', 'USERS')
763            ssh_port = info.get('ssh_port', '22')
764
765            active = info.get('active', 'False')
766
767            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
768            tunnelconfig = self.attrs.has_key('TunnelCfg')
769            try:
770                f = open(cfn, "w")
771                if active == 'True':
772                    print >>f, "active: True"
773                    print >>f, "ssh_port: %s" % ssh_port
774                    if type in ('control', 'both'):
775                        for s in [s for s in services \
776                                if s.get('name', "") in self.imports]:
777                            p = urlparse(s.get('server', 'http://localhost'))
778                            print >>f, 'port: remote:%s:%s:%s' % \
779                                    (p.port, p.hostname, p.port)
780
781                if tunnelconfig:
782                    print >>f, "tunnelip: %s" % tunnelconfig
783                # XXX: send this an fedattr
784                #print >>f, "seercontrol: control.%s.%s%s" % \
785                        #(meid.lower(), mproj.lower(), mdomain)
786                print >>f, "peer: %s" % peer.lower()
787                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
788                        pubkey_base
789                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
790                        secretkey_base
791                f.close()
792            except IOError, e:
793                raise service_error(service_error.internal,
794                        "Can't write protal config %s: %s" % (cfn, e))
795           
796            # XXX: This little seer config file needs to go away.
797            if not seer_out:
798                try:
799                    seerfn = "%s/seer.conf" % tmpdir
800                    f = open(seerfn, "w")
801                    if not master:
802                        print >>f, "ControlNode: control.%s.%s%s" % \
803                            (meid.lower(), mproj.lower(), mdomain)
804                    print >>f, "ExperimentID: %s" % mexp
805                    f.close()
806                except IOError, e:
807                    raise service_error(service_error.internal, 
808                            "Can't write seer.conf: %s" %e)
809                seer_out = True
810
811            if not client_out and type in ('control', 'both'):
812                try:
813                    f = open("%s/client.conf" % tmpdir, "w")
814                    print >>f, "ControlGateway: %s%s" % \
815                        (myname.lower(), ldomain.lower())
816                    for s in services:
817                        if s.get('name',"") in self.imports and \
818                                s.get('visibility','') == 'import':
819                            client_service_out[s['name']](f, s)
820                    # Does seer need this?
821                    # print >>f, "ExperimentID: %s/%s" % (mproj, meid)
822                    f.close()
823                except IOError, e:
824                    raise service_error(service_error.internal,
825                            "Cannot write client.conf: %s" %s)
826                client_out = True
827
828
829    def generate_rspec(self, topo, softdir, master, connInfo):
830        t = topo.clone()
831
832        starts = { }
833
834        # The startcmds for master and slave testbeds
835        if master: 
836            gate_cmd = self.attrs.get('MasterConnectorStartCmd', '/bin/true')
837            node_cmd = self.attrs.get('MasterNodeStartCmd', 'bin/true')
838        else: 
839            gate_cmd = self.attrs.get('SlaveConnectorStartCmd', '/bin/true')
840            node_cmd = self.attrs.get('SlaveNodeStartCmd', 'bin/true')
841
842        # Weed out the things we aren't going to instantiate: Segments, portal
843        # substrates, and portal interfaces.  (The copy in the for loop allows
844        # us to delete from e.elements in side the for loop).  While we're
845        # touching all the elements, we also adjust paths from the original
846        # testbed to local testbed paths and put the federation commands and
847        # startcommands into a dict so we can start them manually later.
848        # ProtoGENI requires setup before the federation commands run, so we
849        # run them by hand after we've seeded configurations.
850        for e in [e for e in t.elements]:
851            if isinstance(e, topdl.Segment):
852                t.elements.remove(e)
853            # Fix software paths
854            for s in getattr(e, 'software', []):
855                s.location = re.sub("^.*/", softdir, s.location)
856            if isinstance(e, topdl.Computer):
857                if e.get_attribute('portal') and gate_cmd:
858                    # Portals never have a user-specified start command
859                    starts[e.name[0]] = gate_cmd
860                elif node_cmd:
861                    if e.get_attribute('startup'):
862                        starts[e.name[0]] = "%s \\$USER '%s'" % \
863                                (node_cmd, e.get_attribute('startup'))
864                        e.remove_attribute('startup')
865                    else:
866                        starts[e.name[0]] = node_cmd
867
868                # Remove portal interfaces
869                e.interface = [i for i in e.interface \
870                        if not i.get_attribute('portal')]
871
872        t.substrates = [ s.clone() for s in t.substrates ]
873        t.incorporate_elements()
874
875        # Customize the ns2 output for local portal commands and images
876        filters = []
877
878        # NB: these are extra commands issued for the node, not the startcmds
879        if master: cmd = self.attrs.get('MasterConnectorCmd', '')
880        else: cmd = self.attrs.get('SlaveConnectorCmd', '')
881
882        if cmd:
883            filters.append(topdl.generate_portal_command_filter(cmd))
884
885        # Convert to rspec and return it
886        exp_rspec = topdl.topology_to_rspec(t, filters)
887
888        return exp_rspec
889
890    def StartSegment(self, req, fid):
891
892        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
893
894        err = None  # Any service_error generated after tmpdir is created
895        rv = None   # Return value from segment creation
896
897        try:
898            req = req['StartSegmentRequestBody']
899        except KeyError:
900            raise service_error(service_error.req, "Badly formed request")
901
902        connInfo = req.get('connection', [])
903        services = req.get('service', [])
904        auth_attr = req['allocID']['fedid']
905        aid = "%s" % auth_attr
906        attrs = req.get('fedAttr', [])
907        if not self.auth.check_attribute(fid, auth_attr):
908            raise service_error(service_error.access, "Access denied")
909        else:
910            # See if this is a replay of an earlier succeeded StartSegment -
911            # sometimes SSL kills 'em.  If so, replay the response rather than
912            # redoing the allocation.
913            self.state_lock.acquire()
914            retval = self.allocation[aid].get('started', None)
915            self.state_lock.release()
916            if retval:
917                self.log.warning("Duplicate StartSegment for %s: " % aid + \
918                        "replaying response")
919                return retval
920
921
922        if req.has_key('segmentdescription') and \
923                req['segmentdescription'].has_key('topdldescription'):
924            topo = \
925                topdl.Topology(**req['segmentdescription']['topdldescription'])
926        else:
927            raise service_error(service_error.req, 
928                    "Request missing segmentdescription'")
929
930        master = req.get('master', False)
931
932        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
933        try:
934            tmpdir = tempfile.mkdtemp(prefix="access-")
935            softdir = "%s/software" % tmpdir
936        except IOError:
937            raise service_error(service_error.internal, "Cannot create tmp dir")
938
939        # Try block alllows us to clean up temporary files.
940        try:
941            sw = set()
942            for e in topo.elements:
943                for s in getattr(e, 'software', []):
944                    sw.add(s.location)
945            os.mkdir(softdir)
946            for s in sw:
947                self.log.debug("Retrieving %s" % s)
948                try:
949                    get_url(s, certfile, softdir)
950                except:
951                    t, v, st = sys.exc_info()
952                    raise service_error(service_error.internal,
953                            "Error retrieving %s: %s" % (s, v))
954
955            # Copy local portal node software to the tempdir
956            for s in (self.portal_software, self.federation_software):
957                for l, f in s:
958                    base = os.path.basename(f)
959                    copy_file(f, "%s/%s" % (softdir, base))
960
961            # Ick.  Put this python rpm in a place that it will get moved into
962            # the staging area.  It's a hack to install a modern (in a Roman
963            # sense of modern) python on ProtoGENI
964            python_rpm ="python2.4-2.4-1pydotorg.i586.rpm"
965            if os.access("./%s" % python_rpm, os.R_OK):
966                copy_file("./%s" % python_rpm, "%s/%s" % (softdir, python_rpm))
967
968            for a in attrs:
969                if a['attribute'] in configs:
970                    try:
971                        self.log.debug("Retrieving %s" % a['value'])
972                        get_url(a['value'], certfile, tmpdir)
973                    except:
974                        t, v, st = sys.exc_info()
975                        raise service_error(service_error.internal,
976                                "Error retrieving %s: %s" % (s, v))
977                if a['attribute'] == 'ssh_pubkey':
978                    pubkey_base = a['value'].rpartition('/')[2]
979                if a['attribute'] == 'ssh_secretkey':
980                    secretkey_base = a['value'].rpartition('/')[2]
981                if a['attribute'] == 'experiment_name':
982                    ename = a['value']
983
984            # If the userconf service was imported, collect the configuration
985            # data.
986            for s in services:
987                if s.get("name", "") == 'userconfig' \
988                        and s.get('visibility',"") == 'import':
989
990                    # Collect ther server and certificate info.
991                    u = s.get('server', None)
992                    for a in s.get('fedAttr', []):
993                        if a.get('attribute',"") == 'cert':
994                            cert = a.get('value', None)
995                            break
996                    else:
997                        cert = None
998
999                    if cert:
1000                        # Make a temporary certificate file for get_url.  The
1001                        # finally clause removes it whether something goes
1002                        # wrong (including an exception from get_url) or not.
1003                        try:
1004                            tfos, tn = tempfile.mkstemp(suffix=".pem")
1005                            tf = os.fdopen(tfos, 'w')
1006                            print >>tf, cert
1007                            tf.close()
1008                            get_url(u, tn, tmpdir, "userconf")
1009                        except IOError, e:
1010                            raise service_error(service.error.internal, 
1011                                    "Cannot create temp file for " + 
1012                                    "userconfig certificates: %s e")
1013                        except:
1014                            t, v, st = sys.exc_info()
1015                            raise service_error(service_error.internal,
1016                                    "Error retrieving %s: %s" % (u, v))
1017                        finally:
1018                            if tn: os.remove(tn)
1019                    else:
1020                        raise service_error(service_error.req,
1021                                "No certificate for retreiving userconfig")
1022                    break
1023
1024            self.state_lock.acquire()
1025            if self.allocation.has_key(aid):
1026                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1027                self.allocation[aid]['experiment'] = ename
1028                self.allocation[aid]['log'] = [ ]
1029                # Create a logger that logs to the experiment's state object as
1030                # well as to the main log file.
1031                alloc_log = logging.getLogger('fedd.access.%s' % ename)
1032                h = logging.StreamHandler(
1033                        list_log.list_log(self.allocation[aid]['log']))
1034                # XXX: there should be a global one of these rather than
1035                # repeating the code.
1036                h.setFormatter(logging.Formatter(
1037                    "%(asctime)s %(name)s %(message)s",
1038                            '%d %b %y %H:%M:%S'))
1039                alloc_log.addHandler(h)
1040                self.write_state()
1041            else:
1042                self.log.error("No allocation for %s!?" % aid)
1043            self.state_lock.release()
1044
1045            # XXX: we really need to put the import and connection info
1046            # generation off longer.
1047            self.import_store_info(certfile, connInfo)
1048            #self.generate_portal_configs(topo, pubkey_base,
1049                    #secretkey_base, tmpdir, master, ename, connInfo,
1050                    #services)
1051            rspec = self.generate_rspec(topo, "%s/%s/" \
1052                    % (self.staging_dir, ename), master, connInfo)
1053
1054            starter = self.start_segment(keyfile=ssh_key,
1055                    debug=self.create_debug, log=alloc_log,
1056                    ch_url = self.ch_url, sa_url=self.sa_url,
1057                    cm_url=self.cm_url)
1058            rv = starter(self, aid, user, rspec, pubkey_base, secretkey_base,
1059                    master, ename,
1060                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1061                    certfile, topo, connInfo, services)
1062        except service_error, e:
1063            err = e
1064        except e:
1065            err = service_error(service_error.internal, str(e))
1066
1067        # Walk up tmpdir, deleting as we go
1068        if self.cleanup:
1069            self.log.debug("[StartSegment]: removing %s" % tmpdir)
1070            for path, dirs, files in os.walk(tmpdir, topdown=False):
1071                for f in files:
1072                    os.remove(os.path.join(path, f))
1073                for d in dirs:
1074                    os.rmdir(os.path.join(path, d))
1075            os.rmdir(tmpdir)
1076        else:
1077            self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1078
1079        if rv:
1080            # Grab the log (this is some anal locking, but better safe than
1081            # sorry)
1082            self.state_lock.acquire()
1083            logv = "".join(self.allocation[aid]['log'])
1084            # It's possible that the StartSegment call gets retried (!).
1085            # if the 'started' key is in the allocation, we'll return it rather
1086            # than redo the setup.
1087            self.allocation[aid]['started'] = { 
1088                    'allocID': req['allocID'],
1089                    'allocationLog': logv,
1090                    }
1091            retval = self.allocation[aid]['started']
1092            self.write_state()
1093            self.state_lock.release()
1094
1095            return retval
1096        elif err:
1097            raise service_error(service_error.federant,
1098                    "Swapin failed: %s" % err)
1099        else:
1100            raise service_error(service_error.federant, "Swapin failed")
1101
1102    def TerminateSegment(self, req, fid):
1103        try:
1104            req = req['TerminateSegmentRequestBody']
1105        except KeyError:
1106            raise service_error(service_error.req, "Badly formed request")
1107
1108        auth_attr = req['allocID']['fedid']
1109        aid = "%s" % auth_attr
1110        attrs = req.get('fedAttr', [])
1111        if not self.auth.check_attribute(fid, auth_attr):
1112            raise service_error(service_error.access, "Access denied")
1113
1114        self.state_lock.acquire()
1115        if self.allocation.has_key(aid):
1116            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1117            slice_cred = self.allocation[aid].get('slice_credential', None)
1118            ename = self.allocation[aid].get('experiment', None)
1119        else:
1120            cf, user, ssh_key, cpw = (None, None, None, None)
1121            slice_cred = None
1122            ename = None
1123        self.state_lock.release()
1124
1125        if ename:
1126            staging = "%s/%s" % ( self.staging_dir, ename)
1127        else:
1128            self.log.warn("Can't find experiment name for %s" % aid)
1129            staging = None
1130
1131        stopper = self.stop_segment(keyfile=ssh_key, debug=self.create_debug,
1132                ch_url = self.ch_url, sa_url=self.sa_url, cm_url=self.cm_url)
1133        stopper(self, user, staging, slice_cred, cf, cpw)
1134        return { 'allocID': req['allocID'] }
1135
1136    def RenewSlices(self):
1137        self.log.info("Scanning for slices to renew")
1138        self.state_lock.acquire()
1139        aids = self.allocation.keys()
1140        self.state_lock.release()
1141
1142        for aid in aids:
1143            self.state_lock.acquire()
1144            if self.allocation.has_key(aid):
1145                name = self.allocation[aid].get('slice_name', None)
1146                scred = self.allocation[aid].get('slice_credential', None)
1147                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1148            else:
1149                name = None
1150                scred = None
1151            self.state_lock.release()
1152
1153            # There's a ProtoGENI slice associated with the segment; renew it.
1154            if name and scred:
1155                renewer = self.renew_segment(log=self.log, 
1156                        debug=self.create_debug, keyfile=ssh_key,
1157                        cm_url = self.cm_url, sa_url = self.sa_url,
1158                        ch_url = self.ch_url)
1159                new_scred = renewer(name, scred, self.renewal_interval, cf, cpw)
1160                if new_scred:
1161                    self.log.info("Slice %s renewed until %s GMT" % \
1162                            (name, time.asctime(time.gmtime(\
1163                                time.time()+self.renewal_interval))))
1164                    self.state_lock.acquire()
1165                    if self.allocation.has_key(aid):
1166                        self.allocation[aid]['slice_credential'] = new_scred
1167                    self.state_lock.release()
1168                else:
1169                    self.log.info("Failed to renew slice %s " % name)
1170
1171        # Let's do this all again soon.  (4 tries before the slices time out)   
1172        t = Timer(self.renewal_interval/4, self.RenewSlices)
1173        t.start()
Note: See TracBrowser for help on using the repository browser.