source: fedd/federation/protogeni_access.py @ f038da1

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since f038da1 was f038da1, checked in by Ted Faber <faber@…>, 14 years ago

These were workarounds for the problems pickling topologies. That problem's allieviated, so these can go back.

  • Property mode set to 100644
File size: 41.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12
13from threading import *
14from M2Crypto.SSL import SSLError
15
16from util import *
17from access_project import access_project
18from fedid import fedid, generate_fedid
19from authorizer import authorizer
20from service_error import service_error
21from remote_service import xmlrpc_handler, soap_handler, service_caller
22
23import httplib
24import tempfile
25from urlparse import urlparse
26
27import topdl
28import list_log
29import proxy_protogeni_segment
30
31
32# Make log messages disappear if noone configures a fedd logger
33class nullHandler(logging.Handler):
34    def emit(self, record): pass
35
36fl = logging.getLogger("fedd.access")
37fl.addHandler(nullHandler())
38
39class access:
40    """
41    The implementation of access control based on mapping users to projects.
42
43    Users can be mapped to existing projects or have projects created
44    dynamically.  This implements both direct requests and proxies.
45    """
46
47    class parse_error(RuntimeError): pass
48
49
50    proxy_RequestAccess= service_caller('RequestAccess')
51    proxy_ReleaseAccess= service_caller('ReleaseAccess')
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        def software_list(v):
59            l = [ ]
60            if v:
61                ps = v.split(" ")
62                while len(ps):
63                    loc, file = ps[0:2]
64                    del ps[0:2]
65                    if loc == 'rpm':
66                        loc = None
67                    l.append((loc, file))
68            return l
69
70        # Make sure that the configuration is in place
71        if not config: 
72            raise RunTimeError("No config to fedd.access")
73
74        self.project_priority = config.getboolean("access", "project_priority")
75        self.allow_proxy = config.getboolean("access", "allow_proxy")
76
77        self.domain = config.get("access", "domain")
78        self.certdir = config.get("access","certdir")
79        self.userconfdir = config.get("access","userconfdir")
80        self.userconfcmd = config.get("access","userconfcmd")
81        self.userconfurl = config.get("access","userconfurl")
82        self.federation_software = config.get("access", "federation_software")
83        self.portal_software = config.get("access", "portal_software")
84        self.ssh_port = config.get("access","ssh_port") or "22"
85        self.sshd = config.get("access","sshd")
86        self.sshd_config = config.get("access", "sshd_config")
87        self.create_debug = config.getboolean("access", "create_debug")
88        self.cleanup = not config.getboolean("access", "leave_tmpfiles")
89        self.access_type = config.get("access", "type")
90        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
91        self.staging_host = config.get("access", "staging_host") \
92                or "ops.emulab.net"
93   
94        self.federation_software = software_list(self.federation_software)
95        self.portal_software = software_list(self.portal_software)
96
97        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
98        self.renewal_interval = int(self.renewal_interval) * 60
99
100        self.ch_url = config.get("access", "ch_url")
101        self.sa_url = config.get("access", "sa_url")
102        self.cm_url = config.get("access", "cm_url")
103
104        self.attrs = { }
105        self.access = { }
106        self.restricted = [ ]
107        self.projects = { }
108        self.keys = { }
109        self.types = { }
110        self.allocation = { }
111        self.state = { 
112            'projects': self.projects,
113            'allocation' : self.allocation,
114            'keys' : self.keys,
115            'types': self.types
116        }
117        self.log = logging.getLogger("fedd.access")
118        set_log_level(config, "access", self.log)
119        self.state_lock = Lock()
120        # XXX: Configurable
121        self.exports = set(('SMB', 'seer', 'tmcd', 'userconfig'))
122        self.imports = set(('SMB', 'seer', 'userconfig'))
123
124        if auth: self.auth = auth
125        else:
126            self.log.error(\
127                    "[access]: No authorizer initialized, creating local one.")
128            auth = authorizer()
129
130        tb = config.get('access', 'testbed')
131        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
132        else: self.testbed = [ ]
133
134        if config.has_option("access", "accessdb"):
135            self.read_access(config.get("access", "accessdb"))
136
137        self.state_filename = config.get("access", "access_state")
138        print "Calling read_state %s" % self.state_filename
139        self.read_state()
140
141        # Keep cert_file and cert_pwd coming from the same place
142        self.cert_file = config.get("access", "cert_file")
143        if self.cert_file:
144            self.sert_pwd = config.get("access", "cert_pw")
145        else:
146            self.cert_file = config.get("globals", "cert_file")
147            self.sert_pwd = config.get("globals", "cert_pw")
148
149        self.trusted_certs = config.get("access", "trusted_certs") or \
150                config.get("globals", "trusted_certs")
151
152        self.start_segment = proxy_protogeni_segment.start_segment
153        self.stop_segment = proxy_protogeni_segment.stop_segment
154        self.renew_segment = proxy_protogeni_segment.renew_segment
155
156        self.call_SetValue = service_caller('SetValue')
157        self.call_GetValue = service_caller('GetValue')
158
159        self.RenewSlices()
160
161        self.soap_services = {\
162            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
163            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
164            'StartSegment': soap_handler("StartSegment", self.StartSegment),
165            'TerminateSegment': soap_handler("TerminateSegment", 
166                self.TerminateSegment),
167            }
168        self.xmlrpc_services =  {\
169            'RequestAccess': xmlrpc_handler('RequestAccess',
170                self.RequestAccess),
171            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
172                self.ReleaseAccess),
173            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
174            'TerminateSegment': xmlrpc_handler('TerminateSegment',
175                self.TerminateSegment),
176            }
177
178    def read_access(self, config):
179        """
180        Read a configuration file and set internal parameters.
181
182        There are access lines of the
183        form (tb, proj, user) -> user that map the first tuple of
184        names to the user for for access purposes.  Names in the key (left side)
185        can include "<NONE> or <ANY>" to act as wildcards or to require the
186        fields to be empty.  Similarly aproj or auser can be <SAME> or
187        <DYNAMIC> indicating that either the matching key is to be used or a
188        dynamic user or project will be created.  These names can also be
189        federated IDs (fedid's) if prefixed with fedid:.  The user is the
190        ProtoGENI identity certificate.
191        Testbed attributes outside the forms above can be given using the
192        format attribute: name value: value.  The name is a single word and the
193        value continues to the end of the line.  Empty lines and lines startin
194        with a # are ignored.
195
196        Parsing errors result in a self.parse_error exception being raised.
197        """
198        lineno=0
199        name_expr = "["+string.ascii_letters + string.digits + "\/\.\-_]+"
200        fedid_expr = "fedid:[" + string.hexdigits + "]+"
201        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
202
203        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
204                re.IGNORECASE)
205        access_str = '\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+ \
206                key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*,\s*('\
207                        + name_expr + ')\s*,\s*('+name_expr+')\s*,?\s*(' + \
208                        name_expr+ ')?\)'
209        access_re = re.compile(access_str, re.IGNORECASE)
210
211
212        def parse_name(n):
213            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
214            else: return n
215       
216        def auth_name(n):
217            if isinstance(n, basestring):
218                if n =='<any>' or n =='<none>': return None
219                else: return unicode(n)
220            else:
221                return n
222
223        f = open(config, "r");
224        for line in f:
225            lineno += 1
226            line = line.strip();
227            if len(line) == 0 or line.startswith('#'):
228                continue
229
230            # Extended (attribute: x value: y) attribute line
231            m = attr_re.match(line)
232            if m != None:
233                attr, val = m.group(1,2)
234                self.attrs[attr] = val
235                continue
236
237            # Access line (t, p, u) -> (a, pw) line
238            m = access_re.match(line)
239            if m != None:
240                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
241                auth_key = tuple([ auth_name(x) for x in access_key])
242                cert = auth_name(parse_name(m.group(4)))
243                user_name = auth_name(parse_name(m.group(5)))
244                ssh_key = unicode(m.group(6))
245                if m.group(6): pw = unicode(m.group(7))
246                else: pw = None
247
248                self.access[access_key] = (cert, user_name, ssh_key, pw)
249                self.auth.set_attribute(auth_key, "access")
250                continue
251
252            # Nothing matched to here: unknown line - raise exception
253            f.close()
254            raise self.parse_error("Unknown statement at line %d of %s" % \
255                    (lineno, config))
256        f.close()
257
258    def write_state(self):
259        if self.state_filename:
260            try:
261                f = open(self.state_filename, 'w')
262                pickle.dump(self.state, f)
263            except EnvironmentError, e:
264                self.log.error("Can't write file %s: %s" % \
265                        (self.state_filename, e))
266            except pickle.PicklingError, e:
267                self.log.error("Pickling problem: %s" % e)
268            except TypeError, e:
269                self.log.error("Pickling problem (TypeError): %s" % e)
270
271
272    def read_state(self):
273        """
274        Read a new copy of access state.  Old state is overwritten.
275
276        State format is a simple pickling of the state dictionary.
277        """
278        if self.state_filename:
279            try:
280                f = open(self.state_filename, "r")
281                self.state = pickle.load(f)
282                self.log.debug("[read_state]: Read state from %s" % \
283                        self.state_filename)
284            except EnvironmentError, e:
285                self.log.warning(("[read_state]: No saved state: " +\
286                        "Can't open %s: %s") % (self.state_filename, e))
287            except EOFError, e:
288                self.log.warning(("[read_state]: " +\
289                        "Empty or damaged state file: %s:") % \
290                        self.state_filename)
291            except pickle.UnpicklingError, e:
292                self.log.warning(("[read_state]: No saved state: " + \
293                        "Unpickling failed: %s") % e)
294
295            # Add the ownership attributes to the authorizer.  Note that the
296            # indices of the allocation dict are strings, but the attributes are
297            # fedids, so there is a conversion.
298            for k in self.state.get('allocation', {}).keys():
299                for o in self.state['allocation'][k].get('owners', []):
300                    self.auth.set_attribute(o, fedid(hexstr=k))
301                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
302
303            if self.allocation != self.state['allocation']:
304                self.allocation = self.state['allocation']
305
306    def permute_wildcards(self, a, p):
307        """Return a copy of a with various fields wildcarded.
308
309        The bits of p control the wildcards.  A set bit is a wildcard
310        replacement with the lowest bit being user then project then testbed.
311        """
312        if p & 1: user = ["<any>"]
313        else: user = a[2]
314        if p & 2: proj = "<any>"
315        else: proj = a[1]
316        if p & 4: tb = "<any>"
317        else: tb = a[0]
318
319        return (tb, proj, user)
320
321
322    def find_access(self, search):
323        """
324        Search the access DB for a match on this tuple.  Return the matching
325        user (protoGENI cert).
326       
327        NB, if the initial tuple fails to match we start inserting wildcards in
328        an order determined by self.project_priority.  Try the list of users in
329        order (when wildcarded, there's only one user in the list).
330        """
331        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
332        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
333
334        for p in perm: 
335            s = self.permute_wildcards(search, p)
336            # s[2] is None on an anonymous, unwildcarded request
337            if s[2] != None:
338                for u in s[2]:
339                    if self.access.has_key((s[0], s[1], u)):
340                        return self.access[(s[0], s[1], u)]
341            else:
342                if self.access.has_key(s):
343                    return self.access[s]
344        return None
345
346    def lookup_access(self, req, fid):
347        """
348        Determine the allowed access for this request.  Return the access and
349        which fields are dynamic.
350
351        The fedid is needed to construct the request
352        """
353        # Search keys
354        tb = None
355        project = None
356        user = None
357        # Return values
358        rp = access_project(None, ())
359        ru = None
360        user_re = re.compile("user:\s(.*)")
361        project_re = re.compile("project:\s(.*)")
362
363        user = [ user_re.findall(x)[0] for x in req.get('credential', []) \
364                if user_re.match(x)]
365        project = [ project_re.findall(x)[0] \
366                for x in req.get('credential', []) \
367                    if project_re.match(x)]
368
369        if len(project) == 1: project = project[0]
370        elif len(project) == 0: project = None
371        else: 
372            raise service_error(service_error.req, 
373                    "More than one project credential")
374
375
376        user_fedids = [ u for u in user if isinstance(u, fedid)]
377
378        # Determine how the caller is representing itself.  If its fedid shows
379        # up as a project or a singleton user, let that stand.  If neither the
380        # usernames nor the project name is a fedid, the caller is a testbed.
381        if project and isinstance(project, fedid):
382            if project == fid:
383                # The caller is the project (which is already in the tuple
384                # passed in to the authorizer)
385                owners = user_fedids
386                owners.append(project)
387            else:
388                raise service_error(service_error.req,
389                        "Project asserting different fedid")
390        else:
391            if fid not in user_fedids:
392                tb = fid
393                owners = user_fedids
394                owners.append(fid)
395            else:
396                if len(fedids) > 1:
397                    raise service_error(service_error.req,
398                            "User asserting different fedid")
399                else:
400                    # Which is a singleton
401                    owners = user_fedids
402        # Confirm authorization
403
404        for u in user:
405            self.log.debug("[lookup_access] Checking access for %s" % \
406                    ((tb, project, u),))
407            if self.auth.check_attribute((tb, project, u), 'access'):
408                self.log.debug("[lookup_access] Access granted")
409                break
410            else:
411                self.log.debug("[lookup_access] Access Denied")
412        else:
413            raise service_error(service_error.access, "Access denied")
414
415        # This maps a valid user to the ProtoGENI credentials to use
416        found = self.find_access((tb, project, user))
417       
418        if found == None:
419            raise service_error(service_error.access,
420                    "Access denied - cannot map access")
421        return found, owners
422
423    def get_handler(self, path, fid):
424        self.log.info("Get handler %s %s" % (path, fid))
425        if self.auth.check_attribute(fid, path) and self.userconfdir:
426            return ("%s/%s" % (self.userconfdir, path), "application/binary")
427        else:
428            return (None, None)
429
430    def export_userconf(self, project):
431        dev_null = None
432        confid, confcert = generate_fedid("test", dir=self.userconfdir, 
433                log=self.log)
434        conffilename = "%s/%s" % (self.userconfdir, str(confid))
435        cf = None
436        try:
437            cf = open(conffilename, "w")
438            os.chmod(conffilename, stat.S_IRUSR | stat.S_IWUSR)
439        except EnvironmentError, e:
440            raise service_error(service_error.internal, 
441                    "Cannot create user configuration data")
442
443        try:
444            dev_null = open("/dev/null", "a")
445        except EnvironmentError, e:
446            self.log.error("export_userconf: can't open /dev/null: %s" % e)
447
448        cmd = "%s %s" % (self.userconfcmd, project)
449        conf = subprocess.call(cmd.split(" "),
450                stdout=cf, stderr=dev_null, close_fds=True)
451
452        self.auth.set_attribute(confid, "/%s" % str(confid))
453
454        return confid, confcert
455
456
457    def export_SMB(self, id, state, project, user):
458        return { 
459                'id': id,
460                'name': 'SMB',
461                'visibility': 'export',
462                'server': 'http://fs:139',
463                'fedAttr': [
464                        { 'attribute': 'SMBSHARE', 'value': 'USERS' },
465                        { 'attribute': 'SMBUSER', 'value': user },
466                        { 'attribute': 'SMBPROJ', 'value': project },
467                    ]
468                }
469
470    def export_seer(self, id, state, project, user):
471        return { 
472                'id': id,
473                'name': 'seer',
474                'visibility': 'export',
475                'server': 'http://control:16606',
476                }
477
478    def export_tmcd(self, id, state, project, user):
479        return { 
480                'id': id,
481                'name': 'seer',
482                'visibility': 'export',
483                'server': 'http://boss:7777',
484                }
485
486    def export_userconfig(self, id, state, project, user):
487        if self.userconfdir and self.userconfcmd \
488                and self.userconfurl:
489            cid, cert = self.export_userconf(project)
490            state['userconfig'] = unicode(cid)
491            return {
492                    'id': id,
493                    'name': 'userconfig',
494                    'visibility': 'export',
495                    'server': "%s/%s" % (self.userconfurl, str(cid)),
496                    'fedAttr': [
497                        { 'attribute': 'cert', 'value': cert },
498                    ]
499                    }
500        else:
501            return None
502
503    def export_services(self, sreq, project, user):
504        exp = [ ]
505        state = { }
506        # XXX: Filthy shortcut here using http: so urlparse will give the right
507        # answers.
508        for s in sreq:
509            sname = s.get('name', '')
510            svis = s.get('visibility', '')
511            if svis == 'export':
512                if sname in self.exports:
513                    id = s.get('id', 'no_id')
514                    if sname == 'SMB':
515                        exp.append(self.export_SMB(id, state, project, user))
516                    elif sname == 'seer':
517                        exp.append(self.export_seer(id, state, project, user))
518                    elif sname == 'tmcd':
519                        exp.append(self.export_tmcd(id, state, project, user))
520                    elif sname == 'userconfig':
521                        exp.append(self.export_userconfig(id, state,
522                            project, user))
523                    elif sname == 'project_export':
524                        exp.append(self.export_SMB(id, state, project, user))
525                        exp.append(self.export_seer(id, state, project, user))
526                        exp.append(self.export_userconfig(id, state,
527                            project, user))
528        return (exp, state)
529
530    def build_response(self, alloc_id, ap, services):
531        """
532        Create the SOAP response.
533
534        Build the dictionary description of the response and use
535        fedd_utils.pack_soap to create the soap message.  ap is the allocate
536        project message returned from a remote project allocation (even if that
537        allocation was done locally).
538        """
539        # Because alloc_id is already a fedd_services_types.IDType_Holder,
540        # there's no need to repack it
541        msg = { 
542                'allocID': alloc_id,
543                'fedAttr': [
544                    { 'attribute': 'domain', 'value': self.domain } , 
545                    { 'attribute': 'project', 'value': 
546                        ap['project'].get('name', {}).get('localname', "???") },
547                ]
548            }
549        if len(self.attrs) > 0:
550            msg['fedAttr'].extend(
551                [ { 'attribute': x, 'value' : y } \
552                        for x,y in self.attrs.iteritems()])
553
554        if services:
555            msg['service'] = services
556        return msg
557
558    def RequestAccess(self, req, fid):
559        """
560        Handle the access request.  Proxy if not for us.
561
562        Parse out the fields and make the allocations or rejections if for us,
563        otherwise, assuming we're willing to proxy, proxy the request out.
564        """
565
566        # The dance to get into the request body
567        if req.has_key('RequestAccessRequestBody'):
568            req = req['RequestAccessRequestBody']
569        else:
570            raise service_error(service_error.req, "No request!?")
571
572        if req.has_key('destinationTestbed'):
573            dt = unpack_id(req['destinationTestbed'])
574
575        if dt == None or dt in self.testbed:
576            # Request for this fedd
577            found, owners = self.lookup_access(req, fid)
578            # keep track of what's been added
579            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
580            aid = unicode(allocID)
581
582            self.state_lock.acquire()
583            self.allocation[aid] = { }
584            # The protoGENI certificate
585            self.allocation[aid]['credentials'] = found
586            # The list of owner FIDs
587            self.allocation[aid]['owners'] = owners
588            self.write_state()
589            self.state_lock.release()
590            for o in owners:
591                self.auth.set_attribute(o, allocID)
592            self.auth.set_attribute(allocID, allocID)
593
594            try:
595                f = open("%s/%s.pem" % (self.certdir, aid), "w")
596                print >>f, alloc_cert
597                f.close()
598            except EnvironmentError, e:
599                raise service_error(service_error.internal, 
600                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
601            return { 'allocID': { 'fedid': allocID } }
602        else:
603            if self.allow_proxy:
604                resp = self.proxy_RequestAccess.call_service(dt, req,
605                            self.cert_file, self.cert_pwd,
606                            self.trusted_certs)
607                if resp.has_key('RequestAccessResponseBody'):
608                    return resp['RequestAccessResponseBody']
609                else:
610                    return None
611            else:
612                raise service_error(service_error.access,
613                        "Access proxying denied")
614
615
616    def ReleaseAccess(self, req, fid):
617        # The dance to get into the request body
618        if req.has_key('ReleaseAccessRequestBody'):
619            req = req['ReleaseAccessRequestBody']
620        else:
621            raise service_error(service_error.req, "No request!?")
622
623        if req.has_key('destinationTestbed'):
624            dt = unpack_id(req['destinationTestbed'])
625        else:
626            dt = None
627
628        if dt == None or dt in self.testbed:
629            # Local request
630            try:
631                if req['allocID'].has_key('localname'):
632                    auth_attr = aid = req['allocID']['localname']
633                elif req['allocID'].has_key('fedid'):
634                    aid = unicode(req['allocID']['fedid'])
635                    auth_attr = req['allocID']['fedid']
636                else:
637                    raise service_error(service_error.req,
638                            "Only localnames and fedids are understood")
639            except KeyError:
640                raise service_error(service_error.req, "Badly formed request")
641
642            self.log.debug("[access] deallocation requested for %s", aid)
643            if not self.auth.check_attribute(fid, auth_attr):
644                self.log.debug("[access] deallocation denied for %s", aid)
645                raise service_error(service_error.access, "Access Denied")
646
647            self.state_lock.acquire()
648            if self.allocation.has_key(aid):
649                self.log.debug("Found allocation for %s" %aid)
650                del self.allocation[aid]
651                self.write_state()
652                self.state_lock.release()
653                # And remove the access cert
654                cf = "%s/%s.pem" % (self.certdir, aid)
655                self.log.debug("Removing %s" % cf)
656                os.remove(cf)
657                return { 'allocID': req['allocID'] } 
658            else:
659                self.state_lock.release()
660                raise service_error(service_error.req, "No such allocation")
661
662        else:
663            if self.allow_proxy:
664                resp = self.proxy_ReleaseAccess.call_service(dt, req,
665                            self.cert_file, self.cert_pwd,
666                            self.trusted_certs)
667                if resp.has_key('ReleaseAccessResponseBody'):
668                    return resp['ReleaseAccessResponseBody']
669                else:
670                    return None
671            else:
672                raise service_error(service_error.access,
673                        "Access proxying denied")
674
675    def import_store_info(self, cf, connInfo):
676        """
677        Pull any import parameters in connInfo in.  We translate them either
678        into known member names or fedAddrs.
679        """
680
681        for c in connInfo:
682            for p in [ p for p in c.get('parameter', []) \
683                    if p.get('type', '') == 'input']:
684                name = p.get('name', None)
685                key = p.get('key', None)
686                store = p.get('store', None)
687
688                if name and key and store :
689                    req = { 'name': key, 'wait': True }
690                    r = self.call_GetValue(store, req, cf)
691                    r = r.get('GetValueResponseBody', None)
692                    if r :
693                        if r.get('name', '') == key:
694                            v = r.get('value', None)
695                            if v is not None:
696                                if name == 'peer':
697                                    c['peer'] = v
698                                else:
699                                    if c.has_key('fedAttr'):
700                                        c['fedAttr'].append({
701                                            'attribute': name, 'value': v})
702                                    else:
703                                        c['fedAttr']= [{
704                                            'attribute': name, 'value': v}]
705                            else:
706                                raise service_error(service_error.internal, 
707                                        'None value exported for %s'  % key)
708                        else:
709                            raise service_error(service_error.internal, 
710                                    'Different name returned for %s: %s' \
711                                            % (key, r.get('name','')))
712                    else:
713                        raise service_error(service_error.internal, 
714                            'Badly formatted response: no GetValueResponseBody')
715                else:
716                    raise service_error(service_error.internal, 
717                        'Bad Services missing info for import %s' % c)
718
719    def generate_portal_configs(self, topo, pubkey_base, secretkey_base, 
720            tmpdir, master, leid, connInfo, services):
721
722        def conninfo_to_dict(key, info):
723            """
724            Make a cpoy of the connection information about key, and flatten it
725            into a single dict by parsing out any feddAttrs.
726            """
727
728            rv = None
729            for i in info:
730                if key == i.get('portal', "") or \
731                        key in [e.get('element', "") \
732                        for e in i.get('member', [])]:
733                    rv = i.copy()
734                    break
735
736            else:
737                return rv
738
739            if 'fedAttr' in rv:
740                for a in rv['fedAttr']:
741                    attr = a.get('attribute', "")
742                    val = a.get('value', "")
743                    if attr and attr not in rv:
744                        rv[attr] = val
745                del rv['fedAttr']
746            return rv
747
748        # XXX: un hardcode this
749        def client_null(f, s):
750            print >>f, "Service: %s" % s['name']
751
752        def client_smb(f, s):
753            print >>f, "Service: %s" % s['name']
754            smbshare = None
755            smbuser = None
756            smbproj = None
757            for a in s.get('fedAttr', []):
758                if a.get('attribute', '') == 'SMBSHARE':
759                    smbshare = a.get('value', None)
760                elif a.get('attribute', '') == 'SMBUSER':
761                    smbuser = a.get('value', None)
762                elif a.get('attribute', '') == 'SMBPROJ':
763                    smbproj = a.get('value', None)
764
765            if all((smbshare, smbuser, smbproj)):
766                print >>f, "SMBshare: %s" % smbshare
767                print >>f, "ProjectUser: %s" % smbuser
768                print >>f, "ProjectName: %s" % smbproj
769
770        client_service_out = {
771                'SMB': client_smb,
772                'tmcd': client_null,
773                'seer': client_null,
774                'userconfig': client_null,
775            }
776        # XXX: end un hardcode this
777
778
779        seer_out = False
780        client_out = False
781        for e in [ e for e in topo.elements \
782                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
783            myname = e.name[0]
784            type = e.get_attribute('portal_type')
785
786            info = conninfo_to_dict(myname, connInfo)
787
788            if not info:
789                raise service_error(service_error.req,
790                        "No connectivity info for %s" % myname)
791
792            peer = info.get('peer', "")
793            ldomain = self.domain;
794
795            mexp = info.get('masterexperiment',"")
796            mproj, meid = mexp.split("/", 1)
797            mdomain = info.get('masterdomain',"")
798            muser = info.get('masteruser','root')
799            smbshare = info.get('smbshare', 'USERS')
800            ssh_port = info.get('ssh_port', '22')
801
802            active = info.get('active', 'False')
803
804            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
805            tunnelconfig = self.attrs.has_key('TunnelCfg')
806            try:
807                f = open(cfn, "w")
808                if active == 'True':
809                    print >>f, "active: True"
810                    print >>f, "ssh_port: %s" % ssh_port
811                    if type in ('control', 'both'):
812                        for s in [s for s in services \
813                                if s.get('name', "") in self.imports]:
814                            p = urlparse(s.get('server', 'http://localhost'))
815                            print >>f, 'port: remote:%s:%s:%s' % \
816                                    (p.port, p.hostname, p.port)
817
818                if tunnelconfig:
819                    print >>f, "tunnelip: %s" % tunnelconfig
820                # XXX: send this an fedattr
821                #print >>f, "seercontrol: control.%s.%s%s" % \
822                        #(meid.lower(), mproj.lower(), mdomain)
823                print >>f, "peer: %s" % peer.lower()
824                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
825                        pubkey_base
826                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
827                        secretkey_base
828                f.close()
829            except EnvironmentError, e:
830                raise service_error(service_error.internal,
831                        "Can't write protal config %s: %s" % (cfn, e))
832           
833            # XXX: This little seer config file needs to go away.
834            if not seer_out:
835                try:
836                    seerfn = "%s/seer.conf" % tmpdir
837                    f = open(seerfn, "w")
838                    if not master:
839                        print >>f, "ControlNode: control.%s.%s%s" % \
840                            (meid.lower(), mproj.lower(), mdomain)
841                    print >>f, "ExperimentID: %s" % mexp
842                    f.close()
843                except EnvironmentError, e:
844                    raise service_error(service_error.internal, 
845                            "Can't write seer.conf: %s" %e)
846                seer_out = True
847
848            if not client_out and type in ('control', 'both'):
849                try:
850                    f = open("%s/client.conf" % tmpdir, "w")
851                    print >>f, "ControlGateway: %s%s" % \
852                        (myname.lower(), ldomain.lower())
853                    for s in services:
854                        if s.get('name',"") in self.imports and \
855                                s.get('visibility','') == 'import':
856                            client_service_out[s['name']](f, s)
857                    # Does seer need this?
858                    # print >>f, "ExperimentID: %s/%s" % (mproj, meid)
859                    f.close()
860                except EnvironmentError, e:
861                    raise service_error(service_error.internal,
862                            "Cannot write client.conf: %s" %s)
863                client_out = True
864
865
866    def generate_rspec(self, topo, softdir, master, connInfo):
867        t = topo.clone()
868
869        starts = { }
870
871        # The startcmds for master and slave testbeds
872        if master: 
873            gate_cmd = self.attrs.get('MasterConnectorStartCmd', '/bin/true')
874            node_cmd = self.attrs.get('MasterNodeStartCmd', 'bin/true')
875        else: 
876            gate_cmd = self.attrs.get('SlaveConnectorStartCmd', '/bin/true')
877            node_cmd = self.attrs.get('SlaveNodeStartCmd', 'bin/true')
878
879        # Weed out the things we aren't going to instantiate: Segments, portal
880        # substrates, and portal interfaces.  (The copy in the for loop allows
881        # us to delete from e.elements in side the for loop).  While we're
882        # touching all the elements, we also adjust paths from the original
883        # testbed to local testbed paths and put the federation commands and
884        # startcommands into a dict so we can start them manually later.
885        # ProtoGENI requires setup before the federation commands run, so we
886        # run them by hand after we've seeded configurations.
887        for e in [e for e in t.elements]:
888            if isinstance(e, topdl.Segment):
889                t.elements.remove(e)
890            # Fix software paths
891            for s in getattr(e, 'software', []):
892                s.location = re.sub("^.*/", softdir, s.location)
893            if isinstance(e, topdl.Computer):
894                if e.get_attribute('portal') and gate_cmd:
895                    # Portals never have a user-specified start command
896                    starts[e.name[0]] = gate_cmd
897                elif node_cmd:
898                    if e.get_attribute('startup'):
899                        starts[e.name[0]] = "%s \\$USER '%s'" % \
900                                (node_cmd, e.get_attribute('startup'))
901                        e.remove_attribute('startup')
902                    else:
903                        starts[e.name[0]] = node_cmd
904
905                # Remove portal interfaces
906                e.interface = [i for i in e.interface \
907                        if not i.get_attribute('portal')]
908
909        t.substrates = [ s.clone() for s in t.substrates ]
910        t.incorporate_elements()
911
912        # Customize the ns2 output for local portal commands and images
913        filters = []
914
915        # NB: these are extra commands issued for the node, not the startcmds
916        if master: cmd = self.attrs.get('MasterConnectorCmd', '')
917        else: cmd = self.attrs.get('SlaveConnectorCmd', '')
918
919        if cmd:
920            filters.append(topdl.generate_portal_command_filter(cmd))
921
922        # Convert to rspec and return it
923        exp_rspec = topdl.topology_to_rspec(t, filters)
924
925        return exp_rspec
926
927    def StartSegment(self, req, fid):
928
929        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
930
931        err = None  # Any service_error generated after tmpdir is created
932        rv = None   # Return value from segment creation
933
934        try:
935            req = req['StartSegmentRequestBody']
936        except KeyError:
937            raise service_error(service_error.req, "Badly formed request")
938
939        connInfo = req.get('connection', [])
940        services = req.get('service', [])
941        auth_attr = req['allocID']['fedid']
942        aid = "%s" % auth_attr
943        attrs = req.get('fedAttr', [])
944        if not self.auth.check_attribute(fid, auth_attr):
945            raise service_error(service_error.access, "Access denied")
946        else:
947            # See if this is a replay of an earlier succeeded StartSegment -
948            # sometimes SSL kills 'em.  If so, replay the response rather than
949            # redoing the allocation.
950            self.state_lock.acquire()
951            retval = self.allocation[aid].get('started', None)
952            self.state_lock.release()
953            if retval:
954                self.log.warning("Duplicate StartSegment for %s: " % aid + \
955                        "replaying response")
956                return retval
957
958
959        if req.has_key('segmentdescription') and \
960                req['segmentdescription'].has_key('topdldescription'):
961            topo = \
962                topdl.Topology(**req['segmentdescription']['topdldescription'])
963        else:
964            raise service_error(service_error.req, 
965                    "Request missing segmentdescription'")
966
967        master = req.get('master', False)
968
969        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
970        try:
971            tmpdir = tempfile.mkdtemp(prefix="access-")
972            softdir = "%s/software" % tmpdir
973        except EnvironmentError:
974            raise service_error(service_error.internal, "Cannot create tmp dir")
975
976        # Try block alllows us to clean up temporary files.
977        try:
978            sw = set()
979            for e in topo.elements:
980                for s in getattr(e, 'software', []):
981                    sw.add(s.location)
982            os.mkdir(softdir)
983            for s in sw:
984                self.log.debug("Retrieving %s" % s)
985                try:
986                    get_url(s, certfile, softdir)
987                except:
988                    t, v, st = sys.exc_info()
989                    raise service_error(service_error.internal,
990                            "Error retrieving %s: %s" % (s, v))
991
992            # Copy local portal node software to the tempdir
993            for s in (self.portal_software, self.federation_software):
994                for l, f in s:
995                    base = os.path.basename(f)
996                    copy_file(f, "%s/%s" % (softdir, base))
997
998            # Ick.  Put this python rpm in a place that it will get moved into
999            # the staging area.  It's a hack to install a modern (in a Roman
1000            # sense of modern) python on ProtoGENI
1001            python_rpm ="python2.4-2.4-1pydotorg.i586.rpm"
1002            if os.access("./%s" % python_rpm, os.R_OK):
1003                copy_file("./%s" % python_rpm, "%s/%s" % (softdir, python_rpm))
1004
1005            for a in attrs:
1006                if a['attribute'] in configs:
1007                    try:
1008                        self.log.debug("Retrieving %s" % a['value'])
1009                        get_url(a['value'], certfile, tmpdir)
1010                    except:
1011                        t, v, st = sys.exc_info()
1012                        raise service_error(service_error.internal,
1013                                "Error retrieving %s: %s" % (s, v))
1014                if a['attribute'] == 'ssh_pubkey':
1015                    pubkey_base = a['value'].rpartition('/')[2]
1016                if a['attribute'] == 'ssh_secretkey':
1017                    secretkey_base = a['value'].rpartition('/')[2]
1018                if a['attribute'] == 'experiment_name':
1019                    ename = a['value']
1020
1021            # If the userconf service was imported, collect the configuration
1022            # data.
1023            for s in services:
1024                if s.get("name", "") == 'userconfig' \
1025                        and s.get('visibility',"") == 'import':
1026
1027                    # Collect ther server and certificate info.
1028                    u = s.get('server', None)
1029                    for a in s.get('fedAttr', []):
1030                        if a.get('attribute',"") == 'cert':
1031                            cert = a.get('value', None)
1032                            break
1033                    else:
1034                        cert = None
1035
1036                    if cert:
1037                        # Make a temporary certificate file for get_url.  The
1038                        # finally clause removes it whether something goes
1039                        # wrong (including an exception from get_url) or not.
1040                        try:
1041                            tfos, tn = tempfile.mkstemp(suffix=".pem")
1042                            tf = os.fdopen(tfos, 'w')
1043                            print >>tf, cert
1044                            tf.close()
1045                            get_url(u, tn, tmpdir, "userconf")
1046                        except EnvironmentError, e:
1047                            raise service_error(service.error.internal, 
1048                                    "Cannot create temp file for " + 
1049                                    "userconfig certificates: %s e")
1050                        except:
1051                            t, v, st = sys.exc_info()
1052                            raise service_error(service_error.internal,
1053                                    "Error retrieving %s: %s" % (u, v))
1054                        finally:
1055                            if tn: os.remove(tn)
1056                    else:
1057                        raise service_error(service_error.req,
1058                                "No certificate for retreiving userconfig")
1059                    break
1060
1061            self.state_lock.acquire()
1062            if self.allocation.has_key(aid):
1063                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1064                self.allocation[aid]['experiment'] = ename
1065                self.allocation[aid]['log'] = [ ]
1066                # Create a logger that logs to the experiment's state object as
1067                # well as to the main log file.
1068                alloc_log = logging.getLogger('fedd.access.%s' % ename)
1069                h = logging.StreamHandler(
1070                        list_log.list_log(self.allocation[aid]['log']))
1071                # XXX: there should be a global one of these rather than
1072                # repeating the code.
1073                h.setFormatter(logging.Formatter(
1074                    "%(asctime)s %(name)s %(message)s",
1075                            '%d %b %y %H:%M:%S'))
1076                alloc_log.addHandler(h)
1077                self.write_state()
1078            else:
1079                self.log.error("No allocation for %s!?" % aid)
1080            self.state_lock.release()
1081
1082            # XXX: we really need to put the import and connection info
1083            # generation off longer.
1084            self.import_store_info(certfile, connInfo)
1085            #self.generate_portal_configs(topo, pubkey_base,
1086                    #secretkey_base, tmpdir, master, ename, connInfo,
1087                    #services)
1088            rspec = self.generate_rspec(topo, "%s/%s/" \
1089                    % (self.staging_dir, ename), master, connInfo)
1090
1091            starter = self.start_segment(keyfile=ssh_key,
1092                    debug=self.create_debug, log=alloc_log,
1093                    ch_url = self.ch_url, sa_url=self.sa_url,
1094                    cm_url=self.cm_url)
1095            rv = starter(self, aid, user, rspec, pubkey_base, secretkey_base,
1096                    master, ename,
1097                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1098                    certfile, topo, connInfo, services)
1099            # Copy the assigned names into the return topology
1100            rvtopo = topo.clone()
1101            for e in [ e for e in rvtopo.elements \
1102                   if isinstance(e, topdl.Computer)]:
1103                for n in e.name:
1104                    if n in starter.node:
1105                        e.set_attribute('hostname', starter.node[n])
1106        except service_error, e:
1107            err = e
1108        except e:
1109            err = service_error(service_error.internal, str(e))
1110
1111        # Walk up tmpdir, deleting as we go
1112        if self.cleanup:
1113            self.log.debug("[StartSegment]: removing %s" % tmpdir)
1114            for path, dirs, files in os.walk(tmpdir, topdown=False):
1115                for f in files:
1116                    os.remove(os.path.join(path, f))
1117                for d in dirs:
1118                    os.rmdir(os.path.join(path, d))
1119            os.rmdir(tmpdir)
1120        else:
1121            self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1122
1123        if rv:
1124            # Grab the log (this is some anal locking, but better safe than
1125            # sorry)
1126            self.state_lock.acquire()
1127            logv = "".join(self.allocation[aid]['log'])
1128            # It's possible that the StartSegment call gets retried (!).
1129            # if the 'started' key is in the allocation, we'll return it rather
1130            # than redo the setup.
1131            self.allocation[aid]['started'] = { 
1132                    'allocID': req['allocID'],
1133                    'allocationLog': logv,
1134                    'segmentdescription': { 
1135                        'topdldescription': rvtopo.to_dict() }
1136                    }
1137            self.write_state()
1138            self.state_lock.release()
1139
1140            return retval
1141        elif err:
1142            raise service_error(service_error.federant,
1143                    "Swapin failed: %s" % err)
1144        else:
1145            raise service_error(service_error.federant, "Swapin failed")
1146
1147    def TerminateSegment(self, req, fid):
1148        try:
1149            req = req['TerminateSegmentRequestBody']
1150        except KeyError:
1151            raise service_error(service_error.req, "Badly formed request")
1152
1153        auth_attr = req['allocID']['fedid']
1154        aid = "%s" % auth_attr
1155        attrs = req.get('fedAttr', [])
1156        if not self.auth.check_attribute(fid, auth_attr):
1157            raise service_error(service_error.access, "Access denied")
1158
1159        self.state_lock.acquire()
1160        if self.allocation.has_key(aid):
1161            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1162            slice_cred = self.allocation[aid].get('slice_credential', None)
1163            ename = self.allocation[aid].get('experiment', None)
1164        else:
1165            cf, user, ssh_key, cpw = (None, None, None, None)
1166            slice_cred = None
1167            ename = None
1168        self.state_lock.release()
1169
1170        if ename:
1171            staging = "%s/%s" % ( self.staging_dir, ename)
1172        else:
1173            self.log.warn("Can't find experiment name for %s" % aid)
1174            staging = None
1175
1176        stopper = self.stop_segment(keyfile=ssh_key, debug=self.create_debug,
1177                ch_url = self.ch_url, sa_url=self.sa_url, cm_url=self.cm_url)
1178        stopper(self, user, staging, slice_cred, cf, cpw)
1179        return { 'allocID': req['allocID'] }
1180
1181    def RenewSlices(self):
1182        self.log.info("Scanning for slices to renew")
1183        self.state_lock.acquire()
1184        aids = self.allocation.keys()
1185        self.state_lock.release()
1186
1187        for aid in aids:
1188            self.state_lock.acquire()
1189            if self.allocation.has_key(aid):
1190                name = self.allocation[aid].get('slice_name', None)
1191                scred = self.allocation[aid].get('slice_credential', None)
1192                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1193            else:
1194                name = None
1195                scred = None
1196            self.state_lock.release()
1197
1198            # There's a ProtoGENI slice associated with the segment; renew it.
1199            if name and scred:
1200                renewer = self.renew_segment(log=self.log, 
1201                        debug=self.create_debug, keyfile=ssh_key,
1202                        cm_url = self.cm_url, sa_url = self.sa_url,
1203                        ch_url = self.ch_url)
1204                new_scred = renewer(name, scred, self.renewal_interval, cf, cpw)
1205                if new_scred:
1206                    self.log.info("Slice %s renewed until %s GMT" % \
1207                            (name, time.asctime(time.gmtime(\
1208                                time.time()+self.renewal_interval))))
1209                    self.state_lock.acquire()
1210                    if self.allocation.has_key(aid):
1211                        self.allocation[aid]['slice_credential'] = new_scred
1212                    self.state_lock.release()
1213                else:
1214                    self.log.info("Failed to renew slice %s " % name)
1215
1216        # Let's do this all again soon.  (4 tries before the slices time out)   
1217        t = Timer(self.renewal_interval/4, self.RenewSlices)
1218        t.start()
Note: See TracBrowser for help on using the repository browser.