source: fedd/federation/access.py @ cc8d8e9

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since cc8d8e9 was cc8d8e9, checked in by Ted Faber <faber@…>, 15 years ago

checkpoint

  • Property mode set to 100644
File size: 46.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9
10from threading import *
11
12from util import *
13from allocate_project import allocate_project_local, allocate_project_remote
14from access_project import access_project
15from fedid import fedid, generate_fedid
16from authorizer import authorizer
17from service_error import service_error
18from remote_service import xmlrpc_handler, soap_handler, service_caller
19
20
21# Make log messages disappear if noone configures a fedd logger
22class nullHandler(logging.Handler):
23    def emit(self, record): pass
24
25fl = logging.getLogger("fedd.access")
26fl.addHandler(nullHandler())
27
28class access:
29    """
30    The implementation of access control based on mapping users to projects.
31
32    Users can be mapped to existing projects or have projects created
33    dynamically.  This implements both direct requests and proxies.
34    """
35
36    class parse_error(RuntimeError): pass
37
38
39    proxy_RequestAccess= service_caller('RequestAccess')
40    proxy_ReleaseAccess= service_caller('ReleaseAccess')
41
42    def __init__(self, config=None, auth=None):
43        """
44        Initializer.  Pulls parameters out of the ConfigParser's access section.
45        """
46
47        # Make sure that the configuration is in place
48        if not config: 
49            raise RunTimeError("No config to fedd.access")
50
51        self.project_priority = config.getboolean("access", "project_priority")
52        self.allow_proxy = config.getboolean("access", "allow_proxy")
53
54        self.boss = config.get("access", "boss")
55        self.ops = config.get("access", "ops")
56        self.domain = config.get("access", "domain")
57        self.fileserver = config.get("access", "fileserver")
58        self.eventserver = config.get("access", "eventserver")
59
60        self.attrs = { }
61        self.access = { }
62        self.restricted = [ ]
63        self.projects = { }
64        self.keys = { }
65        self.types = { }
66        self.allocation = { }
67        self.state = { 
68            'projects': self.projects,
69            'allocation' : self.allocation,
70            'keys' : self.keys,
71            'types': self.types
72        }
73        self.log = logging.getLogger("fedd.access")
74        set_log_level(config, "access", self.log)
75        self.state_lock = Lock()
76
77        if auth: self.auth = auth
78        else:
79            self.log.error(\
80                    "[access]: No authorizer initialized, creating local one.")
81            auth = authorizer()
82
83        tb = config.get('access', 'testbed')
84        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
85        else: self.testbed = [ ]
86
87        if config.has_option("access", "accessdb"):
88            self.read_access(config.get("access", "accessdb"))
89
90        self.state_filename = config.get("access", "access_state")
91        self.read_state()
92
93        # Keep cert_file and cert_pwd coming from the same place
94        self.cert_file = config.get("access", "cert_file")
95        if self.cert_file:
96            self.sert_pwd = config.get("access", "cert_pw")
97        else:
98            self.cert_file = config.get("globals", "cert_file")
99            self.sert_pwd = config.get("globals", "cert_pw")
100
101        self.trusted_certs = config.get("access", "trusted_certs") or \
102                config.get("globals", "trusted_certs")
103
104        self.soap_services = {\
105            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
106            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
107            'StartSegment': soap_handler("StartSegment", self.StartSegment),
108            }
109        self.xmlrpc_services =  {\
110            'RequestAccess': xmlrpc_handler('RequestAccess',
111                self.RequestAccess),
112            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
113                self.ReleaseAccess),
114            'StartSegment': xmlrpc_handler('StartSegment',
115                self.StartSegment),
116            }
117
118
119        if not config.has_option("allocate", "uri"):
120            self.allocate_project = \
121                allocate_project_local(config, auth)
122        else:
123            self.allocate_project = \
124                allocate_project_remote(config, auth)
125
126        # If the project allocator exports services, put them in this object's
127        # maps so that classes that instantiate this can call the services.
128        self.soap_services.update(self.allocate_project.soap_services)
129        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
130
131
132    def read_access(self, config):
133        """
134        Read a configuration file and set internal parameters.
135
136        The format is more complex than one might hope.  The basic format is
137        attribute value pairs separated by colons(:) on a signle line.  The
138        attributes in bool_attrs, emulab_attrs and id_attrs can all be set
139        directly using the name: value syntax.  E.g.
140        boss: hostname
141        sets self.boss to hostname.  In addition, there are access lines of the
142        form (tb, proj, user) -> (aproj, auser) that map the first tuple of
143        names to the second for access purposes.  Names in the key (left side)
144        can include "<NONE> or <ANY>" to act as wildcards or to require the
145        fields to be empty.  Similarly aproj or auser can be <SAME> or
146        <DYNAMIC> indicating that either the matching key is to be used or a
147        dynamic user or project will be created.  These names can also be
148        federated IDs (fedid's) if prefixed with fedid:.  Finally, the aproj
149        can be followed with a colon-separated list of node types to which that
150        project has access (or will have access if dynamic).
151        Testbed attributes outside the forms above can be given using the
152        format attribute: name value: value.  The name is a single word and the
153        value continues to the end of the line.  Empty lines and lines startin
154        with a # are ignored.
155
156        Parsing errors result in a self.parse_error exception being raised.
157        """
158        lineno=0
159        name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+"
160        fedid_expr = "fedid:[" + string.hexdigits + "]+"
161        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
162        access_proj = "(<DYNAMIC>(?::" + name_expr +")*|"+ \
163                "<SAME>" + "(?::" + name_expr + ")*|" + \
164                fedid_expr + "(?::" + name_expr + ")*|" + \
165                name_expr + "(?::" + name_expr + ")*)"
166        access_name = "(<DYNAMIC>|<SAME>|" + fedid_expr + "|"+ name_expr + ")"
167
168        restricted_re = re.compile("restricted:\s*(.*)", re.IGNORECASE)
169        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
170                re.IGNORECASE)
171        access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+
172                key_name+'\s*\)\s*->\s*\('+access_proj + '\s*,\s*' + 
173                access_name + '\s*,\s*' + access_name + '\s*\)', re.IGNORECASE)
174
175        def parse_name(n):
176            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
177            else: return n
178       
179        def auth_name(n):
180            if isinstance(n, basestring):
181                if n =='<any>' or n =='<none>': return None
182                else: return unicode(n)
183            else:
184                return n
185
186        f = open(config, "r");
187        for line in f:
188            lineno += 1
189            line = line.strip();
190            if len(line) == 0 or line.startswith('#'):
191                continue
192
193            # Extended (attribute: x value: y) attribute line
194            m = attr_re.match(line)
195            if m != None:
196                attr, val = m.group(1,2)
197                self.attrs[attr] = val
198                continue
199
200            # Restricted entry
201            m = restricted_re.match(line)
202            if m != None:
203                val = m.group(1)
204                self.restricted.append(val)
205                continue
206
207            # Access line (t, p, u) -> (ap, cu, su) line
208            m = access_re.match(line)
209            if m != None:
210                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
211                auth_key = tuple([ auth_name(x) for x in access_key])
212                aps = m.group(4).split(":");
213                if aps[0] == 'fedid:':
214                    del aps[0]
215                    aps[0] = fedid(hexstr=aps[0])
216
217                cu = parse_name(m.group(5))
218                su = parse_name(m.group(6))
219
220                access_val = (access_project(aps[0], aps[1:]),
221                        parse_name(m.group(5)), parse_name(m.group(6)))
222
223                self.access[access_key] = access_val
224                self.auth.set_attribute(auth_key, "access")
225                continue
226
227            # Nothing matched to here: unknown line - raise exception
228            f.close()
229            raise self.parse_error("Unknown statement at line %d of %s" % \
230                    (lineno, config))
231        f.close()
232
233    def get_users(self, obj):
234        """
235        Return a list of the IDs of the users in dict
236        """
237        if obj.has_key('user'):
238            return [ unpack_id(u['userID']) \
239                    for u in obj['user'] if u.has_key('userID') ]
240        else:
241            return None
242
243    def write_state(self):
244        if self.state_filename:
245            try:
246                f = open(self.state_filename, 'w')
247                pickle.dump(self.state, f)
248            except IOError, e:
249                self.log.error("Can't write file %s: %s" % \
250                        (self.state_filename, e))
251            except pickle.PicklingError, e:
252                self.log.error("Pickling problem: %s" % e)
253            except TypeError, e:
254                self.log.error("Pickling problem (TypeError): %s" % e)
255
256
257    def read_state(self):
258        """
259        Read a new copy of access state.  Old state is overwritten.
260
261        State format is a simple pickling of the state dictionary.
262        """
263        if self.state_filename:
264            try:
265                f = open(self.state_filename, "r")
266                self.state = pickle.load(f)
267
268                self.allocation = self.state['allocation']
269                self.projects = self.state['projects']
270                self.keys = self.state['keys']
271                self.types = self.state['types']
272
273                self.log.debug("[read_state]: Read state from %s" % \
274                        self.state_filename)
275            except IOError, e:
276                self.log.warning(("[read_state]: No saved state: " +\
277                        "Can't open %s: %s") % (self.state_filename, e))
278            except EOFError, e:
279                self.log.warning(("[read_state]: " +\
280                        "Empty or damaged state file: %s:") % \
281                        self.state_filename)
282            except pickle.UnpicklingError, e:
283                self.log.warning(("[read_state]: No saved state: " + \
284                        "Unpickling failed: %s") % e)
285
286            # Add the ownership attributes to the authorizer.  Note that the
287            # indices of the allocation dict are strings, but the attributes are
288            # fedids, so there is a conversion.
289            for k in self.allocation.keys():
290                for o in self.allocation[k].get('owners', []):
291                    self.auth.set_attribute(o, fedid(hexstr=k))
292
293
294    def permute_wildcards(self, a, p):
295        """Return a copy of a with various fields wildcarded.
296
297        The bits of p control the wildcards.  A set bit is a wildcard
298        replacement with the lowest bit being user then project then testbed.
299        """
300        if p & 1: user = ["<any>"]
301        else: user = a[2]
302        if p & 2: proj = "<any>"
303        else: proj = a[1]
304        if p & 4: tb = "<any>"
305        else: tb = a[0]
306
307        return (tb, proj, user)
308
309    def find_access(self, search):
310        """
311        Search the access DB for a match on this tuple.  Return the matching
312        access tuple and the user that matched.
313       
314        NB, if the initial tuple fails to match we start inserting wildcards in
315        an order determined by self.project_priority.  Try the list of users in
316        order (when wildcarded, there's only one user in the list).
317        """
318        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
319        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
320
321        for p in perm: 
322            s = self.permute_wildcards(search, p)
323            # s[2] is None on an anonymous, unwildcarded request
324            if s[2] != None:
325                for u in s[2]:
326                    if self.access.has_key((s[0], s[1], u)):
327                        return (self.access[(s[0], s[1], u)], u)
328            else:
329                if self.access.has_key(s):
330                    return (self.access[s], None)
331        return None, None
332
333    def lookup_access(self, req, fid):
334        """
335        Determine the allowed access for this request.  Return the access and
336        which fields are dynamic.
337
338        The fedid is needed to construct the request
339        """
340        # Search keys
341        tb = None
342        project = None
343        user = None
344        # Return values
345        rp = access_project(None, ())
346        ru = None
347
348        if req.has_key('project'):
349            p = req['project']
350            if p.has_key('name'):
351                project = unpack_id(p['name'])
352            user = self.get_users(p)
353        else:
354            user = self.get_users(req)
355
356        user_fedids = [ u for u in user if isinstance(u, fedid)]
357        # Determine how the caller is representing itself.  If its fedid shows
358        # up as a project or a singleton user, let that stand.  If neither the
359        # usernames nor the project name is a fedid, the caller is a testbed.
360        if project and isinstance(project, fedid):
361            if project == fid:
362                # The caller is the project (which is already in the tuple
363                # passed in to the authorizer)
364                owners = user_fedids
365                owners.append(project)
366            else:
367                raise service_error(service_error.req,
368                        "Project asserting different fedid")
369        else:
370            if fid not in user_fedids:
371                tb = fid
372                owners = user_fedids
373                owners.append(fid)
374            else:
375                if len(fedids) > 1:
376                    raise service_error(service_error.req,
377                            "User asserting different fedid")
378                else:
379                    # Which is a singleton
380                    owners = user_fedids
381        # Confirm authorization
382
383        for u in user:
384            self.log.debug("[lookup_access] Checking access for %s" % \
385                    ((tb, project, u),))
386            if self.auth.check_attribute((tb, project, u), 'access'):
387                self.log.debug("[lookup_access] Access granted")
388                break
389            else:
390                self.log.debug("[lookup_access] Access Denied")
391        else:
392            raise service_error(service_error.access, "Access denied")
393
394        # This maps a valid user to the Emulab projects and users to use
395        found, user_match = self.find_access((tb, project, user))
396       
397        if found == None:
398            raise service_error(service_error.access,
399                    "Access denied - cannot map access")
400
401        # resolve <dynamic> and <same> in found
402        dyn_proj = False
403        dyn_create_user = False
404        dyn_service_user = False
405
406        if found[0].name == "<same>":
407            if project != None:
408                rp.name = project
409            else : 
410                raise service_error(\
411                        service_error.server_config,
412                        "Project matched <same> when no project given")
413        elif found[0].name == "<dynamic>":
414            rp.name = None
415            dyn_proj = True
416        else:
417            rp.name = found[0].name
418        rp.node_types = found[0].node_types;
419
420        if found[1] == "<same>":
421            if user_match == "<any>":
422                if user != None: rcu = user[0]
423                else: raise service_error(\
424                        service_error.server_config,
425                        "Matched <same> on anonymous request")
426            else:
427                rcu = user_match
428        elif found[1] == "<dynamic>":
429            rcu = None
430            dyn_create_user = True
431        else:
432            rcu = found[1]
433       
434        if found[2] == "<same>":
435            if user_match == "<any>":
436                if user != None: rsu = user[0]
437                else: raise service_error(\
438                        service_error.server_config,
439                        "Matched <same> on anonymous request")
440            else:
441                rsu = user_match
442        elif found[2] == "<dynamic>":
443            rsu = None
444            dyn_service_user = True
445        else:
446            rsu = found[2]
447
448        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
449                owners
450
451    def build_response(self, alloc_id, ap):
452        """
453        Create the SOAP response.
454
455        Build the dictionary description of the response and use
456        fedd_utils.pack_soap to create the soap message.  ap is the allocate
457        project message returned from a remote project allocation (even if that
458        allocation was done locally).
459        """
460        # Because alloc_id is already a fedd_services_types.IDType_Holder,
461        # there's no need to repack it
462        msg = { 
463                'allocID': alloc_id,
464                'emulab': { 
465                    'domain': self.domain,
466                    'boss': self.boss,
467                    'ops': self.ops,
468                    'fileServer': self.fileserver,
469                    'eventServer': self.eventserver,
470                    'project': ap['project']
471                },
472            }
473        if len(self.attrs) > 0:
474            msg['emulab']['fedAttr'] = \
475                [ { 'attribute': x, 'value' : y } \
476                        for x,y in self.attrs.iteritems()]
477        return msg
478
479    def RequestAccess(self, req, fid):
480        """
481        Handle the access request.  Proxy if not for us.
482
483        Parse out the fields and make the allocations or rejections if for us,
484        otherwise, assuming we're willing to proxy, proxy the request out.
485        """
486
487        def gateway_hardware(h):
488            if h == 'GWTYPE': return self.attrs.get('connectorType', 'GWTYPE')
489            else: return h
490
491        # The dance to get into the request body
492        if req.has_key('RequestAccessRequestBody'):
493            req = req['RequestAccessRequestBody']
494        else:
495            raise service_error(service_error.req, "No request!?")
496
497        if req.has_key('destinationTestbed'):
498            dt = unpack_id(req['destinationTestbed'])
499
500        if dt == None or dt in self.testbed:
501            # Request for this fedd
502            found, dyn, owners = self.lookup_access(req, fid)
503            restricted = None
504            ap = None
505
506            # If this is a request to export a project and the access project
507            # is not the project to export, access denied.
508            if req.has_key('exportProject'):
509                ep = unpack_id(req['exportProject'])
510                if ep != found[0].name:
511                    raise service_error(service_error.access,
512                            "Cannot export %s" % ep)
513
514            # Check for access to restricted nodes
515            if req.has_key('resources') and req['resources'].has_key('node'):
516                resources = req['resources']
517                restricted = [ gateway_hardware(t) for n in resources['node'] \
518                                if n.has_key('hardware') \
519                                    for t in n['hardware'] \
520                                        if gateway_hardware(t) \
521                                            in self.restricted ]
522                inaccessible = [ t for t in restricted \
523                                    if t not in found[0].node_types]
524                if len(inaccessible) > 0:
525                    raise service_error(service_error.access,
526                            "Access denied (nodetypes %s)" % \
527                            str(', ').join(inaccessible))
528            # These collect the keys for the two roles into single sets, one
529            # for creation and one for service.  The sets are a simple way to
530            # eliminate duplicates
531            create_ssh = set([ x['sshPubkey'] \
532                    for x in req['createAccess'] \
533                        if x.has_key('sshPubkey')])
534
535            service_ssh = set([ x['sshPubkey'] \
536                    for x in req['serviceAccess'] \
537                        if x.has_key('sshPubkey')])
538
539            if len(create_ssh) > 0 and len(service_ssh) >0: 
540                if dyn[1]: 
541                    # Compose the dynamic project request
542                    # (only dynamic, dynamic currently allowed)
543                    preq = { 'AllocateProjectRequestBody': \
544                                { 'project' : {\
545                                    'user': [ \
546                                    { \
547                                        'access': [ { 'sshPubkey': s } \
548                                            for s in service_ssh ], 
549                                         'role': "serviceAccess",\
550                                    }, \
551                                    { \
552                                        'access': [ { 'sshPubkey': s } \
553                                            for s in create_ssh ], 
554                                         'role': "experimentCreation",\
555                                    }, \
556                                    ], \
557                                    }\
558                                }\
559                            }
560                    if restricted != None and len(restricted) > 0:
561                        preq['AllocateProjectRequestBody']['resources'] = \
562                             {'node': [ { 'hardware' :  [ h ] } \
563                                    for h in restricted ] } 
564                    ap = self.allocate_project.dynamic_project(preq)
565                else:
566                    preq = {'StaticProjectRequestBody' : \
567                            { 'project': \
568                                { 'name' : { 'localname' : found[0].name },\
569                                  'user' : [ \
570                                    {\
571                                        'userID': { 'localname' : found[1] }, \
572                                        'access': [ { 'sshPubkey': s } 
573                                            for s in create_ssh ],
574                                        'role': 'experimentCreation'\
575                                    },\
576                                    {\
577                                        'userID': { 'localname' : found[2] }, \
578                                        'access': [ { 'sshPubkey': s } 
579                                            for s in service_ssh ],
580                                        'role': 'serviceAccess'\
581                                    },\
582                                ]}\
583                            }\
584                    }
585                    if restricted != None and len(restricted) > 0:
586                        preq['StaticProjectRequestBody']['resources'] = \
587                            {'node': [ { 'hardware' :  [ h ] } \
588                                    for h in restricted ] } 
589                    ap = self.allocate_project.static_project(preq)
590            else:
591                raise service_error(service_error.req, 
592                        "SSH access parameters required")
593            # keep track of what's been added
594            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
595            aid = unicode(allocID)
596
597            self.state_lock.acquire()
598            self.allocation[aid] = { }
599            try:
600                pname = ap['project']['name']['localname']
601            except KeyError:
602                pname = None
603
604            if dyn[1]:
605                if not pname:
606                    self.state_lock.release()
607                    raise service_error(service_error.internal,
608                            "Misformed allocation response?")
609                if self.projects.has_key(pname): self.projects[pname] += 1
610                else: self.projects[pname] = 1
611                self.allocation[aid]['project'] = pname
612
613            if ap.has_key('resources'):
614                if not pname:
615                    self.state_lock.release()
616                    raise service_error(service_error.internal,
617                            "Misformed allocation response?")
618                self.allocation[aid]['types'] = set()
619                nodes = ap['resources'].get('node', [])
620                for n in nodes:
621                    for h in n.get('hardware', []):
622                        if self.types.has_key((pname, h)):
623                            self.types[(pname, h)] += 1
624                        else:
625                            self.types[(pname, h)] = 1
626                        self.allocation[aid]['types'].add((pname,h))
627
628
629            self.allocation[aid]['keys'] = [ ]
630
631            try:
632                for u in ap['project']['user']:
633                    uname = u['userID']['localname']
634                    for k in [ k['sshPubkey'] for k in u['access'] \
635                            if k.has_key('sshPubkey') ]:
636                        kv = "%s:%s" % (uname, k)
637                        if self.keys.has_key(kv): self.keys[kv] += 1
638                        else: self.keys[kv] = 1
639                        self.allocation[aid]['keys'].append((uname, k))
640            except KeyError:
641                self.state_lock.release()
642                raise service_error(service_error.internal,
643                        "Misformed allocation response?")
644
645
646            self.allocation[aid]['owners'] = owners
647            self.write_state()
648            self.state_lock.release()
649            for o in owners:
650                self.auth.set_attribute(o, allocID)
651            resp = self.build_response({ 'fedid': allocID } , ap)
652            return resp
653        else:
654            if self.allow_proxy:
655                resp = self.proxy_RequestAccess.call_service(dt, req,
656                            self.cert_file, self.cert_pwd,
657                            self.trusted_certs)
658                if resp.has_key('RequestAccessResponseBody'):
659                    return resp['RequestAccessResponseBody']
660                else:
661                    return None
662            else:
663                raise service_error(service_error.access,
664                        "Access proxying denied")
665
666    def ReleaseAccess(self, req, fid):
667        # The dance to get into the request body
668        if req.has_key('ReleaseAccessRequestBody'):
669            req = req['ReleaseAccessRequestBody']
670        else:
671            raise service_error(service_error.req, "No request!?")
672
673        if req.has_key('destinationTestbed'):
674            dt = unpack_id(req['destinationTestbed'])
675        else:
676            dt = None
677
678        if dt == None or dt in self.testbed:
679            # Local request
680            try:
681                if req['allocID'].has_key('localname'):
682                    auth_attr = aid = req['allocID']['localname']
683                elif req['allocID'].has_key('fedid'):
684                    aid = unicode(req['allocID']['fedid'])
685                    auth_attr = req['allocID']['fedid']
686                else:
687                    raise service_error(service_error.req,
688                            "Only localnames and fedids are understood")
689            except KeyError:
690                raise service_error(service_error.req, "Badly formed request")
691
692            self.log.debug("[access] deallocation requested for %s", aid)
693            if not self.auth.check_attribute(fid, auth_attr):
694                self.log.debug("[access] deallocation denied for %s", aid)
695                raise service_error(service_error.access, "Access Denied")
696
697            # If we know this allocation, reduce the reference counts and
698            # remove the local allocations.  Otherwise report an error.  If
699            # there is an allocation to delete, del_users will be a dictonary
700            # of sets where the key is the user that owns the keys in the set.
701            # We use a set to avoid duplicates.  del_project is just the name
702            # of any dynamic project to delete.  We're somewhat lazy about
703            # deleting authorization attributes.  Having access to something
704            # that doesn't exist isn't harmful.
705            del_users = { }
706            del_project = None
707            del_types = set()
708
709            if self.allocation.has_key(aid):
710                self.log.debug("Found allocation for %s" %aid)
711                self.state_lock.acquire()
712                for k in self.allocation[aid]['keys']:
713                    kk = "%s:%s" % k
714                    self.keys[kk] -= 1
715                    if self.keys[kk] == 0:
716                        if not del_users.has_key(k[0]):
717                            del_users[k[0]] = set()
718                        del_users[k[0]].add(k[1])
719                        del self.keys[kk]
720
721                if self.allocation[aid].has_key('project'):
722                    pname = self.allocation[aid]['project']
723                    self.projects[pname] -= 1
724                    if self.projects[pname] == 0:
725                        del_project = pname
726                        del self.projects[pname]
727
728                if self.allocation[aid].has_key('types'):
729                    for t in self.allocation[aid]['types']:
730                        self.types[t] -= 1
731                        if self.types[t] == 0:
732                            if not del_project: del_project = t[0]
733                            del_types.add(t[1])
734                            del self.types[t]
735
736                del self.allocation[aid]
737                self.write_state()
738                self.state_lock.release()
739                # If we actually have resources to deallocate, prepare the call.
740                if del_project or del_users:
741                    msg = { 'project': { }}
742                    if del_project:
743                        msg['project']['name']= {'localname': del_project}
744                    users = [ ]
745                    for u in del_users.keys():
746                        users.append({ 'userID': { 'localname': u },\
747                            'access' :  \
748                                    [ {'sshPubkey' : s } for s in del_users[u]]\
749                        })
750                    if users: 
751                        msg['project']['user'] = users
752                    if len(del_types) > 0:
753                        msg['resources'] = { 'node': \
754                                [ {'hardware': [ h ] } for h in del_types ]\
755                            }
756                    if self.allocate_project.release_project:
757                        msg = { 'ReleaseProjectRequestBody' : msg}
758                        self.allocate_project.release_project(msg)
759                return { 'allocID': req['allocID'] } 
760            else:
761                raise service_error(service_error.req, "No such allocation")
762
763        else:
764            if self.allow_proxy:
765                resp = self.proxy_ReleaseAccess.call_service(dt, req,
766                            self.cert_file, self.cert_pwd,
767                            self.trusted_certs)
768                if resp.has_key('ReleaseAccessResponseBody'):
769                    return resp['ReleaseAccessResponseBody']
770                else:
771                    return None
772            else:
773                raise service_error(service_error.access,
774                        "Access proxying denied")
775
776
777
778    class emulab_segment:
779        class ssh_cmd_timeout(RuntimeError): pass
780
781        def __init__(self, log=None, keyfile=None, debug=False):
782            self.log = log or logging.getLogger(\
783                    'fedd.experiment_control.emulab_segment')
784            self.ssh_privkey_file = keyfile
785            self.debug = debug
786            self.ssh_exec="/usr/bin/ssh"
787            self.scp_exec = "/usr/bin/scp"
788            self.ssh_cmd_timeout = emulab_segment.ssh_cmd_timeout
789
790        def scp_file(self, file, user, host, dest=""):
791            """
792            scp a file to the remote host.  If debug is set the action is only
793            logged.
794            """
795
796            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
797                    '-o', 'StrictHostKeyChecking yes', '-i', 
798                    self.ssh_privkey_file, file, 
799                    "%s@%s:%s" % (user, host, dest)]
800            rv = 0
801
802            try:
803                dnull = open("/dev/null", "w")
804            except IOError:
805                self.log.debug("[ssh_file]: failed to open " + \
806                        "/dev/null for redirect")
807                dnull = Null
808
809            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
810            if not self.debug:
811                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
812                        close_fds=True)
813
814            return rv == 0
815
816        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
817            """
818            Run a remote command on host as user.  If debug is set, the action
819            is only logged.  Commands are run without stdin, to avoid stray
820            SIGTTINs.
821            """
822            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
823                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
824                    (self.ssh_exec, self.ssh_privkey_file, 
825                            user, host, cmd)
826
827            try:
828                dnull = open("/dev/null", "w")
829            except IOError:
830                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
831                        "for redirect")
832                dnull = Null
833
834            self.log.debug("[ssh_cmd]: %s" % sh_str)
835            if not self.debug:
836                if dnull:
837                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
838                            close_fds=True)
839                else:
840                    sub = Popen(sh_str, shell=True,
841                            close_fds=True)
842                if timeout:
843                    i = 0
844                    rv = sub.poll()
845                    while i < timeout:
846                        if rv is not None: break
847                        else:
848                            time.sleep(1)
849                            rv = sub.poll()
850                            i += 1
851                    else:
852                        self.log.debug("Process exceeded runtime: %s" % sh_str)
853                        os.kill(sub.pid, signal.SIGKILL)
854                        raise self.ssh_cmd_timeout();
855                    return rv == 0
856                else:
857                    return sub.wait() == 0
858            else:
859                if timeout == 0:
860                    self.log.debug("debug timeout raised on %s " % sh_str)
861                    raise self.ssh_cmd_timeout()
862                else:
863                    return True
864
865    class start_segment(emulab_segment):
866        def __init__(self, log=None, keyfile=None, debug=False):
867            experiment_control_local.emulab_segment.__init__(self,
868                    log=log, keyfile=keyfile, debug=debug)
869
870        def create_config_tree(self, src_dir, dest_dir, script):
871            """
872            Append commands to script that will create the directory hierarchy
873            on the remote federant.
874            """
875
876            if os.path.isdir(src_dir):
877                print >>script, "mkdir -p %s" % dest_dir
878                print >>script, "chmod 770 %s" % dest_dir
879
880                for f in os.listdir(src_dir):
881                    if os.path.isdir(f):
882                        self.create_config_tree("%s/%s" % (src_dir, f), 
883                                "%s/%s" % (dest_dir, f), script)
884            else:
885                self.log.debug("[create_config_tree]: Not a directory: %s" \
886                        % src_dir)
887
888        def ship_configs(self, host, user, src_dir, dest_dir):
889            """
890            Copy federant-specific configuration files to the federant.
891            """
892            for f in os.listdir(src_dir):
893                if os.path.isdir(f):
894                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
895                            "%s/%s" % (dest_dir, f)):
896                        return False
897                else:
898                    if not self.scp_file("%s/%s" % (src_dir, f), 
899                            user, host, dest_dir):
900                        return False
901            return True
902
903        def get_state(self, user, host, tb, pid, eid):
904            # command to test experiment state
905            expinfo_exec = "/usr/testbed/bin/expinfo" 
906            # Regular expressions to parse the expinfo response
907            state_re = re.compile("State:\s+(\w+)")
908            no_exp_re = re.compile("^No\s+such\s+experiment")
909            swapping_re = re.compile("^No\s+information\s+available.")
910            state = None    # Experiment state parsed from expinfo
911            # The expinfo ssh command.  Note the identity restriction to use
912            # only the identity provided in the pubkey given.
913            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
914                    'StrictHostKeyChecking yes', '-i', 
915                    self.ssh_privkey_file, "%s@%s" % (user, host), 
916                    expinfo_exec, pid, eid]
917
918            dev_null = None
919            try:
920                dev_null = open("/dev/null", "a")
921            except IOError, e:
922                self.log.error("[get_state]: can't open /dev/null: %s" %e)
923
924            if self.debug:
925                state = 'swapped'
926                rv = 0
927            else:
928                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
929                        close_fds=True)
930                for line in status.stdout:
931                    m = state_re.match(line)
932                    if m: state = m.group(1)
933                    else:
934                        for reg, st in ((no_exp_re, "none"),
935                                (swapping_re, "swapping")):
936                            m = reg.match(line)
937                            if m: state = st
938                rv = status.wait()
939
940            # If the experiment is not present the subcommand returns a
941            # non-zero return value.  If we successfully parsed a "none"
942            # outcome, ignore the return code.
943            if rv != 0 and state != 'none':
944                raise service_error(service_error.internal,
945                        "Cannot get status of segment %s:%s/%s" % \
946                                (tb, pid, eid))
947            elif state not in ('active', 'swapped', 'swapping', 'none'):
948                raise service_error(service_error.internal,
949                        "Cannot get status of segment %s:%s/%s" % \
950                                (tb, pid, eid))
951            else: return state
952
953
954        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
955            """
956            Start a sub-experiment on a federant.
957
958            Get the current state, modify or create as appropriate, ship data
959            and configs and start the experiment.  There are small ordering
960            differences based on the initial state of the sub-experiment.
961            """
962            # ops node in the federant
963            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
964            user = tbparams[tb]['user']     # federant user
965            pid = tbparams[tb]['project']   # federant project
966            # XXX
967            base_confs = ( "hosts",)
968            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
969            # Configuration directories on the remote machine
970            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
971            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
972            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
973
974            state = self.get_state(user, host, tb, pid, eid)
975
976            self.log.debug("[start_segment]: %s: %s" % (tb, state))
977            self.log.info("[start_segment]:transferring experiment to %s" % tb)
978
979            if not self.scp_file("%s/%s/%s" % \
980                    (tmpdir, tb, tclfile), user, host):
981                return False
982           
983            if state == 'none':
984                # Create a null copy of the experiment so that we capture any
985                # logs there if the modify fails.  Emulab software discards the
986                # logs from a failed startexp
987                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
988                    return False
989                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
990                timedout = False
991                try:
992                    if not self.ssh_cmd(user, host,
993                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
994                            "-e %s null.tcl") % (pid, eid), "startexp",
995                            timeout=60 * 10):
996                        return False
997                except self.ssh_cmd_timeout:
998                    timedout = True
999
1000                if timedout:
1001                    state = self.get_state(user, host, tb, pid, eid)
1002                    if state != "swapped":
1003                        return False
1004
1005           
1006            # Open up a temporary file to contain a script for setting up the
1007            # filespace for the new experiment.
1008            self.log.info("[start_segment]: creating script file")
1009            try:
1010                sf, scriptname = tempfile.mkstemp()
1011                scriptfile = os.fdopen(sf, 'w')
1012            except IOError:
1013                return False
1014
1015            scriptbase = os.path.basename(scriptname)
1016
1017            # Script the filesystem changes
1018            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
1019            # Clear and create the tarfiles and rpm directories
1020            for d in (tarfiles_dir, rpms_dir):
1021                print >>scriptfile, "/bin/rm -rf %s/*" % d
1022                print >>scriptfile, "mkdir -p %s" % d
1023            print >>scriptfile, 'mkdir -p %s' % proj_dir
1024            self.create_config_tree("%s/%s" % (tmpdir, tb),
1025                    proj_dir, scriptfile)
1026            if os.path.isdir("%s/tarfiles" % tmpdir):
1027                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
1028                        scriptfile)
1029            if os.path.isdir("%s/rpms" % tmpdir):
1030                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
1031                        scriptfile)
1032            print >>scriptfile, "rm -f %s" % scriptbase
1033            scriptfile.close()
1034
1035            # Move the script to the remote machine
1036            # XXX: could collide tempfile names on the remote host
1037            if self.scp_file(scriptname, user, host, scriptbase):
1038                os.remove(scriptname)
1039            else:
1040                return False
1041
1042            # Execute the script (and the script's last line deletes it)
1043            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
1044                return False
1045
1046            for f in base_confs:
1047                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
1048                        "%s/%s" % (proj_dir, f)):
1049                    return False
1050            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
1051                    proj_dir):
1052                return False
1053            if os.path.isdir("%s/tarfiles" % tmpdir):
1054                if not self.ship_configs(host, user,
1055                        "%s/tarfiles" % tmpdir, tarfiles_dir):
1056                    return False
1057            if os.path.isdir("%s/rpms" % tmpdir):
1058                if not self.ship_configs(host, user,
1059                        "%s/rpms" % tmpdir, tarfiles_dir):
1060                    return False
1061            # Stage the new configuration (active experiments will stay swapped
1062            # in now)
1063            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
1064            try:
1065                if not self.ssh_cmd(user, host,
1066                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
1067                                (pid, eid, tclfile),
1068                        "modexp", timeout= 60 * 10):
1069                    return False
1070            except self.ssh_cmd_timeout:
1071                self.log.error("Modify command failed to complete in time")
1072                # There's really no way to see if this succeeded or failed, so
1073                # if it hangs, assume the worst.
1074                return False
1075            # Active experiments are still swapped, this swaps the others in.
1076            if state != 'active':
1077                self.log.info("[start_segment]: Swapping %s in on %s" % \
1078                        (eid, tb))
1079                timedout = False
1080                try:
1081                    if not self.ssh_cmd(user, host,
1082                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
1083                            "swapexp", timeout=10*60):
1084                        return False
1085                except self.ssh_cmd_timeout:
1086                    timedout = True
1087               
1088                # If the command was terminated, but completed successfully,
1089                # report success.
1090                if timedout:
1091                    self.log.debug("[start_segment]: swapin timed out " +\
1092                            "checking state")
1093                    state = self.get_state(user, host, tb, pid, eid)
1094                    self.log.debug("[start_segment]: state is %s" % state)
1095                    return state == 'active'
1096            # Everything has gone OK.
1097            return True
1098
1099    class stop_segment(emulab_segment):
1100        def __init__(self, log=None, keyfile=None, debug=False):
1101            experiment_control_local.emulab_segment.__init__(self,
1102                    log=log, keyfile=keyfile, debug=debug)
1103
1104        def __call__(self, tb, eid, tbparams):
1105            """
1106            Stop a sub experiment by calling swapexp on the federant
1107            """
1108            user = tbparams[tb]['user']
1109            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
1110            pid = tbparams[tb]['project']
1111
1112            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
1113            rv = False
1114            try:
1115                # Clean out tar files: we've gone over quota in the past
1116                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
1117                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
1118                        (pid, eid))
1119                rv = self.ssh_cmd(user, host,
1120                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
1121            except self.ssh_cmd_timeout:
1122                rv = False
1123            return rv
1124
1125    def StartSegment(self, req, fid):
1126        try:
1127            req = req['StartSegmentRequestBody']
1128        except KeyError:
1129            raise service_error(server_error.req, "Badly formed request")
1130        auth_attr = req['allocID']['fedid']
1131        if self.auth.check_attribute(fid, auth_attr):
1132            print "OK"
1133        else:
1134            print "Fail"
1135        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.