source: fedd/federation/emulab_access.py @ 968b84b

Last change on this file since 968b84b was 0b217d1, checked in by Ted Faber <faber@…>, 10 years ago

Strip some dead wood

  • Property mode set to 100644
File size: 40.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13import socket
14
15from threading import *
16from M2Crypto.SSL import SSLError
17
18from access import access_base
19
20from util import *
21from deter import fedid, generate_fedid
22from authorizer import authorizer, abac_authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25from proof import proof as access_proof
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31from deter import topdl
32import list_log
33import emulab_segment
34import emulab_shared_nat_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    max_name_len = 19
53
54    def __init__(self, config=None, auth=None):
55        """
56        Initializer.  Pulls parameters out of the ConfigParser's access section.
57        """
58
59        access_base.__init__(self, config, auth)
60
61        self.max_name_len = access.max_name_len
62
63        self.allow_proxy = config.getboolean("access", "allow_proxy")
64
65        self.domain = config.get("access", "domain")
66        self.userconfdir = config.get("access","userconfdir")
67        self.userconfcmd = config.get("access","userconfcmd")
68        self.userconfurl = config.get("access","userconfurl")
69        self.federation_software = config.get("access", "federation_software")
70        self.portal_software = config.get("access", "portal_software")
71        self.local_seer_software = config.get("access", "local_seer_software")
72        self.local_seer_image = config.get("access", "local_seer_image")
73        self.local_seer_start = config.get("access", "local_seer_start")
74        self.seer_master_start = config.get("access", "seer_master_start")
75        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
76        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
77        self.ssh_port = config.get("access","ssh_port") or "22"
78        self.boss = config.get("access", "boss")
79        self.ops = config.get("access", "ops")
80        # XXX: Delete these
81        # self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
82        # self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
83
84        self.dragon_endpoint = config.get("access", "dragon")
85        self.dragon_vlans = config.get("access", "dragon_vlans")
86        self.deter_internal = config.get("access", "deter_internal")
87
88        self.tunnel_config = config.getboolean("access", "tunnel_config")
89        self.portal_command = config.get("access", "portal_command")
90        self.portal_image = config.get("access", "portal_image")
91        self.portal_type = config.get("access", "portal_type") or "pc"
92        self.portal_startcommand = config.get("access", "portal_startcommand")
93        self.node_startcommand = config.get("access", "node_startcommand")
94        self.nat_portal = config.get("access", "nat_portal")
95        self.shared_nat = config.getboolean("access", "shared_nat")
96
97        self.uri = 'https://%s:%d' % (socket.getfqdn(), 
98                self.get_port(config.get("globals", "services", "23235")))
99
100        self.federation_software = self.software_list(self.federation_software)
101        self.portal_software = self.software_list(self.portal_software)
102        self.local_seer_software = self.software_list(self.local_seer_software)
103
104        self.access_type = self.access_type.lower()
105        if self.shared_nat:
106            self.start_segment = emulab_shared_nat_segment.start_segment
107            self.stop_segment = emulab_shared_nat_segment.stop_segment
108            self.info_segment = emulab_shared_nat_segment.info_segment
109            self.operation_segment = emulab_shared_nat_segment.operation_segment
110            self.exports_segment = emulab_shared_nat_segment.exports_segment
111        else:
112            self.start_segment = emulab_segment.start_segment
113            self.stop_segment = emulab_segment.stop_segment
114            self.info_segment = emulab_segment.info_segment
115            self.operation_segment = emulab_segment.operation_segment
116            self.exports_segment = emulab_segment.exports_segment
117
118        self.restricted = [ ]
119        # XXX
120        #tb = config.get('access', 'testbed')
121        #if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
122        #else: self.testbed = [ ]
123
124        # authorization information
125        self.auth_type = config.get('access', 'auth_type') \
126                or 'abac'
127        self.auth_dir = config.get('access', 'auth_dir')
128        accessdb = config.get("access", "accessdb")
129        # initialize the authorization system
130        if self.auth_type == 'abac':
131            self.auth = abac_authorizer(load=self.auth_dir)
132            self.access = [ ]
133            if accessdb:
134                self.read_access(accessdb, self.access_tuple)
135        else:
136            raise service_error(service_error.internal, 
137                    "Unknown auth_type: %s" % self.auth_type)
138
139        # read_state in the base_class
140        self.state_lock.acquire()
141        if 'allocation' not in self.state: self.state['allocation']= { }
142        self.allocation = self.state['allocation']
143        self.state_lock.release()
144        self.exports = {
145                'SMB': self.export_SMB,
146                'seer': self.export_seer,
147                'tmcd': self.export_tmcd,
148                'userconfig': self.export_userconfig,
149                'project_export': self.export_project_export,
150                'local_seer_control': self.export_local_seer,
151                'seer_master': self.export_seer_master,
152                'hide_hosts': self.export_hide_hosts,
153                }
154
155        if not self.local_seer_image or not self.local_seer_software or \
156                not self.local_seer_start:
157            if 'local_seer_control' in self.exports:
158                del self.exports['local_seer_control']
159
160        if not self.local_seer_image or not self.local_seer_software or \
161                not self.seer_master_start:
162            if 'seer_master' in self.exports:
163                del self.exports['seer_master']
164
165
166        self.soap_services = {\
167            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
168            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
169            'StartSegment': soap_handler("StartSegment", self.StartSegment),
170            'TerminateSegment': soap_handler("TerminateSegment",
171                self.TerminateSegment),
172            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
173            'OperationSegment': soap_handler("OperationSegment",
174                self.OperationSegment),
175            }
176        self.xmlrpc_services =  {\
177            'RequestAccess': xmlrpc_handler('RequestAccess',
178                self.RequestAccess),
179            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
180                self.ReleaseAccess),
181            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
182            'TerminateSegment': xmlrpc_handler('TerminateSegment',
183                self.TerminateSegment),
184            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
185            'OperationSegment': xmlrpc_handler("OperationSegment",
186                self.OperationSegment),
187            }
188
189        self.call_SetValue = service_caller('SetValue', log=self.log)
190        self.call_GetValue = service_caller('GetValue', log=self.log)
191
192    @staticmethod
193    def get_port(ps):
194        '''
195        Take a fedd service string and return the first port.  Used in
196        creating the testbed uri identifier.
197        '''
198        p = ps.split(',')
199        smallport = p[0].split(':')
200        try:
201            rv = int(smallport[0])
202        except ValueError:
203            rv = 23235
204        return rv
205
206    @staticmethod
207    def access_tuple(str):
208        """
209        Convert a string of the form (id, id, id) into an access_project.  This
210        is called by read_access to convert to local attributes.  It returns a
211        tuple of the form (project, user, certificate_file).
212        """
213
214        str = str.strip()
215        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
216            # The slice takes the parens off the string.
217            proj, user, cert = str[1:-1].split(',')
218            return (proj.strip(), user.strip(), cert.strip())
219        else:
220            raise self.parse_error(
221                    'Bad mapping (unbalanced parens or more than 2 commas)')
222
223    # RequestAccess support routines
224
225    def save_project_state(self, aid, pname, uname, certf, owners):
226        """
227        Save the project, user, and owners associated with this allocation.
228        This creates the allocation entry.
229        """
230        self.state_lock.acquire()
231        self.allocation[aid] = { }
232        self.allocation[aid]['project'] = pname
233        self.allocation[aid]['user'] = uname
234        self.allocation[aid]['cert'] = certf
235        self.allocation[aid]['owners'] = owners
236        self.write_state()
237        self.state_lock.release()
238        return (pname, uname)
239
240    # End of RequestAccess support routines
241
242    def RequestAccess(self, req, fid):
243        """
244        Handle the access request.  Proxy if not for us.
245
246        Parse out the fields and make the allocations or rejections if for us,
247        otherwise, assuming we're willing to proxy, proxy the request out.
248        """
249
250        def gateway_hardware(h):
251            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
252            else: return h
253
254        def get_export_project(svcs):
255            """
256            if the service requests includes one to export a project, return
257            that project.
258            """
259            rv = None
260            for s in svcs:
261                if s.get('name', '') == 'project_export' and \
262                        s.get('visibility', '') == 'export':
263                    if not rv: 
264                        for a in s.get('fedAttr', []):
265                            if a.get('attribute', '') == 'project' \
266                                    and 'value' in a:
267                                rv = a['value']
268                    else:
269                        raise service_error(service_error, access, 
270                                'Requesting multiple project exports is ' + \
271                                        'not supported');
272            return rv
273
274        self.log.info("RequestAccess called by %s" % fid)
275        # The dance to get into the request body
276        if req.has_key('RequestAccessRequestBody'):
277            req = req['RequestAccessRequestBody']
278        else:
279            raise service_error(service_error.req, "No request!?")
280
281        # if this includes a project export request, construct a filter such
282        # that only the ABAC attributes mapped to that project are checked for
283        # access.
284        if 'service' in req:
285            ep = get_export_project(req['service'])
286            if ep: pf = lambda(a): a.value[0] == ep
287            else: pf = None
288        else:
289            ep = None
290            pf = None
291
292        if self.auth.import_credentials(
293                data_list=req.get('abac_credential', [])):
294            self.auth.save()
295        else:
296            self.log.debug('failed to import incoming credentials')
297
298        if self.auth_type == 'abac':
299            found, owners, proof = self.lookup_access(req, fid, filter=pf)
300        else:
301            raise service_error(service_error.internal, 
302                    'Unknown auth_type: %s' % self.auth_type)
303        ap = None
304
305        # keep track of what's been added
306        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
307        aid = unicode(allocID)
308
309        pname, uname = self.save_project_state(aid, found[0], found[1], 
310                found[2], owners)
311
312        services, svc_state = self.export_services(req.get('service',[]),
313                pname, uname)
314        self.state_lock.acquire()
315        # Store services state in global state
316        for k, v in svc_state.items():
317            self.allocation[aid][k] = v
318        self.append_allocation_authorization(aid, 
319                set([(o, allocID) for o in owners]), state_attr='allocation')
320        self.write_state()
321        self.state_lock.release()
322        try:
323            f = open("%s/%s.pem" % (self.certdir, aid), "w")
324            print >>f, alloc_cert
325            f.close()
326        except EnvironmentError, e:
327            self.log.info("RequestAccess failed for by %s: internal error" \
328                    % fid)
329            raise service_error(service_error.internal, 
330                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
331        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
332        resp = self.build_access_response({ 'fedid': allocID } ,
333                pname, services, proof)
334        return resp
335
336    def ReleaseAccess(self, req, fid):
337        self.log.info("ReleaseAccess called by %s" % fid)
338        # The dance to get into the request body
339        if req.has_key('ReleaseAccessRequestBody'):
340            req = req['ReleaseAccessRequestBody']
341        else:
342            raise service_error(service_error.req, "No request!?")
343
344        try:
345            if req['allocID'].has_key('localname'):
346                auth_attr = aid = req['allocID']['localname']
347            elif req['allocID'].has_key('fedid'):
348                aid = unicode(req['allocID']['fedid'])
349                auth_attr = req['allocID']['fedid']
350            else:
351                raise service_error(service_error.req,
352                        "Only localnames and fedids are understood")
353        except KeyError:
354            raise service_error(service_error.req, "Badly formed request")
355
356        self.log.debug("[access] deallocation requested for %s by %s" % \
357                (aid, fid))
358        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
359                with_proof=True)
360        if not access_ok:
361            self.log.debug("[access] deallocation denied for %s", aid)
362            raise service_error(service_error.access, "Access Denied")
363
364        self.state_lock.acquire()
365        if aid in self.allocation:
366            self.log.debug("Found allocation for %s" %aid)
367            self.clear_allocation_authorization(aid, state_attr='allocation')
368            del self.allocation[aid]
369            self.write_state()
370            self.state_lock.release()
371            # Remove the access cert
372            cf = "%s/%s.pem" % (self.certdir, aid)
373            self.log.debug("Removing %s" % cf)
374            os.remove(cf)
375            self.log.info("ReleaseAccess succeeded for %s" % fid)
376            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
377        else:
378            self.state_lock.release()
379            raise service_error(service_error.req, "No such allocation")
380
381    # These are subroutines for StartSegment
382    def generate_ns2(self, topo, expfn, softdir, connInfo):
383        """
384        Convert topo into an ns2 file, decorated with appropriate commands for
385        the particular testbed setup.  Convert all requests for software, etc
386        to point at the staged copies on this testbed and add the federation
387        startcommands.
388        """
389        class dragon_commands:
390            """
391            Functor to spit out approrpiate dragon commands for nodes listed in
392            the connectivity description.  The constructor makes a dict mapping
393            dragon nodes to their parameters and the __call__ checks each
394            element in turn for membership.
395            """
396            def __init__(self, map):
397                self.node_info = map
398
399            def __call__(self, e):
400                s = ""
401                if isinstance(e, topdl.Computer):
402                    if self.node_info.has_key(e.name):
403                        info = self.node_info[e.name]
404                        for ifname, vlan, type in info:
405                            for i in e.interface:
406                                if i.name == ifname:
407                                    addr = i.get_attribute('ip4_address')
408                                    subs = i.substrate[0]
409                                    break
410                            else:
411                                raise service_error(service_error.internal,
412                                        "No interface %s on element %s" % \
413                                                (ifname, e.name))
414                            # XXX: do netmask right
415                            if type =='link':
416                                s = ("tb-allow-external ${%s} " + \
417                                        "dragonportal ip %s vlan %s " + \
418                                        "netmask 255.255.255.0\n") % \
419                                        (topdl.to_tcl_name(e.name), addr, vlan)
420                            elif type =='lan':
421                                s = ("tb-allow-external ${%s} " + \
422                                        "dragonportal " + \
423                                        "ip %s vlan %s usurp %s\n") % \
424                                        (topdl.to_tcl_name(e.name), addr, 
425                                                vlan, subs)
426                            else:
427                                raise service_error(service_error_internal,
428                                        "Unknown DRAGON type %s" % type)
429                return s
430
431        class not_dragon:
432            """
433            Return true if a node is in the given map of dragon nodes.
434            """
435            def __init__(self, map):
436                self.nodes = set(map.keys())
437
438            def __call__(self, e):
439                return e.name not in self.nodes
440
441        def have_portals(top):
442            '''
443            Return true if the topology has a portal node
444            '''
445            # The else is on the for
446            for e in top.elements:
447                if isinstance(e, topdl.Computer) and e.get_attribute('portal'):
448                    return True
449            else:
450                return False
451
452
453        # Main line of generate_ns2
454        t = topo.clone()
455
456        # Create the map of nodes that need direct connections (dragon
457        # connections) from the connInfo
458        dragon_map = { }
459        for i in [ i for i in connInfo if i['type'] == 'transit']:
460            for a in i.get('fedAttr', []):
461                if a['attribute'] == 'vlan_id':
462                    vlan = a['value']
463                    break
464            else:
465                raise service_error(service_error.internal, "No vlan tag")
466            members = i.get('member', [])
467            if len(members) > 1: type = 'lan'
468            else: type = 'link'
469
470            try:
471                for m in members:
472                    if m['element'] in dragon_map:
473                        dragon_map[m['element']].append(( m['interface'], 
474                            vlan, type))
475                    else:
476                        dragon_map[m['element']] = [( m['interface'], 
477                            vlan, type),]
478            except KeyError:
479                raise service_error(service_error.req,
480                        "Missing connectivity info")
481
482        def output_fixed_filter(e):
483            if not isinstance(e, topdl.Computer): return ""
484            fn = e.get_attribute('fixed')
485            if fn is None:
486                return ""
487            else:
488                return 'tb-fix-node ${%s} %s\n' % (topdl.to_tcl_name(e.name), fn)
489
490        def output_external_access(e):
491            '''
492            Allow nodes that have explicit external access requests attached to
493            have external access.  This will be replaced by a real risky
494            experiment filter.
495            '''
496            if not isinstance(e, topdl.Computer): return ""
497            ex = e.get_attribute('containers:external')
498            if ex is None:
499                return ""
500            else:
501                return 'tb-allow-external ${%s}\n' % topdl.to_tcl_name(e.name)
502
503        # Weed out the things we aren't going to instantiate: Segments, portal
504        # substrates, and portal interfaces.  (The copy in the for loop allows
505        # us to delete from e.elements in side the for loop).  While we're
506        # touching all the elements, we also adjust paths from the original
507        # testbed to local testbed paths and put the federation commands into
508        # the start commands
509        local = len(dragon_map) == 0 and not have_portals(t)
510        if local: routing = 'Static'
511        else: routing = 'Manual'
512
513        if local:
514            self.log.debug("Local experiment.")
515        for e in [e for e in t.elements]:
516            if isinstance(e, topdl.Segment):
517                t.elements.remove(e)
518            if isinstance(e, topdl.Computer):
519                self.add_kit(e, self.federation_software)
520                if e.get_attribute('portal') and self.portal_startcommand:
521                    # Add local portal support software
522                    self.add_kit(e, self.portal_software)
523                    # Portals never have a user-specified start command
524                    e.set_attribute('startup', self.portal_startcommand)
525                elif not local and self.node_startcommand:
526                    if e.get_attribute('startup'):
527                        e.set_attribute('startup', "%s \\$USER '%s'" % \
528                                (self.node_startcommand, 
529                                    e.get_attribute('startup')))
530                    else:
531                        e.set_attribute('startup', self.node_startcommand)
532
533                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
534                # Remove portal interfaces that do not connect to DRAGON
535                e.interface = [i for i in e.interface \
536                        if not i.get_attribute('portal') or i.name in dinf ]
537            # Fix software paths
538            for s in getattr(e, 'software', []):
539                s.location = re.sub("^.*/", softdir, s.location)
540
541        t.substrates = [ s.clone() for s in t.substrates ]
542        t.incorporate_elements()
543
544        # Customize the ns2 output for local portal commands and images
545        filters = [output_fixed_filter, output_external_access]
546
547        if self.dragon_endpoint:
548            add_filter = not_dragon(dragon_map)
549            filters.append(dragon_commands(dragon_map))
550        else:
551            add_filter = None
552
553        if self.portal_command:
554            if self.shared_nat: suffix = 'shared nat 0/0/tcp rdr 22/tcp'
555            else: suffix = ''
556
557            filters.append(topdl.generate_portal_command_filter(
558                self.portal_command, add_filter=add_filter, suffix=suffix))
559
560        if self.portal_image:
561            filters.append(topdl.generate_portal_image_filter(
562                self.portal_image))
563
564        if self.portal_type:
565            filters.append(topdl.generate_portal_hardware_filter(
566                self.portal_type))
567
568        # Convert to ns and write it out
569        expfile = topdl.topology_to_ns2(t, filters, routing=routing)
570        try:
571            f = open(expfn, "w")
572            print >>f, expfile
573            f.close()
574        except EnvironmentError:
575            raise service_error(service_error.internal,
576                    "Cannot write experiment file %s: %s" % (expfn,e))
577
578    def export_store_info(self, cf, proj, ename, connInfo):
579        """
580        For the export requests in the connection info, send out teh SetValue
581        calls for parameters initialized by the segment's export_segment
582        object. Values are all in the connInfo parameter's value fields.
583        """
584
585        for c in connInfo:
586            for p in c.get('parameter', []):
587                # Ignore non-output parameters
588                if p.get('type', '') != 'output':
589                    continue
590                # Sanity check store, key and value
591                surl = p.get('store', None)
592                key = p.get('key', None)
593                value = p.get('value', None)
594                if surl is None:
595                    self.log.error("Export parameter without store: %s" % \
596                            p.get('name'))
597                    continue
598                if key is None:
599                    self.log.error("Export parameter without key: %s" % \
600                            p.get('name'))
601                    continue
602                if value is None:
603                    self.log.error("Unknown (unset) export parameter: %s" % \
604                            p.get('name'))
605                    continue
606                # Set the store value
607                req = { 'name': key, 'value': value }
608                self.call_SetValue(surl, req, cf)
609
610    def add_seer_node(self, topo, name, startup):
611        """
612        Add a seer node to the given topology, with the startup command passed
613        in.  Used by configure seer_services.
614        """
615        c_node = topdl.Computer(
616                name=name, 
617                os= topdl.OperatingSystem(
618                    attribute=[
619                    { 'attribute': 'osid', 
620                        'value': self.local_seer_image },
621                    ]),
622                attribute=[
623                    { 'attribute': 'startup', 'value': startup },
624                    ]
625                )
626        self.add_kit(c_node, self.local_seer_software)
627        topo.elements.append(c_node)
628
629    def configure_seer_services(self, services, topo, softdir):
630        """
631        Make changes to the topology required for the seer requests being made.
632        Specifically, add any control or master nodes required and set up the
633        start commands on the nodes to interconnect them.
634        """
635        local_seer = False      # True if we need to add a control node
636        collect_seer = False    # True if there is a seer-master node
637        seer_master= False      # True if we need to add the seer-master
638        for s in services:
639            s_name = s.get('name', '')
640            s_vis = s.get('visibility','')
641
642            if s_name  == 'local_seer_control' and s_vis == 'export':
643                local_seer = True
644            elif s_name == 'seer_master':
645                if s_vis == 'import':
646                    collect_seer = True
647                elif s_vis == 'export':
648                    seer_master = True
649       
650        # We've got the whole picture now, so add nodes if needed and configure
651        # them to interconnect properly.
652        if local_seer or seer_master:
653            # Copy local seer control node software to the tempdir
654            for l, f in self.local_seer_software:
655                base = os.path.basename(f)
656                copy_file(f, "%s/%s" % (softdir, base))
657        # If we're collecting seers somewhere the controllers need to talk to
658        # the master.  In testbeds that export the service, that will be a
659        # local node that we'll add below.  Elsewhere it will be the control
660        # portal that will port forward to the exporting master.
661        if local_seer:
662            if collect_seer:
663                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
664            else:
665                startup = self.local_seer_start
666            self.add_seer_node(topo, 'control', startup)
667        # If this is the seer master, add that node, too.
668        if seer_master:
669            self.add_seer_node(topo, 'seer-master', 
670                    "%s -R -n -R seer-master -R -A -R sink" % \
671                            self.seer_master_start)
672
673    def retrieve_software(self, topo, certfile, softdir):
674        """
675        Collect the software that nodes in the topology need loaded and stage
676        it locally.  This implies retrieving it from the experiment_controller
677        and placing it into softdir.  Certfile is used to prove that this node
678        has access to that data (it's the allocation/segment fedid).  Finally
679        local portal and federation software is also copied to the same staging
680        directory for simplicity - all software needed for experiment creation
681        is in softdir.
682        """
683        sw = set()
684        for e in topo.elements:
685            for s in getattr(e, 'software', []):
686                sw.add(s.location)
687        for s in sw:
688            self.log.debug("Retrieving %s" % s)
689            try:
690                get_url(s, certfile, softdir, log=self.log)
691            except:
692                t, v, st = sys.exc_info()
693                raise service_error(service_error.internal,
694                        "Error retrieving %s: %s" % (s, v))
695
696        # Copy local federation and portal node software to the tempdir
697        for s in (self.federation_software, self.portal_software):
698            for l, f in s:
699                base = os.path.basename(f)
700                copy_file(f, "%s/%s" % (softdir, base))
701
702
703    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
704        """
705        Gather common configuration files, retrieve or create an experiment
706        name and project name, and return the ssh_key filenames.  Create an
707        allocation log bound to the state log variable as well.
708        """
709        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
710                'seer_ca_pem', 'seer_node_pem', 'route.tgz')
711        ename = None
712        pubkey_base = None
713        secretkey_base = None
714        proj = None
715        user = None
716        alloc_log = None
717        nonce_experiment = False
718        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
719
720        self.state_lock.acquire()
721        if aid in self.allocation:
722            proj = self.allocation[aid].get('project', None)
723        self.state_lock.release()
724
725        if not proj:
726            raise service_error(service_error.internal, 
727                    "Can't find project for %s" %aid)
728
729        for a in attrs:
730            if a['attribute'] in configs:
731                try:
732                    self.log.debug("Retrieving %s from %s" % \
733                            (a['attribute'], a['value']))
734                    get_url(a['value'], certfile, tmpdir, log=self.log)
735                except:
736                    t, v, st = sys.exc_info()
737                    raise service_error(service_error.internal,
738                            "Error retrieving %s: %s" % (a.get('value', ""), v))
739            if a['attribute'] == 'ssh_pubkey':
740                pubkey_base = a['value'].rpartition('/')[2]
741            if a['attribute'] == 'ssh_secretkey':
742                secretkey_base = a['value'].rpartition('/')[2]
743            if a['attribute'] == 'experiment_name':
744                ename = a['value']
745
746        # Names longer than the emulab max are discarded
747        if ename and len(ename) <= self.max_name_len:
748            # Clean up the experiment name so that emulab will accept it.
749            ename = re.sub(vchars_re, '-', ename)
750
751        else:
752            ename = ""
753            for i in range(0,5):
754                ename += random.choice(string.ascii_letters)
755            nonce_experiment = True
756            self.log.warn("No experiment name or suggestion too long: " + \
757                    "picked one randomly: %s" % ename)
758
759        if not pubkey_base:
760            raise service_error(service_error.req, 
761                    "No public key attribute")
762
763        if not secretkey_base:
764            raise service_error(service_error.req, 
765                    "No secret key attribute")
766
767        self.state_lock.acquire()
768        if aid in self.allocation:
769            user = self.allocation[aid].get('user', None)
770            cert = self.allocation[aid].get('cert', None)
771            self.allocation[aid]['experiment'] = ename
772            self.allocation[aid]['nonce'] = nonce_experiment
773            self.allocation[aid]['log'] = [ ]
774            # Create a logger that logs to the experiment's state object as
775            # well as to the main log file.
776            alloc_log = logging.getLogger('fedd.access.%s' % ename)
777            h = logging.StreamHandler(
778                    list_log.list_log(self.allocation[aid]['log']))
779            # XXX: there should be a global one of these rather than
780            # repeating the code.
781            h.setFormatter(logging.Formatter(
782                "%(asctime)s %(name)s %(message)s",
783                        '%d %b %y %H:%M:%S'))
784            alloc_log.addHandler(h)
785            self.write_state()
786        self.state_lock.release()
787
788        if not user:
789            raise service_error(service_error.internal, 
790                    "Can't find creation user for %s" %aid)
791
792        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
793
794    def decorate_topology(self, info, t):
795        """
796        Copy the physical mapping and status onto the topology.  Used by
797        StartSegment and InfoSegment
798        """
799        def add_new(ann, attr):
800            for a in ann:
801                if a not in attr: attr.append(a)
802
803        def merge_os(os, e):
804            if len(e.os) == 0:
805                # No OS at all:
806                if os.get_attribute('emulab_access:image'):
807                    os.set_attribute('emulab_access:initial_image', 
808                            os.get_attribute('emulab_access:image'))
809                e.os = [ os ]
810            elif len(e.os) == 1:
811                # There's one OS, copy the initial image and replace
812                eos = e.os[0]
813                initial = eos.get_attribute('emulab_access:initial_image')
814                if initial:
815                    os.set_attribute('emulab_access:initial_image', initial)
816                e.os = [ os] 
817            else:
818                # Multiple OSes, replace or append
819                for eos in e.os:
820                    if os.name == eos.name:
821                        eos.version = os.version
822                        eos.version = os.distribution
823                        eos.version = os.distributionversion
824                        for a in os.attribute:
825                            if eos.get_attribute(a.attribute):
826                                eos.remove_attribute(a.attribute)
827                            eos.set_attribute(a.attribute, a.value)
828                        break
829                else:
830                    e.os.append(os)
831
832
833        if t is None: return
834        i = 0 # For fake debugging instantiation
835        # Copy the assigned names into the return topology
836        for e in t.elements:
837            if isinstance(e, topdl.Computer):
838                if not self.create_debug:
839                    if e.name in info.node:
840                        add_new(("%s%s" % 
841                            (info.node[e.name].pname, self.domain),),
842                            e.localname)
843                        add_new(("%s%s" % 
844                            (info.node[e.name].lname, self.domain),),
845                            e.localname)
846                        e.status = info.node[e.name].status
847                        os = info.node[e.name].getOS()
848                        if os: merge_os(os, e)
849                else:
850                    # Simple debugging assignment
851                    add_new(("node%d%s" % (i, self.domain),), e.localname)
852                    e.status = 'active'
853                    add_new(('testop1', 'testop2'), e.operation)
854                    i += 1
855
856        for s in t.substrates:
857            if s.name in info.subs:
858                sub = info.subs[s.name]
859                if sub.cap is not None:
860                    s.capacity = topdl.Capacity(sub.cap, 'max')
861                if sub.delay is not None:
862                    s.delay = topdl.Latency(sub.delay, 'max')
863        # XXX interfaces
864
865
866    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
867        """
868        Store key bits of experiment state in the global repository, including
869        the response that may need to be replayed, and return the response.
870        """
871        def get_localnames(t):
872            names = [ ]
873            for e in t.elements:
874                if isinstance(e, topdl.Computer):
875                    n = e.get_attribute('testbed')
876                    if n is not None and n not in names:
877                        names.append(n)
878            return names
879        i = 0
880        t = topo.clone()
881        self.decorate_topology(starter, t)
882        # Grab the log (this is some anal locking, but better safe than
883        # sorry)
884        self.state_lock.acquire()
885        # Put information about this testbed into the topdl
886        tb = topdl.Testbed(self.uri, "deter",
887                localname=get_localnames(t), 
888                attribute=[ 
889                    { 
890                        'attribute': 'project', 
891                        'value': self.allocation[aid]['project']
892                    },
893                    { 
894                        'attribute': 'experiment', 
895                        'value': self.allocation[aid]['experiment']
896                    }])
897        t.elements.append(tb)
898        logv = "".join(self.allocation[aid]['log'])
899        # It's possible that the StartSegment call gets retried (!).
900        # if the 'started' key is in the allocation, we'll return it rather
901        # than redo the setup.
902        self.allocation[aid]['started'] = { 
903                'allocID': alloc_id,
904                'allocationLog': logv,
905                'segmentdescription': { 
906                    'topdldescription': t.to_dict()
907                    },
908                'proof': proof.to_dict(),
909                }
910        self.allocation[aid]['topo'] = t
911        retval = copy.copy(self.allocation[aid]['started'])
912        self.write_state()
913        self.state_lock.release()
914        return retval
915   
916    # End of StartSegment support routines
917
918    def StartSegment(self, req, fid):
919        err = None  # Any service_error generated after tmpdir is created
920        rv = None   # Return value from segment creation
921
922        self.log.info("StartSegment called by %s" % fid)
923        try:
924            req = req['StartSegmentRequestBody']
925            auth_attr = req['allocID']['fedid']
926            topref = req['segmentdescription']['topdldescription']
927        except KeyError:
928            raise service_error(server_error.req, "Badly formed request")
929
930        connInfo = req.get('connection', [])
931        services = req.get('service', [])
932        aid = "%s" % auth_attr
933        attrs = req.get('fedAttr', [])
934
935        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
936                with_proof=True)
937        if not access_ok:
938            self.log.info("StartSegment for %s failed: access denied" % fid)
939            raise service_error(service_error.access, "Access denied")
940        else:
941            # See if this is a replay of an earlier succeeded StartSegment -
942            # sometimes SSL kills 'em.  If so, replay the response rather than
943            # redoing the allocation.
944            self.state_lock.acquire()
945            retval = self.allocation[aid].get('started', None)
946            self.state_lock.release()
947            if retval:
948                self.log.warning("Duplicate StartSegment for %s: " % aid + \
949                        "replaying response")
950                return retval
951
952        # A new request.  Do it.
953
954        if topref: topo = topdl.Topology(**topref)
955        else:
956            raise service_error(service_error.req, 
957                    "Request missing segmentdescription'")
958       
959        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
960        try:
961            tmpdir = tempfile.mkdtemp(prefix="access-")
962            softdir = "%s/software" % tmpdir
963            os.mkdir(softdir)
964        except EnvironmentError:
965            self.log.info("StartSegment for %s failed: internal error" % fid)
966            raise service_error(service_error.internal, "Cannot create tmp dir")
967
968        # Try block alllows us to clean up temporary files.
969        try:
970            self.retrieve_software(topo, certfile, softdir)
971            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
972                alloc_log =  self.initialize_experiment_info(attrs, aid, 
973                        certfile, tmpdir)
974            # A misconfigured cert in the ABAC map can be confusing...
975            if not os.access(xmlrpc_cert, os.R_OK):
976                self.log.error("Cannot open user's emulab SSL cert: %s" % \
977                        xmlrpc_cert)
978                raise service_error(service_error.internal,
979                        "Cannot open user's emulab SSL cert: %s" % xmlrpc_cert)
980
981
982            if '/' in proj: proj, gid = proj.split('/')
983            else: gid = None
984
985
986            # Set up userconf and seer if needed
987            self.configure_userconf(services, tmpdir)
988            self.configure_seer_services(services, topo, softdir)
989            # Get and send synch store variables
990            exporter = self.exports_segment(keyfile=self.ssh_privkey_file,
991                    debug=self.create_debug, log=alloc_log, boss=self.boss,
992                    ops=self.ops, cert=xmlrpc_cert)
993            if not exporter(self, ename, proj, user, connInfo, tmpdir, gid):
994                raise service_error(service_error.internal,
995                        'Failed to gather export variables')
996            self.export_store_info(certfile, proj, ename, connInfo)
997            self.import_store_info(certfile, connInfo)
998
999            expfile = "%s/experiment.tcl" % tmpdir
1000
1001            self.generate_portal_configs(topo, pubkey_base, 
1002                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1003            self.generate_ns2(topo, expfile, 
1004                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1005
1006            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1007                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1008                    ops=self.ops, cert=xmlrpc_cert)
1009            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
1010        except service_error, e:
1011            self.log.info("StartSegment for %s failed: %s"  % (fid, e))
1012            err = e
1013        except:
1014            t, v, st = sys.exc_info()
1015            self.log.info("StartSegment for %s failed:unexpected error: %s" \
1016                    % (fid, traceback.extract_tb(st)))
1017            err = service_error(service_error.internal, "%s: %s" % \
1018                    (v, traceback.extract_tb(st)))
1019
1020        # Walk up tmpdir, deleting as we go
1021        if self.cleanup: self.remove_dirs(tmpdir)
1022        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1023
1024        if rv:
1025            self.log.info("StartSegment for %s succeeded" % fid)
1026            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1027                    proof)
1028        elif err:
1029            raise service_error(service_error.federant,
1030                    "Swapin failed: %s" % err)
1031        else:
1032            raise service_error(service_error.federant, "Swapin failed")
1033
1034    def TerminateSegment(self, req, fid):
1035        self.log.info("TerminateSegment called by %s" % fid)
1036        try:
1037            req = req['TerminateSegmentRequestBody']
1038        except KeyError:
1039            raise service_error(server_error.req, "Badly formed request")
1040
1041        auth_attr = req['allocID']['fedid']
1042        aid = "%s" % auth_attr
1043        attrs = req.get('fedAttr', [])
1044
1045        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1046                with_proof=True)
1047        if not access_ok:
1048            raise service_error(service_error.access, "Access denied")
1049
1050        self.state_lock.acquire()
1051        if aid in self.allocation:
1052            proj = self.allocation[aid].get('project', None)
1053            user = self.allocation[aid].get('user', None)
1054            xmlrpc_cert = self.allocation[aid].get('cert', None)
1055            ename = self.allocation[aid].get('experiment', None)
1056            nonce = self.allocation[aid].get('nonce', False)
1057        else:
1058            proj = None
1059            user = None
1060            ename = None
1061            nonce = False
1062            xmlrpc_cert = None
1063        self.state_lock.release()
1064
1065        if not proj:
1066            self.log.info("TerminateSegment failed for %s: cannot find project"\
1067                    % fid)
1068            raise service_error(service_error.internal, 
1069                    "Can't find project for %s" % aid)
1070        else:
1071            if '/' in proj: proj, gid = proj.split('/')
1072            else: gid = None
1073
1074        if not user:
1075            self.log.info("TerminateSegment failed for %s: cannot find user"\
1076                    % fid)
1077            raise service_error(service_error.internal, 
1078                    "Can't find creation user for %s" % aid)
1079        if not ename:
1080            self.log.info(
1081                    "TerminateSegment failed for %s: cannot find experiment"\
1082                    % fid)
1083            raise service_error(service_error.internal, 
1084                    "Can't find experiment name for %s" % aid)
1085       
1086        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1087                debug=self.create_debug, boss=self.boss, ops=self.ops,
1088                cert=xmlrpc_cert)
1089        stopper(self, user, proj, ename, gid, nonce)
1090        self.log.info("TerminateSegment succeeded for %s %s %s" % \
1091                (fid, proj, ename))
1092        self.state_lock.acquire()
1093        # Remove the started flag/info - the segment is no longer started
1094        if aid in self.allocation:
1095            if 'started' in self.allocation[aid]:
1096                del self.allocation[aid]['started']
1097            self.write_state()
1098        self.state_lock.release()
1099        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1100
1101    def InfoSegment(self, req, fid):
1102        self.log.info("InfoSegment called by %s" % fid)
1103        try:
1104            req = req['InfoSegmentRequestBody']
1105        except KeyError:
1106            raise service_error(server_error.req, "Badly formed request")
1107
1108        auth_attr = req['allocID']['fedid']
1109        aid = "%s" % auth_attr
1110
1111        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1112                with_proof=True)
1113        if not access_ok:
1114            raise service_error(service_error.access, "Access denied")
1115
1116        self.state_lock.acquire()
1117        if aid in self.allocation:
1118            topo = self.allocation[aid].get('topo', None)
1119            if topo: topo = topo.clone()
1120            proj = self.allocation[aid].get('project', None)
1121            user = self.allocation[aid].get('user', None)
1122            xmlrpc_cert = self.allocation[aid].get('cert', None)
1123            ename = self.allocation[aid].get('experiment', None)
1124        else:
1125            proj = None
1126            user = None
1127            ename = None
1128            topo = None
1129            xmlrpc_cert = None
1130        self.state_lock.release()
1131
1132        if not proj:
1133            self.log.info("InfoSegment failed for %s: cannot find project"% fid)
1134            raise service_error(service_error.internal, 
1135                    "Can't find project for %s" % aid)
1136        else:
1137            if '/' in proj: proj, gid = proj.split('/')
1138            else: gid = None
1139
1140        if not user:
1141            self.log.info("InfoSegment failed for %s: cannot find user"% fid)
1142            raise service_error(service_error.internal, 
1143                    "Can't find creation user for %s" % aid)
1144        if not ename:
1145            self.log.info("InfoSegment failed for %s: cannot find exp"% fid)
1146            raise service_error(service_error.internal, 
1147                    "Can't find experiment name for %s" % aid)
1148        info = self.info_segment(keyfile=self.ssh_privkey_file,
1149                debug=self.create_debug, boss=self.boss, ops=self.ops,
1150                cert=xmlrpc_cert)
1151        info(self, user, proj, ename)
1152        self.log.info("InfoSegment gathered info for %s %s %s %s" % \
1153                (fid, user, proj, ename))
1154        self.decorate_topology(info, topo)
1155        self.state_lock.acquire()
1156        if aid in self.allocation:
1157            self.allocation[aid]['topo'] = topo
1158            self.write_state()
1159        self.state_lock.release()
1160        self.log.info("InfoSegment updated info for %s %s %s %s" % \
1161                (fid, user, proj, ename))
1162
1163        rv = { 
1164                'allocID': req['allocID'], 
1165                'proof': proof.to_dict(),
1166                }
1167        self.log.info("InfoSegment succeeded info for %s %s %s %s" % \
1168                (fid, user, proj, ename))
1169        if topo:
1170            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1171        return rv
1172
1173    def OperationSegment(self, req, fid):
1174        def get_pname(e):
1175            """
1176            Get the physical name of a node
1177            """
1178            if e.localname:
1179                return re.sub('\..*','', e.localname[0])
1180            else:
1181                return None
1182
1183        self.log.info("OperationSegment called by %s" % fid)
1184        try:
1185            req = req['OperationSegmentRequestBody']
1186        except KeyError:
1187            raise service_error(server_error.req, "Badly formed request")
1188
1189        auth_attr = req['allocID']['fedid']
1190        aid = "%s" % auth_attr
1191
1192        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1193                with_proof=True)
1194        if not access_ok:
1195            self.log.info("OperationSegment failed for %s: access denied" % fid)
1196            raise service_error(service_error.access, "Access denied")
1197
1198        op = req.get('operation', None)
1199        targets = req.get('target', None)
1200        params = req.get('parameter', None)
1201
1202        if op is None :
1203            self.log.info("OperationSegment failed for %s: no operation" % fid)
1204            raise service_error(service_error.req, "missing operation")
1205        elif targets is None:
1206            self.log.info("OperationSegment failed for %s: no targets" % fid)
1207            raise service_error(service_error.req, "no targets")
1208
1209        self.state_lock.acquire()
1210        if aid in self.allocation:
1211            topo = self.allocation[aid].get('topo', None)
1212            if topo: topo = topo.clone()
1213            xmlrpc_cert = self.allocation[aid].get('cert', None)
1214        else:
1215            topo = None
1216            xmlrpc_cert = None
1217        self.state_lock.release()
1218
1219        targets = copy.copy(targets)
1220        ptargets = { }
1221        for e in topo.elements:
1222            if isinstance(e, topdl.Computer):
1223                if e.name in targets:
1224                    targets.remove(e.name)
1225                    pn = get_pname(e)
1226                    if pn: ptargets[e.name] = pn
1227
1228        status = [ operation_status(t, operation_status.no_target) \
1229                for t in targets]
1230
1231        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1232                debug=self.create_debug, boss=self.boss, ops=self.ops,
1233                cert=xmlrpc_cert)
1234        ops(self, op, ptargets, params, topo)
1235        self.log.info("OperationSegment operated for %s" % fid)
1236       
1237        status.extend(ops.status)
1238        self.log.info("OperationSegment succeed for %s" % fid)
1239
1240        return { 
1241                'allocID': req['allocID'], 
1242                'status': [s.to_dict() for s in status],
1243                'proof': proof.to_dict(),
1244                }
Note: See TracBrowser for help on using the repository browser.