source: fedd/federation/protogeni_access.py @ 1dcaff4

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 1dcaff4 was 1dcaff4, checked in by Ted Faber <faber@…>, 14 years ago

Improved SSL error handling (more try blocks, BIOError exception)
Separate get_url into util
Work with remote splitter.

  • Property mode set to 100644
File size: 39.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12
13from threading import *
14from M2Crypto.SSL import SSLError
15
16from util import *
17from access_project import access_project
18from fedid import fedid, generate_fedid
19from authorizer import authorizer
20from service_error import service_error
21from remote_service import xmlrpc_handler, soap_handler, service_caller
22
23import httplib
24import tempfile
25from urlparse import urlparse
26
27import topdl
28import list_log
29import proxy_protogeni_segment
30
31
32# Make log messages disappear if noone configures a fedd logger
33class nullHandler(logging.Handler):
34    def emit(self, record): pass
35
36fl = logging.getLogger("fedd.access")
37fl.addHandler(nullHandler())
38
39class access:
40    """
41    The implementation of access control based on mapping users to projects.
42
43    Users can be mapped to existing projects or have projects created
44    dynamically.  This implements both direct requests and proxies.
45    """
46
47    class parse_error(RuntimeError): pass
48
49
50    proxy_RequestAccess= service_caller('RequestAccess')
51    proxy_ReleaseAccess= service_caller('ReleaseAccess')
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        def software_list(v):
59            l = [ ]
60            if v:
61                ps = v.split(" ")
62                while len(ps):
63                    loc, file = ps[0:2]
64                    del ps[0:2]
65                    l.append((loc, file))
66            return l
67
68        # Make sure that the configuration is in place
69        if not config: 
70            raise RunTimeError("No config to fedd.access")
71
72        self.project_priority = config.getboolean("access", "project_priority")
73        self.allow_proxy = config.getboolean("access", "allow_proxy")
74
75        self.domain = config.get("access", "domain")
76        self.certdir = config.get("access","certdir")
77        self.userconfdir = config.get("access","userconfdir")
78        self.userconfcmd = config.get("access","userconfcmd")
79        self.userconfurl = config.get("access","userconfurl")
80        self.federation_software = config.get("access", "federation_software")
81        self.portal_software = config.get("access", "portal_software")
82        self.ssh_port = config.get("access","ssh_port") or "22"
83        self.sshd = config.get("access","sshd")
84        self.sshd_config = config.get("access", "sshd_config")
85        self.create_debug = config.getboolean("access", "create_debug")
86        self.cleanup = not config.getboolean("access", "leave_tmpfiles")
87        self.access_type = config.get("access", "type")
88        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
89        self.staging_host = config.get("access", "staging_host") \
90                or "ops.emulab.net"
91   
92        self.federation_software = software_list(self.federation_software)
93        self.portal_software = software_list(self.portal_software)
94
95        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
96        self.renewal_interval = int(self.renewal_interval) * 60
97
98        self.ch_url = config.get("access", "ch_url")
99        self.sa_url = config.get("access", "sa_url")
100        self.cm_url = config.get("access", "cm_url")
101
102        self.attrs = { }
103        self.access = { }
104        self.restricted = [ ]
105        self.projects = { }
106        self.keys = { }
107        self.types = { }
108        self.allocation = { }
109        self.state = { 
110            'projects': self.projects,
111            'allocation' : self.allocation,
112            'keys' : self.keys,
113            'types': self.types
114        }
115        self.log = logging.getLogger("fedd.access")
116        set_log_level(config, "access", self.log)
117        self.state_lock = Lock()
118        # XXX: Configurable
119        self.exports = set(('SMB', 'seer', 'tmcd', 'userconfig'))
120        self.imports = set(('SMB', 'seer', 'userconfig'))
121
122        if auth: self.auth = auth
123        else:
124            self.log.error(\
125                    "[access]: No authorizer initialized, creating local one.")
126            auth = authorizer()
127
128        tb = config.get('access', 'testbed')
129        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
130        else: self.testbed = [ ]
131
132        if config.has_option("access", "accessdb"):
133            self.read_access(config.get("access", "accessdb"))
134
135        self.state_filename = config.get("access", "access_state")
136        print "Calling read_state %s" % self.state_filename
137        self.read_state()
138
139        # Keep cert_file and cert_pwd coming from the same place
140        self.cert_file = config.get("access", "cert_file")
141        if self.cert_file:
142            self.sert_pwd = config.get("access", "cert_pw")
143        else:
144            self.cert_file = config.get("globals", "cert_file")
145            self.sert_pwd = config.get("globals", "cert_pw")
146
147        self.trusted_certs = config.get("access", "trusted_certs") or \
148                config.get("globals", "trusted_certs")
149
150        self.start_segment = proxy_protogeni_segment.start_segment
151        self.stop_segment = proxy_protogeni_segment.stop_segment
152        self.renew_segment = proxy_protogeni_segment.renew_segment
153
154        self.call_SetValue = service_caller('SetValue')
155        self.call_GetValue = service_caller('GetValue')
156
157        self.RenewSlices()
158
159        self.soap_services = {\
160            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
161            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
162            'StartSegment': soap_handler("StartSegment", self.StartSegment),
163            'TerminateSegment': soap_handler("TerminateSegment", 
164                self.TerminateSegment),
165            }
166        self.xmlrpc_services =  {\
167            'RequestAccess': xmlrpc_handler('RequestAccess',
168                self.RequestAccess),
169            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
170                self.ReleaseAccess),
171            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
172            'TerminateSegment': xmlrpc_handler('TerminateSegment',
173                self.TerminateSegment),
174            }
175
176    def read_access(self, config):
177        """
178        Read a configuration file and set internal parameters.
179
180        There are access lines of the
181        form (tb, proj, user) -> user that map the first tuple of
182        names to the user for for access purposes.  Names in the key (left side)
183        can include "<NONE> or <ANY>" to act as wildcards or to require the
184        fields to be empty.  Similarly aproj or auser can be <SAME> or
185        <DYNAMIC> indicating that either the matching key is to be used or a
186        dynamic user or project will be created.  These names can also be
187        federated IDs (fedid's) if prefixed with fedid:.  The user is the
188        ProtoGENI identity certificate.
189        Testbed attributes outside the forms above can be given using the
190        format attribute: name value: value.  The name is a single word and the
191        value continues to the end of the line.  Empty lines and lines startin
192        with a # are ignored.
193
194        Parsing errors result in a self.parse_error exception being raised.
195        """
196        lineno=0
197        name_expr = "["+string.ascii_letters + string.digits + "\/\.\-_]+"
198        fedid_expr = "fedid:[" + string.hexdigits + "]+"
199        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
200
201        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
202                re.IGNORECASE)
203        access_str = '\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+ \
204                key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*,\s*('\
205                        + name_expr + ')\s*,\s*('+name_expr+')\s*,?\s*(' + \
206                        name_expr+ ')?\)'
207        access_re = re.compile(access_str, re.IGNORECASE)
208
209
210        def parse_name(n):
211            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
212            else: return n
213       
214        def auth_name(n):
215            if isinstance(n, basestring):
216                if n =='<any>' or n =='<none>': return None
217                else: return unicode(n)
218            else:
219                return n
220
221        f = open(config, "r");
222        for line in f:
223            lineno += 1
224            line = line.strip();
225            if len(line) == 0 or line.startswith('#'):
226                continue
227
228            # Extended (attribute: x value: y) attribute line
229            m = attr_re.match(line)
230            if m != None:
231                attr, val = m.group(1,2)
232                self.attrs[attr] = val
233                continue
234
235            # Access line (t, p, u) -> (a, pw) line
236            m = access_re.match(line)
237            if m != None:
238                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
239                auth_key = tuple([ auth_name(x) for x in access_key])
240                cert = auth_name(parse_name(m.group(4)))
241                user_name = auth_name(parse_name(m.group(5)))
242                ssh_key = unicode(m.group(6))
243                if m.group(6): pw = unicode(m.group(7))
244                else: pw = None
245
246                self.access[access_key] = (cert, user_name, ssh_key, pw)
247                self.auth.set_attribute(auth_key, "access")
248                continue
249
250            # Nothing matched to here: unknown line - raise exception
251            f.close()
252            raise self.parse_error("Unknown statement at line %d of %s" % \
253                    (lineno, config))
254        f.close()
255
256    def write_state(self):
257        if self.state_filename:
258            try:
259                f = open(self.state_filename, 'w')
260                pickle.dump(self.state, f)
261            except IOError, e:
262                self.log.error("Can't write file %s: %s" % \
263                        (self.state_filename, e))
264            except pickle.PicklingError, e:
265                self.log.error("Pickling problem: %s" % e)
266            except TypeError, e:
267                self.log.error("Pickling problem (TypeError): %s" % e)
268
269
270    def read_state(self):
271        """
272        Read a new copy of access state.  Old state is overwritten.
273
274        State format is a simple pickling of the state dictionary.
275        """
276        if self.state_filename:
277            try:
278                f = open(self.state_filename, "r")
279                self.state = pickle.load(f)
280                self.log.debug("[read_state]: Read state from %s" % \
281                        self.state_filename)
282            except IOError, e:
283                self.log.warning(("[read_state]: No saved state: " +\
284                        "Can't open %s: %s") % (self.state_filename, e))
285            except EOFError, e:
286                self.log.warning(("[read_state]: " +\
287                        "Empty or damaged state file: %s:") % \
288                        self.state_filename)
289            except pickle.UnpicklingError, e:
290                self.log.warning(("[read_state]: No saved state: " + \
291                        "Unpickling failed: %s") % e)
292
293            # Add the ownership attributes to the authorizer.  Note that the
294            # indices of the allocation dict are strings, but the attributes are
295            # fedids, so there is a conversion.
296            for k in self.state.get('allocation', {}).keys():
297                for o in self.state['allocation'][k].get('owners', []):
298                    self.auth.set_attribute(o, fedid(hexstr=k))
299                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
300
301            if self.allocation != self.state['allocation']:
302                self.allocation = self.state['allocation']
303
304    def permute_wildcards(self, a, p):
305        """Return a copy of a with various fields wildcarded.
306
307        The bits of p control the wildcards.  A set bit is a wildcard
308        replacement with the lowest bit being user then project then testbed.
309        """
310        if p & 1: user = ["<any>"]
311        else: user = a[2]
312        if p & 2: proj = "<any>"
313        else: proj = a[1]
314        if p & 4: tb = "<any>"
315        else: tb = a[0]
316
317        return (tb, proj, user)
318
319
320    def find_access(self, search):
321        """
322        Search the access DB for a match on this tuple.  Return the matching
323        user (protoGENI cert).
324       
325        NB, if the initial tuple fails to match we start inserting wildcards in
326        an order determined by self.project_priority.  Try the list of users in
327        order (when wildcarded, there's only one user in the list).
328        """
329        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
330        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
331
332        for p in perm: 
333            s = self.permute_wildcards(search, p)
334            # s[2] is None on an anonymous, unwildcarded request
335            if s[2] != None:
336                for u in s[2]:
337                    if self.access.has_key((s[0], s[1], u)):
338                        return self.access[(s[0], s[1], u)]
339            else:
340                if self.access.has_key(s):
341                    return self.access[s]
342        return None
343
344    def lookup_access(self, req, fid):
345        """
346        Determine the allowed access for this request.  Return the access and
347        which fields are dynamic.
348
349        The fedid is needed to construct the request
350        """
351        # Search keys
352        tb = None
353        project = None
354        user = None
355        # Return values
356        rp = access_project(None, ())
357        ru = None
358        user_re = re.compile("user:\s(.*)")
359        project_re = re.compile("project:\s(.*)")
360
361        user = [ user_re.findall(x)[0] for x in req.get('credential', []) \
362                if user_re.match(x)]
363        project = [ project_re.findall(x)[0] \
364                for x in req.get('credential', []) \
365                    if project_re.match(x)]
366
367        if len(project) == 1: project = project[0]
368        elif len(project) == 0: project = None
369        else: 
370            raise service_error(service_error.req, 
371                    "More than one project credential")
372
373
374        user_fedids = [ u for u in user if isinstance(u, fedid)]
375
376        # Determine how the caller is representing itself.  If its fedid shows
377        # up as a project or a singleton user, let that stand.  If neither the
378        # usernames nor the project name is a fedid, the caller is a testbed.
379        if project and isinstance(project, fedid):
380            if project == fid:
381                # The caller is the project (which is already in the tuple
382                # passed in to the authorizer)
383                owners = user_fedids
384                owners.append(project)
385            else:
386                raise service_error(service_error.req,
387                        "Project asserting different fedid")
388        else:
389            if fid not in user_fedids:
390                tb = fid
391                owners = user_fedids
392                owners.append(fid)
393            else:
394                if len(fedids) > 1:
395                    raise service_error(service_error.req,
396                            "User asserting different fedid")
397                else:
398                    # Which is a singleton
399                    owners = user_fedids
400        # Confirm authorization
401
402        for u in user:
403            self.log.debug("[lookup_access] Checking access for %s" % \
404                    ((tb, project, u),))
405            if self.auth.check_attribute((tb, project, u), 'access'):
406                self.log.debug("[lookup_access] Access granted")
407                break
408            else:
409                self.log.debug("[lookup_access] Access Denied")
410        else:
411            raise service_error(service_error.access, "Access denied")
412
413        # This maps a valid user to the ProtoGENI credentials to use
414        found = self.find_access((tb, project, user))
415       
416        if found == None:
417            raise service_error(service_error.access,
418                    "Access denied - cannot map access")
419        return found, owners
420
421    def get_handler(self, path, fid):
422        self.log.info("Get handler %s %s" % (path, fid))
423        if self.auth.check_attribute(fid, path) and self.userconfdir:
424            return ("%s/%s" % (self.userconfdir, path), "application/binary")
425        else:
426            return (None, None)
427
428    def export_userconf(self, project):
429        dev_null = None
430        confid, confcert = generate_fedid("test", dir=self.userconfdir, 
431                log=self.log)
432        conffilename = "%s/%s" % (self.userconfdir, str(confid))
433        cf = None
434        try:
435            cf = open(conffilename, "w")
436            os.chmod(conffilename, stat.S_IRUSR | stat.S_IWUSR)
437        except IOError, e:
438            raise service_error(service_error.internal, 
439                    "Cannot create user configuration data")
440
441        try:
442            dev_null = open("/dev/null", "a")
443        except IOError, e:
444            self.log.error("export_userconf: can't open /dev/null: %s" % e)
445
446        cmd = "%s %s" % (self.userconfcmd, project)
447        conf = subprocess.call(cmd.split(" "),
448                stdout=cf, stderr=dev_null, close_fds=True)
449
450        self.auth.set_attribute(confid, "/%s" % str(confid))
451
452        return confid, confcert
453
454
455    def export_services(self, sreq, project, user):
456        exp = [ ]
457        state = { }
458        # XXX: Filthy shortcut here using http: so urlparse will give the right
459        # answers.
460        for s in sreq:
461            sname = s.get('name', '')
462            svis = s.get('visibility', '')
463            if svis == 'export':
464                if sname in self.exports:
465                    outs = s.copy()
466                    if sname == 'SMB':
467                        outs = s.copy()
468                        outs['server'] = "http://fs:139" 
469                        outs['fedAttr'] = [
470                                { 'attribute': 'SMBSHARE', 'value': 'USERS' },
471                                { 'attribute': 'SMBUSER', 'value': user },
472                                { 'attribute': 'SMBPROJ', 'value': project },
473                            ]
474                    elif sname == 'seer':
475                        outs['server'] = "http://control:16606"
476                    elif sname == 'tmcd':
477                        outs['server'] = "http://boss:7777"
478                    elif sname == 'userconfig':
479                        if self.userconfdir and self.userconfcmd \
480                                and self.userconfurl:
481                            cid, cert = self.export_userconf(project)
482                            outs['server'] = "%s/%s" % \
483                                    (self.userconfurl, str(cid))
484                            outs['fedAttr'] = [
485                                    { 'attribute': 'cert', 'value': cert },
486                                ]
487                            state['userconfig'] = unicode(cid)
488                    exp.append(outs)
489        return (exp, state)
490
491    def build_response(self, alloc_id, ap, services):
492        """
493        Create the SOAP response.
494
495        Build the dictionary description of the response and use
496        fedd_utils.pack_soap to create the soap message.  ap is the allocate
497        project message returned from a remote project allocation (even if that
498        allocation was done locally).
499        """
500        # Because alloc_id is already a fedd_services_types.IDType_Holder,
501        # there's no need to repack it
502        msg = { 
503                'allocID': alloc_id,
504                'fedAttr': [
505                    { 'attribute': 'domain', 'value': self.domain } , 
506                    { 'attribute': 'project', 'value': 
507                        ap['project'].get('name', {}).get('localname', "???") },
508                ]
509            }
510        if len(self.attrs) > 0:
511            msg['fedAttr'].extend(
512                [ { 'attribute': x, 'value' : y } \
513                        for x,y in self.attrs.iteritems()])
514
515        if services:
516            msg['service'] = services
517        return msg
518
519    def RequestAccess(self, req, fid):
520        """
521        Handle the access request.  Proxy if not for us.
522
523        Parse out the fields and make the allocations or rejections if for us,
524        otherwise, assuming we're willing to proxy, proxy the request out.
525        """
526
527        # The dance to get into the request body
528        if req.has_key('RequestAccessRequestBody'):
529            req = req['RequestAccessRequestBody']
530        else:
531            raise service_error(service_error.req, "No request!?")
532
533        if req.has_key('destinationTestbed'):
534            dt = unpack_id(req['destinationTestbed'])
535
536        if dt == None or dt in self.testbed:
537            # Request for this fedd
538            found, owners = self.lookup_access(req, fid)
539            # keep track of what's been added
540            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
541            aid = unicode(allocID)
542
543            self.state_lock.acquire()
544            self.allocation[aid] = { }
545            # The protoGENI certificate
546            self.allocation[aid]['credentials'] = found
547            # The list of owner FIDs
548            self.allocation[aid]['owners'] = owners
549            self.write_state()
550            self.state_lock.release()
551            for o in owners:
552                self.auth.set_attribute(o, allocID)
553            self.auth.set_attribute(allocID, allocID)
554
555            try:
556                f = open("%s/%s.pem" % (self.certdir, aid), "w")
557                print >>f, alloc_cert
558                f.close()
559            except IOError, e:
560                raise service_error(service_error.internal, 
561                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
562            return { 'allocID': { 'fedid': allocID } }
563        else:
564            if self.allow_proxy:
565                resp = self.proxy_RequestAccess.call_service(dt, req,
566                            self.cert_file, self.cert_pwd,
567                            self.trusted_certs)
568                if resp.has_key('RequestAccessResponseBody'):
569                    return resp['RequestAccessResponseBody']
570                else:
571                    return None
572            else:
573                raise service_error(service_error.access,
574                        "Access proxying denied")
575
576
577    def ReleaseAccess(self, req, fid):
578        # The dance to get into the request body
579        if req.has_key('ReleaseAccessRequestBody'):
580            req = req['ReleaseAccessRequestBody']
581        else:
582            raise service_error(service_error.req, "No request!?")
583
584        if req.has_key('destinationTestbed'):
585            dt = unpack_id(req['destinationTestbed'])
586        else:
587            dt = None
588
589        if dt == None or dt in self.testbed:
590            # Local request
591            try:
592                if req['allocID'].has_key('localname'):
593                    auth_attr = aid = req['allocID']['localname']
594                elif req['allocID'].has_key('fedid'):
595                    aid = unicode(req['allocID']['fedid'])
596                    auth_attr = req['allocID']['fedid']
597                else:
598                    raise service_error(service_error.req,
599                            "Only localnames and fedids are understood")
600            except KeyError:
601                raise service_error(service_error.req, "Badly formed request")
602
603            self.log.debug("[access] deallocation requested for %s", aid)
604            if not self.auth.check_attribute(fid, auth_attr):
605                self.log.debug("[access] deallocation denied for %s", aid)
606                raise service_error(service_error.access, "Access Denied")
607
608            self.state_lock.acquire()
609            if self.allocation.has_key(aid):
610                self.log.debug("Found allocation for %s" %aid)
611                del self.allocation[aid]
612                self.write_state()
613                self.state_lock.release()
614                # And remove the access cert
615                cf = "%s/%s.pem" % (self.certdir, aid)
616                self.log.debug("Removing %s" % cf)
617                os.remove(cf)
618                return { 'allocID': req['allocID'] } 
619            else:
620                self.state_lock.release()
621                raise service_error(service_error.req, "No such allocation")
622
623        else:
624            if self.allow_proxy:
625                resp = self.proxy_ReleaseAccess.call_service(dt, req,
626                            self.cert_file, self.cert_pwd,
627                            self.trusted_certs)
628                if resp.has_key('ReleaseAccessResponseBody'):
629                    return resp['ReleaseAccessResponseBody']
630                else:
631                    return None
632            else:
633                raise service_error(service_error.access,
634                        "Access proxying denied")
635
636    def import_store_info(self, cf, connInfo):
637        """
638        Pull any import parameters in connInfo in.  We translate them either
639        into known member names or fedAddrs.
640        """
641
642        for c in connInfo:
643            for p in [ p for p in c.get('parameter', []) \
644                    if p.get('type', '') == 'input']:
645                name = p.get('name', None)
646                key = p.get('key', None)
647                store = p.get('store', None)
648
649                if name and key and store :
650                    req = { 'name': key, 'wait': True }
651                    r = self.call_GetValue(store, req, cf)
652                    r = r.get('GetValueResponseBody', None)
653                    if r :
654                        if r.get('name', '') == key:
655                            v = r.get('value', None)
656                            if v is not None:
657                                if name == 'peer':
658                                    c['peer'] = v
659                                else:
660                                    if c.has_key('fedAttr'):
661                                        c['fedAttr'].append({
662                                            'attribute': name, 'value': v})
663                                    else:
664                                        c['fedAttr']= [{
665                                            'attribute': name, 'value': v}]
666                            else:
667                                raise service_error(service_error.internal, 
668                                        'None value exported for %s'  % key)
669                        else:
670                            raise service_error(service_error.internal, 
671                                    'Different name returned for %s: %s' \
672                                            % (key, r.get('name','')))
673                    else:
674                        raise service_error(service_error.internal, 
675                            'Badly formatted response: no GetValueResponseBody')
676                else:
677                    raise service_error(service_error.internal, 
678                        'Bad Services missing info for import %s' % c)
679
680    def generate_portal_configs(self, topo, pubkey_base, secretkey_base, 
681            tmpdir, master, leid, connInfo, services):
682
683        def conninfo_to_dict(key, info):
684            """
685            Make a cpoy of the connection information about key, and flatten it
686            into a single dict by parsing out any feddAttrs.
687            """
688
689            rv = None
690            for i in info:
691                if key == i.get('portal', "") or \
692                        key in [e.get('element', "") \
693                        for e in i.get('member', [])]:
694                    rv = i.copy()
695                    break
696
697            else:
698                return rv
699
700            if 'fedAttr' in rv:
701                for a in rv['fedAttr']:
702                    attr = a.get('attribute', "")
703                    val = a.get('value', "")
704                    if attr and attr not in rv:
705                        rv[attr] = val
706                del rv['fedAttr']
707            return rv
708
709        # XXX: un hardcode this
710        def client_null(f, s):
711            print >>f, "Service: %s" % s['name']
712
713        def client_smb(f, s):
714            print >>f, "Service: %s" % s['name']
715            smbshare = None
716            smbuser = None
717            smbproj = None
718            for a in s.get('fedAttr', []):
719                if a.get('attribute', '') == 'SMBSHARE':
720                    smbshare = a.get('value', None)
721                elif a.get('attribute', '') == 'SMBUSER':
722                    smbuser = a.get('value', None)
723                elif a.get('attribute', '') == 'SMBPROJ':
724                    smbproj = a.get('value', None)
725
726            if all((smbshare, smbuser, smbproj)):
727                print >>f, "SMBshare: %s" % smbshare
728                print >>f, "ProjectUser: %s" % smbuser
729                print >>f, "ProjectName: %s" % smbproj
730
731        client_service_out = {
732                'SMB': client_smb,
733                'tmcd': client_null,
734                'seer': client_null,
735                'userconfig': client_null,
736            }
737        # XXX: end un hardcode this
738
739
740        seer_out = False
741        client_out = False
742        for e in [ e for e in topo.elements \
743                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
744            myname = e.name[0]
745            type = e.get_attribute('portal_type')
746
747            info = conninfo_to_dict(myname, connInfo)
748
749            if not info:
750                raise service_error(service_error.req,
751                        "No connectivity info for %s" % myname)
752
753            peer = info.get('peer', "")
754            ldomain = self.domain;
755
756            mexp = info.get('masterexperiment',"")
757            mproj, meid = mexp.split("/", 1)
758            mdomain = info.get('masterdomain',"")
759            muser = info.get('masteruser','root')
760            smbshare = info.get('smbshare', 'USERS')
761            ssh_port = info.get('ssh_port', '22')
762
763            active = info.get('active', 'False')
764
765            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
766            tunnelconfig = self.attrs.has_key('TunnelCfg')
767            try:
768                f = open(cfn, "w")
769                if active == 'True':
770                    print >>f, "active: True"
771                    print >>f, "ssh_port: %s" % ssh_port
772                    if type in ('control', 'both'):
773                        for s in [s for s in services \
774                                if s.get('name', "") in self.imports]:
775                            p = urlparse(s.get('server', 'http://localhost'))
776                            print >>f, 'port: remote:%s:%s:%s' % \
777                                    (p.port, p.hostname, p.port)
778
779                if tunnelconfig:
780                    print >>f, "tunnelip: %s" % tunnelconfig
781                # XXX: send this an fedattr
782                #print >>f, "seercontrol: control.%s.%s%s" % \
783                        #(meid.lower(), mproj.lower(), mdomain)
784                print >>f, "peer: %s" % peer.lower()
785                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
786                        pubkey_base
787                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
788                        secretkey_base
789                f.close()
790            except IOError, e:
791                raise service_error(service_error.internal,
792                        "Can't write protal config %s: %s" % (cfn, e))
793           
794            # XXX: This little seer config file needs to go away.
795            if not seer_out:
796                try:
797                    seerfn = "%s/seer.conf" % tmpdir
798                    f = open(seerfn, "w")
799                    if not master:
800                        print >>f, "ControlNode: control.%s.%s%s" % \
801                            (meid.lower(), mproj.lower(), mdomain)
802                    print >>f, "ExperimentID: %s" % mexp
803                    f.close()
804                except IOError, e:
805                    raise service_error(service_error.internal, 
806                            "Can't write seer.conf: %s" %e)
807                seer_out = True
808
809            if not client_out and type in ('control', 'both'):
810                try:
811                    f = open("%s/client.conf" % tmpdir, "w")
812                    print >>f, "ControlGateway: %s%s" % \
813                        (myname.lower(), ldomain.lower())
814                    for s in services:
815                        if s.get('name',"") in self.imports and \
816                                s.get('visibility','') == 'import':
817                            client_service_out[s['name']](f, s)
818                    # Does seer need this?
819                    # print >>f, "ExperimentID: %s/%s" % (mproj, meid)
820                    f.close()
821                except IOError, e:
822                    raise service_error(service_error.internal,
823                            "Cannot write client.conf: %s" %s)
824                client_out = True
825
826
827    def generate_rspec(self, topo, softdir, master, connInfo):
828        t = topo.clone()
829
830        starts = { }
831
832        # The startcmds for master and slave testbeds
833        if master: 
834            gate_cmd = self.attrs.get('MasterConnectorStartCmd', '/bin/true')
835            node_cmd = self.attrs.get('MasterNodeStartCmd', 'bin/true')
836        else: 
837            gate_cmd = self.attrs.get('SlaveConnectorStartCmd', '/bin/true')
838            node_cmd = self.attrs.get('SlaveNodeStartCmd', 'bin/true')
839
840        # Weed out the things we aren't going to instantiate: Segments, portal
841        # substrates, and portal interfaces.  (The copy in the for loop allows
842        # us to delete from e.elements in side the for loop).  While we're
843        # touching all the elements, we also adjust paths from the original
844        # testbed to local testbed paths and put the federation commands and
845        # startcommands into a dict so we can start them manually later.
846        # ProtoGENI requires setup before the federation commands run, so we
847        # run them by hand after we've seeded configurations.
848        for e in [e for e in t.elements]:
849            if isinstance(e, topdl.Segment):
850                t.elements.remove(e)
851            # Fix software paths
852            for s in getattr(e, 'software', []):
853                s.location = re.sub("^.*/", softdir, s.location)
854            if isinstance(e, topdl.Computer):
855                if e.get_attribute('portal') and gate_cmd:
856                    # Portals never have a user-specified start command
857                    starts[e.name[0]] = gate_cmd
858                elif node_cmd:
859                    if e.get_attribute('startup'):
860                        starts[e.name[0]] = "%s \\$USER '%s'" % \
861                                (node_cmd, e.get_attribute('startup'))
862                        e.remove_attribute('startup')
863                    else:
864                        starts[e.name[0]] = node_cmd
865
866                # Remove portal interfaces
867                e.interface = [i for i in e.interface \
868                        if not i.get_attribute('portal')]
869
870        t.substrates = [ s.clone() for s in t.substrates ]
871        t.incorporate_elements()
872
873        # Customize the ns2 output for local portal commands and images
874        filters = []
875
876        # NB: these are extra commands issued for the node, not the startcmds
877        if master: cmd = self.attrs.get('MasterConnectorCmd', '')
878        else: cmd = self.attrs.get('SlaveConnectorCmd', '')
879
880        if cmd:
881            filters.append(topdl.generate_portal_command_filter(cmd))
882
883        # Convert to rspec and return it
884        exp_rspec = topdl.topology_to_rspec(t, filters)
885
886        return exp_rspec
887
888    def StartSegment(self, req, fid):
889
890        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
891
892        err = None  # Any service_error generated after tmpdir is created
893        rv = None   # Return value from segment creation
894
895        try:
896            req = req['StartSegmentRequestBody']
897        except KeyError:
898            raise service_error(service_error.req, "Badly formed request")
899
900        connInfo = req.get('connection', [])
901        services = req.get('service', [])
902        auth_attr = req['allocID']['fedid']
903        aid = "%s" % auth_attr
904        attrs = req.get('fedAttr', [])
905        if not self.auth.check_attribute(fid, auth_attr):
906            raise service_error(service_error.access, "Access denied")
907
908
909        if req.has_key('segmentdescription') and \
910                req['segmentdescription'].has_key('topdldescription'):
911            topo = \
912                topdl.Topology(**req['segmentdescription']['topdldescription'])
913        else:
914            raise service_error(service_error.req, 
915                    "Request missing segmentdescription'")
916
917        master = req.get('master', False)
918
919        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
920        try:
921            tmpdir = tempfile.mkdtemp(prefix="access-")
922            softdir = "%s/software" % tmpdir
923        except IOError:
924            raise service_error(service_error.internal, "Cannot create tmp dir")
925
926        # Try block alllows us to clean up temporary files.
927        try:
928            sw = set()
929            for e in topo.elements:
930                for s in getattr(e, 'software', []):
931                    sw.add(s.location)
932            os.mkdir(softdir)
933            for s in sw:
934                self.log.debug("Retrieving %s" % s)
935                try:
936                    get_url(s, certfile, softdir)
937                except:
938                    t, v, st = sys.exc_info()
939                    raise service_error(service_error.internal,
940                            "Error retrieving %s: %s" % (s, v))
941
942            # Copy local portal node software to the tempdir
943            for s in (self.portal_software, self.federation_software):
944                for l, f in s:
945                    base = os.path.basename(f)
946                    copy_file(f, "%s/%s" % (softdir, base))
947
948            # Ick.  Put this python rpm in a place that it will get moved into
949            # the staging area.  It's a hack to install a modern (in a Roman
950            # sense of modern) python on ProtoGENI
951            python_rpm ="python2.4-2.4-1pydotorg.i586.rpm"
952            if os.access("./%s" % python_rpm, os.R_OK):
953                copy_file("./%s" % python_rpm, "%s/%s" % (softdir, python_rpm))
954
955            for a in attrs:
956                if a['attribute'] in configs:
957                    try:
958                        self.log.debug("Retrieving %s" % a['value'])
959                        get_url(a['value'], certfile, tmpdir)
960                    except:
961                        t, v, st = sys.exc_info()
962                        raise service_error(service_error.internal,
963                                "Error retrieving %s: %s" % (s, v))
964                if a['attribute'] == 'ssh_pubkey':
965                    pubkey_base = a['value'].rpartition('/')[2]
966                if a['attribute'] == 'ssh_secretkey':
967                    secretkey_base = a['value'].rpartition('/')[2]
968                if a['attribute'] == 'experiment_name':
969                    ename = a['value']
970
971            # If the userconf service was imported, collect the configuration
972            # data.
973            for s in services:
974                if s.get("name", "") == 'userconfig' \
975                        and s.get('visibility',"") == 'import':
976
977                    # Collect ther server and certificate info.
978                    u = s.get('server', None)
979                    for a in s.get('fedAttr', []):
980                        if a.get('attribute',"") == 'cert':
981                            cert = a.get('value', None)
982                            break
983                    else:
984                        cert = None
985
986                    if cert:
987                        # Make a temporary certificate file for get_url.  The
988                        # finally clause removes it whether something goes
989                        # wrong (including an exception from get_url) or not.
990                        try:
991                            tfos, tn = tempfile.mkstemp(suffix=".pem")
992                            tf = os.fdopen(tfos, 'w')
993                            print >>tf, cert
994                            tf.close()
995                            get_url(u, tn, tmpdir, "userconf")
996                        except IOError, e:
997                            raise service_error(service.error.internal, 
998                                    "Cannot create temp file for " + 
999                                    "userconfig certificates: %s e")
1000                        except:
1001                            t, v, st = sys.exc_info()
1002                            raise service_error(service_error.internal,
1003                                    "Error retrieving %s: %s" % (u, v))
1004                        finally:
1005                            if tn: os.remove(tn)
1006                    else:
1007                        raise service_error(service_error.req,
1008                                "No certificate for retreiving userconfig")
1009                    break
1010
1011            self.state_lock.acquire()
1012            if self.allocation.has_key(aid):
1013                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1014                self.allocation[aid]['experiment'] = ename
1015                self.allocation[aid]['log'] = [ ]
1016                # Create a logger that logs to the experiment's state object as
1017                # well as to the main log file.
1018                alloc_log = logging.getLogger('fedd.access.%s' % ename)
1019                h = logging.StreamHandler(
1020                        list_log.list_log(self.allocation[aid]['log']))
1021                # XXX: there should be a global one of these rather than
1022                # repeating the code.
1023                h.setFormatter(logging.Formatter(
1024                    "%(asctime)s %(name)s %(message)s",
1025                            '%d %b %y %H:%M:%S'))
1026                alloc_log.addHandler(h)
1027                self.write_state()
1028            else:
1029                self.log.error("No allocation for %s!?" % aid)
1030            self.state_lock.release()
1031
1032            # XXX: we really need to put the import and connection info
1033            # generation off longer.
1034            self.import_store_info(certfile, connInfo)
1035            #self.generate_portal_configs(topo, pubkey_base,
1036                    #secretkey_base, tmpdir, master, ename, connInfo,
1037                    #services)
1038            rspec = self.generate_rspec(topo, "%s/%s/" \
1039                    % (self.staging_dir, ename), master, connInfo)
1040
1041            starter = self.start_segment(keyfile=ssh_key,
1042                    debug=self.create_debug, log=alloc_log,
1043                    ch_url = self.ch_url, sa_url=self.sa_url,
1044                    cm_url=self.cm_url)
1045            rv = starter(self, aid, user, rspec, pubkey_base, secretkey_base,
1046                    master, ename,
1047                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1048                    certfile, topo, connInfo, services)
1049        except service_error, e:
1050            err = e
1051        except e:
1052            err = service_error(service_error.internal, str(e))
1053
1054        # Walk up tmpdir, deleting as we go
1055        if self.cleanup:
1056            self.log.debug("[StartSegment]: removing %s" % tmpdir)
1057            for path, dirs, files in os.walk(tmpdir, topdown=False):
1058                for f in files:
1059                    os.remove(os.path.join(path, f))
1060                for d in dirs:
1061                    os.rmdir(os.path.join(path, d))
1062            os.rmdir(tmpdir)
1063        else:
1064            self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1065
1066        if rv:
1067            # Grab the log (this is some anal locking, but better safe than
1068            # sorry)
1069            self.state_lock.acquire()
1070            logv = "".join(self.allocation[aid]['log'])
1071            self.state_lock.release()
1072
1073            return { 'allocID': req['allocID'], 'allocationLog': logv }
1074        elif err:
1075            raise service_error(service_error.federant,
1076                    "Swapin failed: %s" % err)
1077        else:
1078            raise service_error(service_error.federant, "Swapin failed")
1079
1080    def TerminateSegment(self, req, fid):
1081        try:
1082            req = req['TerminateSegmentRequestBody']
1083        except KeyError:
1084            raise service_error(service_error.req, "Badly formed request")
1085
1086        auth_attr = req['allocID']['fedid']
1087        aid = "%s" % auth_attr
1088        attrs = req.get('fedAttr', [])
1089        if not self.auth.check_attribute(fid, auth_attr):
1090            raise service_error(service_error.access, "Access denied")
1091
1092        self.state_lock.acquire()
1093        if self.allocation.has_key(aid):
1094            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1095            slice_cred = self.allocation[aid].get('slice_credential', None)
1096            ename = self.allocation[aid].get('experiment', None)
1097        else:
1098            cf, user, ssh_key, cpw = (None, None, None, None)
1099            slice_cred = None
1100            ename = None
1101        self.state_lock.release()
1102
1103        if ename:
1104            staging = "%s/%s" % ( self.staging_dir, ename)
1105        else:
1106            self.log.warn("Can't find experiment name for %s" % aid)
1107            staging = None
1108
1109        stopper = self.stop_segment(keyfile=ssh_key, debug=self.create_debug,
1110                ch_url = self.ch_url, sa_url=self.sa_url, cm_url=self.cm_url)
1111        stopper(self, user, staging, slice_cred, cf, cpw)
1112        return { 'allocID': req['allocID'] }
1113
1114    def RenewSlices(self):
1115        self.log.info("Scanning for slices to renew")
1116        self.state_lock.acquire()
1117        aids = self.allocation.keys()
1118        self.state_lock.release()
1119
1120        for aid in aids:
1121            self.state_lock.acquire()
1122            if self.allocation.has_key(aid):
1123                name = self.allocation[aid].get('slice_name', None)
1124                scred = self.allocation[aid].get('slice_credential', None)
1125                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1126            else:
1127                name = None
1128                scred = None
1129            self.state_lock.release()
1130
1131            # There's a ProtoGENI slice associated with the segment; renew it.
1132            if name and scred:
1133                renewer = self.renew_segment(log=self.log, 
1134                        debug=self.create_debug, keyfile=ssh_key,
1135                        cm_url = self.cm_url, sa_url = self.sa_url,
1136                        ch_url = self.ch_url)
1137                new_scred = renewer(name, scred, self.renewal_interval, cf, cpw)
1138                if new_scred:
1139                    self.log.info("Slice %s renewed until %s GMT" % \
1140                            (name, time.asctime(time.gmtime(\
1141                                time.time()+self.renewal_interval))))
1142                    self.state_lock.acquire()
1143                    if self.allocation.has_key(aid):
1144                        self.allocation[aid]['slice_credential'] = new_scred
1145                    self.state_lock.release()
1146                else:
1147                    self.log.info("Failed to renew slice %s " % name)
1148
1149        # Let's do this all again soon.  (4 tries before the slices time out)   
1150        t = Timer(self.renewal_interval/4, self.RenewSlices)
1151        t.start()
Note: See TracBrowser for help on using the repository browser.