source: fedd/federation/protogeni_access.py @ 062b991

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 062b991 was dd3e38b, checked in by Ted Faber <faber@…>, 15 years ago

Add slice/sliver renewal

  • Property mode set to 100644
File size: 38.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12
13from threading import *
14from M2Crypto.SSL import SSLError
15
16from util import *
17from access_project import access_project
18from fedid import fedid, generate_fedid
19from authorizer import authorizer
20from service_error import service_error
21from remote_service import xmlrpc_handler, soap_handler, service_caller
22
23import httplib
24import tempfile
25from urlparse import urlparse
26
27import topdl
28import list_log
29import proxy_protogeni_segment
30
31
32# Make log messages disappear if noone configures a fedd logger
33class nullHandler(logging.Handler):
34    def emit(self, record): pass
35
36fl = logging.getLogger("fedd.access")
37fl.addHandler(nullHandler())
38
39class access:
40    """
41    The implementation of access control based on mapping users to projects.
42
43    Users can be mapped to existing projects or have projects created
44    dynamically.  This implements both direct requests and proxies.
45    """
46
47    class parse_error(RuntimeError): pass
48
49
50    proxy_RequestAccess= service_caller('RequestAccess')
51    proxy_ReleaseAccess= service_caller('ReleaseAccess')
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        # Make sure that the configuration is in place
59        if not config: 
60            raise RunTimeError("No config to fedd.access")
61
62        self.project_priority = config.getboolean("access", "project_priority")
63        self.allow_proxy = config.getboolean("access", "allow_proxy")
64
65        self.domain = config.get("access", "domain")
66        self.certdir = config.get("access","certdir")
67        self.userconfdir = config.get("access","userconfdir")
68        self.userconfcmd = config.get("access","userconfcmd")
69        self.userconfurl = config.get("access","userconfurl")
70        self.ssh_port = config.get("access","ssh_port") or "22"
71        self.sshd = config.get("access","sshd")
72        self.sshd_config = config.get("access", "sshd_config")
73        self.create_debug = config.getboolean("access", "create_debug")
74        self.cleanup = not config.getboolean("access", "leave_tmpfiles")
75        self.access_type = config.get("access", "type")
76        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
77        self.staging_host = config.get("access", "staging_host") \
78                or "ops.emulab.net"
79
80        self.renewal_interval = config.get("access", "renewal") or (3 * 60 * 60)
81        self.renewal_interval = int(self.renewal_interval)
82
83        self.ch_url = config.get("access", "ch_url")
84        self.sa_url = config.get("access", "sa_url")
85        self.cm_url = config.get("access", "cm_url")
86
87        self.attrs = { }
88        self.access = { }
89        self.restricted = [ ]
90        self.projects = { }
91        self.keys = { }
92        self.types = { }
93        self.allocation = { }
94        self.state = { 
95            'projects': self.projects,
96            'allocation' : self.allocation,
97            'keys' : self.keys,
98            'types': self.types
99        }
100        self.log = logging.getLogger("fedd.access")
101        set_log_level(config, "access", self.log)
102        self.state_lock = Lock()
103        # XXX: Configurable
104        self.exports = set(('SMB', 'seer', 'tmcd', 'userconfig'))
105        self.imports = set(('SMB', 'seer', 'userconfig'))
106
107        if auth: self.auth = auth
108        else:
109            self.log.error(\
110                    "[access]: No authorizer initialized, creating local one.")
111            auth = authorizer()
112
113        tb = config.get('access', 'testbed')
114        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
115        else: self.testbed = [ ]
116
117        if config.has_option("access", "accessdb"):
118            self.read_access(config.get("access", "accessdb"))
119
120        self.state_filename = config.get("access", "access_state")
121        print "Calling read_state %s" % self.state_filename
122        self.read_state()
123
124        # Keep cert_file and cert_pwd coming from the same place
125        self.cert_file = config.get("access", "cert_file")
126        if self.cert_file:
127            self.sert_pwd = config.get("access", "cert_pw")
128        else:
129            self.cert_file = config.get("globals", "cert_file")
130            self.sert_pwd = config.get("globals", "cert_pw")
131
132        self.trusted_certs = config.get("access", "trusted_certs") or \
133                config.get("globals", "trusted_certs")
134
135        self.start_segment = proxy_protogeni_segment.start_segment
136        self.stop_segment = proxy_protogeni_segment.stop_segment
137        self.renew_segment = proxy_protogeni_segment.renew_segment
138
139        self.call_SetValue = service_caller('SetValue')
140        self.call_GetValue = service_caller('GetValue')
141
142        self.RenewSlices()
143
144        self.soap_services = {\
145            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
146            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
147            'StartSegment': soap_handler("StartSegment", self.StartSegment),
148            'TerminateSegment': soap_handler("TerminateSegment", 
149                self.TerminateSegment),
150            }
151        self.xmlrpc_services =  {\
152            'RequestAccess': xmlrpc_handler('RequestAccess',
153                self.RequestAccess),
154            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
155                self.ReleaseAccess),
156            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
157            'TerminateSegment': xmlrpc_handler('TerminateSegment',
158                self.TerminateSegment),
159            }
160
161    def read_access(self, config):
162        """
163        Read a configuration file and set internal parameters.
164
165        There are access lines of the
166        form (tb, proj, user) -> user that map the first tuple of
167        names to the user for for access purposes.  Names in the key (left side)
168        can include "<NONE> or <ANY>" to act as wildcards or to require the
169        fields to be empty.  Similarly aproj or auser can be <SAME> or
170        <DYNAMIC> indicating that either the matching key is to be used or a
171        dynamic user or project will be created.  These names can also be
172        federated IDs (fedid's) if prefixed with fedid:.  The user is the
173        ProtoGENI identity certificate.
174        Testbed attributes outside the forms above can be given using the
175        format attribute: name value: value.  The name is a single word and the
176        value continues to the end of the line.  Empty lines and lines startin
177        with a # are ignored.
178
179        Parsing errors result in a self.parse_error exception being raised.
180        """
181        lineno=0
182        name_expr = "["+string.ascii_letters + string.digits + "\/\.\-_]+"
183        fedid_expr = "fedid:[" + string.hexdigits + "]+"
184        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
185
186        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
187                re.IGNORECASE)
188        access_str = '\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+ \
189                key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*,\s*('\
190                        + name_expr + ')\s*,\s*('+name_expr+')\s*,?\s*(' + \
191                        name_expr+ ')?\)'
192        access_re = re.compile(access_str, re.IGNORECASE)
193
194
195        def parse_name(n):
196            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
197            else: return n
198       
199        def auth_name(n):
200            if isinstance(n, basestring):
201                if n =='<any>' or n =='<none>': return None
202                else: return unicode(n)
203            else:
204                return n
205
206        f = open(config, "r");
207        for line in f:
208            lineno += 1
209            line = line.strip();
210            if len(line) == 0 or line.startswith('#'):
211                continue
212
213            # Extended (attribute: x value: y) attribute line
214            m = attr_re.match(line)
215            if m != None:
216                attr, val = m.group(1,2)
217                self.attrs[attr] = val
218                continue
219
220            # Access line (t, p, u) -> (a, pw) line
221            m = access_re.match(line)
222            if m != None:
223                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
224                auth_key = tuple([ auth_name(x) for x in access_key])
225                cert = auth_name(parse_name(m.group(4)))
226                user_name = auth_name(parse_name(m.group(5)))
227                ssh_key = unicode(m.group(6))
228                if m.group(6): pw = unicode(m.group(7))
229                else: pw = None
230
231                self.access[access_key] = (cert, user_name, ssh_key, pw)
232                self.auth.set_attribute(auth_key, "access")
233                continue
234
235            # Nothing matched to here: unknown line - raise exception
236            f.close()
237            raise self.parse_error("Unknown statement at line %d of %s" % \
238                    (lineno, config))
239        f.close()
240
241# need this ?
242    def get_users(self, obj):
243        """
244        Return a list of the IDs of the users in dict
245        """
246        if obj.has_key('user'):
247            return [ unpack_id(u['userID']) \
248                    for u in obj['user'] if u.has_key('userID') ]
249        else:
250            return None
251# need this ?
252
253    def write_state(self):
254        if self.state_filename:
255            try:
256                f = open(self.state_filename, 'w')
257                pickle.dump(self.state, f)
258            except IOError, e:
259                self.log.error("Can't write file %s: %s" % \
260                        (self.state_filename, e))
261            except pickle.PicklingError, e:
262                self.log.error("Pickling problem: %s" % e)
263            except TypeError, e:
264                self.log.error("Pickling problem (TypeError): %s" % e)
265
266
267    def read_state(self):
268        """
269        Read a new copy of access state.  Old state is overwritten.
270
271        State format is a simple pickling of the state dictionary.
272        """
273        if self.state_filename:
274            try:
275                f = open(self.state_filename, "r")
276                self.state = pickle.load(f)
277                self.log.debug("[read_state]: Read state from %s" % \
278                        self.state_filename)
279            except IOError, e:
280                self.log.warning(("[read_state]: No saved state: " +\
281                        "Can't open %s: %s") % (self.state_filename, e))
282            except EOFError, e:
283                self.log.warning(("[read_state]: " +\
284                        "Empty or damaged state file: %s:") % \
285                        self.state_filename)
286            except pickle.UnpicklingError, e:
287                self.log.warning(("[read_state]: No saved state: " + \
288                        "Unpickling failed: %s") % e)
289
290            # Add the ownership attributes to the authorizer.  Note that the
291            # indices of the allocation dict are strings, but the attributes are
292            # fedids, so there is a conversion.
293            for k in self.state.get('allocation', {}).keys():
294                for o in self.state['allocation'][k].get('owners', []):
295                    self.auth.set_attribute(o, fedid(hexstr=k))
296                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
297
298            if self.allocation != self.state['allocation']:
299                self.allocation = self.state['allocation']
300
301    def permute_wildcards(self, a, p):
302        """Return a copy of a with various fields wildcarded.
303
304        The bits of p control the wildcards.  A set bit is a wildcard
305        replacement with the lowest bit being user then project then testbed.
306        """
307        if p & 1: user = ["<any>"]
308        else: user = a[2]
309        if p & 2: proj = "<any>"
310        else: proj = a[1]
311        if p & 4: tb = "<any>"
312        else: tb = a[0]
313
314        return (tb, proj, user)
315
316
317    def find_access(self, search):
318        """
319        Search the access DB for a match on this tuple.  Return the matching
320        user (protoGENI cert).
321       
322        NB, if the initial tuple fails to match we start inserting wildcards in
323        an order determined by self.project_priority.  Try the list of users in
324        order (when wildcarded, there's only one user in the list).
325        """
326        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
327        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
328
329        for p in perm: 
330            s = self.permute_wildcards(search, p)
331            # s[2] is None on an anonymous, unwildcarded request
332            if s[2] != None:
333                for u in s[2]:
334                    if self.access.has_key((s[0], s[1], u)):
335                        return self.access[(s[0], s[1], u)]
336            else:
337                if self.access.has_key(s):
338                    return self.access[s]
339        return None
340
341    def lookup_access(self, req, fid):
342        """
343        Determine the allowed access for this request.  Return the access and
344        which fields are dynamic.
345
346        The fedid is needed to construct the request
347        """
348        # Search keys
349        tb = None
350        project = None
351        user = None
352        # Return values
353        rp = access_project(None, ())
354        ru = None
355        user_re = re.compile("user:\s(.*)")
356        project_re = re.compile("project:\s(.*)")
357
358        user = [ user_re.findall(x)[0] for x in req.get('credential', []) \
359                if user_re.match(x)]
360        project = [ project_re.findall(x)[0] \
361                for x in req.get('credential', []) \
362                    if project_re.match(x)]
363
364        if len(project) == 1: project = project[0]
365        elif len(project) == 0: project = None
366        else: 
367            raise service_error(service_error.req, 
368                    "More than one project credential")
369
370
371        user_fedids = [ u for u in user if isinstance(u, fedid)]
372
373        # Determine how the caller is representing itself.  If its fedid shows
374        # up as a project or a singleton user, let that stand.  If neither the
375        # usernames nor the project name is a fedid, the caller is a testbed.
376        if project and isinstance(project, fedid):
377            if project == fid:
378                # The caller is the project (which is already in the tuple
379                # passed in to the authorizer)
380                owners = user_fedids
381                owners.append(project)
382            else:
383                raise service_error(service_error.req,
384                        "Project asserting different fedid")
385        else:
386            if fid not in user_fedids:
387                tb = fid
388                owners = user_fedids
389                owners.append(fid)
390            else:
391                if len(fedids) > 1:
392                    raise service_error(service_error.req,
393                            "User asserting different fedid")
394                else:
395                    # Which is a singleton
396                    owners = user_fedids
397        # Confirm authorization
398
399        for u in user:
400            self.log.debug("[lookup_access] Checking access for %s" % \
401                    ((tb, project, u),))
402            if self.auth.check_attribute((tb, project, u), 'access'):
403                self.log.debug("[lookup_access] Access granted")
404                break
405            else:
406                self.log.debug("[lookup_access] Access Denied")
407        else:
408            raise service_error(service_error.access, "Access denied")
409
410        # This maps a valid user to the ProtoGENI credentials to use
411        found = self.find_access((tb, project, user))
412       
413        if found == None:
414            raise service_error(service_error.access,
415                    "Access denied - cannot map access")
416        return found, owners
417
418    def get_handler(self, path, fid):
419        self.log.info("Get handler %s %s" % (path, fid))
420        if self.auth.check_attribute(fid, path) and self.userconfdir:
421            return ("%s/%s" % (self.userconfdir, path), "application/binary")
422        else:
423            return (None, None)
424
425    def export_userconf(self, project):
426        dev_null = None
427        confid, confcert = generate_fedid("test", dir=self.userconfdir, 
428                log=self.log)
429        conffilename = "%s/%s" % (self.userconfdir, str(confid))
430        cf = None
431        try:
432            cf = open(conffilename, "w")
433            os.chmod(conffilename, stat.S_IRUSR | stat.S_IWUSR)
434        except IOError, e:
435            raise service_error(service_error.internal, 
436                    "Cannot create user configuration data")
437
438        try:
439            dev_null = open("/dev/null", "a")
440        except IOError, e:
441            self.log.error("export_userconf: can't open /dev/null: %s" % e)
442
443        cmd = "%s %s" % (self.userconfcmd, project)
444        conf = subprocess.call(cmd.split(" "),
445                stdout=cf, stderr=dev_null, close_fds=True)
446
447        self.auth.set_attribute(confid, "/%s" % str(confid))
448
449        return confid, confcert
450
451
452    def export_services(self, sreq, project, user):
453        exp = [ ]
454        state = { }
455        # XXX: Filthy shortcut here using http: so urlparse will give the right
456        # answers.
457        for s in sreq:
458            sname = s.get('name', '')
459            svis = s.get('visibility', '')
460            if svis == 'export':
461                if sname in self.exports:
462                    outs = s.copy()
463                    if sname == 'SMB':
464                        outs = s.copy()
465                        outs['server'] = "http://fs:139" 
466                        outs['fedAttr'] = [
467                                { 'attribute': 'SMBSHARE', 'value': 'USERS' },
468                                { 'attribute': 'SMBUSER', 'value': user },
469                                { 'attribute': 'SMBPROJ', 'value': project },
470                            ]
471                    elif sname == 'seer':
472                        outs['server'] = "http://control:16606"
473                    elif sname == 'tmcd':
474                        outs['server'] = "http://boss:7777"
475                    elif sname == 'userconfig':
476                        if self.userconfdir and self.userconfcmd \
477                                and self.userconfurl:
478                            cid, cert = self.export_userconf(project)
479                            outs['server'] = "%s/%s" % \
480                                    (self.userconfurl, str(cid))
481                            outs['fedAttr'] = [
482                                    { 'attribute': 'cert', 'value': cert },
483                                ]
484                            state['userconfig'] = unicode(cid)
485                    exp.append(outs)
486        return (exp, state)
487
488    def build_response(self, alloc_id, ap, services):
489        """
490        Create the SOAP response.
491
492        Build the dictionary description of the response and use
493        fedd_utils.pack_soap to create the soap message.  ap is the allocate
494        project message returned from a remote project allocation (even if that
495        allocation was done locally).
496        """
497        # Because alloc_id is already a fedd_services_types.IDType_Holder,
498        # there's no need to repack it
499        msg = { 
500                'allocID': alloc_id,
501                'fedAttr': [
502                    { 'attribute': 'domain', 'value': self.domain } , 
503                    { 'attribute': 'project', 'value': 
504                        ap['project'].get('name', {}).get('localname', "???") },
505                ]
506            }
507        if len(self.attrs) > 0:
508            msg['fedAttr'].extend(
509                [ { 'attribute': x, 'value' : y } \
510                        for x,y in self.attrs.iteritems()])
511
512        if services:
513            msg['service'] = services
514        return msg
515
516    def RequestAccess(self, req, fid):
517        """
518        Handle the access request.  Proxy if not for us.
519
520        Parse out the fields and make the allocations or rejections if for us,
521        otherwise, assuming we're willing to proxy, proxy the request out.
522        """
523
524        # The dance to get into the request body
525        if req.has_key('RequestAccessRequestBody'):
526            req = req['RequestAccessRequestBody']
527        else:
528            raise service_error(service_error.req, "No request!?")
529
530        if req.has_key('destinationTestbed'):
531            dt = unpack_id(req['destinationTestbed'])
532
533        if dt == None or dt in self.testbed:
534            # Request for this fedd
535            found, owners = self.lookup_access(req, fid)
536            # keep track of what's been added
537            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
538            aid = unicode(allocID)
539
540            self.state_lock.acquire()
541            self.allocation[aid] = { }
542            # The protoGENI certificate
543            self.allocation[aid]['credentials'] = found
544            # The list of owner FIDs
545            self.allocation[aid]['owners'] = owners
546            self.write_state()
547            self.state_lock.release()
548            for o in owners:
549                self.auth.set_attribute(o, allocID)
550            self.auth.set_attribute(allocID, allocID)
551
552            try:
553                f = open("%s/%s.pem" % (self.certdir, aid), "w")
554                print >>f, alloc_cert
555                f.close()
556            except IOError, e:
557                raise service_error(service_error.internal, 
558                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
559            return { 'allocID': { 'fedid': allocID } }
560        else:
561            if self.allow_proxy:
562                resp = self.proxy_RequestAccess.call_service(dt, req,
563                            self.cert_file, self.cert_pwd,
564                            self.trusted_certs)
565                if resp.has_key('RequestAccessResponseBody'):
566                    return resp['RequestAccessResponseBody']
567                else:
568                    return None
569            else:
570                raise service_error(service_error.access,
571                        "Access proxying denied")
572
573
574    def ReleaseAccess(self, req, fid):
575        # The dance to get into the request body
576        if req.has_key('ReleaseAccessRequestBody'):
577            req = req['ReleaseAccessRequestBody']
578        else:
579            raise service_error(service_error.req, "No request!?")
580
581        if req.has_key('destinationTestbed'):
582            dt = unpack_id(req['destinationTestbed'])
583        else:
584            dt = None
585
586        if dt == None or dt in self.testbed:
587            # Local request
588            try:
589                if req['allocID'].has_key('localname'):
590                    auth_attr = aid = req['allocID']['localname']
591                elif req['allocID'].has_key('fedid'):
592                    aid = unicode(req['allocID']['fedid'])
593                    auth_attr = req['allocID']['fedid']
594                else:
595                    raise service_error(service_error.req,
596                            "Only localnames and fedids are understood")
597            except KeyError:
598                raise service_error(service_error.req, "Badly formed request")
599
600            self.log.debug("[access] deallocation requested for %s", aid)
601            if not self.auth.check_attribute(fid, auth_attr):
602                self.log.debug("[access] deallocation denied for %s", aid)
603                raise service_error(service_error.access, "Access Denied")
604
605            self.state_lock.acquire()
606            if self.allocation.has_key(aid):
607                self.log.debug("Found allocation for %s" %aid)
608                del self.allocation[aid]
609                self.write_state()
610                self.state_lock.release()
611                # And remove the access cert
612                cf = "%s/%s.pem" % (self.certdir, aid)
613                self.log.debug("Removing %s" % cf)
614                os.remove(cf)
615                return { 'allocID': req['allocID'] } 
616            else:
617                self.state_lock.release()
618                raise service_error(service_error.req, "No such allocation")
619
620        else:
621            if self.allow_proxy:
622                resp = self.proxy_ReleaseAccess.call_service(dt, req,
623                            self.cert_file, self.cert_pwd,
624                            self.trusted_certs)
625                if resp.has_key('ReleaseAccessResponseBody'):
626                    return resp['ReleaseAccessResponseBody']
627                else:
628                    return None
629            else:
630                raise service_error(service_error.access,
631                        "Access proxying denied")
632
633    def import_store_info(self, cf, connInfo):
634        """
635        Pull any import parameters in connInfo in.  We translate them either
636        into known member names or fedAddrs.
637        """
638
639        for c in connInfo:
640            for p in [ p for p in c.get('parameter', []) \
641                    if p.get('type', '') == 'input']:
642                name = p.get('name', None)
643                key = p.get('key', None)
644                store = p.get('store', None)
645
646                if name and key and store :
647                    req = { 'name': key, 'wait': True }
648                    r = self.call_GetValue(store, req, cf)
649                    r = r.get('GetValueResponseBody', None)
650                    if r :
651                        if r.get('name', '') == key:
652                            v = r.get('value', None)
653                            if v is not None:
654                                if name == 'peer':
655                                    c['peer'] = v
656                                else:
657                                    if c.has_key('fedAttr'):
658                                        c['fedAttr'].append({
659                                            'attribute': name, 'value': v})
660                                    else:
661                                        c['fedAttr']= [{
662                                            'attribute': name, 'value': v}]
663                            else:
664                                raise service_error(service_error.internal, 
665                                        'None value exported for %s'  % key)
666                        else:
667                            raise service_error(service_error.internal, 
668                                    'Different name returned for %s: %s' \
669                                            % (key, r.get('name','')))
670                    else:
671                        raise service_error(service_error.internal, 
672                            'Badly formatted response: no GetValueResponseBody')
673                else:
674                    raise service_error(service_error.internal, 
675                        'Bad Services missing info for import %s' % c)
676
677    def generate_portal_configs(self, topo, pubkey_base, secretkey_base, 
678            tmpdir, master, leid, connInfo, services):
679
680        def conninfo_to_dict(key, info):
681            """
682            Make a cpoy of the connection information about key, and flatten it
683            into a single dict by parsing out any feddAttrs.
684            """
685
686            rv = None
687            for i in info:
688                if key == i.get('portal', "") or \
689                        key in [e.get('element', "") \
690                        for e in i.get('member', [])]:
691                    rv = i.copy()
692                    break
693
694            else:
695                return rv
696
697            if 'fedAttr' in rv:
698                for a in rv['fedAttr']:
699                    attr = a.get('attribute', "")
700                    val = a.get('value', "")
701                    if attr and attr not in rv:
702                        rv[attr] = val
703                del rv['fedAttr']
704            return rv
705
706        # XXX: un hardcode this
707        def client_null(f, s):
708            print >>f, "Service: %s" % s['name']
709
710        def client_smb(f, s):
711            print >>f, "Service: %s" % s['name']
712            smbshare = None
713            smbuser = None
714            smbproj = None
715            for a in s.get('fedAttr', []):
716                if a.get('attribute', '') == 'SMBSHARE':
717                    smbshare = a.get('value', None)
718                elif a.get('attribute', '') == 'SMBUSER':
719                    smbuser = a.get('value', None)
720                elif a.get('attribute', '') == 'SMBPROJ':
721                    smbproj = a.get('value', None)
722
723            if all((smbshare, smbuser, smbproj)):
724                print >>f, "SMBshare: %s" % smbshare
725                print >>f, "ProjectUser: %s" % smbuser
726                print >>f, "ProjectName: %s" % smbproj
727
728        client_service_out = {
729                'SMB': client_smb,
730                'tmcd': client_null,
731                'seer': client_null,
732                'userconfig': client_null,
733            }
734        # XXX: end un hardcode this
735
736
737        seer_out = False
738        client_out = False
739        for e in [ e for e in topo.elements \
740                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
741            myname = e.name[0]
742            type = e.get_attribute('portal_type')
743
744            info = conninfo_to_dict(myname, connInfo)
745
746            if not info:
747                raise service_error(service_error.req,
748                        "No connectivity info for %s" % myname)
749
750            peer = info.get('peer', "")
751            ldomain = self.domain;
752
753            mexp = info.get('masterexperiment',"")
754            mproj, meid = mexp.split("/", 1)
755            mdomain = info.get('masterdomain',"")
756            muser = info.get('masteruser','root')
757            smbshare = info.get('smbshare', 'USERS')
758            ssh_port = info.get('ssh_port', '22')
759
760            active = info.get('active', 'False')
761
762            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
763            tunnelconfig = self.attrs.has_key('TunnelCfg')
764            try:
765                f = open(cfn, "w")
766                if active == 'True':
767                    print >>f, "active: True"
768                    print >>f, "ssh_port: %s" % ssh_port
769                    if type in ('control', 'both'):
770                        for s in [s for s in services \
771                                if s.get('name', "") in self.imports]:
772                            p = urlparse(s.get('server', 'http://localhost'))
773                            print >>f, 'port: remote:%s:%s:%s' % \
774                                    (p.port, p.hostname, p.port)
775
776                if tunnelconfig:
777                    print >>f, "tunnelip: %s" % tunnelconfig
778                # XXX: send this an fedattr
779                #print >>f, "seercontrol: control.%s.%s%s" % \
780                        #(meid.lower(), mproj.lower(), mdomain)
781                print >>f, "peer: %s" % peer.lower()
782                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
783                        pubkey_base
784                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
785                        secretkey_base
786                f.close()
787            except IOError, e:
788                raise service_error(service_error.internal,
789                        "Can't write protal config %s: %s" % (cfn, e))
790           
791            # XXX: This little seer config file needs to go away.
792            if not seer_out:
793                try:
794                    seerfn = "%s/seer.conf" % tmpdir
795                    f = open(seerfn, "w")
796                    if not master:
797                        print >>f, "ControlNode: control.%s.%s%s" % \
798                            (meid.lower(), mproj.lower(), mdomain)
799                    print >>f, "ExperimentID: %s" % mexp
800                    f.close()
801                except IOError, e:
802                    raise service_error(service_error.internal, 
803                            "Can't write seer.conf: %s" %e)
804                seer_out = True
805
806            if not client_out and type in ('control', 'both'):
807                try:
808                    f = open("%s/client.conf" % tmpdir, "w")
809                    print >>f, "ControlGateway: %s%s" % \
810                        (myname.lower(), ldomain.lower())
811                    for s in services:
812                        if s.get('name',"") in self.imports and \
813                                s.get('visibility','') == 'import':
814                            client_service_out[s['name']](f, s)
815                    # Does seer need this?
816                    # print >>f, "ExperimentID: %s/%s" % (mproj, meid)
817                    f.close()
818                except IOError, e:
819                    raise service_error(service_error.internal,
820                            "Cannot write client.conf: %s" %s)
821                client_out = True
822
823
824    def generate_rspec(self, topo, softdir, master, connInfo):
825        t = topo.clone()
826
827        starts = { }
828
829        # The startcmds for master and slave testbeds
830        if master: 
831            gate_cmd = self.attrs.get('MasterConnectorStartCmd', '/bin/true')
832            node_cmd = self.attrs.get('MasterNodeStartCmd', 'bin/true')
833        else: 
834            gate_cmd = self.attrs.get('SlaveConnectorStartCmd', '/bin/true')
835            node_cmd = self.attrs.get('SlaveNodeStartCmd', 'bin/true')
836
837        # Weed out the things we aren't going to instantiate: Segments, portal
838        # substrates, and portal interfaces.  (The copy in the for loop allows
839        # us to delete from e.elements in side the for loop).  While we're
840        # touching all the elements, we also adjust paths from the original
841        # testbed to local testbed paths and put the federation commands and
842        # startcommands into a dict so we can start them manually later.
843        # ProtoGENI requires setup before the federation commands run, so we
844        # run them by hand after we've seeded configurations.
845        for e in [e for e in t.elements]:
846            if isinstance(e, topdl.Segment):
847                t.elements.remove(e)
848            # Fix software paths
849            for s in getattr(e, 'software', []):
850                s.location = re.sub("^.*/", softdir, s.location)
851            if isinstance(e, topdl.Computer):
852                if e.get_attribute('portal') and gate_cmd:
853                    # Portals never have a user-specified start command
854                    starts[e.name[0]] = gate_cmd
855                elif node_cmd:
856                    if e.get_attribute('startup'):
857                        starts[e.name[0]] = "%s \\$USER '%s'" % \
858                                (node_cmd, e.get_attribute('startup'))
859                        e.remove_attribute('startup')
860                    else:
861                        starts[e.name[0]] = node_cmd
862
863                # Remove portal interfaces
864                e.interface = [i for i in e.interface \
865                        if not i.get_attribute('portal')]
866
867        t.substrates = [ s.clone() for s in t.substrates ]
868        t.incorporate_elements()
869
870        # Customize the ns2 output for local portal commands and images
871        filters = []
872
873        # NB: these are extra commands issued for the node, not the startcmds
874        if master: cmd = self.attrs.get('MasterConnectorCmd', '')
875        else: cmd = self.attrs.get('SlaveConnectorCmd', '')
876
877        if cmd:
878            filters.append(topdl.generate_portal_command_filter(cmd))
879
880        # Convert to rspec and return it
881        exp_rspec = topdl.topology_to_rspec(t, filters)
882
883        return exp_rspec
884
885    def StartSegment(self, req, fid):
886        def get_url(url, cf, destdir, fn=None):
887            po = urlparse(url)
888            if not fn:
889                fn = po.path.rpartition('/')[2]
890            retries = 0
891            ok = False
892            while not ok and retries < 5:
893                try:
894                    conn = httplib.HTTPSConnection(po.hostname, port=po.port, 
895                            cert_file=cf, key_file=cf)
896                    conn.putrequest('GET', po.path)
897                    conn.endheaders()
898                    response = conn.getresponse()
899
900                    lf = open("%s/%s" % (destdir, fn), "w")
901                    buf = response.read(4096)
902                    while buf:
903                        lf.write(buf)
904                        buf = response.read(4096)
905                    lf.close()
906                    ok = True
907                except IOError, e:
908                    print e
909                    raise service_error(service_error.internal,
910                            "Error writing tempfile: %s" %e)
911                except httplib.HTTPException, e:
912                    print e
913                    raise service_error(service_error.internal, 
914                            "Error retrieving data: %s" % e)
915                except SSLError, e:
916                    print "SSL error %s" %e
917                    retries += 1
918
919            if retries > 5:
920                raise service_error(service_error.internal,
921                        "Repeated SSL failures")
922
923
924        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
925
926        err = None  # Any service_error generated after tmpdir is created
927        rv = None   # Return value from segment creation
928
929        try:
930            req = req['StartSegmentRequestBody']
931        except KeyError:
932            raise service_error(service_error.req, "Badly formed request")
933
934        connInfo = req.get('connection', [])
935        services = req.get('service', [])
936        auth_attr = req['allocID']['fedid']
937        aid = "%s" % auth_attr
938        attrs = req.get('fedAttr', [])
939        if not self.auth.check_attribute(fid, auth_attr):
940            raise service_error(service_error.access, "Access denied")
941
942
943        if req.has_key('segmentdescription') and \
944                req['segmentdescription'].has_key('topdldescription'):
945            topo = \
946                topdl.Topology(**req['segmentdescription']['topdldescription'])
947        else:
948            raise service_error(service_error.req, 
949                    "Request missing segmentdescription'")
950
951        master = req.get('master', False)
952
953        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
954        try:
955            tmpdir = tempfile.mkdtemp(prefix="access-")
956            softdir = "%s/software" % tmpdir
957        except IOError:
958            raise service_error(service_error.internal, "Cannot create tmp dir")
959
960        # Try block alllows us to clean up temporary files.
961        try:
962            sw = set()
963            for e in topo.elements:
964                for s in getattr(e, 'software', []):
965                    sw.add(s.location)
966            if len(sw) > 0:
967                os.mkdir(softdir)
968            for s in sw:
969                self.log.debug("Retrieving %s" % s)
970                get_url(s, certfile, softdir)
971
972            for a in attrs:
973                if a['attribute'] in configs:
974                    get_url(a['value'], certfile, tmpdir)
975                if a['attribute'] == 'ssh_pubkey':
976                    pubkey_base = a['value'].rpartition('/')[2]
977                if a['attribute'] == 'ssh_secretkey':
978                    secretkey_base = a['value'].rpartition('/')[2]
979                if a['attribute'] == 'experiment_name':
980                    ename = a['value']
981
982            # If the userconf service was imported, collect the configuration
983            # data.
984            for s in services:
985                if s.get("name", "") == 'userconfig' \
986                        and s.get('visibility',"") == 'import':
987
988                    # Collect ther server and certificate info.
989                    u = s.get('server', None)
990                    for a in s.get('fedAttr', []):
991                        if a.get('attribute',"") == 'cert':
992                            cert = a.get('value', None)
993                            break
994                    else:
995                        cert = None
996
997                    if cert:
998                        # Make a temporary certificate file for get_url.  The
999                        # finally clause removes it whether something goes
1000                        # wrong (including an exception from get_url) or not.
1001                        try:
1002                            tfos, tn = tempfile.mkstemp(suffix=".pem")
1003                            tf = os.fdopen(tfos, 'w')
1004                            print >>tf, cert
1005                            tf.close()
1006                            get_url(u, tn, tmpdir, "userconf")
1007                        except IOError, e:
1008                            raise service_error(service.error.internal, 
1009                                    "Cannot create temp file for " + 
1010                                    "userconfig certificates: %s e")
1011                        finally:
1012                            if tn: os.remove(tn)
1013                    else:
1014                        raise service_error(service_error.req,
1015                                "No certificate for retreiving userconfig")
1016                    break
1017
1018
1019
1020            self.state_lock.acquire()
1021            if self.allocation.has_key(aid):
1022                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1023                self.allocation[aid]['experiment'] = ename
1024                self.allocation[aid]['log'] = [ ]
1025                # Create a logger that logs to the experiment's state object as
1026                # well as to the main log file.
1027                alloc_log = logging.getLogger('fedd.access.%s' % ename)
1028                h = logging.StreamHandler(
1029                        list_log.list_log(self.allocation[aid]['log']))
1030                # XXX: there should be a global one of these rather than
1031                # repeating the code.
1032                h.setFormatter(logging.Formatter(
1033                    "%(asctime)s %(name)s %(message)s",
1034                            '%d %b %y %H:%M:%S'))
1035                alloc_log.addHandler(h)
1036                self.write_state()
1037            else:
1038                self.log.error("No allocation for %s!?" % aid)
1039            self.state_lock.release()
1040
1041            # XXX: we really need to put the import and connection info
1042            # generation off longer.
1043            self.import_store_info(certfile, connInfo)
1044            #self.generate_portal_configs(topo, pubkey_base,
1045                    #secretkey_base, tmpdir, master, ename, connInfo,
1046                    #services)
1047            rspec = self.generate_rspec(topo, "%s/%s/" \
1048                    % (self.staging_dir, ename), master, connInfo)
1049
1050            starter = self.start_segment(keyfile=ssh_key,
1051                    debug=self.create_debug, log=alloc_log,
1052                    ch_url = self.ch_url, sa_url=self.sa_url,
1053                    cm_url=self.cm_url)
1054            rv = starter(self, aid, user, rspec, pubkey_base, secretkey_base,
1055                    master, ename,
1056                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1057                    certfile, topo, connInfo, services)
1058        except service_error, e:
1059            err = e
1060        except e:
1061            err = service_error(service_error.internal, str(e))
1062
1063        # Walk up tmpdir, deleting as we go
1064        if self.cleanup:
1065            self.log.debug("[StartSegment]: removing %s" % tmpdir)
1066            for path, dirs, files in os.walk(tmpdir, topdown=False):
1067                for f in files:
1068                    os.remove(os.path.join(path, f))
1069                for d in dirs:
1070                    os.rmdir(os.path.join(path, d))
1071            os.rmdir(tmpdir)
1072        else:
1073            self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1074
1075        if rv:
1076            # Grab the log (this is some anal locking, but better safe than
1077            # sorry)
1078            self.state_lock.acquire()
1079            logv = "".join(self.allocation[aid]['log'])
1080            self.state_lock.release()
1081
1082            return { 'allocID': req['allocID'], 'allocationLog': logv }
1083        elif err:
1084            raise service_error(service_error.federant,
1085                    "Swapin failed: %s" % err)
1086        else:
1087            raise service_error(service_error.federant, "Swapin failed")
1088
1089    def TerminateSegment(self, req, fid):
1090        try:
1091            req = req['TerminateSegmentRequestBody']
1092        except KeyError:
1093            raise service_error(service_error.req, "Badly formed request")
1094
1095        auth_attr = req['allocID']['fedid']
1096        aid = "%s" % auth_attr
1097        attrs = req.get('fedAttr', [])
1098        if not self.auth.check_attribute(fid, auth_attr):
1099            raise service_error(service_error.access, "Access denied")
1100
1101        self.state_lock.acquire()
1102        if self.allocation.has_key(aid):
1103            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1104            slice_cred = self.allocation[aid].get('slice_credential', None)
1105            ename = self.allocation[aid].get('experiment', None)
1106        else:
1107            cf, user, ssh_key, cpw = (None, None, None, None)
1108            slice_cred = None
1109            ename = None
1110        self.state_lock.release()
1111
1112        if ename:
1113            staging = "%s/%s" % ( self.staging_dir, ename)
1114        else:
1115            self.log.warn("Can't find experiment name for %s" % aid)
1116            staging = None
1117
1118        stopper = self.stop_segment(keyfile=ssh_key, debug=self.create_debug,
1119                ch_url = self.ch_url, sa_url=self.sa_url, cm_url=self.cm_url)
1120        stopper(self, user, staging, slice_cred, cf, cpw)
1121        return { 'allocID': req['allocID'] }
1122
1123    def RenewSlices(self):
1124        self.log.info("Scanning for slices to renew")
1125        self.state_lock.acquire()
1126        aids = self.allocation.keys()
1127        self.state_lock.release()
1128
1129        for aid in aids:
1130            self.state_lock.acquire()
1131            if self.allocation.has_key(aid):
1132                name = self.allocation[aid].get('slice_name', None)
1133                scred = self.allocation[aid].get('slice_credential', None)
1134                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1135            else:
1136                name = None
1137                scred = None
1138            self.state_lock.release()
1139
1140            # There's a ProtoGENI slice associated with the segment; renew it.
1141            if name and scred:
1142                renewer = self.renew_segment(log=self.log, 
1143                        debug=self.create_debug, keyfile=ssh_key,
1144                        cm_url = self.cm_url, sa_url = self.sa_url,
1145                        ch_url = self.ch_url)
1146                new_scred = renewer(name, scred, self.renewal_interval, cf, cpw)
1147                if new_scred:
1148                    self.log.info("Slice %s renewed until %s GMT" % \
1149                            (name, time.asctime(time.gmtime(\
1150                                time.time()+self.renewal_interval))))
1151                    self.state_lock.acquire()
1152                    if self.allocation.has_key(aid):
1153                        self.allocation[aid]['slice_credential'] = new_scred
1154                    self.state_lock.release()
1155                else:
1156                    self.log.info("Failed to renew slice %s " % name)
1157
1158        # Let's do this all again soon.  (4 tries before the slices time out)   
1159        t = Timer(self.renewal_interval/4, self.RenewSlices)
1160        t.start()
Note: See TracBrowser for help on using the repository browser.