source: fedd/federation/emulab_access.py @ d3c8759

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since d3c8759 was d3c8759, checked in by Ted Faber <faber@…>, 14 years ago

Wholesale change of IOError to EnvironmentError? for file operations. Lots of
uncaught EnvironmentErrors? were causing spurious error conditions, e.g. on disk
full.

  • Property mode set to 100644
File size: 62.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12
13from threading import *
14from M2Crypto.SSL import SSLError
15
16from util import *
17from allocate_project import allocate_project_local, allocate_project_remote
18from access_project import access_project
19from fedid import fedid, generate_fedid
20from authorizer import authorizer
21from service_error import service_error
22from remote_service import xmlrpc_handler, soap_handler, service_caller
23
24import httplib
25import tempfile
26from urlparse import urlparse
27
28import topdl
29import list_log
30import proxy_emulab_segment
31import local_emulab_segment
32
33
34# Make log messages disappear if noone configures a fedd logger
35class nullHandler(logging.Handler):
36    def emit(self, record): pass
37
38fl = logging.getLogger("fedd.access")
39fl.addHandler(nullHandler())
40
41class access:
42    """
43    The implementation of access control based on mapping users to projects.
44
45    Users can be mapped to existing projects or have projects created
46    dynamically.  This implements both direct requests and proxies.
47    """
48
49    class parse_error(RuntimeError): pass
50
51
52    proxy_RequestAccess= service_caller('RequestAccess')
53    proxy_ReleaseAccess= service_caller('ReleaseAccess')
54
55    def __init__(self, config=None, auth=None):
56        """
57        Initializer.  Pulls parameters out of the ConfigParser's access section.
58        """
59
60        def software_list(v):
61            l = [ ]
62            if v:
63                ps = v.split(" ")
64                while len(ps):
65                    loc, file = ps[0:2]
66                    del ps[0:2]
67                    l.append((loc, file))
68            return l
69
70
71        # Make sure that the configuration is in place
72        if not config: 
73            raise RunTimeError("No config to fedd.access")
74
75        self.project_priority = config.getboolean("access", "project_priority")
76        self.allow_proxy = config.getboolean("access", "allow_proxy")
77
78        self.boss = config.get("access", "boss")
79        self.ops = config.get("access", "ops")
80        self.domain = config.get("access", "domain")
81        self.fileserver = config.get("access", "fileserver")
82        self.eventserver = config.get("access", "eventserver")
83        self.certdir = config.get("access","certdir")
84        self.userconfdir = config.get("access","userconfdir")
85        self.userconfcmd = config.get("access","userconfcmd")
86        self.userconfurl = config.get("access","userconfurl")
87        self.federation_software = config.get("access", "federation_software")
88        self.portal_software = config.get("access", "portal_software")
89        self.local_seer_software = config.get("access", "local_seer_software")
90        self.local_seer_image = config.get("access", "local_seer_image")
91        self.local_seer_start = config.get("access", "local_seer_start")
92        self.seer_master_start = config.get("access", "seer_master_start")
93        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
94        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
95        self.ssh_port = config.get("access","ssh_port") or "22"
96        self.create_debug = config.getboolean("access", "create_debug")
97        self.cleanup = not config.getboolean("access", "leave_tmpfiles")
98        self.access_type = config.get("access", "type")
99   
100        self.federation_software = software_list(self.federation_software)
101        self.portal_software = software_list(self.portal_software)
102        self.local_seer_software = software_list(self.local_seer_software)
103
104        self.access_type = self.access_type.lower()
105        if self.access_type == 'remote_emulab':
106            self.start_segment = proxy_emulab_segment.start_segment
107            self.stop_segment = proxy_emulab_segment.stop_segment
108        elif self.access_type == 'local_emulab':
109            self.start_segment = local_emulab_segment.start_segment
110            self.stop_segment = local_emulab_segment.stop_segment
111        else:
112            self.start_segment = None
113            self.stop_segment = None
114
115        self.attrs = { }
116        self.access = { }
117        self.restricted = [ ]
118        self.projects = { }
119        self.keys = { }
120        self.types = { }
121        self.allocation = { }
122        self.state = { 
123            'projects': self.projects,
124            'allocation' : self.allocation,
125            'keys' : self.keys,
126            'types': self.types
127        }
128        self.log = logging.getLogger("fedd.access")
129        set_log_level(config, "access", self.log)
130        self.state_lock = Lock()
131        # XXX: Configurable
132        self.exports = set(('SMB', 'seer', 'tmcd', 'userconfig', 
133            'project_export', 'local_seer_control', 'seer_master'))
134        self.imports = set(('SMB', 'seer', 'userconfig', 'seer_master'))
135
136        if not self.local_seer_image or not self.local_seer_software:
137            self.exports.discard('local_seer_control')
138            self.exports.discard('seer_master')
139
140        if not self.local_seer_start:
141            self.exports.discard('local_seer_control')
142
143        if not self.seer_master_start:
144            self.exports.discard('seer_master')
145
146        if auth: self.auth = auth
147        else:
148            self.log.error(\
149                    "[access]: No authorizer initialized, creating local one.")
150            auth = authorizer()
151
152        tb = config.get('access', 'testbed')
153        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
154        else: self.testbed = [ ]
155
156        if config.has_option("access", "accessdb"):
157            self.read_access(config.get("access", "accessdb"))
158
159        self.state_filename = config.get("access", "access_state")
160        self.read_state()
161
162        # Keep cert_file and cert_pwd coming from the same place
163        self.cert_file = config.get("access", "cert_file")
164        if self.cert_file:
165            self.sert_pwd = config.get("access", "cert_pw")
166        else:
167            self.cert_file = config.get("globals", "cert_file")
168            self.sert_pwd = config.get("globals", "cert_pw")
169
170        self.trusted_certs = config.get("access", "trusted_certs") or \
171                config.get("globals", "trusted_certs")
172
173        self.soap_services = {\
174            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
175            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
176            'StartSegment': soap_handler("StartSegment", self.StartSegment),
177            'TerminateSegment': soap_handler("TerminateSegment",
178                self.TerminateSegment),
179            }
180        self.xmlrpc_services =  {\
181            'RequestAccess': xmlrpc_handler('RequestAccess',
182                self.RequestAccess),
183            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
184                self.ReleaseAccess),
185            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
186            'TerminateSegment': xmlrpc_handler('TerminateSegment',
187                self.TerminateSegment),
188            }
189
190        self.call_SetValue = service_caller('SetValue')
191        self.call_GetValue = service_caller('GetValue', log=self.log)
192
193        if not config.has_option("allocate", "uri"):
194            self.allocate_project = \
195                allocate_project_local(config, auth)
196        else:
197            self.allocate_project = \
198                allocate_project_remote(config, auth)
199
200
201        # If the project allocator exports services, put them in this object's
202        # maps so that classes that instantiate this can call the services.
203        self.soap_services.update(self.allocate_project.soap_services)
204        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
205
206    @staticmethod
207    def add_kit(e, kit):
208        """
209        Add a Software object created from the list of (install, location)
210        tuples passed as kit  to the software attribute of an object e.  We
211        do this enough to break out the code, but it's kind of a hack to
212        avoid changing the old tuple rep.
213        """
214
215        s = [ topdl.Software(install=i, location=l) for i, l in kit]
216
217        if isinstance(e.software, list): e.software.extend(s)
218        else: e.software = s
219
220
221    def read_access(self, config):
222        """
223        Read a configuration file and set internal parameters.
224
225        The format is more complex than one might hope.  The basic format is
226        attribute value pairs separated by colons(:) on a signle line.  The
227        attributes in bool_attrs, emulab_attrs and id_attrs can all be set
228        directly using the name: value syntax.  E.g.
229        boss: hostname
230        sets self.boss to hostname.  In addition, there are access lines of the
231        form (tb, proj, user) -> (aproj, auser) that map the first tuple of
232        names to the second for access purposes.  Names in the key (left side)
233        can include "<NONE> or <ANY>" to act as wildcards or to require the
234        fields to be empty.  Similarly aproj or auser can be <SAME> or
235        <DYNAMIC> indicating that either the matching key is to be used or a
236        dynamic user or project will be created.  These names can also be
237        federated IDs (fedid's) if prefixed with fedid:.  Finally, the aproj
238        can be followed with a colon-separated list of node types to which that
239        project has access (or will have access if dynamic).
240        Testbed attributes outside the forms above can be given using the
241        format attribute: name value: value.  The name is a single word and the
242        value continues to the end of the line.  Empty lines and lines startin
243        with a # are ignored.
244
245        Parsing errors result in a self.parse_error exception being raised.
246        """
247        lineno=0
248        name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+"
249        fedid_expr = "fedid:[" + string.hexdigits + "]+"
250        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
251        access_proj = "(<DYNAMIC>(?::" + name_expr +")*|"+ \
252                "<SAME>" + "(?::" + name_expr + ")*|" + \
253                fedid_expr + "(?::" + name_expr + ")*|" + \
254                name_expr + "(?::" + name_expr + ")*)"
255        access_name = "(<DYNAMIC>|<SAME>|" + fedid_expr + "|"+ name_expr + ")"
256
257        restricted_re = re.compile("restricted:\s*(.*)", re.IGNORECASE)
258        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
259                re.IGNORECASE)
260        access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+
261                key_name+'\s*\)\s*->\s*\('+access_proj + '\s*,\s*' + 
262                access_name + '\s*,\s*' + access_name + '\s*\)', re.IGNORECASE)
263
264        def parse_name(n):
265            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
266            else: return n
267       
268        def auth_name(n):
269            if isinstance(n, basestring):
270                if n =='<any>' or n =='<none>': return None
271                else: return unicode(n)
272            else:
273                return n
274
275        f = open(config, "r");
276        for line in f:
277            lineno += 1
278            line = line.strip();
279            if len(line) == 0 or line.startswith('#'):
280                continue
281
282            # Extended (attribute: x value: y) attribute line
283            m = attr_re.match(line)
284            if m != None:
285                attr, val = m.group(1,2)
286                self.attrs[attr] = val
287                continue
288
289            # Restricted entry
290            m = restricted_re.match(line)
291            if m != None:
292                val = m.group(1)
293                self.restricted.append(val)
294                continue
295
296            # Access line (t, p, u) -> (ap, cu, su) line
297            m = access_re.match(line)
298            if m != None:
299                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
300                auth_key = tuple([ auth_name(x) for x in access_key])
301                aps = m.group(4).split(":");
302                if aps[0] == 'fedid:':
303                    del aps[0]
304                    aps[0] = fedid(hexstr=aps[0])
305
306                cu = parse_name(m.group(5))
307                su = parse_name(m.group(6))
308
309                access_val = (access_project(aps[0], aps[1:]),
310                        parse_name(m.group(5)), parse_name(m.group(6)))
311
312                self.access[access_key] = access_val
313                self.auth.set_attribute(auth_key, "access")
314                continue
315
316            # Nothing matched to here: unknown line - raise exception
317            f.close()
318            raise self.parse_error("Unknown statement at line %d of %s" % \
319                    (lineno, config))
320        f.close()
321
322    def get_users(self, obj):
323        """
324        Return a list of the IDs of the users in dict
325        """
326        if obj.has_key('user'):
327            return [ unpack_id(u['userID']) \
328                    for u in obj['user'] if u.has_key('userID') ]
329        else:
330            return None
331
332    def write_state(self):
333        if self.state_filename:
334            try:
335                f = open(self.state_filename, 'w')
336                pickle.dump(self.state, f)
337            except EnvironmentError, e:
338                self.log.error("Can't write file %s: %s" % \
339                        (self.state_filename, e))
340            except pickle.PicklingError, e:
341                self.log.error("Pickling problem: %s" % e)
342            except TypeError, e:
343                self.log.error("Pickling problem (TypeError): %s" % e)
344
345
346    def read_state(self):
347        """
348        Read a new copy of access state.  Old state is overwritten.
349
350        State format is a simple pickling of the state dictionary.
351        """
352        if self.state_filename:
353            try:
354                f = open(self.state_filename, "r")
355                self.state = pickle.load(f)
356
357                self.allocation = self.state['allocation']
358                self.projects = self.state['projects']
359                self.keys = self.state['keys']
360                self.types = self.state['types']
361
362                self.log.debug("[read_state]: Read state from %s" % \
363                        self.state_filename)
364            except EnvironmentError, e:
365                self.log.warning(("[read_state]: No saved state: " +\
366                        "Can't open %s: %s") % (self.state_filename, e))
367            except EOFError, e:
368                self.log.warning(("[read_state]: " +\
369                        "Empty or damaged state file: %s:") % \
370                        self.state_filename)
371            except pickle.UnpicklingError, e:
372                self.log.warning(("[read_state]: No saved state: " + \
373                        "Unpickling failed: %s") % e)
374
375            # Add the ownership attributes to the authorizer.  Note that the
376            # indices of the allocation dict are strings, but the attributes are
377            # fedids, so there is a conversion.
378            for k in self.allocation.keys():
379                for o in self.allocation[k].get('owners', []):
380                    self.auth.set_attribute(o, fedid(hexstr=k))
381                if self.allocation[k].has_key('userconfig'):
382                    sfid = self.allocation[k]['userconfig']
383                    fid = fedid(hexstr=sfid)
384                    self.auth.set_attribute(fid, "/%s" % sfid)
385
386
387    def permute_wildcards(self, a, p):
388        """Return a copy of a with various fields wildcarded.
389
390        The bits of p control the wildcards.  A set bit is a wildcard
391        replacement with the lowest bit being user then project then testbed.
392        """
393        if p & 1: user = ["<any>"]
394        else: user = a[2]
395        if p & 2: proj = "<any>"
396        else: proj = a[1]
397        if p & 4: tb = "<any>"
398        else: tb = a[0]
399
400        return (tb, proj, user)
401
402    def find_access(self, search):
403        """
404        Search the access DB for a match on this tuple.  Return the matching
405        access tuple and the user that matched.
406       
407        NB, if the initial tuple fails to match we start inserting wildcards in
408        an order determined by self.project_priority.  Try the list of users in
409        order (when wildcarded, there's only one user in the list).
410        """
411        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
412        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
413
414        for p in perm: 
415            s = self.permute_wildcards(search, p)
416            # s[2] is None on an anonymous, unwildcarded request
417            if s[2] != None:
418                for u in s[2]:
419                    if self.access.has_key((s[0], s[1], u)):
420                        return (self.access[(s[0], s[1], u)], u)
421            else:
422                if self.access.has_key(s):
423                    return (self.access[s], None)
424        return None, None
425
426    def lookup_access(self, req, fid):
427        """
428        Determine the allowed access for this request.  Return the access and
429        which fields are dynamic.
430
431        The fedid is needed to construct the request
432        """
433        user_re = re.compile("user:\s(.*)")
434        project_re = re.compile("project:\s(.*)")
435
436        # Search keys
437        tb = None
438        project = None
439        user = None
440        # Return values
441        rp = access_project(None, ())
442        ru = None
443
444        user = [ user_re.findall(x)[0] for x in req.get('credential', []) \
445                if user_re.match(x)]
446        project = [ project_re.findall(x)[0] \
447                for x in req.get('credential', []) \
448                    if project_re.match(x)]
449
450        if len(project) == 1: project = project[0]
451        elif len(project) == 0: project = None
452        else: 
453            raise service_error(service_error.req, 
454                    "More than one project credential")
455
456
457        user_fedids = [ u for u in user if isinstance(u, fedid)]
458        # Determine how the caller is representing itself.  If its fedid shows
459        # up as a project or a singleton user, let that stand.  If neither the
460        # usernames nor the project name is a fedid, the caller is a testbed.
461        if project and isinstance(project, fedid):
462            if project == fid:
463                # The caller is the project (which is already in the tuple
464                # passed in to the authorizer)
465                owners = user_fedids
466                owners.append(project)
467            else:
468                raise service_error(service_error.req,
469                        "Project asserting different fedid")
470        else:
471            if fid not in user_fedids:
472                tb = fid
473                owners = user_fedids
474                owners.append(fid)
475            else:
476                if len(fedids) > 1:
477                    raise service_error(service_error.req,
478                            "User asserting different fedid")
479                else:
480                    # Which is a singleton
481                    owners = user_fedids
482        # Confirm authorization
483
484        for u in user:
485            self.log.debug("[lookup_access] Checking access for %s" % \
486                    ((tb, project, u),))
487            if self.auth.check_attribute((tb, project, u), 'access'):
488                self.log.debug("[lookup_access] Access granted")
489                break
490            else:
491                self.log.debug("[lookup_access] Access Denied")
492        else:
493            raise service_error(service_error.access, "Access denied")
494
495        # This maps a valid user to the Emulab projects and users to use
496        found, user_match = self.find_access((tb, project, user))
497       
498        if found == None:
499            raise service_error(service_error.access,
500                    "Access denied - cannot map access")
501
502        # resolve <dynamic> and <same> in found
503        dyn_proj = False
504        dyn_create_user = False
505        dyn_service_user = False
506
507        if found[0].name == "<same>":
508            if project != None:
509                rp.name = project
510            else : 
511                raise service_error(\
512                        service_error.server_config,
513                        "Project matched <same> when no project given")
514        elif found[0].name == "<dynamic>":
515            rp.name = None
516            dyn_proj = True
517        else:
518            rp.name = found[0].name
519        rp.node_types = found[0].node_types;
520
521        if found[1] == "<same>":
522            if user_match == "<any>":
523                if user != None: rcu = user[0]
524                else: raise service_error(\
525                        service_error.server_config,
526                        "Matched <same> on anonymous request")
527            else:
528                rcu = user_match
529        elif found[1] == "<dynamic>":
530            rcu = None
531            dyn_create_user = True
532        else:
533            rcu = found[1]
534       
535        if found[2] == "<same>":
536            if user_match == "<any>":
537                if user != None: rsu = user[0]
538                else: raise service_error(\
539                        service_error.server_config,
540                        "Matched <same> on anonymous request")
541            else:
542                rsu = user_match
543        elif found[2] == "<dynamic>":
544            rsu = None
545            dyn_service_user = True
546        else:
547            rsu = found[2]
548
549        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
550                owners
551
552    def get_handler(self, path, fid):
553        self.log.info("Get handler %s %s" % (path, fid))
554        if self.auth.check_attribute(fid, path) and self.userconfdir:
555            return ("%s/%s" % (self.userconfdir, path), "application/binary")
556        else:
557            return (None, None)
558
559    def export_userconf(self, project):
560        dev_null = None
561        confid, confcert = generate_fedid("test", dir=self.userconfdir, 
562                log=self.log)
563        conffilename = "%s/%s" % (self.userconfdir, str(confid))
564        cf = None
565        try:
566            cf = open(conffilename, "w")
567            os.chmod(conffilename, stat.S_IRUSR | stat.S_IWUSR)
568        except EnvironmentError, e:
569            raise service_error(service_error.internal, 
570                    "Cannot create user configuration data")
571
572        try:
573            dev_null = open("/dev/null", "a")
574        except EnvironmentError, e:
575            self.log.error("export_userconf: can't open /dev/null: %s" % e)
576
577        cmd = "%s %s" % (self.userconfcmd, project)
578        conf = subprocess.call(cmd.split(" "),
579                stdout=cf, stderr=dev_null, close_fds=True)
580
581        self.auth.set_attribute(confid, "/%s" % str(confid))
582
583        return confid, confcert
584
585    def export_SMB(self, id, state, project, user):
586        return { 
587                'id': id,
588                'name': 'SMB',
589                'visibility': 'export',
590                'server': 'http://fs:139',
591                'fedAttr': [
592                        { 'attribute': 'SMBSHARE', 'value': 'USERS' },
593                        { 'attribute': 'SMBUSER', 'value': user },
594                        { 'attribute': 'SMBPROJ', 'value': project },
595                    ]
596                }
597
598    def export_seer(self, id, state, project, user):
599        return { 
600                'id': id,
601                'name': 'seer',
602                'visibility': 'export',
603                'server': 'http://control:16606',
604                }
605
606    def export_local_seer(self, id, state, project, user):
607        return { 
608                'id': id,
609                'name': 'local_seer_control',
610                'visibility': 'export',
611                'server': 'http://control:16606',
612                }
613
614    def export_seer_master(self, id, state, project, user):
615        return { 
616                'id': id,
617                'name': 'seer_master',
618                'visibility': 'export',
619                'server': 'http://seer-master:17707',
620                }
621
622    def export_tmcd(self, id, state, project, user):
623        return { 
624                'id': id,
625                'name': 'seer',
626                'visibility': 'export',
627                'server': 'http://boss:7777',
628                }
629
630    def export_userconfig(self, id, state, project, user):
631        if self.userconfdir and self.userconfcmd \
632                and self.userconfurl:
633            cid, cert = self.export_userconf(project)
634            state['userconfig'] = unicode(cid)
635            return {
636                    'id': id,
637                    'name': 'userconfig',
638                    'visibility': 'export',
639                    'server': "%s/%s" % (self.userconfurl, str(cid)),
640                    'fedAttr': [
641                        { 'attribute': 'cert', 'value': cert },
642                    ]
643                    }
644        else:
645            return None
646
647    def export_services(self, sreq, project, user):
648        exp = [ ]
649        state = { }
650        # XXX: Filthy shortcut here using http: so urlparse will give the right
651        # answers.
652        for s in sreq:
653            sname = s.get('name', '')
654            svis = s.get('visibility', '')
655            if svis == 'export':
656                if sname in self.exports:
657                    id = s.get('id', 'no_id')
658                    if sname == 'SMB':
659                        exp.append(self.export_SMB(id, state, project, user))
660                    elif sname == 'seer':
661                        exp.append(self.export_seer(id, state, project, user))
662                    elif sname == 'tmcd':
663                        exp.append(self.export_tmcd(id, state, project, user))
664                    elif sname == 'userconfig':
665                        exp.append(self.export_userconfig(id, state,
666                            project, user))
667                    elif sname == 'project_export':
668                        exp.append(self.export_SMB(id, state, project, user))
669                        #exp.append(self.export_seer(id, state, project, user))
670                        exp.append(self.export_userconfig(id, state,
671                            project, user))
672                    elif sname == 'local_seer_control':
673                        exp.append(self.export_local_seer(id, state, project,
674                            user))
675                    elif sname == 'seer_master':
676                        exp.append(self.export_seer_master(id, state, project,
677                            user))
678        return (exp, state)
679
680    def build_response(self, alloc_id, ap, services):
681        """
682        Create the SOAP response.
683
684        Build the dictionary description of the response and use
685        fedd_utils.pack_soap to create the soap message.  ap is the allocate
686        project message returned from a remote project allocation (even if that
687        allocation was done locally).
688        """
689        # Because alloc_id is already a fedd_services_types.IDType_Holder,
690        # there's no need to repack it
691        msg = { 
692                'allocID': alloc_id,
693                'fedAttr': [
694                    { 'attribute': 'domain', 'value': self.domain } , 
695                    { 'attribute': 'project', 'value': 
696                        ap['project'].get('name', {}).get('localname', "???") },
697                ]
698            }
699        if len(self.attrs) > 0:
700            msg['fedAttr'].extend(
701                [ { 'attribute': x, 'value' : y } \
702                        for x,y in self.attrs.iteritems()])
703
704        if services:
705            msg['service'] = services
706        return msg
707
708    def RequestAccess(self, req, fid):
709        """
710        Handle the access request.  Proxy if not for us.
711
712        Parse out the fields and make the allocations or rejections if for us,
713        otherwise, assuming we're willing to proxy, proxy the request out.
714        """
715
716        def gateway_hardware(h):
717            if h == 'GWTYPE': return self.attrs.get('connectorType', 'GWTYPE')
718            else: return h
719
720        def get_export_project(svcs):
721            """
722            if the service requests includes one to export a project, return
723            that project.
724            """
725            rv = None
726            for s in svcs:
727                if s.get('name', '') == 'project_export' and \
728                        s.get('visibility', '') == 'export':
729                    if not rv: 
730                        for a in s.get('feddAttr', []):
731                            if a.get('attribute', '') == 'project' \
732                                    and 'value' in a:
733                                rv = a['value']
734                    else:
735                        raise service_error(service_error, access, 
736                                'Requesting multiple project exports is ' + \
737                                        'not supported');
738            return rv
739
740        # The dance to get into the request body
741        if req.has_key('RequestAccessRequestBody'):
742            req = req['RequestAccessRequestBody']
743        else:
744            raise service_error(service_error.req, "No request!?")
745
746        if req.has_key('destinationTestbed'):
747            dt = unpack_id(req['destinationTestbed'])
748
749        if dt == None or dt in self.testbed:
750            # Request for this fedd
751            found, dyn, owners = self.lookup_access(req, fid)
752            restricted = None
753            ap = None
754
755            # if this includes a project export request and the exported
756            # project is not the access project, access denied.
757            if 'service' in req:
758                ep = get_export_project(req['service'])
759                if ep and ep != found[0].name:
760                    raise service_error(service_error.access,
761                            "Cannot export %s" % ep)
762
763            # XXX
764            # Check for access to restricted nodes
765            if req.has_key('resources') and req['resources'].has_key('node'):
766                resources = req['resources']
767                restricted = [ gateway_hardware(t) for n in resources['node'] \
768                                if n.has_key('hardware') \
769                                    for t in n['hardware'] \
770                                        if gateway_hardware(t) \
771                                            in self.restricted ]
772                inaccessible = [ t for t in restricted \
773                                    if t not in found[0].node_types]
774                if len(inaccessible) > 0:
775                    raise service_error(service_error.access,
776                            "Access denied (nodetypes %s)" % \
777                            str(', ').join(inaccessible))
778            # XXX
779
780            # These were passed around before, but now are hidden from users
781            # and configurators alike, beyond a configuration file entry.
782            create_ssh = [ self.ssh_pubkey_file ]
783            service_ssh = [ self.ssh_pubkey_file ]
784
785            if len(create_ssh) > 0 and len(service_ssh) >0: 
786                if dyn[1]: 
787                    # Compose the dynamic project request
788                    # (only dynamic, dynamic currently allowed)
789                    preq = { 'AllocateProjectRequestBody': \
790                                { 'project' : {\
791                                    'user': [ \
792                                    { \
793                                        'access': [ { 'sshPubkey': s } \
794                                            for s in service_ssh ], 
795                                         'role': "serviceAccess",\
796                                    }, \
797                                    { \
798                                        'access': [ { 'sshPubkey': s } \
799                                            for s in create_ssh ], 
800                                         'role': "experimentCreation",\
801                                    }, \
802                                    ], \
803                                    }\
804                                }\
805                            }
806                    if restricted != None and len(restricted) > 0:
807                        preq['AllocateProjectRequestBody']['resources'] = \
808                             {'node': [ { 'hardware' :  [ h ] } \
809                                    for h in restricted ] } 
810                    ap = self.allocate_project.dynamic_project(preq)
811                else:
812                    preq = {'StaticProjectRequestBody' : \
813                            { 'project': \
814                                { 'name' : { 'localname' : found[0].name },\
815                                  'user' : [ \
816                                    {\
817                                        'userID': { 'localname' : found[1] }, \
818                                        'access': [ { 'sshPubkey': s } 
819                                            for s in create_ssh ],
820                                        'role': 'experimentCreation'\
821                                    },\
822                                    {\
823                                        'userID': { 'localname' : found[2] }, \
824                                        'access': [ { 'sshPubkey': s } 
825                                            for s in service_ssh ],
826                                        'role': 'serviceAccess'\
827                                    },\
828                                ]}\
829                            }\
830                    }
831                    if restricted != None and len(restricted) > 0:
832                        preq['StaticProjectRequestBody']['resources'] = \
833                            {'node': [ { 'hardware' :  [ h ] } \
834                                    for h in restricted ] } 
835                    ap = self.allocate_project.static_project(preq)
836            else:
837                raise service_error(service_error.req, 
838                        "SSH access parameters required")
839            # keep track of what's been added
840            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
841            aid = unicode(allocID)
842
843            self.state_lock.acquire()
844            self.allocation[aid] = { }
845            try:
846                pname = ap['project']['name']['localname']
847            except KeyError:
848                pname = None
849
850            if dyn[1]:
851                if not pname:
852                    self.state_lock.release()
853                    raise service_error(service_error.internal,
854                            "Misformed allocation response?")
855                if self.projects.has_key(pname): self.projects[pname] += 1
856                else: self.projects[pname] = 1
857                self.allocation[aid]['project'] = pname
858            else:
859                # sproject is a static project associated with this allocation.
860                self.allocation[aid]['sproject'] = pname
861
862            if ap.has_key('resources'):
863                if not pname:
864                    self.state_lock.release()
865                    raise service_error(service_error.internal,
866                            "Misformed allocation response?")
867                self.allocation[aid]['types'] = set()
868                nodes = ap['resources'].get('node', [])
869                for n in nodes:
870                    for h in n.get('hardware', []):
871                        if self.types.has_key((pname, h)):
872                            self.types[(pname, h)] += 1
873                        else:
874                            self.types[(pname, h)] = 1
875                        self.allocation[aid]['types'].add((pname,h))
876
877
878            self.allocation[aid]['keys'] = [ ]
879
880            try:
881                for u in ap['project']['user']:
882                    uname = u['userID']['localname']
883                    if u['role'] == 'experimentCreation':
884                        self.allocation[aid]['user'] = uname
885                    for k in [ k['sshPubkey'] for k in u['access'] \
886                            if k.has_key('sshPubkey') ]:
887                        kv = "%s:%s" % (uname, k)
888                        if self.keys.has_key(kv): self.keys[kv] += 1
889                        else: self.keys[kv] = 1
890                        self.allocation[aid]['keys'].append((uname, k))
891            except KeyError:
892                self.state_lock.release()
893                raise service_error(service_error.internal,
894                        "Misformed allocation response?")
895
896            self.allocation[aid]['owners'] = owners
897            services, svc_state = self.export_services(req.get('service',[]),
898                    pname, uname)
899            # Store services state in global state
900            for k, v in svc_state.items():
901                self.allocation[aid][k] = v
902            self.write_state()
903            self.state_lock.release()
904            for o in owners:
905                self.auth.set_attribute(o, allocID)
906            try:
907                f = open("%s/%s.pem" % (self.certdir, aid), "w")
908                print >>f, alloc_cert
909                f.close()
910            except EnvironmentError, e:
911                raise service_error(service_error.internal, 
912                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
913            resp = self.build_response({ 'fedid': allocID } , ap, services)
914            return resp
915        else:
916            if self.allow_proxy:
917                resp = self.proxy_RequestAccess.call_service(dt, req,
918                            self.cert_file, self.cert_pwd,
919                            self.trusted_certs)
920                if resp.has_key('RequestAccessResponseBody'):
921                    return resp['RequestAccessResponseBody']
922                else:
923                    return None
924            else:
925                raise service_error(service_error.access,
926                        "Access proxying denied")
927
928    def ReleaseAccess(self, req, fid):
929        # The dance to get into the request body
930        if req.has_key('ReleaseAccessRequestBody'):
931            req = req['ReleaseAccessRequestBody']
932        else:
933            raise service_error(service_error.req, "No request!?")
934
935        if req.has_key('destinationTestbed'):
936            dt = unpack_id(req['destinationTestbed'])
937        else:
938            dt = None
939
940        if dt == None or dt in self.testbed:
941            # Local request
942            try:
943                if req['allocID'].has_key('localname'):
944                    auth_attr = aid = req['allocID']['localname']
945                elif req['allocID'].has_key('fedid'):
946                    aid = unicode(req['allocID']['fedid'])
947                    auth_attr = req['allocID']['fedid']
948                else:
949                    raise service_error(service_error.req,
950                            "Only localnames and fedids are understood")
951            except KeyError:
952                raise service_error(service_error.req, "Badly formed request")
953
954            self.log.debug("[access] deallocation requested for %s", aid)
955            if not self.auth.check_attribute(fid, auth_attr):
956                self.log.debug("[access] deallocation denied for %s", aid)
957                raise service_error(service_error.access, "Access Denied")
958
959            # If we know this allocation, reduce the reference counts and
960            # remove the local allocations.  Otherwise report an error.  If
961            # there is an allocation to delete, del_users will be a dictonary
962            # of sets where the key is the user that owns the keys in the set.
963            # We use a set to avoid duplicates.  del_project is just the name
964            # of any dynamic project to delete.  We're somewhat lazy about
965            # deleting authorization attributes.  Having access to something
966            # that doesn't exist isn't harmful.
967            del_users = { }
968            del_project = None
969            del_types = set()
970
971            if self.allocation.has_key(aid):
972                self.log.debug("Found allocation for %s" %aid)
973                self.state_lock.acquire()
974                for k in self.allocation[aid]['keys']:
975                    kk = "%s:%s" % k
976                    self.keys[kk] -= 1
977                    if self.keys[kk] == 0:
978                        if not del_users.has_key(k[0]):
979                            del_users[k[0]] = set()
980                        del_users[k[0]].add(k[1])
981                        del self.keys[kk]
982
983                if self.allocation[aid].has_key('project'):
984                    pname = self.allocation[aid]['project']
985                    self.projects[pname] -= 1
986                    if self.projects[pname] == 0:
987                        del_project = pname
988                        del self.projects[pname]
989
990                if self.allocation[aid].has_key('types'):
991                    for t in self.allocation[aid]['types']:
992                        self.types[t] -= 1
993                        if self.types[t] == 0:
994                            if not del_project: del_project = t[0]
995                            del_types.add(t[1])
996                            del self.types[t]
997
998                del self.allocation[aid]
999                self.write_state()
1000                self.state_lock.release()
1001                # If we actually have resources to deallocate, prepare the call.
1002                if del_project or del_users:
1003                    msg = { 'project': { }}
1004                    if del_project:
1005                        msg['project']['name']= {'localname': del_project}
1006                    users = [ ]
1007                    for u in del_users.keys():
1008                        users.append({ 'userID': { 'localname': u },\
1009                            'access' :  \
1010                                    [ {'sshPubkey' : s } for s in del_users[u]]\
1011                        })
1012                    if users: 
1013                        msg['project']['user'] = users
1014                    if len(del_types) > 0:
1015                        msg['resources'] = { 'node': \
1016                                [ {'hardware': [ h ] } for h in del_types ]\
1017                            }
1018                    if self.allocate_project.release_project:
1019                        msg = { 'ReleaseProjectRequestBody' : msg}
1020                        self.allocate_project.release_project(msg)
1021                # And remove the access cert
1022                cf = "%s/%s.pem" % (self.certdir, aid)
1023                self.log.debug("Removing %s" % cf)
1024                os.remove(cf)
1025                return { 'allocID': req['allocID'] } 
1026            else:
1027                raise service_error(service_error.req, "No such allocation")
1028
1029        else:
1030            if self.allow_proxy:
1031                resp = self.proxy_ReleaseAccess.call_service(dt, req,
1032                            self.cert_file, self.cert_pwd,
1033                            self.trusted_certs)
1034                if resp.has_key('ReleaseAccessResponseBody'):
1035                    return resp['ReleaseAccessResponseBody']
1036                else:
1037                    return None
1038            else:
1039                raise service_error(service_error.access,
1040                        "Access proxying denied")
1041
1042    def generate_portal_configs(self, topo, pubkey_base, secretkey_base, 
1043            tmpdir, master, lproj, leid, connInfo, services):
1044
1045        def conninfo_to_dict(key, info):
1046            """
1047            Make a cpoy of the connection information about key, and flatten it
1048            into a single dict by parsing out any feddAttrs.
1049            """
1050
1051            rv = None
1052            for i in info:
1053                if key == i.get('portal', "") or \
1054                        key in [e.get('element', "") \
1055                        for e in i.get('member', [])]:
1056                    rv = i.copy()
1057                    break
1058
1059            else:
1060                return rv
1061
1062            if 'fedAttr' in rv:
1063                for a in rv['fedAttr']:
1064                    attr = a.get('attribute', "")
1065                    val = a.get('value', "")
1066                    if attr and attr not in rv:
1067                        rv[attr] = val
1068                del rv['fedAttr']
1069            return rv
1070
1071        # XXX: un hardcode this
1072        def client_null(f, s):
1073            print >>f, "Service: %s" % s['name']
1074
1075        def client_seer_master(f, s):
1076            print >>f, 'PortalAlias: seer-master';
1077
1078        def client_smb(f, s):
1079            print >>f, "Service: %s" % s['name']
1080            smbshare = None
1081            smbuser = None
1082            smbproj = None
1083            for a in s.get('fedAttr', []):
1084                if a.get('attribute', '') == 'SMBSHARE':
1085                    smbshare = a.get('value', None)
1086                elif a.get('attribute', '') == 'SMBUSER':
1087                    smbuser = a.get('value', None)
1088                elif a.get('attribute', '') == 'SMBPROJ':
1089                    smbproj = a.get('value', None)
1090
1091            if all((smbshare, smbuser, smbproj)):
1092                print >>f, "SMBshare: %s" % smbshare
1093                print >>f, "ProjectUser: %s" % smbuser
1094                print >>f, "ProjectName: %s" % smbproj
1095
1096        client_service_out = {
1097                'SMB': client_smb,
1098                'tmcd': client_null,
1099                'seer': client_null,
1100                'userconfig': client_null,
1101                'project_export': client_null,
1102                'seer_master': client_seer_master,
1103            }
1104
1105        def client_seer_master_export(f, s):
1106            print >>f, "AddedNode: seer-master"
1107
1108        def client_seer_local_export(f, s):
1109            print >>f, "AddedNode: control"
1110
1111        client_export_service_out = {
1112                'seer_master': client_seer_master_export,
1113                'local_seer_control': client_seer_local_export,
1114            }
1115
1116        def server_port(f, s):
1117            p = urlparse(s.get('server', 'http://localhost'))
1118            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
1119
1120        def server_null(f,s): pass
1121
1122        def server_seer(f, s):
1123            print >>f, 'seer: True'
1124
1125        server_service_out = {
1126                'SMB': server_port,
1127                'tmcd': server_port,
1128                'userconfig': server_null,
1129                'project_export': server_null,
1130                'seer': server_seer,
1131                'seer_master': server_port,
1132            }
1133        # XXX: end un hardcode this
1134
1135
1136        seer_out = False
1137        client_out = False
1138        mproj = None
1139        mexp = None
1140        control_gw = None
1141        testbed = ""
1142        # Create configuration files for the portals
1143        for e in [ e for e in topo.elements \
1144                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
1145            myname = e.name
1146            type = e.get_attribute('portal_type')
1147
1148            info = conninfo_to_dict(myname, connInfo)
1149
1150            if not info:
1151                raise service_error(service_error.req,
1152                        "No connectivity info for %s" % myname)
1153
1154            peer = info.get('peer', "")
1155            ldomain = self.domain;
1156            ssh_port = info.get('ssh_port', 22)
1157
1158            # Collect this for the client.conf file
1159            if 'masterexperiment' in info:
1160                mproj, meid = info['masterexperiment'].split("/", 1)
1161
1162            if type in ('control', 'both'):
1163                testbed = e.get_attribute('testbed')
1164                control_gw = myname
1165
1166            active = info.get('active', 'False')
1167
1168            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
1169            tunnelconfig = self.attrs.has_key('TunnelCfg')
1170            try:
1171                f = open(cfn, "w")
1172                if active == 'True':
1173                    print >>f, "active: True"
1174                    print >>f, "ssh_port: %s" % ssh_port
1175                    if type in ('control', 'both'):
1176                        for s in [s for s in services \
1177                                if s.get('name', "") in self.imports]:
1178                            server_service_out[s['name']](f, s)
1179
1180                if tunnelconfig:
1181                    print >>f, "tunnelip: %s" % tunnelconfig
1182                print >>f, "peer: %s" % peer.lower()
1183                print >>f, "ssh_pubkey: /proj/%s/exp/%s/tmp/%s" % \
1184                        (lproj, leid, pubkey_base)
1185                print >>f, "ssh_privkey: /proj/%s/exp/%s/tmp/%s" % \
1186                        (lproj, leid, secretkey_base)
1187                f.close()
1188            except EnvironmentError, e:
1189                raise service_error(service_error.internal,
1190                        "Can't write protal config %s: %s" % (cfn, e))
1191
1192        # Done with portals, write the client config file.
1193        try:
1194            f = open("%s/client.conf" % tmpdir, "w")
1195            if control_gw:
1196                print >>f, "ControlGateway: %s.%s.%s%s" % \
1197                    (myname.lower(), leid.lower(), lproj.lower(),
1198                            ldomain.lower())
1199            for s in services:
1200                if s.get('name',"") in self.imports and \
1201                        s.get('visibility','') == 'import':
1202                    client_service_out[s['name']](f, s)
1203                if s.get('name', '') in self.exports and \
1204                        s.get('visibility', '') == 'export' and \
1205                        s['name'] in client_export_service_out:
1206                    client_export_service_out[s['name']](f, s)
1207            # Seer uses this.
1208            if mproj and meid:
1209                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
1210            # Better way...
1211            if testbed == master:
1212                print >>f, "SEERBase: True"
1213            f.close()
1214        except EnvironmentError, e:
1215            raise service_error(service_error.internal,
1216                    "Cannot write client.conf: %s" %s)
1217
1218    def generate_ns2(self, topo, expfn, softdir, master, connInfo):
1219        class dragon_commands:
1220            """
1221            Functor to spit out approrpiate dragon commands for nodes listed in
1222            the connectivity description.  The constructor makes a dict mapping
1223            dragon nodes to their parameters and the __call__ checks each
1224            element in turn for membership.
1225            """
1226            def __init__(self, map):
1227                self.node_info = map
1228
1229            def __call__(self, e):
1230                s = ""
1231                if isinstance(e, topdl.Computer):
1232                    if self.node_info.has_key(e.name):
1233                        i = self.node_info[e.name]
1234                        for ifname, vlan, type in i:
1235                            for i in e.interface:
1236                                if i.name == ifname:
1237                                    addr = i.get_attribute('ip4_address')
1238                                    subs = i.substrate[0]
1239                                    break
1240                            else:
1241                                raise service_error(service_error.internal,
1242                                        "No interface %s on element %s" % \
1243                                                (ifname, e.name))
1244                            # XXX: do netmask right
1245                            if type =='link':
1246                                s = ("tb-allow-external ${%s} dragonportal " + \
1247                                        "ip %s vlan %s netmask 255.255.255.0\n") % \
1248                                        (e.name, addr, vlan)
1249                            elif type =='lan':
1250                                s = ("tb-allow-external ${%s} dragonportal " + \
1251                                        "ip %s vlan %s usurp %s\n") % \
1252                                        (e.name, addr, vlan, subs)
1253                            else:
1254                                raise service_error(service_error_internal,
1255                                        "Unknown DRAGON type %s" % type)
1256                return s
1257
1258        class not_dragon:
1259            def __init__(self, map):
1260                self.nodes = set(map.keys())
1261
1262            def __call__(self, e):
1263                return e.name not in self.nodes
1264
1265
1266        t = topo.clone()
1267
1268        dragon_map = { }
1269        for i in [ i for i in connInfo if i['type'] == 'transit']:
1270            for a in i.get('fedAttr', []):
1271                if a['attribute'] == 'vlan_id':
1272                    vlan = a['value']
1273                    break
1274            else:
1275                raise service_error(service_error.internal, 
1276                        "No vlan tag")
1277            members = i.get('member', [])
1278            if len(members) > 1: type = 'lan'
1279            else: type = 'link'
1280
1281            try:
1282                for m in members:
1283                    if dragon_map.has_key(m['element']):
1284                        dragon_map[m['element']].append(( m['interface'], 
1285                            vlan, type))
1286                    else:
1287                        dragon_map[m['element']] = [( m['interface'], 
1288                            vlan, type),]
1289            except KeyError:
1290                raise service_error(service_error.req,
1291                        "Missing connectivity info")
1292
1293        # The startcmds for master and slave testbeds
1294        if master: 
1295            gate_cmd = self.attrs.get('MasterConnectorStartCmd', '/bin/true')
1296            node_cmd = self.attrs.get('MasterNodeStartCmd', 'bin/true')
1297        else: 
1298            gate_cmd = self.attrs.get('SlaveConnectorStartCmd', '/bin/true')
1299            node_cmd = self.attrs.get('SlaveNodeStartCmd', 'bin/true')
1300
1301        # Weed out the things we aren't going to instantiate: Segments, portal
1302        # substrates, and portal interfaces.  (The copy in the for loop allows
1303        # us to delete from e.elements in side the for loop).  While we're
1304        # touching all the elements, we also adjust paths from the original
1305        # testbed to local testbed paths and put the federation commands into
1306        # the start commands
1307        for e in [e for e in t.elements]:
1308            if isinstance(e, topdl.Segment):
1309                t.elements.remove(e)
1310            if isinstance(e, topdl.Computer):
1311                self.add_kit(e, self.federation_software)
1312                if e.get_attribute('portal') and gate_cmd:
1313                    # Add local portal support software
1314                    self.add_kit(e, self.portal_software)
1315                    # Portals never have a user-specified start command
1316                    e.set_attribute('startup', gate_cmd)
1317                elif node_cmd:
1318                    if e.get_attribute('startup'):
1319                        e.set_attribute('startup', "%s \\$USER '%s'" % \
1320                                (node_cmd, e.get_attribute('startup')))
1321                    else:
1322                        e.set_attribute('startup', node_cmd)
1323
1324                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
1325                # Remove portal interfaces that do not connect to DRAGON
1326                e.interface = [i for i in e.interface \
1327                        if not i.get_attribute('portal') or i.name in dinf ]
1328            # Fix software paths
1329            for s in getattr(e, 'software', []):
1330                s.location = re.sub("^.*/", softdir, s.location)
1331
1332        t.substrates = [ s.clone() for s in t.substrates ]
1333        t.incorporate_elements()
1334
1335        # Customize the ns2 output for local portal commands and images
1336        filters = []
1337
1338        # NB: these are extra commands issued for the node, not the startcmds
1339        if master: cmd = self.attrs.get('MasterConnectorCmd', '')
1340        else: cmd = self.attrs.get('SlaveConnectorCmd', '')
1341
1342        if self.attrs.has_key('dragon'):
1343            add_filter = not_dragon(dragon_map)
1344            filters.append(dragon_commands(dragon_map))
1345        else:
1346            add_filter = None
1347
1348        if cmd:
1349            filters.append(topdl.generate_portal_command_filter(cmd,
1350                add_filter=add_filter))
1351
1352        if self.attrs.has_key('connectorImage'):
1353            filters.append(topdl.generate_portal_image_filter(
1354                self.attrs.get('connectorImage')))
1355
1356        if self.attrs.has_key('connectorType'):
1357            filters.append(topdl.generate_portal_hardware_filter(
1358                self.attrs.get('connectorType')))
1359
1360        # Convert to ns and write it out
1361        expfile = topdl.topology_to_ns2(t, filters)
1362        try:
1363            f = open(expfn, "w")
1364            print >>f, expfile
1365            f.close()
1366        except EnvironmentError:
1367            raise service_error(service_error.internal,
1368                    "Cannot write experiment file %s: %s" % (expfn,e))
1369
1370    def export_store_info(self, cf, proj, ename, connInfo):
1371        """
1372        For the export requests in the connection info, install the peer names
1373        at the experiment controller via SetValue calls.
1374        """
1375
1376        for c in connInfo:
1377            for p in [ p for p in c.get('parameter', []) \
1378                    if p.get('type', '') == 'output']:
1379
1380                if p.get('name', '') == 'peer':
1381                    k = p.get('key', None)
1382                    surl = p.get('store', None)
1383                    if surl and k and k.index('/') != -1:
1384                        value = "%s.%s.%s%s" % \
1385                                (k[k.index('/')+1:], ename, proj, self.domain)
1386                        req = { 'name': k, 'value': value }
1387                        self.log.debug("Setting %s to %s on %s" % \
1388                                (k, value, surl))
1389                        self.call_SetValue(surl, req, cf)
1390                    else:
1391                        self.log.error("Bad export request: %s" % p)
1392                elif p.get('name', '') == 'ssh_port':
1393                    k = p.get('key', None)
1394                    surl = p.get('store', None)
1395                    if surl and k:
1396                        req = { 'name': k, 'value': self.ssh_port }
1397                        self.log.debug("Setting %s to %s on %s" % \
1398                                (k, self.ssh_port, surl))
1399                        self.call_SetValue(surl, req, cf)
1400                    else:
1401                        self.log.error("Bad export request: %s" % p)
1402                else:
1403                    self.log.error("Unknown export parameter: %s" % \
1404                            p.get('name'))
1405                    continue
1406
1407    def import_store_info(self, cf, connInfo):
1408        """
1409        Pull any import parameters in connInfo in.  We translate them either
1410        into known member names or fedAddrs.
1411        """
1412
1413        for c in connInfo:
1414            for p in [ p for p in c.get('parameter', []) \
1415                    if p.get('type', '') == 'input']:
1416                name = p.get('name', None)
1417                key = p.get('key', None)
1418                store = p.get('store', None)
1419
1420                if name and key and store :
1421                    req = { 'name': key, 'wait': True }
1422                    self.log.debug("Waiting for %s (%s) from %s" % \
1423                            (name, key, store))
1424                    r = self.call_GetValue(store, req, cf)
1425                    r = r.get('GetValueResponseBody', None)
1426                    if r :
1427                        if r.get('name', '') == key:
1428                            v = r.get('value', None)
1429                            if v is not None:
1430                                if name == 'peer':
1431                                    self.log.debug("Got peer %s" % v)
1432                                    c['peer'] = v
1433                                else:
1434                                    self.log.debug("Got %s %s" % (name, v))
1435                                    if c.has_key('fedAttr'):
1436                                        c['fedAttr'].append({
1437                                            'attribute': name, 'value': v})
1438                                    else:
1439                                        c['fedAttr']= [{
1440                                            'attribute': name, 'value': v}]
1441                            else:
1442                                raise service_error(service_error.internal, 
1443                                        'None value exported for %s'  % key)
1444                        else:
1445                            raise service_error(service_error.internal, 
1446                                    'Different name returned for %s: %s' \
1447                                            % (key, r.get('name','')))
1448                    else:
1449                        raise service_error(service_error.internal, 
1450                            'Badly formatted response: no GetValueResponseBody')
1451                else:
1452                    raise service_error(service_error.internal, 
1453                        'Bad Services missing info for import %s' % c)
1454
1455    def configure_userconf(self, services):
1456        """
1457        If the userconf service was imported, collect the configuration data.
1458        """
1459        for s in services:
1460            s_name = s.get('name', '')
1461            s_vis = s.get('visibility','')
1462            if s_name  == 'userconfig' and s_vis == 'import':
1463                # Collect ther server and certificate info.
1464                u = s.get('server', None)
1465                for a in s.get('fedAttr', []):
1466                    if a.get('attribute',"") == 'cert':
1467                        cert = a.get('value', None)
1468                        break
1469                else:
1470                    cert = None
1471
1472                if cert:
1473                    # Make a temporary certificate file for get_url.  The
1474                    # finally clause removes it whether something goes
1475                    # wrong (including an exception from get_url) or not.
1476                    try:
1477                        tfos, tn = tempfile.mkstemp(suffix=".pem")
1478                        tf = os.fdopen(tfos, 'w')
1479                        print >>tf, cert
1480                        tf.close()
1481                        self.log.debug("Getting userconf info: %s" % u)
1482                        get_url(u, tn, tmpdir, "userconf")
1483                        self.log.debug("Got userconf info: %s" % u)
1484                    except EnvironmentError, e:
1485                        raise service_error(service.error.internal, 
1486                                "Cannot create temp file for " + 
1487                                "userconfig certificates: %s e")
1488                    except:
1489                        t, v, st = sys.exc_info()
1490                        raise service_error(service_error.internal,
1491                                "Error retrieving %s: %s" % (s, v))
1492                    finally:
1493                        if tn: os.remove(tn)
1494                else:
1495                    raise service_error(service_error.req,
1496                            "No certificate for retreiving userconfig")
1497                break
1498
1499    def add_seer_node(self, topo, name, startup):
1500        """
1501        Add a seer node to the given topology, with the startup command passed
1502        in.
1503        """
1504        c_node = topdl.Computer(
1505                name=name, 
1506                os= topdl.OperatingSystem(
1507                    attribute=[
1508                    { 'attribute': 'osid', 
1509                        'value': self.local_seer_image },
1510                    ]),
1511                attribute=[
1512                    { 'attribute': 'startup', 'value': startup },
1513                    ]
1514                )
1515        self.add_kit(c_node, self.local_seer_software)
1516        topo.elements.append(c_node)
1517
1518    def configure_seer_services(self, services, topo, softdir):
1519        local_seer = False
1520        collect_seer = False
1521        seer_master= False
1522        for s in services:
1523            s_name = s.get('name', '')
1524            s_vis = s.get('visibility','')
1525
1526            if s_name  == 'local_seer_control'  and s_vis == 'export':
1527                local_seer = True
1528            elif s_name == 'seer_master':
1529                if s_vis == 'import':
1530                    collect_seer = True
1531                elif s_vis == 'export':
1532                    seer_master = True
1533       
1534        # We've got the whole picture now, so add nodes if needed and configure
1535        # them to interconnect properly.
1536        if local_seer or seer_master:
1537            # Copy local seer control node software to the tempdir
1538            for l, f in self.local_seer_software:
1539                base = os.path.basename(f)
1540                copy_file(f, "%s/%s" % (softdir, base))
1541        # If we're collecting seers somewhere the controllers need to talk to
1542        # the master.  In testbeds that export the service, that will be a
1543        # local node that we'll add below.  Elsewhere it will be the control
1544        # portal that will port forward to the exporting master.
1545        if local_seer:
1546            if collect_seer:
1547                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
1548            else:
1549                startup = self.local_seer_start
1550            self.add_seer_node(topo, 'control', startup)
1551        # If this is the seer master, add that node, too.
1552        if seer_master:
1553            self.add_seer_node(topo, 'seer-master', self.seer_master_start)
1554
1555
1556
1557    def StartSegment(self, req, fid):
1558
1559        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1560
1561        err = None  # Any service_error generated after tmpdir is created
1562        rv = None   # Return value from segment creation
1563
1564        try:
1565            req = req['StartSegmentRequestBody']
1566        except KeyError:
1567            raise service_error(server_error.req, "Badly formed request")
1568
1569        connInfo = req.get('connection', [])
1570        services = req.get('service', [])
1571        auth_attr = req['allocID']['fedid']
1572        aid = "%s" % auth_attr
1573        attrs = req.get('fedAttr', [])
1574        if not self.auth.check_attribute(fid, auth_attr):
1575            raise service_error(service_error.access, "Access denied")
1576        else:
1577            # See if this is a replay of an earlier succeeded StartSegment -
1578            # sometimes SSL kills 'em.  If so, replay the response rather than
1579            # redoing the allocation.
1580            self.state_lock.acquire()
1581            retval = self.allocation[aid].get('started', None)
1582            self.state_lock.release()
1583            if retval:
1584                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1585                        "replaying response")
1586                return retval
1587
1588        # A new request.  Do it.
1589
1590        if req.has_key('segmentdescription') and \
1591                req['segmentdescription'].has_key('topdldescription'):
1592            topo = \
1593                topdl.Topology(**req['segmentdescription']['topdldescription'])
1594        else:
1595            raise service_error(service_error.req, 
1596                    "Request missing segmentdescription'")
1597       
1598        master = req.get('master', False)
1599
1600        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1601        try:
1602            tmpdir = tempfile.mkdtemp(prefix="access-")
1603            softdir = "%s/software" % tmpdir
1604            os.mkdir(softdir)
1605        except EnvironmentError:
1606            raise service_error(service_error.internal, "Cannot create tmp dir")
1607
1608        # Try block alllows us to clean up temporary files.
1609        try:
1610            sw = set()
1611            for e in topo.elements:
1612                for s in getattr(e, 'software', []):
1613                    sw.add(s.location)
1614            for s in sw:
1615                self.log.debug("Retrieving %s" % s)
1616                try:
1617                    get_url(s, certfile, softdir)
1618                except:
1619                    t, v, st = sys.exc_info()
1620                    raise service_error(service_error.internal,
1621                            "Error retrieving %s: %s" % (s, v))
1622
1623            # Copy local federation and portal node software to the tempdir
1624            for s in (self.federation_software, self.portal_software):
1625                for l, f in s:
1626                    base = os.path.basename(f)
1627                    copy_file(f, "%s/%s" % (softdir, base))
1628
1629            ename = None
1630            pubkey_base = None
1631            secretkey_base = None
1632            for a in attrs:
1633                if a['attribute'] in configs:
1634                    try:
1635                        self.log.debug("Retrieving %s from %s" % \
1636                                (a['attribute'], a['value']))
1637                        get_url(a['value'], certfile, tmpdir)
1638                    except:
1639                        t, v, st = sys.exc_info()
1640                        raise service_error(service_error.internal,
1641                                "Error retrieving %s: %s" % (s, v))
1642                if a['attribute'] == 'ssh_pubkey':
1643                    pubkey_base = a['value'].rpartition('/')[2]
1644                if a['attribute'] == 'ssh_secretkey':
1645                    secretkey_base = a['value'].rpartition('/')[2]
1646                if a['attribute'] == 'experiment_name':
1647                    ename = a['value']
1648
1649            if not ename:
1650                ename = ""
1651                for i in range(0,5):
1652                    ename += random.choice(string.ascii_letters)
1653                self.log.warn("No experiment name: picked one randomly: %s" \
1654                        % ename)
1655
1656            if not pubkey_base:
1657                raise service_error(service_error.req, 
1658                        "No public key attribute")
1659
1660            if not secretkey_base:
1661                raise service_error(service_error.req, 
1662                        "No secret key attribute")
1663
1664            # If the userconf service was imported, collect the configuration
1665            # data.
1666            self.configure_userconf(services)
1667            self.configure_seer_services(services, topo, softdir)
1668
1669            proj = None
1670            user = None
1671            self.state_lock.acquire()
1672            if self.allocation.has_key(aid):
1673                proj = self.allocation[aid].get('project', None)
1674                if not proj: 
1675                    proj = self.allocation[aid].get('sproject', None)
1676                user = self.allocation[aid].get('user', None)
1677                self.allocation[aid]['experiment'] = ename
1678                self.allocation[aid]['log'] = [ ]
1679                # Create a logger that logs to the experiment's state object as
1680                # well as to the main log file.
1681                alloc_log = logging.getLogger('fedd.access.%s' % ename)
1682                h = logging.StreamHandler(
1683                        list_log.list_log(self.allocation[aid]['log']))
1684                # XXX: there should be a global one of these rather than
1685                # repeating the code.
1686                h.setFormatter(logging.Formatter(
1687                    "%(asctime)s %(name)s %(message)s",
1688                            '%d %b %y %H:%M:%S'))
1689                alloc_log.addHandler(h)
1690                self.write_state()
1691            self.state_lock.release()
1692
1693            if not proj:
1694                raise service_error(service_error.internal, 
1695                        "Can't find project for %s" %aid)
1696
1697            if not user:
1698                raise service_error(service_error.internal, 
1699                        "Can't find creation user for %s" %aid)
1700
1701            self.export_store_info(certfile, proj, ename, connInfo)
1702            self.import_store_info(certfile, connInfo)
1703
1704            expfile = "%s/experiment.tcl" % tmpdir
1705
1706            self.generate_portal_configs(topo, pubkey_base, 
1707                    secretkey_base, tmpdir, master, proj, ename, connInfo, 
1708                    services)
1709            self.generate_ns2(topo, expfile, 
1710                    "/proj/%s/software/%s/" % (proj, ename), master, connInfo)
1711
1712            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1713                    debug=self.create_debug, log=alloc_log)
1714            rv = starter(self, ename, proj, user, expfile, tmpdir)
1715            # Copy the assigned names into the return topology
1716            rvtopo = topo.clone()
1717            for e in [ e for e in rvtopo.elements \
1718                   if isinstance(e, topdl.Computer)]:
1719                for n in e.name:
1720                    if n in starter.node:
1721                        e.set_attribute('hostname', "%s%s" % \
1722                                (starter.node[n], self.domain))
1723        except service_error, e:
1724            err = e
1725        except e:
1726            err = service_error(service_error.internal, str(e))
1727
1728        # Walk up tmpdir, deleting as we go
1729        if self.cleanup:
1730            self.log.debug("[StartSegment]: removing %s" % tmpdir)
1731            for path, dirs, files in os.walk(tmpdir, topdown=False):
1732                for f in files:
1733                    os.remove(os.path.join(path, f))
1734                for d in dirs:
1735                    os.rmdir(os.path.join(path, d))
1736            os.rmdir(tmpdir)
1737        else:
1738            self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1739
1740        if rv:
1741            # Grab the log (this is some anal locking, but better safe than
1742            # sorry)
1743            self.state_lock.acquire()
1744            logv = "".join(self.allocation[aid]['log'])
1745            # It's possible that the StartSegment call gets retried (!).
1746            # if the 'started' key is in the allocation, we'll return it rather
1747            # than redo the setup.
1748            self.allocation[aid]['started'] = { 
1749                    'allocID': req['allocID'],
1750                    'allocationLog': logv,
1751                    }
1752            retval = copy.copy(self.allocation[aid]['started'])
1753            self.write_state()
1754            self.state_lock.release()
1755            retval['segmentdescription'] =  \
1756                    { 'topdldescription': rvtopo.to_dict() }
1757            return retval
1758        elif err:
1759            raise service_error(service_error.federant,
1760                    "Swapin failed: %s" % err)
1761        else:
1762            raise service_error(service_error.federant, "Swapin failed")
1763
1764    def TerminateSegment(self, req, fid):
1765        try:
1766            req = req['TerminateSegmentRequestBody']
1767        except KeyError:
1768            raise service_error(server_error.req, "Badly formed request")
1769
1770        auth_attr = req['allocID']['fedid']
1771        aid = "%s" % auth_attr
1772        attrs = req.get('fedAttr', [])
1773        if not self.auth.check_attribute(fid, auth_attr):
1774            raise service_error(service_error.access, "Access denied")
1775
1776        self.state_lock.acquire()
1777        if self.allocation.has_key(aid):
1778            proj = self.allocation[aid].get('project', None)
1779            if not proj: 
1780                proj = self.allocation[aid].get('sproject', None)
1781            user = self.allocation[aid].get('user', None)
1782            ename = self.allocation[aid].get('experiment', None)
1783        else:
1784            proj = None
1785            user = None
1786            ename = None
1787        self.state_lock.release()
1788
1789        if not proj:
1790            raise service_error(service_error.internal, 
1791                    "Can't find project for %s" % aid)
1792
1793        if not user:
1794            raise service_error(service_error.internal, 
1795                    "Can't find creation user for %s" % aid)
1796        if not ename:
1797            raise service_error(service_error.internal, 
1798                    "Can't find experiment name for %s" % aid)
1799        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1800                debug=self.create_debug)
1801        stopper(self, user, proj, ename)
1802        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.