source: fedd/federation/protogeni_access.py @ 5bf359d

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 5bf359d was a65a65a, checked in by Ted Faber <faber@…>, 15 years ago

get rid of attrs as well (untested)

  • Property mode set to 100644
File size: 41.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12
13from threading import *
14from M2Crypto.SSL import SSLError
15
16from util import *
17from access_project import access_project
18from fedid import fedid, generate_fedid
19from authorizer import authorizer
20from service_error import service_error
21from remote_service import xmlrpc_handler, soap_handler, service_caller
22
23import httplib
24import tempfile
25from urlparse import urlparse
26
27import topdl
28import list_log
29import proxy_protogeni_segment
30
31
32# Make log messages disappear if noone configures a fedd logger
33class nullHandler(logging.Handler):
34    def emit(self, record): pass
35
36fl = logging.getLogger("fedd.access")
37fl.addHandler(nullHandler())
38
39class access:
40    """
41    The implementation of access control based on mapping users to projects.
42
43    Users can be mapped to existing projects or have projects created
44    dynamically.  This implements both direct requests and proxies.
45    """
46
47    class parse_error(RuntimeError): pass
48
49
50    proxy_RequestAccess= service_caller('RequestAccess')
51    proxy_ReleaseAccess= service_caller('ReleaseAccess')
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        def software_list(v):
59            l = [ ]
60            if v:
61                ps = v.split(" ")
62                while len(ps):
63                    loc, file = ps[0:2]
64                    del ps[0:2]
65                    if loc == 'rpm':
66                        loc = None
67                    l.append((loc, file))
68            return l
69
70        # Make sure that the configuration is in place
71        if not config: 
72            raise RunTimeError("No config to fedd.access")
73
74        self.project_priority = config.getboolean("access", "project_priority")
75        self.allow_proxy = config.getboolean("access", "allow_proxy")
76
77        self.domain = config.get("access", "domain")
78        self.certdir = config.get("access","certdir")
79        self.userconfdir = config.get("access","userconfdir")
80        self.userconfcmd = config.get("access","userconfcmd")
81        self.userconfurl = config.get("access","userconfurl")
82        self.federation_software = config.get("access", "federation_software")
83        self.portal_software = config.get("access", "portal_software")
84        self.ssh_port = config.get("access","ssh_port") or "22"
85        self.sshd = config.get("access","sshd")
86        self.sshd_config = config.get("access", "sshd_config")
87        self.create_debug = config.getboolean("access", "create_debug")
88        self.cleanup = not config.getboolean("access", "leave_tmpfiles")
89        self.access_type = config.get("access", "type")
90        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
91        self.staging_host = config.get("access", "staging_host") \
92                or "ops.emulab.net"
93        self.local_seer_software = config.get("access", "local_seer_software")
94        self.local_seer_image = config.get("access", "local_seer_image")
95        self.local_seer_start = config.get("access", "local_seer_start")
96   
97        self.dragon_endpoint = config.get("access", "dragon")
98        self.dragon_vlans = config.get("access", "dragon_vlans")
99        self.deter_internal = config.get("access", "deter_internal")
100
101        self.tunnel_config = config.getboolean("access", "tunnel_config")
102        self.portal_command = config.get("access", "portal_command")
103        self.portal_image = config.get("access", "portal_image")
104        self.portal_type = config.get("access", "portal_type") or "pc"
105        self.portal_startcommand = config.get("access", "portal_startcommand")
106        self.node_startcommand = config.get("access", "node_startcommand")
107
108        self.federation_software = software_list(self.federation_software)
109        self.portal_software = software_list(self.portal_software)
110        self.local_seer_software = software_list(self.local_seer_software)
111
112        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
113        self.renewal_interval = int(self.renewal_interval) * 60
114
115        self.ch_url = config.get("access", "ch_url")
116        self.sa_url = config.get("access", "sa_url")
117        self.cm_url = config.get("access", "cm_url")
118
119        self.access = { }
120        self.restricted = [ ]
121        self.projects = { }
122        self.keys = { }
123        self.types = { }
124        self.allocation = { }
125        self.state = { 
126            'projects': self.projects,
127            'allocation' : self.allocation,
128            'keys' : self.keys,
129            'types': self.types
130        }
131        self.log = logging.getLogger("fedd.access")
132        set_log_level(config, "access", self.log)
133        self.state_lock = Lock()
134        # XXX: Configurable
135        self.exports = set(('SMB', 'seer', 'tmcd', 'userconfig'))
136        self.imports = set(('SMB', 'seer', 'userconfig'))
137
138        if auth: self.auth = auth
139        else:
140            self.log.error(\
141                    "[access]: No authorizer initialized, creating local one.")
142            auth = authorizer()
143
144        tb = config.get('access', 'testbed')
145        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
146        else: self.testbed = [ ]
147
148        if config.has_option("access", "accessdb"):
149            self.read_access(config.get("access", "accessdb"))
150
151        self.state_filename = config.get("access", "access_state")
152        print "Calling read_state %s" % self.state_filename
153        self.read_state()
154
155        # Keep cert_file and cert_pwd coming from the same place
156        self.cert_file = config.get("access", "cert_file")
157        if self.cert_file:
158            self.sert_pwd = config.get("access", "cert_pw")
159        else:
160            self.cert_file = config.get("globals", "cert_file")
161            self.sert_pwd = config.get("globals", "cert_pw")
162
163        self.trusted_certs = config.get("access", "trusted_certs") or \
164                config.get("globals", "trusted_certs")
165
166        self.start_segment = proxy_protogeni_segment.start_segment
167        self.stop_segment = proxy_protogeni_segment.stop_segment
168        self.renew_segment = proxy_protogeni_segment.renew_segment
169
170        self.call_SetValue = service_caller('SetValue')
171        self.call_GetValue = service_caller('GetValue')
172
173        self.RenewSlices()
174
175        self.soap_services = {\
176            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
177            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
178            'StartSegment': soap_handler("StartSegment", self.StartSegment),
179            'TerminateSegment': soap_handler("TerminateSegment", 
180                self.TerminateSegment),
181            }
182        self.xmlrpc_services =  {\
183            'RequestAccess': xmlrpc_handler('RequestAccess',
184                self.RequestAccess),
185            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
186                self.ReleaseAccess),
187            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
188            'TerminateSegment': xmlrpc_handler('TerminateSegment',
189                self.TerminateSegment),
190            }
191
192    def read_access(self, config):
193        """
194        Read a configuration file and set internal parameters.
195
196        There are access lines of the
197        form (tb, proj, user) -> user that map the first tuple of
198        names to the user for for access purposes.  Names in the key (left side)
199        can include "<NONE> or <ANY>" to act as wildcards or to require the
200        fields to be empty.  Similarly aproj or auser can be <SAME> or
201        <DYNAMIC> indicating that either the matching key is to be used or a
202        dynamic user or project will be created.  These names can also be
203        federated IDs (fedid's) if prefixed with fedid:.  The user is the
204        ProtoGENI identity certificate.
205        Testbed attributes outside the forms above can be given using the
206        format attribute: name value: value.  The name is a single word and the
207        value continues to the end of the line.  Empty lines and lines startin
208        with a # are ignored.
209
210        Parsing errors result in a self.parse_error exception being raised.
211        """
212        lineno=0
213        name_expr = "["+string.ascii_letters + string.digits + "\/\.\-_]+"
214        fedid_expr = "fedid:[" + string.hexdigits + "]+"
215        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
216
217        access_str = '\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+ \
218                key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*,\s*('\
219                        + name_expr + ')\s*,\s*('+name_expr+')\s*,?\s*(' + \
220                        name_expr+ ')?\)'
221        access_re = re.compile(access_str, re.IGNORECASE)
222
223
224        def parse_name(n):
225            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
226            else: return n
227       
228        def auth_name(n):
229            if isinstance(n, basestring):
230                if n =='<any>' or n =='<none>': return None
231                else: return unicode(n)
232            else:
233                return n
234
235        f = open(config, "r");
236        for line in f:
237            lineno += 1
238            line = line.strip();
239            if len(line) == 0 or line.startswith('#'):
240                continue
241
242            # Access line (t, p, u) -> (a, pw) line
243            m = access_re.match(line)
244            if m != None:
245                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
246                auth_key = tuple([ auth_name(x) for x in access_key])
247                cert = auth_name(parse_name(m.group(4)))
248                user_name = auth_name(parse_name(m.group(5)))
249                ssh_key = unicode(m.group(6))
250                if m.group(6): pw = unicode(m.group(7))
251                else: pw = None
252
253                self.access[access_key] = (cert, user_name, ssh_key, pw)
254                self.auth.set_attribute(auth_key, "access")
255                continue
256
257            # Nothing matched to here: unknown line - raise exception
258            f.close()
259            raise self.parse_error("Unknown statement at line %d of %s" % \
260                    (lineno, config))
261        f.close()
262
263    def write_state(self):
264        if self.state_filename:
265            try:
266                f = open(self.state_filename, 'w')
267                pickle.dump(self.state, f)
268            except EnvironmentError, e:
269                self.log.error("Can't write file %s: %s" % \
270                        (self.state_filename, e))
271            except pickle.PicklingError, e:
272                self.log.error("Pickling problem: %s" % e)
273            except TypeError, e:
274                self.log.error("Pickling problem (TypeError): %s" % e)
275
276
277    def read_state(self):
278        """
279        Read a new copy of access state.  Old state is overwritten.
280
281        State format is a simple pickling of the state dictionary.
282        """
283        if self.state_filename:
284            try:
285                f = open(self.state_filename, "r")
286                self.state = pickle.load(f)
287                self.log.debug("[read_state]: Read state from %s" % \
288                        self.state_filename)
289            except EnvironmentError, e:
290                self.log.warning(("[read_state]: No saved state: " +\
291                        "Can't open %s: %s") % (self.state_filename, e))
292            except EOFError, e:
293                self.log.warning(("[read_state]: " +\
294                        "Empty or damaged state file: %s:") % \
295                        self.state_filename)
296            except pickle.UnpicklingError, e:
297                self.log.warning(("[read_state]: No saved state: " + \
298                        "Unpickling failed: %s") % e)
299
300            # Add the ownership attributes to the authorizer.  Note that the
301            # indices of the allocation dict are strings, but the attributes are
302            # fedids, so there is a conversion.
303            for k in self.state.get('allocation', {}).keys():
304                for o in self.state['allocation'][k].get('owners', []):
305                    self.auth.set_attribute(o, fedid(hexstr=k))
306                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
307
308            if self.allocation != self.state['allocation']:
309                self.allocation = self.state['allocation']
310
311    def permute_wildcards(self, a, p):
312        """Return a copy of a with various fields wildcarded.
313
314        The bits of p control the wildcards.  A set bit is a wildcard
315        replacement with the lowest bit being user then project then testbed.
316        """
317        if p & 1: user = ["<any>"]
318        else: user = a[2]
319        if p & 2: proj = "<any>"
320        else: proj = a[1]
321        if p & 4: tb = "<any>"
322        else: tb = a[0]
323
324        return (tb, proj, user)
325
326
327    def find_access(self, search):
328        """
329        Search the access DB for a match on this tuple.  Return the matching
330        user (protoGENI cert).
331       
332        NB, if the initial tuple fails to match we start inserting wildcards in
333        an order determined by self.project_priority.  Try the list of users in
334        order (when wildcarded, there's only one user in the list).
335        """
336        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
337        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
338
339        for p in perm: 
340            s = self.permute_wildcards(search, p)
341            # s[2] is None on an anonymous, unwildcarded request
342            if s[2] != None:
343                for u in s[2]:
344                    if self.access.has_key((s[0], s[1], u)):
345                        return self.access[(s[0], s[1], u)]
346            else:
347                if self.access.has_key(s):
348                    return self.access[s]
349        return None
350
351    def lookup_access(self, req, fid):
352        """
353        Determine the allowed access for this request.  Return the access and
354        which fields are dynamic.
355
356        The fedid is needed to construct the request
357        """
358        # Search keys
359        tb = None
360        project = None
361        user = None
362        # Return values
363        rp = access_project(None, ())
364        ru = None
365        user_re = re.compile("user:\s(.*)")
366        project_re = re.compile("project:\s(.*)")
367
368        user = [ user_re.findall(x)[0] for x in req.get('credential', []) \
369                if user_re.match(x)]
370        project = [ project_re.findall(x)[0] \
371                for x in req.get('credential', []) \
372                    if project_re.match(x)]
373
374        if len(project) == 1: project = project[0]
375        elif len(project) == 0: project = None
376        else: 
377            raise service_error(service_error.req, 
378                    "More than one project credential")
379
380
381        user_fedids = [ u for u in user if isinstance(u, fedid)]
382
383        # Determine how the caller is representing itself.  If its fedid shows
384        # up as a project or a singleton user, let that stand.  If neither the
385        # usernames nor the project name is a fedid, the caller is a testbed.
386        if project and isinstance(project, fedid):
387            if project == fid:
388                # The caller is the project (which is already in the tuple
389                # passed in to the authorizer)
390                owners = user_fedids
391                owners.append(project)
392            else:
393                raise service_error(service_error.req,
394                        "Project asserting different fedid")
395        else:
396            if fid not in user_fedids:
397                tb = fid
398                owners = user_fedids
399                owners.append(fid)
400            else:
401                if len(fedids) > 1:
402                    raise service_error(service_error.req,
403                            "User asserting different fedid")
404                else:
405                    # Which is a singleton
406                    owners = user_fedids
407        # Confirm authorization
408
409        for u in user:
410            self.log.debug("[lookup_access] Checking access for %s" % \
411                    ((tb, project, u),))
412            if self.auth.check_attribute((tb, project, u), 'access'):
413                self.log.debug("[lookup_access] Access granted")
414                break
415            else:
416                self.log.debug("[lookup_access] Access Denied")
417        else:
418            raise service_error(service_error.access, "Access denied")
419
420        # This maps a valid user to the ProtoGENI credentials to use
421        found = self.find_access((tb, project, user))
422       
423        if found == None:
424            raise service_error(service_error.access,
425                    "Access denied - cannot map access")
426        return found, owners
427
428    def get_handler(self, path, fid):
429        self.log.info("Get handler %s %s" % (path, fid))
430        if self.auth.check_attribute(fid, path) and self.userconfdir:
431            return ("%s/%s" % (self.userconfdir, path), "application/binary")
432        else:
433            return (None, None)
434
435    def export_userconf(self, project):
436        dev_null = None
437        confid, confcert = generate_fedid("test", dir=self.userconfdir, 
438                log=self.log)
439        conffilename = "%s/%s" % (self.userconfdir, str(confid))
440        cf = None
441        try:
442            cf = open(conffilename, "w")
443            os.chmod(conffilename, stat.S_IRUSR | stat.S_IWUSR)
444        except EnvironmentError, e:
445            raise service_error(service_error.internal, 
446                    "Cannot create user configuration data")
447
448        try:
449            dev_null = open("/dev/null", "a")
450        except EnvironmentError, e:
451            self.log.error("export_userconf: can't open /dev/null: %s" % e)
452
453        cmd = "%s %s" % (self.userconfcmd, project)
454        conf = subprocess.call(cmd.split(" "),
455                stdout=cf, stderr=dev_null, close_fds=True)
456
457        self.auth.set_attribute(confid, "/%s" % str(confid))
458
459        return confid, confcert
460
461
462    def export_SMB(self, id, state, project, user):
463        return { 
464                'id': id,
465                'name': 'SMB',
466                'visibility': 'export',
467                'server': 'http://fs:139',
468                'fedAttr': [
469                        { 'attribute': 'SMBSHARE', 'value': 'USERS' },
470                        { 'attribute': 'SMBUSER', 'value': user },
471                        { 'attribute': 'SMBPROJ', 'value': project },
472                    ]
473                }
474
475    def export_seer(self, id, state, project, user):
476        return { 
477                'id': id,
478                'name': 'seer',
479                'visibility': 'export',
480                'server': 'http://control:16606',
481                }
482
483    def export_tmcd(self, id, state, project, user):
484        return { 
485                'id': id,
486                'name': 'seer',
487                'visibility': 'export',
488                'server': 'http://boss:7777',
489                }
490
491    def export_userconfig(self, id, state, project, user):
492        if self.userconfdir and self.userconfcmd \
493                and self.userconfurl:
494            cid, cert = self.export_userconf(project)
495            state['userconfig'] = unicode(cid)
496            return {
497                    'id': id,
498                    'name': 'userconfig',
499                    'visibility': 'export',
500                    'server': "%s/%s" % (self.userconfurl, str(cid)),
501                    'fedAttr': [
502                        { 'attribute': 'cert', 'value': cert },
503                    ]
504                    }
505        else:
506            return None
507
508    def export_services(self, sreq, project, user):
509        exp = [ ]
510        state = { }
511        # XXX: Filthy shortcut here using http: so urlparse will give the right
512        # answers.
513        for s in sreq:
514            sname = s.get('name', '')
515            svis = s.get('visibility', '')
516            if svis == 'export':
517                if sname in self.exports:
518                    id = s.get('id', 'no_id')
519                    if sname == 'SMB':
520                        exp.append(self.export_SMB(id, state, project, user))
521                    elif sname == 'seer':
522                        exp.append(self.export_seer(id, state, project, user))
523                    elif sname == 'tmcd':
524                        exp.append(self.export_tmcd(id, state, project, user))
525                    elif sname == 'userconfig':
526                        exp.append(self.export_userconfig(id, state,
527                            project, user))
528                    elif sname == 'project_export':
529                        exp.append(self.export_SMB(id, state, project, user))
530                        exp.append(self.export_seer(id, state, project, user))
531                        exp.append(self.export_userconfig(id, state,
532                            project, user))
533        return (exp, state)
534
535    def build_response(self, alloc_id, ap, services):
536        """
537        Create the SOAP response.
538
539        Build the dictionary description of the response and use
540        fedd_utils.pack_soap to create the soap message.  ap is the allocate
541        project message returned from a remote project allocation (even if that
542        allocation was done locally).
543        """
544        # Because alloc_id is already a fedd_services_types.IDType_Holder,
545        # there's no need to repack it
546        msg = { 
547                'allocID': alloc_id,
548                'fedAttr': [
549                    { 'attribute': 'domain', 'value': self.domain } , 
550                    { 'attribute': 'project', 'value': 
551                        ap['project'].get('name', {}).get('localname', "???") },
552                ]
553            }
554        if self.dragon_endpoint:
555            msg['fedAttr'].append({'attribute': 'dragon',
556                'value': self.dragon_endpoint})
557        if self.deter_internal:
558            print 'adding internal'
559            msg['fedAttr'].append({'attribute': 'deter_internal',
560                'value': self.deter_internal})
561        else: print "internal: %s" % self.deter_internal
562        #XXX: ??
563        if self.dragon_vlans:
564            msg['fedAttr'].append({'attribute': 'vlans',
565                'value': self.dragon_vlans})
566
567        if services:
568            msg['service'] = services
569        return msg
570
571    def RequestAccess(self, req, fid):
572        """
573        Handle the access request.  Proxy if not for us.
574
575        Parse out the fields and make the allocations or rejections if for us,
576        otherwise, assuming we're willing to proxy, proxy the request out.
577        """
578
579        # The dance to get into the request body
580        if req.has_key('RequestAccessRequestBody'):
581            req = req['RequestAccessRequestBody']
582        else:
583            raise service_error(service_error.req, "No request!?")
584
585        if req.has_key('destinationTestbed'):
586            dt = unpack_id(req['destinationTestbed'])
587
588        if dt == None or dt in self.testbed:
589            # Request for this fedd
590            found, owners = self.lookup_access(req, fid)
591            # keep track of what's been added
592            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
593            aid = unicode(allocID)
594
595            self.state_lock.acquire()
596            self.allocation[aid] = { }
597            # The protoGENI certificate
598            self.allocation[aid]['credentials'] = found
599            # The list of owner FIDs
600            self.allocation[aid]['owners'] = owners
601            self.write_state()
602            self.state_lock.release()
603            for o in owners:
604                self.auth.set_attribute(o, allocID)
605            self.auth.set_attribute(allocID, allocID)
606
607            try:
608                f = open("%s/%s.pem" % (self.certdir, aid), "w")
609                print >>f, alloc_cert
610                f.close()
611            except EnvironmentError, e:
612                raise service_error(service_error.internal, 
613                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
614            return { 'allocID': { 'fedid': allocID } }
615        else:
616            if self.allow_proxy:
617                resp = self.proxy_RequestAccess.call_service(dt, req,
618                            self.cert_file, self.cert_pwd,
619                            self.trusted_certs)
620                if resp.has_key('RequestAccessResponseBody'):
621                    return resp['RequestAccessResponseBody']
622                else:
623                    return None
624            else:
625                raise service_error(service_error.access,
626                        "Access proxying denied")
627
628
629    def ReleaseAccess(self, req, fid):
630        # The dance to get into the request body
631        if req.has_key('ReleaseAccessRequestBody'):
632            req = req['ReleaseAccessRequestBody']
633        else:
634            raise service_error(service_error.req, "No request!?")
635
636        if req.has_key('destinationTestbed'):
637            dt = unpack_id(req['destinationTestbed'])
638        else:
639            dt = None
640
641        if dt == None or dt in self.testbed:
642            # Local request
643            try:
644                if req['allocID'].has_key('localname'):
645                    auth_attr = aid = req['allocID']['localname']
646                elif req['allocID'].has_key('fedid'):
647                    aid = unicode(req['allocID']['fedid'])
648                    auth_attr = req['allocID']['fedid']
649                else:
650                    raise service_error(service_error.req,
651                            "Only localnames and fedids are understood")
652            except KeyError:
653                raise service_error(service_error.req, "Badly formed request")
654
655            self.log.debug("[access] deallocation requested for %s", aid)
656            if not self.auth.check_attribute(fid, auth_attr):
657                self.log.debug("[access] deallocation denied for %s", aid)
658                raise service_error(service_error.access, "Access Denied")
659
660            self.state_lock.acquire()
661            if self.allocation.has_key(aid):
662                self.log.debug("Found allocation for %s" %aid)
663                del self.allocation[aid]
664                self.write_state()
665                self.state_lock.release()
666                # And remove the access cert
667                cf = "%s/%s.pem" % (self.certdir, aid)
668                self.log.debug("Removing %s" % cf)
669                os.remove(cf)
670                return { 'allocID': req['allocID'] } 
671            else:
672                self.state_lock.release()
673                raise service_error(service_error.req, "No such allocation")
674
675        else:
676            if self.allow_proxy:
677                resp = self.proxy_ReleaseAccess.call_service(dt, req,
678                            self.cert_file, self.cert_pwd,
679                            self.trusted_certs)
680                if resp.has_key('ReleaseAccessResponseBody'):
681                    return resp['ReleaseAccessResponseBody']
682                else:
683                    return None
684            else:
685                raise service_error(service_error.access,
686                        "Access proxying denied")
687
688    def import_store_info(self, cf, connInfo):
689        """
690        Pull any import parameters in connInfo in.  We translate them either
691        into known member names or fedAddrs.
692        """
693
694        for c in connInfo:
695            for p in [ p for p in c.get('parameter', []) \
696                    if p.get('type', '') == 'input']:
697                name = p.get('name', None)
698                key = p.get('key', None)
699                store = p.get('store', None)
700
701                if name and key and store :
702                    req = { 'name': key, 'wait': True }
703                    r = self.call_GetValue(store, req, cf)
704                    r = r.get('GetValueResponseBody', None)
705                    if r :
706                        if r.get('name', '') == key:
707                            v = r.get('value', None)
708                            if v is not None:
709                                if name == 'peer':
710                                    c['peer'] = v
711                                else:
712                                    if c.has_key('fedAttr'):
713                                        c['fedAttr'].append({
714                                            'attribute': name, 'value': v})
715                                    else:
716                                        c['fedAttr']= [{
717                                            'attribute': name, 'value': v}]
718                            else:
719                                raise service_error(service_error.internal, 
720                                        'None value exported for %s'  % key)
721                        else:
722                            raise service_error(service_error.internal, 
723                                    'Different name returned for %s: %s' \
724                                            % (key, r.get('name','')))
725                    else:
726                        raise service_error(service_error.internal, 
727                            'Badly formatted response: no GetValueResponseBody')
728                else:
729                    raise service_error(service_error.internal, 
730                        'Bad Services missing info for import %s' % c)
731
732    def generate_portal_configs(self, topo, pubkey_base, secretkey_base, 
733            tmpdir, master, leid, connInfo, services):
734
735        def conninfo_to_dict(key, info):
736            """
737            Make a cpoy of the connection information about key, and flatten it
738            into a single dict by parsing out any feddAttrs.
739            """
740
741            rv = None
742            for i in info:
743                if key == i.get('portal', "") or \
744                        key in [e.get('element', "") \
745                        for e in i.get('member', [])]:
746                    rv = i.copy()
747                    break
748
749            else:
750                return rv
751
752            if 'fedAttr' in rv:
753                for a in rv['fedAttr']:
754                    attr = a.get('attribute', "")
755                    val = a.get('value', "")
756                    if attr and attr not in rv:
757                        rv[attr] = val
758                del rv['fedAttr']
759            return rv
760
761        # XXX: un hardcode this
762        def client_null(f, s):
763            print >>f, "Service: %s" % s['name']
764
765        def client_smb(f, s):
766            print >>f, "Service: %s" % s['name']
767            smbshare = None
768            smbuser = None
769            smbproj = None
770            for a in s.get('fedAttr', []):
771                if a.get('attribute', '') == 'SMBSHARE':
772                    smbshare = a.get('value', None)
773                elif a.get('attribute', '') == 'SMBUSER':
774                    smbuser = a.get('value', None)
775                elif a.get('attribute', '') == 'SMBPROJ':
776                    smbproj = a.get('value', None)
777
778            if all((smbshare, smbuser, smbproj)):
779                print >>f, "SMBshare: %s" % smbshare
780                print >>f, "ProjectUser: %s" % smbuser
781                print >>f, "ProjectName: %s" % smbproj
782
783        client_service_out = {
784                'SMB': client_smb,
785                'tmcd': client_null,
786                'seer': client_null,
787                'userconfig': client_null,
788            }
789        # XXX: end un hardcode this
790
791
792        seer_out = False
793        client_out = False
794        for e in [ e for e in topo.elements \
795                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
796            myname = e.name[0]
797            type = e.get_attribute('portal_type')
798
799            info = conninfo_to_dict(myname, connInfo)
800
801            if not info:
802                raise service_error(service_error.req,
803                        "No connectivity info for %s" % myname)
804
805            peer = info.get('peer', "")
806            ldomain = self.domain;
807
808            mexp = info.get('masterexperiment',"")
809            mproj, meid = mexp.split("/", 1)
810            mdomain = info.get('masterdomain',"")
811            muser = info.get('masteruser','root')
812            smbshare = info.get('smbshare', 'USERS')
813            ssh_port = info.get('ssh_port', '22')
814
815            active = info.get('active', 'False')
816
817            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
818            tunnelconfig = self.tunnel_config
819            try:
820                f = open(cfn, "w")
821                if active == 'True':
822                    print >>f, "active: True"
823                    print >>f, "ssh_port: %s" % ssh_port
824                    if type in ('control', 'both'):
825                        for s in [s for s in services \
826                                if s.get('name', "") in self.imports]:
827                            p = urlparse(s.get('server', 'http://localhost'))
828                            print >>f, 'port: remote:%s:%s:%s' % \
829                                    (p.port, p.hostname, p.port)
830
831                if tunnelconfig:
832                    print >>f, "tunnelip: %s" % tunnelconfig
833                # XXX: send this an fedattr
834                #print >>f, "seercontrol: control.%s.%s%s" % \
835                        #(meid.lower(), mproj.lower(), mdomain)
836                print >>f, "peer: %s" % peer.lower()
837                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
838                        pubkey_base
839                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
840                        secretkey_base
841                f.close()
842            except EnvironmentError, e:
843                raise service_error(service_error.internal,
844                        "Can't write protal config %s: %s" % (cfn, e))
845           
846            # XXX: This little seer config file needs to go away.
847            if not seer_out:
848                try:
849                    seerfn = "%s/seer.conf" % tmpdir
850                    f = open(seerfn, "w")
851                    if not master:
852                        print >>f, "ControlNode: control.%s.%s%s" % \
853                            (meid.lower(), mproj.lower(), mdomain)
854                    print >>f, "ExperimentID: %s" % mexp
855                    f.close()
856                except EnvironmentError, e:
857                    raise service_error(service_error.internal, 
858                            "Can't write seer.conf: %s" %e)
859                seer_out = True
860
861            if not client_out and type in ('control', 'both'):
862                try:
863                    f = open("%s/client.conf" % tmpdir, "w")
864                    print >>f, "ControlGateway: %s%s" % \
865                        (myname.lower(), ldomain.lower())
866                    for s in services:
867                        if s.get('name',"") in self.imports and \
868                                s.get('visibility','') == 'import':
869                            client_service_out[s['name']](f, s)
870                    # Does seer need this?
871                    # print >>f, "ExperimentID: %s/%s" % (mproj, meid)
872                    f.close()
873                except EnvironmentError, e:
874                    raise service_error(service_error.internal,
875                            "Cannot write client.conf: %s" %s)
876                client_out = True
877
878
879    def generate_rspec(self, topo, softdir, master, connInfo):
880        t = topo.clone()
881
882        starts = { }
883        # Weed out the things we aren't going to instantiate: Segments, portal
884        # substrates, and portal interfaces.  (The copy in the for loop allows
885        # us to delete from e.elements in side the for loop).  While we're
886        # touching all the elements, we also adjust paths from the original
887        # testbed to local testbed paths and put the federation commands and
888        # startcommands into a dict so we can start them manually later.
889        # ProtoGENI requires setup before the federation commands run, so we
890        # run them by hand after we've seeded configurations.
891        for e in [e for e in t.elements]:
892            if isinstance(e, topdl.Segment):
893                t.elements.remove(e)
894            # Fix software paths
895            for s in getattr(e, 'software', []):
896                s.location = re.sub("^.*/", softdir, s.location)
897            if isinstance(e, topdl.Computer):
898                if e.get_attribute('portal') and self.portal_startcommand:
899                    # Portals never have a user-specified start command
900                    starts[e.name] = self.portal_startcommand
901                elif self.node_startcommand:
902                    if e.get_attribute('startup'):
903                        starts[e.name] = "%s \\$USER '%s'" % \
904                                (self.node_startcommand, 
905                                        e.get_attribute('startup'))
906                        e.remove_attribute('startup')
907                    else:
908                        starts[e.name] = self.node_startcommand
909
910                # Remove portal interfaces
911                e.interface = [i for i in e.interface \
912                        if not i.get_attribute('portal')]
913
914        t.substrates = [ s.clone() for s in t.substrates ]
915        t.incorporate_elements()
916
917        # Customize the ns2 output for local portal commands and images
918        filters = []
919
920        # NB: these are extra commands issued for the node, not the startcmds
921        if self.portal_command:
922            filters.append(topdl.generate_portal_command_filter(
923                self.portal_command))
924
925        # Convert to rspec and return it
926        exp_rspec = topdl.topology_to_rspec(t, filters)
927
928        return exp_rspec
929
930    def StartSegment(self, req, fid):
931
932        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
933
934        err = None  # Any service_error generated after tmpdir is created
935        rv = None   # Return value from segment creation
936
937        try:
938            req = req['StartSegmentRequestBody']
939        except KeyError:
940            raise service_error(service_error.req, "Badly formed request")
941
942        connInfo = req.get('connection', [])
943        services = req.get('service', [])
944        auth_attr = req['allocID']['fedid']
945        aid = "%s" % auth_attr
946        attrs = req.get('fedAttr', [])
947        if not self.auth.check_attribute(fid, auth_attr):
948            raise service_error(service_error.access, "Access denied")
949        else:
950            # See if this is a replay of an earlier succeeded StartSegment -
951            # sometimes SSL kills 'em.  If so, replay the response rather than
952            # redoing the allocation.
953            self.state_lock.acquire()
954            retval = self.allocation[aid].get('started', None)
955            self.state_lock.release()
956            if retval:
957                self.log.warning("Duplicate StartSegment for %s: " % aid + \
958                        "replaying response")
959                return retval
960
961
962        if req.has_key('segmentdescription') and \
963                req['segmentdescription'].has_key('topdldescription'):
964            topo = \
965                topdl.Topology(**req['segmentdescription']['topdldescription'])
966        else:
967            raise service_error(service_error.req, 
968                    "Request missing segmentdescription'")
969
970        master = req.get('master', False)
971
972        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
973        try:
974            tmpdir = tempfile.mkdtemp(prefix="access-")
975            softdir = "%s/software" % tmpdir
976        except EnvironmentError:
977            raise service_error(service_error.internal, "Cannot create tmp dir")
978
979        # Try block alllows us to clean up temporary files.
980        try:
981            sw = set()
982            for e in topo.elements:
983                for s in getattr(e, 'software', []):
984                    sw.add(s.location)
985            os.mkdir(softdir)
986            for s in sw:
987                self.log.debug("Retrieving %s" % s)
988                try:
989                    get_url(s, certfile, softdir)
990                except:
991                    t, v, st = sys.exc_info()
992                    raise service_error(service_error.internal,
993                            "Error retrieving %s: %s" % (s, v))
994
995            # Copy local portal node software to the tempdir
996            for s in (self.portal_software, self.federation_software):
997                for l, f in s:
998                    base = os.path.basename(f)
999                    copy_file(f, "%s/%s" % (softdir, base))
1000
1001            # Ick.  Put this python rpm in a place that it will get moved into
1002            # the staging area.  It's a hack to install a modern (in a Roman
1003            # sense of modern) python on ProtoGENI
1004            python_rpm ="python2.4-2.4-1pydotorg.i586.rpm"
1005            if os.access("./%s" % python_rpm, os.R_OK):
1006                copy_file("./%s" % python_rpm, "%s/%s" % (softdir, python_rpm))
1007
1008            for a in attrs:
1009                if a['attribute'] in configs:
1010                    try:
1011                        self.log.debug("Retrieving %s" % a['value'])
1012                        get_url(a['value'], certfile, tmpdir)
1013                    except:
1014                        t, v, st = sys.exc_info()
1015                        raise service_error(service_error.internal,
1016                                "Error retrieving %s: %s" % (s, v))
1017                if a['attribute'] == 'ssh_pubkey':
1018                    pubkey_base = a['value'].rpartition('/')[2]
1019                if a['attribute'] == 'ssh_secretkey':
1020                    secretkey_base = a['value'].rpartition('/')[2]
1021                if a['attribute'] == 'experiment_name':
1022                    ename = a['value']
1023
1024            # If the userconf service was imported, collect the configuration
1025            # data.
1026            for s in services:
1027                if s.get("name", "") == 'userconfig' \
1028                        and s.get('visibility',"") == 'import':
1029
1030                    # Collect ther server and certificate info.
1031                    u = s.get('server', None)
1032                    for a in s.get('fedAttr', []):
1033                        if a.get('attribute',"") == 'cert':
1034                            cert = a.get('value', None)
1035                            break
1036                    else:
1037                        cert = None
1038
1039                    if cert:
1040                        # Make a temporary certificate file for get_url.  The
1041                        # finally clause removes it whether something goes
1042                        # wrong (including an exception from get_url) or not.
1043                        try:
1044                            tfos, tn = tempfile.mkstemp(suffix=".pem")
1045                            tf = os.fdopen(tfos, 'w')
1046                            print >>tf, cert
1047                            tf.close()
1048                            get_url(u, tn, tmpdir, "userconf")
1049                        except EnvironmentError, e:
1050                            raise service_error(service.error.internal, 
1051                                    "Cannot create temp file for " + 
1052                                    "userconfig certificates: %s e")
1053                        except:
1054                            t, v, st = sys.exc_info()
1055                            raise service_error(service_error.internal,
1056                                    "Error retrieving %s: %s" % (u, v))
1057                        finally:
1058                            if tn: os.remove(tn)
1059                    else:
1060                        raise service_error(service_error.req,
1061                                "No certificate for retreiving userconfig")
1062                    break
1063
1064            self.state_lock.acquire()
1065            if self.allocation.has_key(aid):
1066                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1067                self.allocation[aid]['experiment'] = ename
1068                self.allocation[aid]['log'] = [ ]
1069                # Create a logger that logs to the experiment's state object as
1070                # well as to the main log file.
1071                alloc_log = logging.getLogger('fedd.access.%s' % ename)
1072                h = logging.StreamHandler(
1073                        list_log.list_log(self.allocation[aid]['log']))
1074                # XXX: there should be a global one of these rather than
1075                # repeating the code.
1076                h.setFormatter(logging.Formatter(
1077                    "%(asctime)s %(name)s %(message)s",
1078                            '%d %b %y %H:%M:%S'))
1079                alloc_log.addHandler(h)
1080                self.write_state()
1081            else:
1082                self.log.error("No allocation for %s!?" % aid)
1083            self.state_lock.release()
1084
1085            # XXX: we really need to put the import and connection info
1086            # generation off longer.
1087            self.import_store_info(certfile, connInfo)
1088            #self.generate_portal_configs(topo, pubkey_base,
1089                    #secretkey_base, tmpdir, master, ename, connInfo,
1090                    #services)
1091            rspec = self.generate_rspec(topo, "%s/%s/" \
1092                    % (self.staging_dir, ename), master, connInfo)
1093
1094            starter = self.start_segment(keyfile=ssh_key,
1095                    debug=self.create_debug, log=alloc_log,
1096                    ch_url = self.ch_url, sa_url=self.sa_url,
1097                    cm_url=self.cm_url)
1098            rv = starter(self, aid, user, rspec, pubkey_base, secretkey_base,
1099                    master, ename,
1100                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1101                    certfile, topo, connInfo, services)
1102            # Copy the assigned names into the return topology
1103            rvtopo = topo.clone()
1104            for e in rvtopo.elements:
1105                if isinstance(e, topdl.Computer) and e.get_attribute('testbed'):
1106                    myname = e.get_attribute('testbed')
1107                    break
1108            else: myname = None
1109
1110            embedding = [ ]
1111            for n in starter.node:
1112                embedding.append({ 
1113                    'toponame': n,
1114                    'physname': ["%s%s" %  (starter.node[n], self.domain)],
1115                    })
1116
1117        except service_error, e:
1118            err = e
1119        except e:
1120            err = service_error(service_error.internal, str(e))
1121
1122        # Walk up tmpdir, deleting as we go
1123        if self.cleanup:
1124            self.log.debug("[StartSegment]: removing %s" % tmpdir)
1125            for path, dirs, files in os.walk(tmpdir, topdown=False):
1126                for f in files:
1127                    os.remove(os.path.join(path, f))
1128                for d in dirs:
1129                    os.rmdir(os.path.join(path, d))
1130            os.rmdir(tmpdir)
1131        else:
1132            self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1133
1134        if rv:
1135            # Grab the log (this is some anal locking, but better safe than
1136            # sorry)
1137            self.state_lock.acquire()
1138            logv = "".join(self.allocation[aid]['log'])
1139            # It's possible that the StartSegment call gets retried (!).
1140            # if the 'started' key is in the allocation, we'll return it rather
1141            # than redo the setup.
1142            self.allocation[aid]['started'] = { 
1143                    'allocID': req['allocID'],
1144                    'allocationLog': logv,
1145                    'segmentdescription': { 
1146                        'topdldescription': rvtopo.to_dict() },
1147                    'embedding': embedding,
1148                    }
1149            self.write_state()
1150            self.state_lock.release()
1151
1152            return retval
1153        elif err:
1154            raise service_error(service_error.federant,
1155                    "Swapin failed: %s" % err)
1156        else:
1157            raise service_error(service_error.federant, "Swapin failed")
1158
1159    def TerminateSegment(self, req, fid):
1160        try:
1161            req = req['TerminateSegmentRequestBody']
1162        except KeyError:
1163            raise service_error(service_error.req, "Badly formed request")
1164
1165        auth_attr = req['allocID']['fedid']
1166        aid = "%s" % auth_attr
1167        attrs = req.get('fedAttr', [])
1168        if not self.auth.check_attribute(fid, auth_attr):
1169            raise service_error(service_error.access, "Access denied")
1170
1171        self.state_lock.acquire()
1172        if self.allocation.has_key(aid):
1173            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1174            slice_cred = self.allocation[aid].get('slice_credential', None)
1175            ename = self.allocation[aid].get('experiment', None)
1176        else:
1177            cf, user, ssh_key, cpw = (None, None, None, None)
1178            slice_cred = None
1179            ename = None
1180        self.state_lock.release()
1181
1182        if ename:
1183            staging = "%s/%s" % ( self.staging_dir, ename)
1184        else:
1185            self.log.warn("Can't find experiment name for %s" % aid)
1186            staging = None
1187
1188        stopper = self.stop_segment(keyfile=ssh_key, debug=self.create_debug,
1189                ch_url = self.ch_url, sa_url=self.sa_url, cm_url=self.cm_url)
1190        stopper(self, user, staging, slice_cred, cf, cpw)
1191        return { 'allocID': req['allocID'] }
1192
1193    def RenewSlices(self):
1194        self.log.info("Scanning for slices to renew")
1195        self.state_lock.acquire()
1196        aids = self.allocation.keys()
1197        self.state_lock.release()
1198
1199        for aid in aids:
1200            self.state_lock.acquire()
1201            if self.allocation.has_key(aid):
1202                name = self.allocation[aid].get('slice_name', None)
1203                scred = self.allocation[aid].get('slice_credential', None)
1204                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1205            else:
1206                name = None
1207                scred = None
1208            self.state_lock.release()
1209
1210            # There's a ProtoGENI slice associated with the segment; renew it.
1211            if name and scred:
1212                renewer = self.renew_segment(log=self.log, 
1213                        debug=self.create_debug, keyfile=ssh_key,
1214                        cm_url = self.cm_url, sa_url = self.sa_url,
1215                        ch_url = self.ch_url)
1216                new_scred = renewer(name, scred, self.renewal_interval, cf, cpw)
1217                if new_scred:
1218                    self.log.info("Slice %s renewed until %s GMT" % \
1219                            (name, time.asctime(time.gmtime(\
1220                                time.time()+self.renewal_interval))))
1221                    self.state_lock.acquire()
1222                    if self.allocation.has_key(aid):
1223                        self.allocation[aid]['slice_credential'] = new_scred
1224                    self.state_lock.release()
1225                else:
1226                    self.log.info("Failed to renew slice %s " % name)
1227
1228        # Let's do this all again soon.  (4 tries before the slices time out)   
1229        t = Timer(self.renewal_interval/4, self.RenewSlices)
1230        t.start()
Note: See TracBrowser for help on using the repository browser.