source: fedd/federation/access.py @ 6c57fe9

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since 6c57fe9 was 6c57fe9, checked in by Ted Faber <faber@…>, 15 years ago

checkpoint

  • Property mode set to 100644
File size: 48.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9
10from threading import *
11
12from util import *
13from allocate_project import allocate_project_local, allocate_project_remote
14from access_project import access_project
15from fedid import fedid, generate_fedid
16from authorizer import authorizer
17from service_error import service_error
18from remote_service import xmlrpc_handler, soap_handler, service_caller
19
20import topdl
21import httplib
22import tempfile
23from urlparse import urlparse
24
25
26# Make log messages disappear if noone configures a fedd logger
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.access")
31fl.addHandler(nullHandler())
32
33class access:
34    """
35    The implementation of access control based on mapping users to projects.
36
37    Users can be mapped to existing projects or have projects created
38    dynamically.  This implements both direct requests and proxies.
39    """
40
41    class parse_error(RuntimeError): pass
42
43
44    proxy_RequestAccess= service_caller('RequestAccess')
45    proxy_ReleaseAccess= service_caller('ReleaseAccess')
46
47    def __init__(self, config=None, auth=None):
48        """
49        Initializer.  Pulls parameters out of the ConfigParser's access section.
50        """
51
52        # Make sure that the configuration is in place
53        if not config: 
54            raise RunTimeError("No config to fedd.access")
55
56        self.project_priority = config.getboolean("access", "project_priority")
57        self.allow_proxy = config.getboolean("access", "allow_proxy")
58
59        self.boss = config.get("access", "boss")
60        self.ops = config.get("access", "ops")
61        self.domain = config.get("access", "domain")
62        self.fileserver = config.get("access", "fileserver")
63        self.eventserver = config.get("access", "eventserver")
64        self.certdir = config.get("access","certdir")
65
66        self.attrs = { }
67        self.access = { }
68        self.restricted = [ ]
69        self.projects = { }
70        self.keys = { }
71        self.types = { }
72        self.allocation = { }
73        self.state = { 
74            'projects': self.projects,
75            'allocation' : self.allocation,
76            'keys' : self.keys,
77            'types': self.types
78        }
79        self.log = logging.getLogger("fedd.access")
80        set_log_level(config, "access", self.log)
81        self.state_lock = Lock()
82
83        if auth: self.auth = auth
84        else:
85            self.log.error(\
86                    "[access]: No authorizer initialized, creating local one.")
87            auth = authorizer()
88
89        tb = config.get('access', 'testbed')
90        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
91        else: self.testbed = [ ]
92
93        if config.has_option("access", "accessdb"):
94            self.read_access(config.get("access", "accessdb"))
95
96        self.state_filename = config.get("access", "access_state")
97        self.read_state()
98
99        # Keep cert_file and cert_pwd coming from the same place
100        self.cert_file = config.get("access", "cert_file")
101        if self.cert_file:
102            self.sert_pwd = config.get("access", "cert_pw")
103        else:
104            self.cert_file = config.get("globals", "cert_file")
105            self.sert_pwd = config.get("globals", "cert_pw")
106
107        self.trusted_certs = config.get("access", "trusted_certs") or \
108                config.get("globals", "trusted_certs")
109
110        self.soap_services = {\
111            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
112            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
113            'StartSegment': soap_handler("StartSegment", self.StartSegment),
114            }
115        self.xmlrpc_services =  {\
116            'RequestAccess': xmlrpc_handler('RequestAccess',
117                self.RequestAccess),
118            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
119                self.ReleaseAccess),
120            'StartSegment': xmlrpc_handler('StartSegment',
121                self.StartSegment),
122            }
123
124
125        if not config.has_option("allocate", "uri"):
126            self.allocate_project = \
127                allocate_project_local(config, auth)
128        else:
129            self.allocate_project = \
130                allocate_project_remote(config, auth)
131
132        # If the project allocator exports services, put them in this object's
133        # maps so that classes that instantiate this can call the services.
134        self.soap_services.update(self.allocate_project.soap_services)
135        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
136
137
138    def read_access(self, config):
139        """
140        Read a configuration file and set internal parameters.
141
142        The format is more complex than one might hope.  The basic format is
143        attribute value pairs separated by colons(:) on a signle line.  The
144        attributes in bool_attrs, emulab_attrs and id_attrs can all be set
145        directly using the name: value syntax.  E.g.
146        boss: hostname
147        sets self.boss to hostname.  In addition, there are access lines of the
148        form (tb, proj, user) -> (aproj, auser) that map the first tuple of
149        names to the second for access purposes.  Names in the key (left side)
150        can include "<NONE> or <ANY>" to act as wildcards or to require the
151        fields to be empty.  Similarly aproj or auser can be <SAME> or
152        <DYNAMIC> indicating that either the matching key is to be used or a
153        dynamic user or project will be created.  These names can also be
154        federated IDs (fedid's) if prefixed with fedid:.  Finally, the aproj
155        can be followed with a colon-separated list of node types to which that
156        project has access (or will have access if dynamic).
157        Testbed attributes outside the forms above can be given using the
158        format attribute: name value: value.  The name is a single word and the
159        value continues to the end of the line.  Empty lines and lines startin
160        with a # are ignored.
161
162        Parsing errors result in a self.parse_error exception being raised.
163        """
164        lineno=0
165        name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+"
166        fedid_expr = "fedid:[" + string.hexdigits + "]+"
167        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
168        access_proj = "(<DYNAMIC>(?::" + name_expr +")*|"+ \
169                "<SAME>" + "(?::" + name_expr + ")*|" + \
170                fedid_expr + "(?::" + name_expr + ")*|" + \
171                name_expr + "(?::" + name_expr + ")*)"
172        access_name = "(<DYNAMIC>|<SAME>|" + fedid_expr + "|"+ name_expr + ")"
173
174        restricted_re = re.compile("restricted:\s*(.*)", re.IGNORECASE)
175        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
176                re.IGNORECASE)
177        access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+
178                key_name+'\s*\)\s*->\s*\('+access_proj + '\s*,\s*' + 
179                access_name + '\s*,\s*' + access_name + '\s*\)', re.IGNORECASE)
180
181        def parse_name(n):
182            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
183            else: return n
184       
185        def auth_name(n):
186            if isinstance(n, basestring):
187                if n =='<any>' or n =='<none>': return None
188                else: return unicode(n)
189            else:
190                return n
191
192        f = open(config, "r");
193        for line in f:
194            lineno += 1
195            line = line.strip();
196            if len(line) == 0 or line.startswith('#'):
197                continue
198
199            # Extended (attribute: x value: y) attribute line
200            m = attr_re.match(line)
201            if m != None:
202                attr, val = m.group(1,2)
203                self.attrs[attr] = val
204                continue
205
206            # Restricted entry
207            m = restricted_re.match(line)
208            if m != None:
209                val = m.group(1)
210                self.restricted.append(val)
211                continue
212
213            # Access line (t, p, u) -> (ap, cu, su) line
214            m = access_re.match(line)
215            if m != None:
216                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
217                auth_key = tuple([ auth_name(x) for x in access_key])
218                aps = m.group(4).split(":");
219                if aps[0] == 'fedid:':
220                    del aps[0]
221                    aps[0] = fedid(hexstr=aps[0])
222
223                cu = parse_name(m.group(5))
224                su = parse_name(m.group(6))
225
226                access_val = (access_project(aps[0], aps[1:]),
227                        parse_name(m.group(5)), parse_name(m.group(6)))
228
229                self.access[access_key] = access_val
230                self.auth.set_attribute(auth_key, "access")
231                continue
232
233            # Nothing matched to here: unknown line - raise exception
234            f.close()
235            raise self.parse_error("Unknown statement at line %d of %s" % \
236                    (lineno, config))
237        f.close()
238
239    def get_users(self, obj):
240        """
241        Return a list of the IDs of the users in dict
242        """
243        if obj.has_key('user'):
244            return [ unpack_id(u['userID']) \
245                    for u in obj['user'] if u.has_key('userID') ]
246        else:
247            return None
248
249    def write_state(self):
250        if self.state_filename:
251            try:
252                f = open(self.state_filename, 'w')
253                pickle.dump(self.state, f)
254            except IOError, e:
255                self.log.error("Can't write file %s: %s" % \
256                        (self.state_filename, e))
257            except pickle.PicklingError, e:
258                self.log.error("Pickling problem: %s" % e)
259            except TypeError, e:
260                self.log.error("Pickling problem (TypeError): %s" % e)
261
262
263    def read_state(self):
264        """
265        Read a new copy of access state.  Old state is overwritten.
266
267        State format is a simple pickling of the state dictionary.
268        """
269        if self.state_filename:
270            try:
271                f = open(self.state_filename, "r")
272                self.state = pickle.load(f)
273
274                self.allocation = self.state['allocation']
275                self.projects = self.state['projects']
276                self.keys = self.state['keys']
277                self.types = self.state['types']
278
279                self.log.debug("[read_state]: Read state from %s" % \
280                        self.state_filename)
281            except IOError, e:
282                self.log.warning(("[read_state]: No saved state: " +\
283                        "Can't open %s: %s") % (self.state_filename, e))
284            except EOFError, e:
285                self.log.warning(("[read_state]: " +\
286                        "Empty or damaged state file: %s:") % \
287                        self.state_filename)
288            except pickle.UnpicklingError, e:
289                self.log.warning(("[read_state]: No saved state: " + \
290                        "Unpickling failed: %s") % e)
291
292            # Add the ownership attributes to the authorizer.  Note that the
293            # indices of the allocation dict are strings, but the attributes are
294            # fedids, so there is a conversion.
295            for k in self.allocation.keys():
296                for o in self.allocation[k].get('owners', []):
297                    self.auth.set_attribute(o, fedid(hexstr=k))
298
299
300    def permute_wildcards(self, a, p):
301        """Return a copy of a with various fields wildcarded.
302
303        The bits of p control the wildcards.  A set bit is a wildcard
304        replacement with the lowest bit being user then project then testbed.
305        """
306        if p & 1: user = ["<any>"]
307        else: user = a[2]
308        if p & 2: proj = "<any>"
309        else: proj = a[1]
310        if p & 4: tb = "<any>"
311        else: tb = a[0]
312
313        return (tb, proj, user)
314
315    def find_access(self, search):
316        """
317        Search the access DB for a match on this tuple.  Return the matching
318        access tuple and the user that matched.
319       
320        NB, if the initial tuple fails to match we start inserting wildcards in
321        an order determined by self.project_priority.  Try the list of users in
322        order (when wildcarded, there's only one user in the list).
323        """
324        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
325        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
326
327        for p in perm: 
328            s = self.permute_wildcards(search, p)
329            # s[2] is None on an anonymous, unwildcarded request
330            if s[2] != None:
331                for u in s[2]:
332                    if self.access.has_key((s[0], s[1], u)):
333                        return (self.access[(s[0], s[1], u)], u)
334            else:
335                if self.access.has_key(s):
336                    return (self.access[s], None)
337        return None, None
338
339    def lookup_access(self, req, fid):
340        """
341        Determine the allowed access for this request.  Return the access and
342        which fields are dynamic.
343
344        The fedid is needed to construct the request
345        """
346        # Search keys
347        tb = None
348        project = None
349        user = None
350        # Return values
351        rp = access_project(None, ())
352        ru = None
353
354        if req.has_key('project'):
355            p = req['project']
356            if p.has_key('name'):
357                project = unpack_id(p['name'])
358            user = self.get_users(p)
359        else:
360            user = self.get_users(req)
361
362        user_fedids = [ u for u in user if isinstance(u, fedid)]
363        # Determine how the caller is representing itself.  If its fedid shows
364        # up as a project or a singleton user, let that stand.  If neither the
365        # usernames nor the project name is a fedid, the caller is a testbed.
366        if project and isinstance(project, fedid):
367            if project == fid:
368                # The caller is the project (which is already in the tuple
369                # passed in to the authorizer)
370                owners = user_fedids
371                owners.append(project)
372            else:
373                raise service_error(service_error.req,
374                        "Project asserting different fedid")
375        else:
376            if fid not in user_fedids:
377                tb = fid
378                owners = user_fedids
379                owners.append(fid)
380            else:
381                if len(fedids) > 1:
382                    raise service_error(service_error.req,
383                            "User asserting different fedid")
384                else:
385                    # Which is a singleton
386                    owners = user_fedids
387        # Confirm authorization
388
389        for u in user:
390            self.log.debug("[lookup_access] Checking access for %s" % \
391                    ((tb, project, u),))
392            if self.auth.check_attribute((tb, project, u), 'access'):
393                self.log.debug("[lookup_access] Access granted")
394                break
395            else:
396                self.log.debug("[lookup_access] Access Denied")
397        else:
398            raise service_error(service_error.access, "Access denied")
399
400        # This maps a valid user to the Emulab projects and users to use
401        found, user_match = self.find_access((tb, project, user))
402       
403        if found == None:
404            raise service_error(service_error.access,
405                    "Access denied - cannot map access")
406
407        # resolve <dynamic> and <same> in found
408        dyn_proj = False
409        dyn_create_user = False
410        dyn_service_user = False
411
412        if found[0].name == "<same>":
413            if project != None:
414                rp.name = project
415            else : 
416                raise service_error(\
417                        service_error.server_config,
418                        "Project matched <same> when no project given")
419        elif found[0].name == "<dynamic>":
420            rp.name = None
421            dyn_proj = True
422        else:
423            rp.name = found[0].name
424        rp.node_types = found[0].node_types;
425
426        if found[1] == "<same>":
427            if user_match == "<any>":
428                if user != None: rcu = user[0]
429                else: raise service_error(\
430                        service_error.server_config,
431                        "Matched <same> on anonymous request")
432            else:
433                rcu = user_match
434        elif found[1] == "<dynamic>":
435            rcu = None
436            dyn_create_user = True
437        else:
438            rcu = found[1]
439       
440        if found[2] == "<same>":
441            if user_match == "<any>":
442                if user != None: rsu = user[0]
443                else: raise service_error(\
444                        service_error.server_config,
445                        "Matched <same> on anonymous request")
446            else:
447                rsu = user_match
448        elif found[2] == "<dynamic>":
449            rsu = None
450            dyn_service_user = True
451        else:
452            rsu = found[2]
453
454        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
455                owners
456
457    def build_response(self, alloc_id, ap):
458        """
459        Create the SOAP response.
460
461        Build the dictionary description of the response and use
462        fedd_utils.pack_soap to create the soap message.  ap is the allocate
463        project message returned from a remote project allocation (even if that
464        allocation was done locally).
465        """
466        # Because alloc_id is already a fedd_services_types.IDType_Holder,
467        # there's no need to repack it
468        msg = { 
469                'allocID': alloc_id,
470                'emulab': { 
471                    'domain': self.domain,
472                    'boss': self.boss,
473                    'ops': self.ops,
474                    'fileServer': self.fileserver,
475                    'eventServer': self.eventserver,
476                    'project': ap['project']
477                },
478            }
479        if len(self.attrs) > 0:
480            msg['emulab']['fedAttr'] = \
481                [ { 'attribute': x, 'value' : y } \
482                        for x,y in self.attrs.iteritems()]
483        return msg
484
485    def RequestAccess(self, req, fid):
486        """
487        Handle the access request.  Proxy if not for us.
488
489        Parse out the fields and make the allocations or rejections if for us,
490        otherwise, assuming we're willing to proxy, proxy the request out.
491        """
492
493        def gateway_hardware(h):
494            if h == 'GWTYPE': return self.attrs.get('connectorType', 'GWTYPE')
495            else: return h
496
497        # The dance to get into the request body
498        if req.has_key('RequestAccessRequestBody'):
499            req = req['RequestAccessRequestBody']
500        else:
501            raise service_error(service_error.req, "No request!?")
502
503        if req.has_key('destinationTestbed'):
504            dt = unpack_id(req['destinationTestbed'])
505
506        if dt == None or dt in self.testbed:
507            # Request for this fedd
508            found, dyn, owners = self.lookup_access(req, fid)
509            restricted = None
510            ap = None
511
512            # If this is a request to export a project and the access project
513            # is not the project to export, access denied.
514            if req.has_key('exportProject'):
515                ep = unpack_id(req['exportProject'])
516                if ep != found[0].name:
517                    raise service_error(service_error.access,
518                            "Cannot export %s" % ep)
519
520            # Check for access to restricted nodes
521            if req.has_key('resources') and req['resources'].has_key('node'):
522                resources = req['resources']
523                restricted = [ gateway_hardware(t) for n in resources['node'] \
524                                if n.has_key('hardware') \
525                                    for t in n['hardware'] \
526                                        if gateway_hardware(t) \
527                                            in self.restricted ]
528                inaccessible = [ t for t in restricted \
529                                    if t not in found[0].node_types]
530                if len(inaccessible) > 0:
531                    raise service_error(service_error.access,
532                            "Access denied (nodetypes %s)" % \
533                            str(', ').join(inaccessible))
534            # These collect the keys for the two roles into single sets, one
535            # for creation and one for service.  The sets are a simple way to
536            # eliminate duplicates
537            create_ssh = set([ x['sshPubkey'] \
538                    for x in req['createAccess'] \
539                        if x.has_key('sshPubkey')])
540
541            service_ssh = set([ x['sshPubkey'] \
542                    for x in req['serviceAccess'] \
543                        if x.has_key('sshPubkey')])
544
545            if len(create_ssh) > 0 and len(service_ssh) >0: 
546                if dyn[1]: 
547                    # Compose the dynamic project request
548                    # (only dynamic, dynamic currently allowed)
549                    preq = { 'AllocateProjectRequestBody': \
550                                { 'project' : {\
551                                    'user': [ \
552                                    { \
553                                        'access': [ { 'sshPubkey': s } \
554                                            for s in service_ssh ], 
555                                         'role': "serviceAccess",\
556                                    }, \
557                                    { \
558                                        'access': [ { 'sshPubkey': s } \
559                                            for s in create_ssh ], 
560                                         'role': "experimentCreation",\
561                                    }, \
562                                    ], \
563                                    }\
564                                }\
565                            }
566                    if restricted != None and len(restricted) > 0:
567                        preq['AllocateProjectRequestBody']['resources'] = \
568                             {'node': [ { 'hardware' :  [ h ] } \
569                                    for h in restricted ] } 
570                    ap = self.allocate_project.dynamic_project(preq)
571                else:
572                    preq = {'StaticProjectRequestBody' : \
573                            { 'project': \
574                                { 'name' : { 'localname' : found[0].name },\
575                                  'user' : [ \
576                                    {\
577                                        'userID': { 'localname' : found[1] }, \
578                                        'access': [ { 'sshPubkey': s } 
579                                            for s in create_ssh ],
580                                        'role': 'experimentCreation'\
581                                    },\
582                                    {\
583                                        'userID': { 'localname' : found[2] }, \
584                                        'access': [ { 'sshPubkey': s } 
585                                            for s in service_ssh ],
586                                        'role': 'serviceAccess'\
587                                    },\
588                                ]}\
589                            }\
590                    }
591                    if restricted != None and len(restricted) > 0:
592                        preq['StaticProjectRequestBody']['resources'] = \
593                            {'node': [ { 'hardware' :  [ h ] } \
594                                    for h in restricted ] } 
595                    ap = self.allocate_project.static_project(preq)
596            else:
597                raise service_error(service_error.req, 
598                        "SSH access parameters required")
599            # keep track of what's been added
600            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
601            aid = unicode(allocID)
602
603            self.state_lock.acquire()
604            self.allocation[aid] = { }
605            try:
606                pname = ap['project']['name']['localname']
607            except KeyError:
608                pname = None
609
610            if dyn[1]:
611                if not pname:
612                    self.state_lock.release()
613                    raise service_error(service_error.internal,
614                            "Misformed allocation response?")
615                if self.projects.has_key(pname): self.projects[pname] += 1
616                else: self.projects[pname] = 1
617                self.allocation[aid]['project'] = pname
618
619            if ap.has_key('resources'):
620                if not pname:
621                    self.state_lock.release()
622                    raise service_error(service_error.internal,
623                            "Misformed allocation response?")
624                self.allocation[aid]['types'] = set()
625                nodes = ap['resources'].get('node', [])
626                for n in nodes:
627                    for h in n.get('hardware', []):
628                        if self.types.has_key((pname, h)):
629                            self.types[(pname, h)] += 1
630                        else:
631                            self.types[(pname, h)] = 1
632                        self.allocation[aid]['types'].add((pname,h))
633
634
635            self.allocation[aid]['keys'] = [ ]
636
637            try:
638                for u in ap['project']['user']:
639                    uname = u['userID']['localname']
640                    for k in [ k['sshPubkey'] for k in u['access'] \
641                            if k.has_key('sshPubkey') ]:
642                        kv = "%s:%s" % (uname, k)
643                        if self.keys.has_key(kv): self.keys[kv] += 1
644                        else: self.keys[kv] = 1
645                        self.allocation[aid]['keys'].append((uname, k))
646            except KeyError:
647                self.state_lock.release()
648                raise service_error(service_error.internal,
649                        "Misformed allocation response?")
650
651
652            self.allocation[aid]['owners'] = owners
653            self.write_state()
654            self.state_lock.release()
655            for o in owners:
656                self.auth.set_attribute(o, allocID)
657            try:
658                f = open("%s/%s.pem" % (self.certdir, aid), "w")
659                print >>f, alloc_cert
660                f.close()
661            except IOError, e:
662                raise service_error(service_error.internal, 
663                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
664            resp = self.build_response({ 'fedid': allocID } , ap)
665            return resp
666        else:
667            if self.allow_proxy:
668                resp = self.proxy_RequestAccess.call_service(dt, req,
669                            self.cert_file, self.cert_pwd,
670                            self.trusted_certs)
671                if resp.has_key('RequestAccessResponseBody'):
672                    return resp['RequestAccessResponseBody']
673                else:
674                    return None
675            else:
676                raise service_error(service_error.access,
677                        "Access proxying denied")
678
679    def ReleaseAccess(self, req, fid):
680        # The dance to get into the request body
681        if req.has_key('ReleaseAccessRequestBody'):
682            req = req['ReleaseAccessRequestBody']
683        else:
684            raise service_error(service_error.req, "No request!?")
685
686        if req.has_key('destinationTestbed'):
687            dt = unpack_id(req['destinationTestbed'])
688        else:
689            dt = None
690
691        if dt == None or dt in self.testbed:
692            # Local request
693            try:
694                if req['allocID'].has_key('localname'):
695                    auth_attr = aid = req['allocID']['localname']
696                elif req['allocID'].has_key('fedid'):
697                    aid = unicode(req['allocID']['fedid'])
698                    auth_attr = req['allocID']['fedid']
699                else:
700                    raise service_error(service_error.req,
701                            "Only localnames and fedids are understood")
702            except KeyError:
703                raise service_error(service_error.req, "Badly formed request")
704
705            self.log.debug("[access] deallocation requested for %s", aid)
706            if not self.auth.check_attribute(fid, auth_attr):
707                self.log.debug("[access] deallocation denied for %s", aid)
708                raise service_error(service_error.access, "Access Denied")
709
710            # If we know this allocation, reduce the reference counts and
711            # remove the local allocations.  Otherwise report an error.  If
712            # there is an allocation to delete, del_users will be a dictonary
713            # of sets where the key is the user that owns the keys in the set.
714            # We use a set to avoid duplicates.  del_project is just the name
715            # of any dynamic project to delete.  We're somewhat lazy about
716            # deleting authorization attributes.  Having access to something
717            # that doesn't exist isn't harmful.
718            del_users = { }
719            del_project = None
720            del_types = set()
721
722            if self.allocation.has_key(aid):
723                self.log.debug("Found allocation for %s" %aid)
724                self.state_lock.acquire()
725                for k in self.allocation[aid]['keys']:
726                    kk = "%s:%s" % k
727                    self.keys[kk] -= 1
728                    if self.keys[kk] == 0:
729                        if not del_users.has_key(k[0]):
730                            del_users[k[0]] = set()
731                        del_users[k[0]].add(k[1])
732                        del self.keys[kk]
733
734                if self.allocation[aid].has_key('project'):
735                    pname = self.allocation[aid]['project']
736                    self.projects[pname] -= 1
737                    if self.projects[pname] == 0:
738                        del_project = pname
739                        del self.projects[pname]
740
741                if self.allocation[aid].has_key('types'):
742                    for t in self.allocation[aid]['types']:
743                        self.types[t] -= 1
744                        if self.types[t] == 0:
745                            if not del_project: del_project = t[0]
746                            del_types.add(t[1])
747                            del self.types[t]
748
749                del self.allocation[aid]
750                self.write_state()
751                self.state_lock.release()
752                # If we actually have resources to deallocate, prepare the call.
753                if del_project or del_users:
754                    msg = { 'project': { }}
755                    if del_project:
756                        msg['project']['name']= {'localname': del_project}
757                    users = [ ]
758                    for u in del_users.keys():
759                        users.append({ 'userID': { 'localname': u },\
760                            'access' :  \
761                                    [ {'sshPubkey' : s } for s in del_users[u]]\
762                        })
763                    if users: 
764                        msg['project']['user'] = users
765                    if len(del_types) > 0:
766                        msg['resources'] = { 'node': \
767                                [ {'hardware': [ h ] } for h in del_types ]\
768                            }
769                    if self.allocate_project.release_project:
770                        msg = { 'ReleaseProjectRequestBody' : msg}
771                        self.allocate_project.release_project(msg)
772                return { 'allocID': req['allocID'] } 
773            else:
774                raise service_error(service_error.req, "No such allocation")
775
776        else:
777            if self.allow_proxy:
778                resp = self.proxy_ReleaseAccess.call_service(dt, req,
779                            self.cert_file, self.cert_pwd,
780                            self.trusted_certs)
781                if resp.has_key('ReleaseAccessResponseBody'):
782                    return resp['ReleaseAccessResponseBody']
783                else:
784                    return None
785            else:
786                raise service_error(service_error.access,
787                        "Access proxying denied")
788
789
790
791    class emulab_segment:
792        class ssh_cmd_timeout(RuntimeError): pass
793
794        def __init__(self, log=None, keyfile=None, debug=False):
795            self.log = log or logging.getLogger(\
796                    'fedd.experiment_control.emulab_segment')
797            self.ssh_privkey_file = keyfile
798            self.debug = debug
799            self.ssh_exec="/usr/bin/ssh"
800            self.scp_exec = "/usr/bin/scp"
801            self.ssh_cmd_timeout = emulab_segment.ssh_cmd_timeout
802
803        def scp_file(self, file, user, host, dest=""):
804            """
805            scp a file to the remote host.  If debug is set the action is only
806            logged.
807            """
808
809            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
810                    '-o', 'StrictHostKeyChecking yes', '-i', 
811                    self.ssh_privkey_file, file, 
812                    "%s@%s:%s" % (user, host, dest)]
813            rv = 0
814
815            try:
816                dnull = open("/dev/null", "w")
817            except IOError:
818                self.log.debug("[ssh_file]: failed to open " + \
819                        "/dev/null for redirect")
820                dnull = Null
821
822            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
823            if not self.debug:
824                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
825                        close_fds=True)
826
827            return rv == 0
828
829        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
830            """
831            Run a remote command on host as user.  If debug is set, the action
832            is only logged.  Commands are run without stdin, to avoid stray
833            SIGTTINs.
834            """
835            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
836                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
837                    (self.ssh_exec, self.ssh_privkey_file, 
838                            user, host, cmd)
839
840            try:
841                dnull = open("/dev/null", "w")
842            except IOError:
843                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
844                        "for redirect")
845                dnull = Null
846
847            self.log.debug("[ssh_cmd]: %s" % sh_str)
848            if not self.debug:
849                if dnull:
850                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
851                            close_fds=True)
852                else:
853                    sub = Popen(sh_str, shell=True,
854                            close_fds=True)
855                if timeout:
856                    i = 0
857                    rv = sub.poll()
858                    while i < timeout:
859                        if rv is not None: break
860                        else:
861                            time.sleep(1)
862                            rv = sub.poll()
863                            i += 1
864                    else:
865                        self.log.debug("Process exceeded runtime: %s" % sh_str)
866                        os.kill(sub.pid, signal.SIGKILL)
867                        raise self.ssh_cmd_timeout();
868                    return rv == 0
869                else:
870                    return sub.wait() == 0
871            else:
872                if timeout == 0:
873                    self.log.debug("debug timeout raised on %s " % sh_str)
874                    raise self.ssh_cmd_timeout()
875                else:
876                    return True
877
878    class start_segment(emulab_segment):
879        def __init__(self, log=None, keyfile=None, debug=False):
880            experiment_control_local.emulab_segment.__init__(self,
881                    log=log, keyfile=keyfile, debug=debug)
882
883        def create_config_tree(self, src_dir, dest_dir, script):
884            """
885            Append commands to script that will create the directory hierarchy
886            on the remote federant.
887            """
888
889            if os.path.isdir(src_dir):
890                print >>script, "mkdir -p %s" % dest_dir
891                print >>script, "chmod 770 %s" % dest_dir
892
893                for f in os.listdir(src_dir):
894                    if os.path.isdir(f):
895                        self.create_config_tree("%s/%s" % (src_dir, f), 
896                                "%s/%s" % (dest_dir, f), script)
897            else:
898                self.log.debug("[create_config_tree]: Not a directory: %s" \
899                        % src_dir)
900
901        def ship_configs(self, host, user, src_dir, dest_dir):
902            """
903            Copy federant-specific configuration files to the federant.
904            """
905            for f in os.listdir(src_dir):
906                if os.path.isdir(f):
907                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
908                            "%s/%s" % (dest_dir, f)):
909                        return False
910                else:
911                    if not self.scp_file("%s/%s" % (src_dir, f), 
912                            user, host, dest_dir):
913                        return False
914            return True
915
916        def get_state(self, user, host, tb, pid, eid):
917            # command to test experiment state
918            expinfo_exec = "/usr/testbed/bin/expinfo" 
919            # Regular expressions to parse the expinfo response
920            state_re = re.compile("State:\s+(\w+)")
921            no_exp_re = re.compile("^No\s+such\s+experiment")
922            swapping_re = re.compile("^No\s+information\s+available.")
923            state = None    # Experiment state parsed from expinfo
924            # The expinfo ssh command.  Note the identity restriction to use
925            # only the identity provided in the pubkey given.
926            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
927                    'StrictHostKeyChecking yes', '-i', 
928                    self.ssh_privkey_file, "%s@%s" % (user, host), 
929                    expinfo_exec, pid, eid]
930
931            dev_null = None
932            try:
933                dev_null = open("/dev/null", "a")
934            except IOError, e:
935                self.log.error("[get_state]: can't open /dev/null: %s" %e)
936
937            if self.debug:
938                state = 'swapped'
939                rv = 0
940            else:
941                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
942                        close_fds=True)
943                for line in status.stdout:
944                    m = state_re.match(line)
945                    if m: state = m.group(1)
946                    else:
947                        for reg, st in ((no_exp_re, "none"),
948                                (swapping_re, "swapping")):
949                            m = reg.match(line)
950                            if m: state = st
951                rv = status.wait()
952
953            # If the experiment is not present the subcommand returns a
954            # non-zero return value.  If we successfully parsed a "none"
955            # outcome, ignore the return code.
956            if rv != 0 and state != 'none':
957                raise service_error(service_error.internal,
958                        "Cannot get status of segment %s:%s/%s" % \
959                                (tb, pid, eid))
960            elif state not in ('active', 'swapped', 'swapping', 'none'):
961                raise service_error(service_error.internal,
962                        "Cannot get status of segment %s:%s/%s" % \
963                                (tb, pid, eid))
964            else: return state
965
966
967        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
968            """
969            Start a sub-experiment on a federant.
970
971            Get the current state, modify or create as appropriate, ship data
972            and configs and start the experiment.  There are small ordering
973            differences based on the initial state of the sub-experiment.
974            """
975            # ops node in the federant
976            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
977            user = tbparams[tb]['user']     # federant user
978            pid = tbparams[tb]['project']   # federant project
979            # XXX
980            base_confs = ( "hosts",)
981            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
982            # Configuration directories on the remote machine
983            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
984            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
985            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
986
987            state = self.get_state(user, host, tb, pid, eid)
988
989            self.log.debug("[start_segment]: %s: %s" % (tb, state))
990            self.log.info("[start_segment]:transferring experiment to %s" % tb)
991
992            if not self.scp_file("%s/%s/%s" % \
993                    (tmpdir, tb, tclfile), user, host):
994                return False
995           
996            if state == 'none':
997                # Create a null copy of the experiment so that we capture any
998                # logs there if the modify fails.  Emulab software discards the
999                # logs from a failed startexp
1000                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
1001                    return False
1002                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
1003                timedout = False
1004                try:
1005                    if not self.ssh_cmd(user, host,
1006                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
1007                            "-e %s null.tcl") % (pid, eid), "startexp",
1008                            timeout=60 * 10):
1009                        return False
1010                except self.ssh_cmd_timeout:
1011                    timedout = True
1012
1013                if timedout:
1014                    state = self.get_state(user, host, tb, pid, eid)
1015                    if state != "swapped":
1016                        return False
1017
1018           
1019            # Open up a temporary file to contain a script for setting up the
1020            # filespace for the new experiment.
1021            self.log.info("[start_segment]: creating script file")
1022            try:
1023                sf, scriptname = tempfile.mkstemp()
1024                scriptfile = os.fdopen(sf, 'w')
1025            except IOError:
1026                return False
1027
1028            scriptbase = os.path.basename(scriptname)
1029
1030            # Script the filesystem changes
1031            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
1032            # Clear and create the tarfiles and rpm directories
1033            for d in (tarfiles_dir, rpms_dir):
1034                print >>scriptfile, "/bin/rm -rf %s/*" % d
1035                print >>scriptfile, "mkdir -p %s" % d
1036            print >>scriptfile, 'mkdir -p %s' % proj_dir
1037            self.create_config_tree("%s/%s" % (tmpdir, tb),
1038                    proj_dir, scriptfile)
1039            if os.path.isdir("%s/tarfiles" % tmpdir):
1040                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
1041                        scriptfile)
1042            if os.path.isdir("%s/rpms" % tmpdir):
1043                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
1044                        scriptfile)
1045            print >>scriptfile, "rm -f %s" % scriptbase
1046            scriptfile.close()
1047
1048            # Move the script to the remote machine
1049            # XXX: could collide tempfile names on the remote host
1050            if self.scp_file(scriptname, user, host, scriptbase):
1051                os.remove(scriptname)
1052            else:
1053                return False
1054
1055            # Execute the script (and the script's last line deletes it)
1056            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
1057                return False
1058
1059            for f in base_confs:
1060                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
1061                        "%s/%s" % (proj_dir, f)):
1062                    return False
1063            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
1064                    proj_dir):
1065                return False
1066            if os.path.isdir("%s/tarfiles" % tmpdir):
1067                if not self.ship_configs(host, user,
1068                        "%s/tarfiles" % tmpdir, tarfiles_dir):
1069                    return False
1070            if os.path.isdir("%s/rpms" % tmpdir):
1071                if not self.ship_configs(host, user,
1072                        "%s/rpms" % tmpdir, tarfiles_dir):
1073                    return False
1074            # Stage the new configuration (active experiments will stay swapped
1075            # in now)
1076            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
1077            try:
1078                if not self.ssh_cmd(user, host,
1079                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
1080                                (pid, eid, tclfile),
1081                        "modexp", timeout= 60 * 10):
1082                    return False
1083            except self.ssh_cmd_timeout:
1084                self.log.error("Modify command failed to complete in time")
1085                # There's really no way to see if this succeeded or failed, so
1086                # if it hangs, assume the worst.
1087                return False
1088            # Active experiments are still swapped, this swaps the others in.
1089            if state != 'active':
1090                self.log.info("[start_segment]: Swapping %s in on %s" % \
1091                        (eid, tb))
1092                timedout = False
1093                try:
1094                    if not self.ssh_cmd(user, host,
1095                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
1096                            "swapexp", timeout=10*60):
1097                        return False
1098                except self.ssh_cmd_timeout:
1099                    timedout = True
1100               
1101                # If the command was terminated, but completed successfully,
1102                # report success.
1103                if timedout:
1104                    self.log.debug("[start_segment]: swapin timed out " +\
1105                            "checking state")
1106                    state = self.get_state(user, host, tb, pid, eid)
1107                    self.log.debug("[start_segment]: state is %s" % state)
1108                    return state == 'active'
1109            # Everything has gone OK.
1110            return True
1111
1112    class stop_segment(emulab_segment):
1113        def __init__(self, log=None, keyfile=None, debug=False):
1114            experiment_control_local.emulab_segment.__init__(self,
1115                    log=log, keyfile=keyfile, debug=debug)
1116
1117        def __call__(self, tb, eid, tbparams):
1118            """
1119            Stop a sub experiment by calling swapexp on the federant
1120            """
1121            user = tbparams[tb]['user']
1122            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
1123            pid = tbparams[tb]['project']
1124
1125            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
1126            rv = False
1127            try:
1128                # Clean out tar files: we've gone over quota in the past
1129                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
1130                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
1131                        (pid, eid))
1132                rv = self.ssh_cmd(user, host,
1133                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
1134            except self.ssh_cmd_timeout:
1135                rv = False
1136            return rv
1137
1138    def StartSegment(self, req, fid):
1139        def get_url(url, cf, tmpdir):
1140            po = urlparse(url)
1141            fn = po.path.rpartition('/')[2]
1142            try:
1143                conn = httplib.HTTPSConnection(po.hostname, port=po.port, 
1144                        cert_file=cf, key_file=cf)
1145                conn.putrequest('GET', po.path)
1146                conn.endheaders()
1147                response = conn.getresponse()
1148
1149                lf = open("%s/%s" % (tmpdir, fn), "w")
1150                buf = response.read(4096)
1151                while buf:
1152                    lf.write(buf)
1153                    buf = response.read(4096)
1154                lf.close()
1155            except IOError, e:
1156                raise service_error(service_error.internal,
1157                        "Erro writing tempfile: %s" %e)
1158            except httplib.HTTPException, e:
1159                raise service_error(service_error.internal, 
1160                        "Error retrieving data: %s" % e)
1161
1162        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1163
1164
1165        try:
1166            req = req['StartSegmentRequestBody']
1167        except KeyError:
1168            raise service_error(server_error.req, "Badly formed request")
1169        auth_attr = req['allocID']['fedid']
1170        attrs = req.get('fedAttr', [])
1171        print auth_attr
1172        print "%s" % auth_attr
1173        if self.auth.check_attribute(fid, auth_attr):
1174            print "OK"
1175        else:
1176            print "Fail"
1177
1178        if req.has_key('segmentdescription') and \
1179                req['segmentdescription'].has_key('topdldescription'):
1180            topo = \
1181                topdl.Topology(**req['segmentdescription']['topdldescription'])
1182        else:
1183            raise service_error(service_error.req, 
1184                    "Request missing segmentdescription'")
1185
1186        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1187        try:
1188            tmpdir = tempfile.mkdtemp(prefix="access-")
1189        except IOError:
1190            raise service_error(service_error.internal, "Cannot create tmp dir")
1191
1192        sw = set()
1193        for e in [c for c in topo.elements if getattr(c, 'software', False)]:
1194            for s in e.software:
1195                sw.add(s.location)
1196        for s in sw:
1197            get_url(s, certfile, tmpdir)
1198
1199        for a in attrs:
1200            if a['attribute'] in configs:
1201                get_url(a['value'], certfile, tmpdir)
1202
1203        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.