source: fedd/federation/protogeni_access.py @ cd06678

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since cd06678 was cd06678, checked in by Ted Faber <faber@…>, 15 years ago

Handle replayed StartSegment? calls. SSL sometimes kills the response and this is needed to get the response to the controller w/o redoing the allocation

  • Property mode set to 100644
File size: 39.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12
13from threading import *
14from M2Crypto.SSL import SSLError
15
16from util import *
17from access_project import access_project
18from fedid import fedid, generate_fedid
19from authorizer import authorizer
20from service_error import service_error
21from remote_service import xmlrpc_handler, soap_handler, service_caller
22
23import httplib
24import tempfile
25from urlparse import urlparse
26
27import topdl
28import list_log
29import proxy_protogeni_segment
30
31
32# Make log messages disappear if noone configures a fedd logger
33class nullHandler(logging.Handler):
34    def emit(self, record): pass
35
36fl = logging.getLogger("fedd.access")
37fl.addHandler(nullHandler())
38
39class access:
40    """
41    The implementation of access control based on mapping users to projects.
42
43    Users can be mapped to existing projects or have projects created
44    dynamically.  This implements both direct requests and proxies.
45    """
46
47    class parse_error(RuntimeError): pass
48
49
50    proxy_RequestAccess= service_caller('RequestAccess')
51    proxy_ReleaseAccess= service_caller('ReleaseAccess')
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        def software_list(v):
59            l = [ ]
60            if v:
61                ps = v.split(" ")
62                while len(ps):
63                    loc, file = ps[0:2]
64                    del ps[0:2]
65                    l.append((loc, file))
66            return l
67
68        # Make sure that the configuration is in place
69        if not config: 
70            raise RunTimeError("No config to fedd.access")
71
72        self.project_priority = config.getboolean("access", "project_priority")
73        self.allow_proxy = config.getboolean("access", "allow_proxy")
74
75        self.domain = config.get("access", "domain")
76        self.certdir = config.get("access","certdir")
77        self.userconfdir = config.get("access","userconfdir")
78        self.userconfcmd = config.get("access","userconfcmd")
79        self.userconfurl = config.get("access","userconfurl")
80        self.federation_software = config.get("access", "federation_software")
81        self.portal_software = config.get("access", "portal_software")
82        self.ssh_port = config.get("access","ssh_port") or "22"
83        self.sshd = config.get("access","sshd")
84        self.sshd_config = config.get("access", "sshd_config")
85        self.create_debug = config.getboolean("access", "create_debug")
86        self.cleanup = not config.getboolean("access", "leave_tmpfiles")
87        self.access_type = config.get("access", "type")
88        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
89        self.staging_host = config.get("access", "staging_host") \
90                or "ops.emulab.net"
91   
92        self.federation_software = software_list(self.federation_software)
93        self.portal_software = software_list(self.portal_software)
94
95        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
96        self.renewal_interval = int(self.renewal_interval) * 60
97
98        self.ch_url = config.get("access", "ch_url")
99        self.sa_url = config.get("access", "sa_url")
100        self.cm_url = config.get("access", "cm_url")
101
102        self.attrs = { }
103        self.access = { }
104        self.restricted = [ ]
105        self.projects = { }
106        self.keys = { }
107        self.types = { }
108        self.allocation = { }
109        self.state = { 
110            'projects': self.projects,
111            'allocation' : self.allocation,
112            'keys' : self.keys,
113            'types': self.types
114        }
115        self.log = logging.getLogger("fedd.access")
116        set_log_level(config, "access", self.log)
117        self.state_lock = Lock()
118        # XXX: Configurable
119        self.exports = set(('SMB', 'seer', 'tmcd', 'userconfig'))
120        self.imports = set(('SMB', 'seer', 'userconfig'))
121
122        if auth: self.auth = auth
123        else:
124            self.log.error(\
125                    "[access]: No authorizer initialized, creating local one.")
126            auth = authorizer()
127
128        tb = config.get('access', 'testbed')
129        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
130        else: self.testbed = [ ]
131
132        if config.has_option("access", "accessdb"):
133            self.read_access(config.get("access", "accessdb"))
134
135        self.state_filename = config.get("access", "access_state")
136        print "Calling read_state %s" % self.state_filename
137        self.read_state()
138
139        # Keep cert_file and cert_pwd coming from the same place
140        self.cert_file = config.get("access", "cert_file")
141        if self.cert_file:
142            self.sert_pwd = config.get("access", "cert_pw")
143        else:
144            self.cert_file = config.get("globals", "cert_file")
145            self.sert_pwd = config.get("globals", "cert_pw")
146
147        self.trusted_certs = config.get("access", "trusted_certs") or \
148                config.get("globals", "trusted_certs")
149
150        self.start_segment = proxy_protogeni_segment.start_segment
151        self.stop_segment = proxy_protogeni_segment.stop_segment
152        self.renew_segment = proxy_protogeni_segment.renew_segment
153
154        self.call_SetValue = service_caller('SetValue')
155        self.call_GetValue = service_caller('GetValue')
156
157        self.RenewSlices()
158
159        self.soap_services = {\
160            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
161            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
162            'StartSegment': soap_handler("StartSegment", self.StartSegment),
163            'TerminateSegment': soap_handler("TerminateSegment", 
164                self.TerminateSegment),
165            }
166        self.xmlrpc_services =  {\
167            'RequestAccess': xmlrpc_handler('RequestAccess',
168                self.RequestAccess),
169            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
170                self.ReleaseAccess),
171            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
172            'TerminateSegment': xmlrpc_handler('TerminateSegment',
173                self.TerminateSegment),
174            }
175
176    def read_access(self, config):
177        """
178        Read a configuration file and set internal parameters.
179
180        There are access lines of the
181        form (tb, proj, user) -> user that map the first tuple of
182        names to the user for for access purposes.  Names in the key (left side)
183        can include "<NONE> or <ANY>" to act as wildcards or to require the
184        fields to be empty.  Similarly aproj or auser can be <SAME> or
185        <DYNAMIC> indicating that either the matching key is to be used or a
186        dynamic user or project will be created.  These names can also be
187        federated IDs (fedid's) if prefixed with fedid:.  The user is the
188        ProtoGENI identity certificate.
189        Testbed attributes outside the forms above can be given using the
190        format attribute: name value: value.  The name is a single word and the
191        value continues to the end of the line.  Empty lines and lines startin
192        with a # are ignored.
193
194        Parsing errors result in a self.parse_error exception being raised.
195        """
196        lineno=0
197        name_expr = "["+string.ascii_letters + string.digits + "\/\.\-_]+"
198        fedid_expr = "fedid:[" + string.hexdigits + "]+"
199        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
200
201        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
202                re.IGNORECASE)
203        access_str = '\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+ \
204                key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*,\s*('\
205                        + name_expr + ')\s*,\s*('+name_expr+')\s*,?\s*(' + \
206                        name_expr+ ')?\)'
207        access_re = re.compile(access_str, re.IGNORECASE)
208
209
210        def parse_name(n):
211            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
212            else: return n
213       
214        def auth_name(n):
215            if isinstance(n, basestring):
216                if n =='<any>' or n =='<none>': return None
217                else: return unicode(n)
218            else:
219                return n
220
221        f = open(config, "r");
222        for line in f:
223            lineno += 1
224            line = line.strip();
225            if len(line) == 0 or line.startswith('#'):
226                continue
227
228            # Extended (attribute: x value: y) attribute line
229            m = attr_re.match(line)
230            if m != None:
231                attr, val = m.group(1,2)
232                self.attrs[attr] = val
233                continue
234
235            # Access line (t, p, u) -> (a, pw) line
236            m = access_re.match(line)
237            if m != None:
238                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
239                auth_key = tuple([ auth_name(x) for x in access_key])
240                cert = auth_name(parse_name(m.group(4)))
241                user_name = auth_name(parse_name(m.group(5)))
242                ssh_key = unicode(m.group(6))
243                if m.group(6): pw = unicode(m.group(7))
244                else: pw = None
245
246                self.access[access_key] = (cert, user_name, ssh_key, pw)
247                self.auth.set_attribute(auth_key, "access")
248                continue
249
250            # Nothing matched to here: unknown line - raise exception
251            f.close()
252            raise self.parse_error("Unknown statement at line %d of %s" % \
253                    (lineno, config))
254        f.close()
255
256    def write_state(self):
257        if self.state_filename:
258            try:
259                f = open(self.state_filename, 'w')
260                pickle.dump(self.state, f)
261            except IOError, e:
262                self.log.error("Can't write file %s: %s" % \
263                        (self.state_filename, e))
264            except pickle.PicklingError, e:
265                self.log.error("Pickling problem: %s" % e)
266            except TypeError, e:
267                self.log.error("Pickling problem (TypeError): %s" % e)
268
269
270    def read_state(self):
271        """
272        Read a new copy of access state.  Old state is overwritten.
273
274        State format is a simple pickling of the state dictionary.
275        """
276        if self.state_filename:
277            try:
278                f = open(self.state_filename, "r")
279                self.state = pickle.load(f)
280                self.log.debug("[read_state]: Read state from %s" % \
281                        self.state_filename)
282            except IOError, e:
283                self.log.warning(("[read_state]: No saved state: " +\
284                        "Can't open %s: %s") % (self.state_filename, e))
285            except EOFError, e:
286                self.log.warning(("[read_state]: " +\
287                        "Empty or damaged state file: %s:") % \
288                        self.state_filename)
289            except pickle.UnpicklingError, e:
290                self.log.warning(("[read_state]: No saved state: " + \
291                        "Unpickling failed: %s") % e)
292
293            # Add the ownership attributes to the authorizer.  Note that the
294            # indices of the allocation dict are strings, but the attributes are
295            # fedids, so there is a conversion.
296            for k in self.state.get('allocation', {}).keys():
297                for o in self.state['allocation'][k].get('owners', []):
298                    self.auth.set_attribute(o, fedid(hexstr=k))
299                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
300
301            if self.allocation != self.state['allocation']:
302                self.allocation = self.state['allocation']
303
304    def permute_wildcards(self, a, p):
305        """Return a copy of a with various fields wildcarded.
306
307        The bits of p control the wildcards.  A set bit is a wildcard
308        replacement with the lowest bit being user then project then testbed.
309        """
310        if p & 1: user = ["<any>"]
311        else: user = a[2]
312        if p & 2: proj = "<any>"
313        else: proj = a[1]
314        if p & 4: tb = "<any>"
315        else: tb = a[0]
316
317        return (tb, proj, user)
318
319
320    def find_access(self, search):
321        """
322        Search the access DB for a match on this tuple.  Return the matching
323        user (protoGENI cert).
324       
325        NB, if the initial tuple fails to match we start inserting wildcards in
326        an order determined by self.project_priority.  Try the list of users in
327        order (when wildcarded, there's only one user in the list).
328        """
329        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
330        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
331
332        for p in perm: 
333            s = self.permute_wildcards(search, p)
334            # s[2] is None on an anonymous, unwildcarded request
335            if s[2] != None:
336                for u in s[2]:
337                    if self.access.has_key((s[0], s[1], u)):
338                        return self.access[(s[0], s[1], u)]
339            else:
340                if self.access.has_key(s):
341                    return self.access[s]
342        return None
343
344    def lookup_access(self, req, fid):
345        """
346        Determine the allowed access for this request.  Return the access and
347        which fields are dynamic.
348
349        The fedid is needed to construct the request
350        """
351        # Search keys
352        tb = None
353        project = None
354        user = None
355        # Return values
356        rp = access_project(None, ())
357        ru = None
358        user_re = re.compile("user:\s(.*)")
359        project_re = re.compile("project:\s(.*)")
360
361        user = [ user_re.findall(x)[0] for x in req.get('credential', []) \
362                if user_re.match(x)]
363        project = [ project_re.findall(x)[0] \
364                for x in req.get('credential', []) \
365                    if project_re.match(x)]
366
367        if len(project) == 1: project = project[0]
368        elif len(project) == 0: project = None
369        else: 
370            raise service_error(service_error.req, 
371                    "More than one project credential")
372
373
374        user_fedids = [ u for u in user if isinstance(u, fedid)]
375
376        # Determine how the caller is representing itself.  If its fedid shows
377        # up as a project or a singleton user, let that stand.  If neither the
378        # usernames nor the project name is a fedid, the caller is a testbed.
379        if project and isinstance(project, fedid):
380            if project == fid:
381                # The caller is the project (which is already in the tuple
382                # passed in to the authorizer)
383                owners = user_fedids
384                owners.append(project)
385            else:
386                raise service_error(service_error.req,
387                        "Project asserting different fedid")
388        else:
389            if fid not in user_fedids:
390                tb = fid
391                owners = user_fedids
392                owners.append(fid)
393            else:
394                if len(fedids) > 1:
395                    raise service_error(service_error.req,
396                            "User asserting different fedid")
397                else:
398                    # Which is a singleton
399                    owners = user_fedids
400        # Confirm authorization
401
402        for u in user:
403            self.log.debug("[lookup_access] Checking access for %s" % \
404                    ((tb, project, u),))
405            if self.auth.check_attribute((tb, project, u), 'access'):
406                self.log.debug("[lookup_access] Access granted")
407                break
408            else:
409                self.log.debug("[lookup_access] Access Denied")
410        else:
411            raise service_error(service_error.access, "Access denied")
412
413        # This maps a valid user to the ProtoGENI credentials to use
414        found = self.find_access((tb, project, user))
415       
416        if found == None:
417            raise service_error(service_error.access,
418                    "Access denied - cannot map access")
419        return found, owners
420
421    def get_handler(self, path, fid):
422        self.log.info("Get handler %s %s" % (path, fid))
423        if self.auth.check_attribute(fid, path) and self.userconfdir:
424            return ("%s/%s" % (self.userconfdir, path), "application/binary")
425        else:
426            return (None, None)
427
428    def export_userconf(self, project):
429        dev_null = None
430        confid, confcert = generate_fedid("test", dir=self.userconfdir, 
431                log=self.log)
432        conffilename = "%s/%s" % (self.userconfdir, str(confid))
433        cf = None
434        try:
435            cf = open(conffilename, "w")
436            os.chmod(conffilename, stat.S_IRUSR | stat.S_IWUSR)
437        except IOError, e:
438            raise service_error(service_error.internal, 
439                    "Cannot create user configuration data")
440
441        try:
442            dev_null = open("/dev/null", "a")
443        except IOError, e:
444            self.log.error("export_userconf: can't open /dev/null: %s" % e)
445
446        cmd = "%s %s" % (self.userconfcmd, project)
447        conf = subprocess.call(cmd.split(" "),
448                stdout=cf, stderr=dev_null, close_fds=True)
449
450        self.auth.set_attribute(confid, "/%s" % str(confid))
451
452        return confid, confcert
453
454
455    def export_services(self, sreq, project, user):
456        exp = [ ]
457        state = { }
458        # XXX: Filthy shortcut here using http: so urlparse will give the right
459        # answers.
460        for s in sreq:
461            sname = s.get('name', '')
462            svis = s.get('visibility', '')
463            if svis == 'export':
464                if sname in self.exports:
465                    outs = s.copy()
466                    if sname == 'SMB':
467                        outs = s.copy()
468                        outs['server'] = "http://fs:139" 
469                        outs['fedAttr'] = [
470                                { 'attribute': 'SMBSHARE', 'value': 'USERS' },
471                                { 'attribute': 'SMBUSER', 'value': user },
472                                { 'attribute': 'SMBPROJ', 'value': project },
473                            ]
474                    elif sname == 'seer':
475                        outs['server'] = "http://control:16606"
476                    elif sname == 'tmcd':
477                        outs['server'] = "http://boss:7777"
478                    elif sname == 'userconfig':
479                        if self.userconfdir and self.userconfcmd \
480                                and self.userconfurl:
481                            cid, cert = self.export_userconf(project)
482                            outs['server'] = "%s/%s" % \
483                                    (self.userconfurl, str(cid))
484                            outs['fedAttr'] = [
485                                    { 'attribute': 'cert', 'value': cert },
486                                ]
487                            state['userconfig'] = unicode(cid)
488                    exp.append(outs)
489        return (exp, state)
490
491    def build_response(self, alloc_id, ap, services):
492        """
493        Create the SOAP response.
494
495        Build the dictionary description of the response and use
496        fedd_utils.pack_soap to create the soap message.  ap is the allocate
497        project message returned from a remote project allocation (even if that
498        allocation was done locally).
499        """
500        # Because alloc_id is already a fedd_services_types.IDType_Holder,
501        # there's no need to repack it
502        msg = { 
503                'allocID': alloc_id,
504                'fedAttr': [
505                    { 'attribute': 'domain', 'value': self.domain } , 
506                    { 'attribute': 'project', 'value': 
507                        ap['project'].get('name', {}).get('localname', "???") },
508                ]
509            }
510        if len(self.attrs) > 0:
511            msg['fedAttr'].extend(
512                [ { 'attribute': x, 'value' : y } \
513                        for x,y in self.attrs.iteritems()])
514
515        if services:
516            msg['service'] = services
517        return msg
518
519    def RequestAccess(self, req, fid):
520        """
521        Handle the access request.  Proxy if not for us.
522
523        Parse out the fields and make the allocations or rejections if for us,
524        otherwise, assuming we're willing to proxy, proxy the request out.
525        """
526
527        # The dance to get into the request body
528        if req.has_key('RequestAccessRequestBody'):
529            req = req['RequestAccessRequestBody']
530        else:
531            raise service_error(service_error.req, "No request!?")
532
533        if req.has_key('destinationTestbed'):
534            dt = unpack_id(req['destinationTestbed'])
535
536        if dt == None or dt in self.testbed:
537            # Request for this fedd
538            found, owners = self.lookup_access(req, fid)
539            # keep track of what's been added
540            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
541            aid = unicode(allocID)
542
543            self.state_lock.acquire()
544            self.allocation[aid] = { }
545            # The protoGENI certificate
546            self.allocation[aid]['credentials'] = found
547            # The list of owner FIDs
548            self.allocation[aid]['owners'] = owners
549            self.write_state()
550            self.state_lock.release()
551            for o in owners:
552                self.auth.set_attribute(o, allocID)
553            self.auth.set_attribute(allocID, allocID)
554
555            try:
556                f = open("%s/%s.pem" % (self.certdir, aid), "w")
557                print >>f, alloc_cert
558                f.close()
559            except IOError, e:
560                raise service_error(service_error.internal, 
561                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
562            return { 'allocID': { 'fedid': allocID } }
563        else:
564            if self.allow_proxy:
565                resp = self.proxy_RequestAccess.call_service(dt, req,
566                            self.cert_file, self.cert_pwd,
567                            self.trusted_certs)
568                if resp.has_key('RequestAccessResponseBody'):
569                    return resp['RequestAccessResponseBody']
570                else:
571                    return None
572            else:
573                raise service_error(service_error.access,
574                        "Access proxying denied")
575
576
577    def ReleaseAccess(self, req, fid):
578        # The dance to get into the request body
579        if req.has_key('ReleaseAccessRequestBody'):
580            req = req['ReleaseAccessRequestBody']
581        else:
582            raise service_error(service_error.req, "No request!?")
583
584        if req.has_key('destinationTestbed'):
585            dt = unpack_id(req['destinationTestbed'])
586        else:
587            dt = None
588
589        if dt == None or dt in self.testbed:
590            # Local request
591            try:
592                if req['allocID'].has_key('localname'):
593                    auth_attr = aid = req['allocID']['localname']
594                elif req['allocID'].has_key('fedid'):
595                    aid = unicode(req['allocID']['fedid'])
596                    auth_attr = req['allocID']['fedid']
597                else:
598                    raise service_error(service_error.req,
599                            "Only localnames and fedids are understood")
600            except KeyError:
601                raise service_error(service_error.req, "Badly formed request")
602
603            self.log.debug("[access] deallocation requested for %s", aid)
604            if not self.auth.check_attribute(fid, auth_attr):
605                self.log.debug("[access] deallocation denied for %s", aid)
606                raise service_error(service_error.access, "Access Denied")
607
608            self.state_lock.acquire()
609            if self.allocation.has_key(aid):
610                self.log.debug("Found allocation for %s" %aid)
611                del self.allocation[aid]
612                self.write_state()
613                self.state_lock.release()
614                # And remove the access cert
615                cf = "%s/%s.pem" % (self.certdir, aid)
616                self.log.debug("Removing %s" % cf)
617                os.remove(cf)
618                return { 'allocID': req['allocID'] } 
619            else:
620                self.state_lock.release()
621                raise service_error(service_error.req, "No such allocation")
622
623        else:
624            if self.allow_proxy:
625                resp = self.proxy_ReleaseAccess.call_service(dt, req,
626                            self.cert_file, self.cert_pwd,
627                            self.trusted_certs)
628                if resp.has_key('ReleaseAccessResponseBody'):
629                    return resp['ReleaseAccessResponseBody']
630                else:
631                    return None
632            else:
633                raise service_error(service_error.access,
634                        "Access proxying denied")
635
636    def import_store_info(self, cf, connInfo):
637        """
638        Pull any import parameters in connInfo in.  We translate them either
639        into known member names or fedAddrs.
640        """
641
642        for c in connInfo:
643            for p in [ p for p in c.get('parameter', []) \
644                    if p.get('type', '') == 'input']:
645                name = p.get('name', None)
646                key = p.get('key', None)
647                store = p.get('store', None)
648
649                if name and key and store :
650                    req = { 'name': key, 'wait': True }
651                    r = self.call_GetValue(store, req, cf)
652                    r = r.get('GetValueResponseBody', None)
653                    if r :
654                        if r.get('name', '') == key:
655                            v = r.get('value', None)
656                            if v is not None:
657                                if name == 'peer':
658                                    c['peer'] = v
659                                else:
660                                    if c.has_key('fedAttr'):
661                                        c['fedAttr'].append({
662                                            'attribute': name, 'value': v})
663                                    else:
664                                        c['fedAttr']= [{
665                                            'attribute': name, 'value': v}]
666                            else:
667                                raise service_error(service_error.internal, 
668                                        'None value exported for %s'  % key)
669                        else:
670                            raise service_error(service_error.internal, 
671                                    'Different name returned for %s: %s' \
672                                            % (key, r.get('name','')))
673                    else:
674                        raise service_error(service_error.internal, 
675                            'Badly formatted response: no GetValueResponseBody')
676                else:
677                    raise service_error(service_error.internal, 
678                        'Bad Services missing info for import %s' % c)
679
680    def generate_portal_configs(self, topo, pubkey_base, secretkey_base, 
681            tmpdir, master, leid, connInfo, services):
682
683        def conninfo_to_dict(key, info):
684            """
685            Make a cpoy of the connection information about key, and flatten it
686            into a single dict by parsing out any feddAttrs.
687            """
688
689            rv = None
690            for i in info:
691                if key == i.get('portal', "") or \
692                        key in [e.get('element', "") \
693                        for e in i.get('member', [])]:
694                    rv = i.copy()
695                    break
696
697            else:
698                return rv
699
700            if 'fedAttr' in rv:
701                for a in rv['fedAttr']:
702                    attr = a.get('attribute', "")
703                    val = a.get('value', "")
704                    if attr and attr not in rv:
705                        rv[attr] = val
706                del rv['fedAttr']
707            return rv
708
709        # XXX: un hardcode this
710        def client_null(f, s):
711            print >>f, "Service: %s" % s['name']
712
713        def client_smb(f, s):
714            print >>f, "Service: %s" % s['name']
715            smbshare = None
716            smbuser = None
717            smbproj = None
718            for a in s.get('fedAttr', []):
719                if a.get('attribute', '') == 'SMBSHARE':
720                    smbshare = a.get('value', None)
721                elif a.get('attribute', '') == 'SMBUSER':
722                    smbuser = a.get('value', None)
723                elif a.get('attribute', '') == 'SMBPROJ':
724                    smbproj = a.get('value', None)
725
726            if all((smbshare, smbuser, smbproj)):
727                print >>f, "SMBshare: %s" % smbshare
728                print >>f, "ProjectUser: %s" % smbuser
729                print >>f, "ProjectName: %s" % smbproj
730
731        client_service_out = {
732                'SMB': client_smb,
733                'tmcd': client_null,
734                'seer': client_null,
735                'userconfig': client_null,
736            }
737        # XXX: end un hardcode this
738
739
740        seer_out = False
741        client_out = False
742        for e in [ e for e in topo.elements \
743                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
744            myname = e.name[0]
745            type = e.get_attribute('portal_type')
746
747            info = conninfo_to_dict(myname, connInfo)
748
749            if not info:
750                raise service_error(service_error.req,
751                        "No connectivity info for %s" % myname)
752
753            peer = info.get('peer', "")
754            ldomain = self.domain;
755
756            mexp = info.get('masterexperiment',"")
757            mproj, meid = mexp.split("/", 1)
758            mdomain = info.get('masterdomain',"")
759            muser = info.get('masteruser','root')
760            smbshare = info.get('smbshare', 'USERS')
761            ssh_port = info.get('ssh_port', '22')
762
763            active = info.get('active', 'False')
764
765            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
766            tunnelconfig = self.attrs.has_key('TunnelCfg')
767            try:
768                f = open(cfn, "w")
769                if active == 'True':
770                    print >>f, "active: True"
771                    print >>f, "ssh_port: %s" % ssh_port
772                    if type in ('control', 'both'):
773                        for s in [s for s in services \
774                                if s.get('name', "") in self.imports]:
775                            p = urlparse(s.get('server', 'http://localhost'))
776                            print >>f, 'port: remote:%s:%s:%s' % \
777                                    (p.port, p.hostname, p.port)
778
779                if tunnelconfig:
780                    print >>f, "tunnelip: %s" % tunnelconfig
781                # XXX: send this an fedattr
782                #print >>f, "seercontrol: control.%s.%s%s" % \
783                        #(meid.lower(), mproj.lower(), mdomain)
784                print >>f, "peer: %s" % peer.lower()
785                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
786                        pubkey_base
787                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
788                        secretkey_base
789                f.close()
790            except IOError, e:
791                raise service_error(service_error.internal,
792                        "Can't write protal config %s: %s" % (cfn, e))
793           
794            # XXX: This little seer config file needs to go away.
795            if not seer_out:
796                try:
797                    seerfn = "%s/seer.conf" % tmpdir
798                    f = open(seerfn, "w")
799                    if not master:
800                        print >>f, "ControlNode: control.%s.%s%s" % \
801                            (meid.lower(), mproj.lower(), mdomain)
802                    print >>f, "ExperimentID: %s" % mexp
803                    f.close()
804                except IOError, e:
805                    raise service_error(service_error.internal, 
806                            "Can't write seer.conf: %s" %e)
807                seer_out = True
808
809            if not client_out and type in ('control', 'both'):
810                try:
811                    f = open("%s/client.conf" % tmpdir, "w")
812                    print >>f, "ControlGateway: %s%s" % \
813                        (myname.lower(), ldomain.lower())
814                    for s in services:
815                        if s.get('name',"") in self.imports and \
816                                s.get('visibility','') == 'import':
817                            client_service_out[s['name']](f, s)
818                    # Does seer need this?
819                    # print >>f, "ExperimentID: %s/%s" % (mproj, meid)
820                    f.close()
821                except IOError, e:
822                    raise service_error(service_error.internal,
823                            "Cannot write client.conf: %s" %s)
824                client_out = True
825
826
827    def generate_rspec(self, topo, softdir, master, connInfo):
828        t = topo.clone()
829
830        starts = { }
831
832        # The startcmds for master and slave testbeds
833        if master: 
834            gate_cmd = self.attrs.get('MasterConnectorStartCmd', '/bin/true')
835            node_cmd = self.attrs.get('MasterNodeStartCmd', 'bin/true')
836        else: 
837            gate_cmd = self.attrs.get('SlaveConnectorStartCmd', '/bin/true')
838            node_cmd = self.attrs.get('SlaveNodeStartCmd', 'bin/true')
839
840        # Weed out the things we aren't going to instantiate: Segments, portal
841        # substrates, and portal interfaces.  (The copy in the for loop allows
842        # us to delete from e.elements in side the for loop).  While we're
843        # touching all the elements, we also adjust paths from the original
844        # testbed to local testbed paths and put the federation commands and
845        # startcommands into a dict so we can start them manually later.
846        # ProtoGENI requires setup before the federation commands run, so we
847        # run them by hand after we've seeded configurations.
848        for e in [e for e in t.elements]:
849            if isinstance(e, topdl.Segment):
850                t.elements.remove(e)
851            # Fix software paths
852            for s in getattr(e, 'software', []):
853                s.location = re.sub("^.*/", softdir, s.location)
854            if isinstance(e, topdl.Computer):
855                if e.get_attribute('portal') and gate_cmd:
856                    # Portals never have a user-specified start command
857                    starts[e.name[0]] = gate_cmd
858                elif node_cmd:
859                    if e.get_attribute('startup'):
860                        starts[e.name[0]] = "%s \\$USER '%s'" % \
861                                (node_cmd, e.get_attribute('startup'))
862                        e.remove_attribute('startup')
863                    else:
864                        starts[e.name[0]] = node_cmd
865
866                # Remove portal interfaces
867                e.interface = [i for i in e.interface \
868                        if not i.get_attribute('portal')]
869
870        t.substrates = [ s.clone() for s in t.substrates ]
871        t.incorporate_elements()
872
873        # Customize the ns2 output for local portal commands and images
874        filters = []
875
876        # NB: these are extra commands issued for the node, not the startcmds
877        if master: cmd = self.attrs.get('MasterConnectorCmd', '')
878        else: cmd = self.attrs.get('SlaveConnectorCmd', '')
879
880        if cmd:
881            filters.append(topdl.generate_portal_command_filter(cmd))
882
883        # Convert to rspec and return it
884        exp_rspec = topdl.topology_to_rspec(t, filters)
885
886        return exp_rspec
887
888    def StartSegment(self, req, fid):
889
890        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
891
892        err = None  # Any service_error generated after tmpdir is created
893        rv = None   # Return value from segment creation
894
895        try:
896            req = req['StartSegmentRequestBody']
897        except KeyError:
898            raise service_error(service_error.req, "Badly formed request")
899
900        connInfo = req.get('connection', [])
901        services = req.get('service', [])
902        auth_attr = req['allocID']['fedid']
903        aid = "%s" % auth_attr
904        attrs = req.get('fedAttr', [])
905        if not self.auth.check_attribute(fid, auth_attr):
906            raise service_error(service_error.access, "Access denied")
907        else:
908            # See if this is a replay of an earlier succeeded StartSegment -
909            # sometimes SSL kills 'em.  If so, replay the response rather than
910            # redoing the allocation.
911            self.state_lock.acquire()
912            retval = self.allocation[aid].get('started', None)
913            self.state_lock.release()
914            if retval:
915                self.log.warning("Duplicate StartSegment for %s: " % aid + \
916                        "replaying response")
917                return retval
918
919
920        if req.has_key('segmentdescription') and \
921                req['segmentdescription'].has_key('topdldescription'):
922            topo = \
923                topdl.Topology(**req['segmentdescription']['topdldescription'])
924        else:
925            raise service_error(service_error.req, 
926                    "Request missing segmentdescription'")
927
928        master = req.get('master', False)
929
930        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
931        try:
932            tmpdir = tempfile.mkdtemp(prefix="access-")
933            softdir = "%s/software" % tmpdir
934        except IOError:
935            raise service_error(service_error.internal, "Cannot create tmp dir")
936
937        # Try block alllows us to clean up temporary files.
938        try:
939            sw = set()
940            for e in topo.elements:
941                for s in getattr(e, 'software', []):
942                    sw.add(s.location)
943            os.mkdir(softdir)
944            for s in sw:
945                self.log.debug("Retrieving %s" % s)
946                try:
947                    get_url(s, certfile, softdir)
948                except:
949                    t, v, st = sys.exc_info()
950                    raise service_error(service_error.internal,
951                            "Error retrieving %s: %s" % (s, v))
952
953            # Copy local portal node software to the tempdir
954            for s in (self.portal_software, self.federation_software):
955                for l, f in s:
956                    base = os.path.basename(f)
957                    copy_file(f, "%s/%s" % (softdir, base))
958
959            # Ick.  Put this python rpm in a place that it will get moved into
960            # the staging area.  It's a hack to install a modern (in a Roman
961            # sense of modern) python on ProtoGENI
962            python_rpm ="python2.4-2.4-1pydotorg.i586.rpm"
963            if os.access("./%s" % python_rpm, os.R_OK):
964                copy_file("./%s" % python_rpm, "%s/%s" % (softdir, python_rpm))
965
966            for a in attrs:
967                if a['attribute'] in configs:
968                    try:
969                        self.log.debug("Retrieving %s" % a['value'])
970                        get_url(a['value'], certfile, tmpdir)
971                    except:
972                        t, v, st = sys.exc_info()
973                        raise service_error(service_error.internal,
974                                "Error retrieving %s: %s" % (s, v))
975                if a['attribute'] == 'ssh_pubkey':
976                    pubkey_base = a['value'].rpartition('/')[2]
977                if a['attribute'] == 'ssh_secretkey':
978                    secretkey_base = a['value'].rpartition('/')[2]
979                if a['attribute'] == 'experiment_name':
980                    ename = a['value']
981
982            # If the userconf service was imported, collect the configuration
983            # data.
984            for s in services:
985                if s.get("name", "") == 'userconfig' \
986                        and s.get('visibility',"") == 'import':
987
988                    # Collect ther server and certificate info.
989                    u = s.get('server', None)
990                    for a in s.get('fedAttr', []):
991                        if a.get('attribute',"") == 'cert':
992                            cert = a.get('value', None)
993                            break
994                    else:
995                        cert = None
996
997                    if cert:
998                        # Make a temporary certificate file for get_url.  The
999                        # finally clause removes it whether something goes
1000                        # wrong (including an exception from get_url) or not.
1001                        try:
1002                            tfos, tn = tempfile.mkstemp(suffix=".pem")
1003                            tf = os.fdopen(tfos, 'w')
1004                            print >>tf, cert
1005                            tf.close()
1006                            get_url(u, tn, tmpdir, "userconf")
1007                        except IOError, e:
1008                            raise service_error(service.error.internal, 
1009                                    "Cannot create temp file for " + 
1010                                    "userconfig certificates: %s e")
1011                        except:
1012                            t, v, st = sys.exc_info()
1013                            raise service_error(service_error.internal,
1014                                    "Error retrieving %s: %s" % (u, v))
1015                        finally:
1016                            if tn: os.remove(tn)
1017                    else:
1018                        raise service_error(service_error.req,
1019                                "No certificate for retreiving userconfig")
1020                    break
1021
1022            self.state_lock.acquire()
1023            if self.allocation.has_key(aid):
1024                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1025                self.allocation[aid]['experiment'] = ename
1026                self.allocation[aid]['log'] = [ ]
1027                # Create a logger that logs to the experiment's state object as
1028                # well as to the main log file.
1029                alloc_log = logging.getLogger('fedd.access.%s' % ename)
1030                h = logging.StreamHandler(
1031                        list_log.list_log(self.allocation[aid]['log']))
1032                # XXX: there should be a global one of these rather than
1033                # repeating the code.
1034                h.setFormatter(logging.Formatter(
1035                    "%(asctime)s %(name)s %(message)s",
1036                            '%d %b %y %H:%M:%S'))
1037                alloc_log.addHandler(h)
1038                self.write_state()
1039            else:
1040                self.log.error("No allocation for %s!?" % aid)
1041            self.state_lock.release()
1042
1043            # XXX: we really need to put the import and connection info
1044            # generation off longer.
1045            self.import_store_info(certfile, connInfo)
1046            #self.generate_portal_configs(topo, pubkey_base,
1047                    #secretkey_base, tmpdir, master, ename, connInfo,
1048                    #services)
1049            rspec = self.generate_rspec(topo, "%s/%s/" \
1050                    % (self.staging_dir, ename), master, connInfo)
1051
1052            starter = self.start_segment(keyfile=ssh_key,
1053                    debug=self.create_debug, log=alloc_log,
1054                    ch_url = self.ch_url, sa_url=self.sa_url,
1055                    cm_url=self.cm_url)
1056            rv = starter(self, aid, user, rspec, pubkey_base, secretkey_base,
1057                    master, ename,
1058                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1059                    certfile, topo, connInfo, services)
1060        except service_error, e:
1061            err = e
1062        except e:
1063            err = service_error(service_error.internal, str(e))
1064
1065        # Walk up tmpdir, deleting as we go
1066        if self.cleanup:
1067            self.log.debug("[StartSegment]: removing %s" % tmpdir)
1068            for path, dirs, files in os.walk(tmpdir, topdown=False):
1069                for f in files:
1070                    os.remove(os.path.join(path, f))
1071                for d in dirs:
1072                    os.rmdir(os.path.join(path, d))
1073            os.rmdir(tmpdir)
1074        else:
1075            self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1076
1077        if rv:
1078            # Grab the log (this is some anal locking, but better safe than
1079            # sorry)
1080            self.state_lock.acquire()
1081            logv = "".join(self.allocation[aid]['log'])
1082            # It's possible that the StartSegment call gets retried (!).
1083            # if the 'started' key is in the allocation, we'll return it rather
1084            # than redo the setup.
1085            self.allocation[aid]['started'] = { 
1086                    'allocID': req['allocID'],
1087                    'allocationLog': logv,
1088                    }
1089            retval = self.allocation[aid]['started']
1090            self.state_lock.release()
1091
1092            return retval
1093        elif err:
1094            raise service_error(service_error.federant,
1095                    "Swapin failed: %s" % err)
1096        else:
1097            raise service_error(service_error.federant, "Swapin failed")
1098
1099    def TerminateSegment(self, req, fid):
1100        try:
1101            req = req['TerminateSegmentRequestBody']
1102        except KeyError:
1103            raise service_error(service_error.req, "Badly formed request")
1104
1105        auth_attr = req['allocID']['fedid']
1106        aid = "%s" % auth_attr
1107        attrs = req.get('fedAttr', [])
1108        if not self.auth.check_attribute(fid, auth_attr):
1109            raise service_error(service_error.access, "Access denied")
1110
1111        self.state_lock.acquire()
1112        if self.allocation.has_key(aid):
1113            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1114            slice_cred = self.allocation[aid].get('slice_credential', None)
1115            ename = self.allocation[aid].get('experiment', None)
1116        else:
1117            cf, user, ssh_key, cpw = (None, None, None, None)
1118            slice_cred = None
1119            ename = None
1120        self.state_lock.release()
1121
1122        if ename:
1123            staging = "%s/%s" % ( self.staging_dir, ename)
1124        else:
1125            self.log.warn("Can't find experiment name for %s" % aid)
1126            staging = None
1127
1128        stopper = self.stop_segment(keyfile=ssh_key, debug=self.create_debug,
1129                ch_url = self.ch_url, sa_url=self.sa_url, cm_url=self.cm_url)
1130        stopper(self, user, staging, slice_cred, cf, cpw)
1131        return { 'allocID': req['allocID'] }
1132
1133    def RenewSlices(self):
1134        self.log.info("Scanning for slices to renew")
1135        self.state_lock.acquire()
1136        aids = self.allocation.keys()
1137        self.state_lock.release()
1138
1139        for aid in aids:
1140            self.state_lock.acquire()
1141            if self.allocation.has_key(aid):
1142                name = self.allocation[aid].get('slice_name', None)
1143                scred = self.allocation[aid].get('slice_credential', None)
1144                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1145            else:
1146                name = None
1147                scred = None
1148            self.state_lock.release()
1149
1150            # There's a ProtoGENI slice associated with the segment; renew it.
1151            if name and scred:
1152                renewer = self.renew_segment(log=self.log, 
1153                        debug=self.create_debug, keyfile=ssh_key,
1154                        cm_url = self.cm_url, sa_url = self.sa_url,
1155                        ch_url = self.ch_url)
1156                new_scred = renewer(name, scred, self.renewal_interval, cf, cpw)
1157                if new_scred:
1158                    self.log.info("Slice %s renewed until %s GMT" % \
1159                            (name, time.asctime(time.gmtime(\
1160                                time.time()+self.renewal_interval))))
1161                    self.state_lock.acquire()
1162                    if self.allocation.has_key(aid):
1163                        self.allocation[aid]['slice_credential'] = new_scred
1164                    self.state_lock.release()
1165                else:
1166                    self.log.info("Failed to renew slice %s " % name)
1167
1168        # Let's do this all again soon.  (4 tries before the slices time out)   
1169        t = Timer(self.renewal_interval/4, self.RenewSlices)
1170        t.start()
Note: See TracBrowser for help on using the repository browser.