source: fedd/federation/emulab_access.py @ 12aa3fe

Last change on this file since 12aa3fe was 12aa3fe, checked in by Ted Faber <faber@…>, 10 years ago

Actually trim the code

  • Property mode set to 100644
File size: 40.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13import socket
14
15from threading import *
16from M2Crypto.SSL import SSLError
17
18from access import access_base
19
20from util import *
21from deter import fedid, generate_fedid
22from authorizer import authorizer, abac_authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25from proof import proof as access_proof
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31from deter import topdl
32import list_log
33import emulab_segment
34import emulab_shared_nat_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    max_name_len = 19
53
54    def __init__(self, config=None, auth=None):
55        """
56        Initializer.  Pulls parameters out of the ConfigParser's access section.
57        """
58
59        access_base.__init__(self, config, auth)
60
61        self.max_name_len = access.max_name_len
62
63        self.allow_proxy = config.getboolean("access", "allow_proxy")
64
65        self.domain = config.get("access", "domain")
66        self.userconfdir = config.get("access","userconfdir")
67        self.userconfcmd = config.get("access","userconfcmd")
68        self.userconfurl = config.get("access","userconfurl")
69        self.federation_software = config.get("access", "federation_software")
70        self.portal_software = config.get("access", "portal_software")
71        self.local_seer_software = config.get("access", "local_seer_software")
72        self.local_seer_image = config.get("access", "local_seer_image")
73        self.local_seer_start = config.get("access", "local_seer_start")
74        self.seer_master_start = config.get("access", "seer_master_start")
75        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
76        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
77        self.ssh_port = config.get("access","ssh_port") or "22"
78        self.boss = config.get("access", "boss")
79        self.ops = config.get("access", "ops")
80
81        self.dragon_endpoint = config.get("access", "dragon")
82        self.dragon_vlans = config.get("access", "dragon_vlans")
83        self.deter_internal = config.get("access", "deter_internal")
84
85        self.tunnel_config = config.getboolean("access", "tunnel_config")
86        self.portal_command = config.get("access", "portal_command")
87        self.portal_image = config.get("access", "portal_image")
88        self.portal_type = config.get("access", "portal_type") or "pc"
89        self.portal_startcommand = config.get("access", "portal_startcommand")
90        self.node_startcommand = config.get("access", "node_startcommand")
91        self.nat_portal = config.get("access", "nat_portal")
92        self.shared_nat = config.getboolean("access", "shared_nat")
93
94        self.uri = 'https://%s:%d' % (socket.getfqdn(), 
95                self.get_port(config.get("globals", "services", "23235")))
96
97        self.federation_software = self.software_list(self.federation_software)
98        self.portal_software = self.software_list(self.portal_software)
99        self.local_seer_software = self.software_list(self.local_seer_software)
100
101        self.access_type = self.access_type.lower()
102        if self.shared_nat:
103            self.start_segment = emulab_shared_nat_segment.start_segment
104            self.stop_segment = emulab_shared_nat_segment.stop_segment
105            self.info_segment = emulab_shared_nat_segment.info_segment
106            self.operation_segment = emulab_shared_nat_segment.operation_segment
107            self.exports_segment = emulab_shared_nat_segment.exports_segment
108        else:
109            self.start_segment = emulab_segment.start_segment
110            self.stop_segment = emulab_segment.stop_segment
111            self.info_segment = emulab_segment.info_segment
112            self.operation_segment = emulab_segment.operation_segment
113            self.exports_segment = emulab_segment.exports_segment
114
115        self.restricted = [ ]
116
117        # authorization information
118        self.auth_type = config.get('access', 'auth_type') \
119                or 'abac'
120        self.auth_dir = config.get('access', 'auth_dir')
121        accessdb = config.get("access", "accessdb")
122        # initialize the authorization system
123        if self.auth_type == 'abac':
124            self.auth = abac_authorizer(load=self.auth_dir)
125            self.access = [ ]
126            if accessdb:
127                self.read_access(accessdb, self.access_tuple)
128        else:
129            raise service_error(service_error.internal, 
130                    "Unknown auth_type: %s" % self.auth_type)
131
132        # read_state in the base_class
133        self.state_lock.acquire()
134        if 'allocation' not in self.state: self.state['allocation']= { }
135        self.allocation = self.state['allocation']
136        self.state_lock.release()
137        self.exports = {
138                'SMB': self.export_SMB,
139                'seer': self.export_seer,
140                'tmcd': self.export_tmcd,
141                'userconfig': self.export_userconfig,
142                'project_export': self.export_project_export,
143                'local_seer_control': self.export_local_seer,
144                'seer_master': self.export_seer_master,
145                'hide_hosts': self.export_hide_hosts,
146                }
147
148        if not self.local_seer_image or not self.local_seer_software or \
149                not self.local_seer_start:
150            if 'local_seer_control' in self.exports:
151                del self.exports['local_seer_control']
152
153        if not self.local_seer_image or not self.local_seer_software or \
154                not self.seer_master_start:
155            if 'seer_master' in self.exports:
156                del self.exports['seer_master']
157
158
159        self.soap_services = {\
160            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
161            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
162            'StartSegment': soap_handler("StartSegment", self.StartSegment),
163            'TerminateSegment': soap_handler("TerminateSegment",
164                self.TerminateSegment),
165            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
166            'OperationSegment': soap_handler("OperationSegment",
167                self.OperationSegment),
168            }
169        self.xmlrpc_services =  {\
170            'RequestAccess': xmlrpc_handler('RequestAccess',
171                self.RequestAccess),
172            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
173                self.ReleaseAccess),
174            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
175            'TerminateSegment': xmlrpc_handler('TerminateSegment',
176                self.TerminateSegment),
177            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
178            'OperationSegment': xmlrpc_handler("OperationSegment",
179                self.OperationSegment),
180            }
181
182        self.call_SetValue = service_caller('SetValue', log=self.log)
183        self.call_GetValue = service_caller('GetValue', log=self.log)
184
185    @staticmethod
186    def get_port(ps):
187        '''
188        Take a fedd service string and return the first port.  Used in
189        creating the testbed uri identifier.
190        '''
191        p = ps.split(',')
192        smallport = p[0].split(':')
193        try:
194            rv = int(smallport[0])
195        except ValueError:
196            rv = 23235
197        return rv
198
199    @staticmethod
200    def access_tuple(str):
201        """
202        Convert a string of the form (id, id, id) into an access_project.  This
203        is called by read_access to convert to local attributes.  It returns a
204        tuple of the form (project, user, certificate_file).
205        """
206
207        str = str.strip()
208        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
209            # The slice takes the parens off the string.
210            proj, user, cert = str[1:-1].split(',')
211            return (proj.strip(), user.strip(), cert.strip())
212        else:
213            raise self.parse_error(
214                    'Bad mapping (unbalanced parens or more than 2 commas)')
215
216    # RequestAccess support routines
217
218    def save_project_state(self, aid, pname, uname, certf, owners):
219        """
220        Save the project, user, and owners associated with this allocation.
221        This creates the allocation entry.
222        """
223        self.state_lock.acquire()
224        self.allocation[aid] = { }
225        self.allocation[aid]['project'] = pname
226        self.allocation[aid]['user'] = uname
227        self.allocation[aid]['cert'] = certf
228        self.allocation[aid]['owners'] = owners
229        self.write_state()
230        self.state_lock.release()
231        return (pname, uname)
232
233    # End of RequestAccess support routines
234
235    def RequestAccess(self, req, fid):
236        """
237        Handle the access request.  Proxy if not for us.
238
239        Parse out the fields and make the allocations or rejections if for us,
240        otherwise, assuming we're willing to proxy, proxy the request out.
241        """
242
243        def gateway_hardware(h):
244            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
245            else: return h
246
247        def get_export_project(svcs):
248            """
249            if the service requests includes one to export a project, return
250            that project.
251            """
252            rv = None
253            for s in svcs:
254                if s.get('name', '') == 'project_export' and \
255                        s.get('visibility', '') == 'export':
256                    if not rv: 
257                        for a in s.get('fedAttr', []):
258                            if a.get('attribute', '') == 'project' \
259                                    and 'value' in a:
260                                rv = a['value']
261                    else:
262                        raise service_error(service_error, access, 
263                                'Requesting multiple project exports is ' + \
264                                        'not supported');
265            return rv
266
267        self.log.info("RequestAccess called by %s" % fid)
268        # The dance to get into the request body
269        if req.has_key('RequestAccessRequestBody'):
270            req = req['RequestAccessRequestBody']
271        else:
272            raise service_error(service_error.req, "No request!?")
273
274        # if this includes a project export request, construct a filter such
275        # that only the ABAC attributes mapped to that project are checked for
276        # access.
277        if 'service' in req:
278            ep = get_export_project(req['service'])
279            if ep: pf = lambda(a): a.value[0] == ep
280            else: pf = None
281        else:
282            ep = None
283            pf = None
284
285        if self.auth.import_credentials(
286                data_list=req.get('abac_credential', [])):
287            self.auth.save()
288        else:
289            self.log.debug('failed to import incoming credentials')
290
291        if self.auth_type == 'abac':
292            found, owners, proof = self.lookup_access(req, fid, filter=pf)
293        else:
294            raise service_error(service_error.internal, 
295                    'Unknown auth_type: %s' % self.auth_type)
296        ap = None
297
298        # keep track of what's been added
299        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
300        aid = unicode(allocID)
301
302        pname, uname = self.save_project_state(aid, found[0], found[1], 
303                found[2], owners)
304
305        services, svc_state = self.export_services(req.get('service',[]),
306                pname, uname)
307        self.state_lock.acquire()
308        # Store services state in global state
309        for k, v in svc_state.items():
310            self.allocation[aid][k] = v
311        self.append_allocation_authorization(aid, 
312                set([(o, allocID) for o in owners]), state_attr='allocation')
313        self.write_state()
314        self.state_lock.release()
315        try:
316            f = open("%s/%s.pem" % (self.certdir, aid), "w")
317            print >>f, alloc_cert
318            f.close()
319        except EnvironmentError, e:
320            self.log.info("RequestAccess failed for by %s: internal error" \
321                    % fid)
322            raise service_error(service_error.internal, 
323                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
324        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
325        resp = self.build_access_response({ 'fedid': allocID } ,
326                pname, services, proof)
327        return resp
328
329    def ReleaseAccess(self, req, fid):
330        self.log.info("ReleaseAccess called by %s" % fid)
331        # The dance to get into the request body
332        if req.has_key('ReleaseAccessRequestBody'):
333            req = req['ReleaseAccessRequestBody']
334        else:
335            raise service_error(service_error.req, "No request!?")
336
337        try:
338            if req['allocID'].has_key('localname'):
339                auth_attr = aid = req['allocID']['localname']
340            elif req['allocID'].has_key('fedid'):
341                aid = unicode(req['allocID']['fedid'])
342                auth_attr = req['allocID']['fedid']
343            else:
344                raise service_error(service_error.req,
345                        "Only localnames and fedids are understood")
346        except KeyError:
347            raise service_error(service_error.req, "Badly formed request")
348
349        self.log.debug("[access] deallocation requested for %s by %s" % \
350                (aid, fid))
351        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
352                with_proof=True)
353        if not access_ok:
354            self.log.debug("[access] deallocation denied for %s", aid)
355            raise service_error(service_error.access, "Access Denied")
356
357        self.state_lock.acquire()
358        if aid in self.allocation:
359            self.log.debug("Found allocation for %s" %aid)
360            self.clear_allocation_authorization(aid, state_attr='allocation')
361            del self.allocation[aid]
362            self.write_state()
363            self.state_lock.release()
364            # Remove the access cert
365            cf = "%s/%s.pem" % (self.certdir, aid)
366            self.log.debug("Removing %s" % cf)
367            os.remove(cf)
368            self.log.info("ReleaseAccess succeeded for %s" % fid)
369            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
370        else:
371            self.state_lock.release()
372            raise service_error(service_error.req, "No such allocation")
373
374    # These are subroutines for StartSegment
375    def generate_ns2(self, topo, expfn, softdir, connInfo):
376        """
377        Convert topo into an ns2 file, decorated with appropriate commands for
378        the particular testbed setup.  Convert all requests for software, etc
379        to point at the staged copies on this testbed and add the federation
380        startcommands.
381        """
382        class dragon_commands:
383            """
384            Functor to spit out approrpiate dragon commands for nodes listed in
385            the connectivity description.  The constructor makes a dict mapping
386            dragon nodes to their parameters and the __call__ checks each
387            element in turn for membership.
388            """
389            def __init__(self, map):
390                self.node_info = map
391
392            def __call__(self, e):
393                s = ""
394                if isinstance(e, topdl.Computer):
395                    if self.node_info.has_key(e.name):
396                        info = self.node_info[e.name]
397                        for ifname, vlan, type in info:
398                            for i in e.interface:
399                                if i.name == ifname:
400                                    addr = i.get_attribute('ip4_address')
401                                    subs = i.substrate[0]
402                                    break
403                            else:
404                                raise service_error(service_error.internal,
405                                        "No interface %s on element %s" % \
406                                                (ifname, e.name))
407                            # XXX: do netmask right
408                            if type =='link':
409                                s = ("tb-allow-external ${%s} " + \
410                                        "dragonportal ip %s vlan %s " + \
411                                        "netmask 255.255.255.0\n") % \
412                                        (topdl.to_tcl_name(e.name), addr, vlan)
413                            elif type =='lan':
414                                s = ("tb-allow-external ${%s} " + \
415                                        "dragonportal " + \
416                                        "ip %s vlan %s usurp %s\n") % \
417                                        (topdl.to_tcl_name(e.name), addr, 
418                                                vlan, subs)
419                            else:
420                                raise service_error(service_error_internal,
421                                        "Unknown DRAGON type %s" % type)
422                return s
423
424        class not_dragon:
425            """
426            Return true if a node is in the given map of dragon nodes.
427            """
428            def __init__(self, map):
429                self.nodes = set(map.keys())
430
431            def __call__(self, e):
432                return e.name not in self.nodes
433
434        def have_portals(top):
435            '''
436            Return true if the topology has a portal node
437            '''
438            # The else is on the for
439            for e in top.elements:
440                if isinstance(e, topdl.Computer) and e.get_attribute('portal'):
441                    return True
442            else:
443                return False
444
445
446        # Main line of generate_ns2
447        t = topo.clone()
448
449        # Create the map of nodes that need direct connections (dragon
450        # connections) from the connInfo
451        dragon_map = { }
452        for i in [ i for i in connInfo if i['type'] == 'transit']:
453            for a in i.get('fedAttr', []):
454                if a['attribute'] == 'vlan_id':
455                    vlan = a['value']
456                    break
457            else:
458                raise service_error(service_error.internal, "No vlan tag")
459            members = i.get('member', [])
460            if len(members) > 1: type = 'lan'
461            else: type = 'link'
462
463            try:
464                for m in members:
465                    if m['element'] in dragon_map:
466                        dragon_map[m['element']].append(( m['interface'], 
467                            vlan, type))
468                    else:
469                        dragon_map[m['element']] = [( m['interface'], 
470                            vlan, type),]
471            except KeyError:
472                raise service_error(service_error.req,
473                        "Missing connectivity info")
474
475        def output_fixed_filter(e):
476            if not isinstance(e, topdl.Computer): return ""
477            fn = e.get_attribute('fixed')
478            if fn is None:
479                return ""
480            else:
481                return 'tb-fix-node ${%s} %s\n' % (topdl.to_tcl_name(e.name), fn)
482
483        def output_external_access(e):
484            '''
485            Allow nodes that have explicit external access requests attached to
486            have external access.  This will be replaced by a real risky
487            experiment filter.
488            '''
489            if not isinstance(e, topdl.Computer): return ""
490            ex = e.get_attribute('containers:external')
491            if ex is None:
492                return ""
493            else:
494                return 'tb-allow-external ${%s}\n' % topdl.to_tcl_name(e.name)
495
496        # Weed out the things we aren't going to instantiate: Segments, portal
497        # substrates, and portal interfaces.  (The copy in the for loop allows
498        # us to delete from e.elements in side the for loop).  While we're
499        # touching all the elements, we also adjust paths from the original
500        # testbed to local testbed paths and put the federation commands into
501        # the start commands
502        local = len(dragon_map) == 0 and not have_portals(t)
503        if local: routing = 'Static'
504        else: routing = 'Manual'
505
506        if local:
507            self.log.debug("Local experiment.")
508        for e in [e for e in t.elements]:
509            if isinstance(e, topdl.Segment):
510                t.elements.remove(e)
511            if isinstance(e, topdl.Computer):
512                self.add_kit(e, self.federation_software)
513                if e.get_attribute('portal') and self.portal_startcommand:
514                    # Add local portal support software
515                    self.add_kit(e, self.portal_software)
516                    # Portals never have a user-specified start command
517                    e.set_attribute('startup', self.portal_startcommand)
518                elif not local and self.node_startcommand:
519                    if e.get_attribute('startup'):
520                        e.set_attribute('startup', "%s \\$USER '%s'" % \
521                                (self.node_startcommand, 
522                                    e.get_attribute('startup')))
523                    else:
524                        e.set_attribute('startup', self.node_startcommand)
525
526                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
527                # Remove portal interfaces that do not connect to DRAGON
528                e.interface = [i for i in e.interface \
529                        if not i.get_attribute('portal') or i.name in dinf ]
530            # Fix software paths
531            for s in getattr(e, 'software', []):
532                s.location = re.sub("^.*/", softdir, s.location)
533
534        t.substrates = [ s.clone() for s in t.substrates ]
535        t.incorporate_elements()
536
537        # Customize the ns2 output for local portal commands and images
538        filters = [output_fixed_filter, output_external_access]
539
540        if self.dragon_endpoint:
541            add_filter = not_dragon(dragon_map)
542            filters.append(dragon_commands(dragon_map))
543        else:
544            add_filter = None
545
546        if self.portal_command:
547            if self.shared_nat: suffix = 'shared nat 0/0/tcp rdr 22/tcp'
548            else: suffix = ''
549
550            filters.append(topdl.generate_portal_command_filter(
551                self.portal_command, add_filter=add_filter, suffix=suffix))
552
553        if self.portal_image:
554            filters.append(topdl.generate_portal_image_filter(
555                self.portal_image))
556
557        if self.portal_type:
558            filters.append(topdl.generate_portal_hardware_filter(
559                self.portal_type))
560
561        # Convert to ns and write it out
562        expfile = topdl.topology_to_ns2(t, filters, routing=routing)
563        try:
564            f = open(expfn, "w")
565            print >>f, expfile
566            f.close()
567        except EnvironmentError:
568            raise service_error(service_error.internal,
569                    "Cannot write experiment file %s: %s" % (expfn,e))
570
571    def export_store_info(self, cf, proj, ename, connInfo):
572        """
573        For the export requests in the connection info, send out teh SetValue
574        calls for parameters initialized by the segment's export_segment
575        object. Values are all in the connInfo parameter's value fields.
576        """
577
578        for c in connInfo:
579            for p in c.get('parameter', []):
580                # Ignore non-output parameters
581                if p.get('type', '') != 'output':
582                    continue
583                # Sanity check store, key and value
584                surl = p.get('store', None)
585                key = p.get('key', None)
586                value = p.get('value', None)
587                if surl is None:
588                    self.log.error("Export parameter without store: %s" % \
589                            p.get('name'))
590                    continue
591                if key is None:
592                    self.log.error("Export parameter without key: %s" % \
593                            p.get('name'))
594                    continue
595                if value is None:
596                    self.log.error("Unknown (unset) export parameter: %s" % \
597                            p.get('name'))
598                    continue
599                # Set the store value
600                req = { 'name': key, 'value': value }
601                self.call_SetValue(surl, req, cf)
602
603    def add_seer_node(self, topo, name, startup):
604        """
605        Add a seer node to the given topology, with the startup command passed
606        in.  Used by configure seer_services.
607        """
608        c_node = topdl.Computer(
609                name=name, 
610                os= topdl.OperatingSystem(
611                    attribute=[
612                    { 'attribute': 'osid', 
613                        'value': self.local_seer_image },
614                    ]),
615                attribute=[
616                    { 'attribute': 'startup', 'value': startup },
617                    ]
618                )
619        self.add_kit(c_node, self.local_seer_software)
620        topo.elements.append(c_node)
621
622    def configure_seer_services(self, services, topo, softdir):
623        """
624        Make changes to the topology required for the seer requests being made.
625        Specifically, add any control or master nodes required and set up the
626        start commands on the nodes to interconnect them.
627        """
628        local_seer = False      # True if we need to add a control node
629        collect_seer = False    # True if there is a seer-master node
630        seer_master= False      # True if we need to add the seer-master
631        for s in services:
632            s_name = s.get('name', '')
633            s_vis = s.get('visibility','')
634
635            if s_name  == 'local_seer_control' and s_vis == 'export':
636                local_seer = True
637            elif s_name == 'seer_master':
638                if s_vis == 'import':
639                    collect_seer = True
640                elif s_vis == 'export':
641                    seer_master = True
642       
643        # We've got the whole picture now, so add nodes if needed and configure
644        # them to interconnect properly.
645        if local_seer or seer_master:
646            # Copy local seer control node software to the tempdir
647            for l, f in self.local_seer_software:
648                base = os.path.basename(f)
649                copy_file(f, "%s/%s" % (softdir, base))
650        # If we're collecting seers somewhere the controllers need to talk to
651        # the master.  In testbeds that export the service, that will be a
652        # local node that we'll add below.  Elsewhere it will be the control
653        # portal that will port forward to the exporting master.
654        if local_seer:
655            if collect_seer:
656                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
657            else:
658                startup = self.local_seer_start
659            self.add_seer_node(topo, 'control', startup)
660        # If this is the seer master, add that node, too.
661        if seer_master:
662            self.add_seer_node(topo, 'seer-master', 
663                    "%s -R -n -R seer-master -R -A -R sink" % \
664                            self.seer_master_start)
665
666    def retrieve_software(self, topo, certfile, softdir):
667        """
668        Collect the software that nodes in the topology need loaded and stage
669        it locally.  This implies retrieving it from the experiment_controller
670        and placing it into softdir.  Certfile is used to prove that this node
671        has access to that data (it's the allocation/segment fedid).  Finally
672        local portal and federation software is also copied to the same staging
673        directory for simplicity - all software needed for experiment creation
674        is in softdir.
675        """
676        sw = set()
677        for e in topo.elements:
678            for s in getattr(e, 'software', []):
679                sw.add(s.location)
680        for s in sw:
681            self.log.debug("Retrieving %s" % s)
682            try:
683                get_url(s, certfile, softdir, log=self.log)
684            except:
685                t, v, st = sys.exc_info()
686                raise service_error(service_error.internal,
687                        "Error retrieving %s: %s" % (s, v))
688
689        # Copy local federation and portal node software to the tempdir
690        for s in (self.federation_software, self.portal_software):
691            for l, f in s:
692                base = os.path.basename(f)
693                copy_file(f, "%s/%s" % (softdir, base))
694
695
696    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
697        """
698        Gather common configuration files, retrieve or create an experiment
699        name and project name, and return the ssh_key filenames.  Create an
700        allocation log bound to the state log variable as well.
701        """
702        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
703                'seer_ca_pem', 'seer_node_pem', 'route.tgz')
704        ename = None
705        pubkey_base = None
706        secretkey_base = None
707        proj = None
708        user = None
709        alloc_log = None
710        nonce_experiment = False
711        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
712
713        self.state_lock.acquire()
714        if aid in self.allocation:
715            proj = self.allocation[aid].get('project', None)
716        self.state_lock.release()
717
718        if not proj:
719            raise service_error(service_error.internal, 
720                    "Can't find project for %s" %aid)
721
722        for a in attrs:
723            if a['attribute'] in configs:
724                try:
725                    self.log.debug("Retrieving %s from %s" % \
726                            (a['attribute'], a['value']))
727                    get_url(a['value'], certfile, tmpdir, log=self.log)
728                except:
729                    t, v, st = sys.exc_info()
730                    raise service_error(service_error.internal,
731                            "Error retrieving %s: %s" % (a.get('value', ""), v))
732            if a['attribute'] == 'ssh_pubkey':
733                pubkey_base = a['value'].rpartition('/')[2]
734            if a['attribute'] == 'ssh_secretkey':
735                secretkey_base = a['value'].rpartition('/')[2]
736            if a['attribute'] == 'experiment_name':
737                ename = a['value']
738
739        # Names longer than the emulab max are discarded
740        if ename and len(ename) <= self.max_name_len:
741            # Clean up the experiment name so that emulab will accept it.
742            ename = re.sub(vchars_re, '-', ename)
743
744        else:
745            ename = ""
746            for i in range(0,5):
747                ename += random.choice(string.ascii_letters)
748            nonce_experiment = True
749            self.log.warn("No experiment name or suggestion too long: " + \
750                    "picked one randomly: %s" % ename)
751
752        if not pubkey_base:
753            raise service_error(service_error.req, 
754                    "No public key attribute")
755
756        if not secretkey_base:
757            raise service_error(service_error.req, 
758                    "No secret key attribute")
759
760        self.state_lock.acquire()
761        if aid in self.allocation:
762            user = self.allocation[aid].get('user', None)
763            cert = self.allocation[aid].get('cert', None)
764            self.allocation[aid]['experiment'] = ename
765            self.allocation[aid]['nonce'] = nonce_experiment
766            self.allocation[aid]['log'] = [ ]
767            # Create a logger that logs to the experiment's state object as
768            # well as to the main log file.
769            alloc_log = logging.getLogger('fedd.access.%s' % ename)
770            h = logging.StreamHandler(
771                    list_log.list_log(self.allocation[aid]['log']))
772            # XXX: there should be a global one of these rather than
773            # repeating the code.
774            h.setFormatter(logging.Formatter(
775                "%(asctime)s %(name)s %(message)s",
776                        '%d %b %y %H:%M:%S'))
777            alloc_log.addHandler(h)
778            self.write_state()
779        self.state_lock.release()
780
781        if not user:
782            raise service_error(service_error.internal, 
783                    "Can't find creation user for %s" %aid)
784
785        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
786
787    def decorate_topology(self, info, t):
788        """
789        Copy the physical mapping and status onto the topology.  Used by
790        StartSegment and InfoSegment
791        """
792        def add_new(ann, attr):
793            for a in ann:
794                if a not in attr: attr.append(a)
795
796        def merge_os(os, e):
797            if len(e.os) == 0:
798                # No OS at all:
799                if os.get_attribute('emulab_access:image'):
800                    os.set_attribute('emulab_access:initial_image', 
801                            os.get_attribute('emulab_access:image'))
802                e.os = [ os ]
803            elif len(e.os) == 1:
804                # There's one OS, copy the initial image and replace
805                eos = e.os[0]
806                initial = eos.get_attribute('emulab_access:initial_image')
807                if initial:
808                    os.set_attribute('emulab_access:initial_image', initial)
809                e.os = [ os] 
810            else:
811                # Multiple OSes, replace or append
812                for eos in e.os:
813                    if os.name == eos.name:
814                        eos.version = os.version
815                        eos.version = os.distribution
816                        eos.version = os.distributionversion
817                        for a in os.attribute:
818                            if eos.get_attribute(a.attribute):
819                                eos.remove_attribute(a.attribute)
820                            eos.set_attribute(a.attribute, a.value)
821                        break
822                else:
823                    e.os.append(os)
824
825
826        if t is None: return
827        i = 0 # For fake debugging instantiation
828        # Copy the assigned names into the return topology
829        for e in t.elements:
830            if isinstance(e, topdl.Computer):
831                if not self.create_debug:
832                    if e.name in info.node:
833                        add_new(("%s%s" % 
834                            (info.node[e.name].pname, self.domain),),
835                            e.localname)
836                        add_new(("%s%s" % 
837                            (info.node[e.name].lname, self.domain),),
838                            e.localname)
839                        e.status = info.node[e.name].status
840                        os = info.node[e.name].getOS()
841                        if os: merge_os(os, e)
842                else:
843                    # Simple debugging assignment
844                    add_new(("node%d%s" % (i, self.domain),), e.localname)
845                    e.status = 'active'
846                    add_new(('testop1', 'testop2'), e.operation)
847                    i += 1
848
849        for s in t.substrates:
850            if s.name in info.subs:
851                sub = info.subs[s.name]
852                if sub.cap is not None:
853                    s.capacity = topdl.Capacity(sub.cap, 'max')
854                if sub.delay is not None:
855                    s.delay = topdl.Latency(sub.delay, 'max')
856        # XXX interfaces
857
858
859    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
860        """
861        Store key bits of experiment state in the global repository, including
862        the response that may need to be replayed, and return the response.
863        """
864        def get_localnames(t):
865            names = [ ]
866            for e in t.elements:
867                if isinstance(e, topdl.Computer):
868                    n = e.get_attribute('testbed')
869                    if n is not None and n not in names:
870                        names.append(n)
871            return names
872        i = 0
873        t = topo.clone()
874        self.decorate_topology(starter, t)
875        # Grab the log (this is some anal locking, but better safe than
876        # sorry)
877        self.state_lock.acquire()
878        # Put information about this testbed into the topdl
879        tb = topdl.Testbed(self.uri, "deter",
880                localname=get_localnames(t), 
881                attribute=[ 
882                    { 
883                        'attribute': 'project', 
884                        'value': self.allocation[aid]['project']
885                    },
886                    { 
887                        'attribute': 'experiment', 
888                        'value': self.allocation[aid]['experiment']
889                    }])
890        t.elements.append(tb)
891        logv = "".join(self.allocation[aid]['log'])
892        # It's possible that the StartSegment call gets retried (!).
893        # if the 'started' key is in the allocation, we'll return it rather
894        # than redo the setup.
895        self.allocation[aid]['started'] = { 
896                'allocID': alloc_id,
897                'allocationLog': logv,
898                'segmentdescription': { 
899                    'topdldescription': t.to_dict()
900                    },
901                'proof': proof.to_dict(),
902                }
903        self.allocation[aid]['topo'] = t
904        retval = copy.copy(self.allocation[aid]['started'])
905        self.write_state()
906        self.state_lock.release()
907        return retval
908   
909    # End of StartSegment support routines
910
911    def StartSegment(self, req, fid):
912        err = None  # Any service_error generated after tmpdir is created
913        rv = None   # Return value from segment creation
914
915        self.log.info("StartSegment called by %s" % fid)
916        try:
917            req = req['StartSegmentRequestBody']
918            auth_attr = req['allocID']['fedid']
919            topref = req['segmentdescription']['topdldescription']
920        except KeyError:
921            raise service_error(server_error.req, "Badly formed request")
922
923        connInfo = req.get('connection', [])
924        services = req.get('service', [])
925        aid = "%s" % auth_attr
926        attrs = req.get('fedAttr', [])
927
928        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
929                with_proof=True)
930        if not access_ok:
931            self.log.info("StartSegment for %s failed: access denied" % fid)
932            raise service_error(service_error.access, "Access denied")
933        else:
934            # See if this is a replay of an earlier succeeded StartSegment -
935            # sometimes SSL kills 'em.  If so, replay the response rather than
936            # redoing the allocation.
937            self.state_lock.acquire()
938            retval = self.allocation[aid].get('started', None)
939            self.state_lock.release()
940            if retval:
941                self.log.warning("Duplicate StartSegment for %s: " % aid + \
942                        "replaying response")
943                return retval
944
945        # A new request.  Do it.
946
947        if topref: topo = topdl.Topology(**topref)
948        else:
949            raise service_error(service_error.req, 
950                    "Request missing segmentdescription'")
951       
952        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
953        try:
954            tmpdir = tempfile.mkdtemp(prefix="access-")
955            softdir = "%s/software" % tmpdir
956            os.mkdir(softdir)
957        except EnvironmentError:
958            self.log.info("StartSegment for %s failed: internal error" % fid)
959            raise service_error(service_error.internal, "Cannot create tmp dir")
960
961        # Try block alllows us to clean up temporary files.
962        try:
963            self.retrieve_software(topo, certfile, softdir)
964            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
965                alloc_log =  self.initialize_experiment_info(attrs, aid, 
966                        certfile, tmpdir)
967            # A misconfigured cert in the ABAC map can be confusing...
968            if not os.access(xmlrpc_cert, os.R_OK):
969                self.log.error("Cannot open user's emulab SSL cert: %s" % \
970                        xmlrpc_cert)
971                raise service_error(service_error.internal,
972                        "Cannot open user's emulab SSL cert: %s" % xmlrpc_cert)
973
974
975            if '/' in proj: proj, gid = proj.split('/')
976            else: gid = None
977
978
979            # Set up userconf and seer if needed
980            self.configure_userconf(services, tmpdir)
981            self.configure_seer_services(services, topo, softdir)
982            # Get and send synch store variables
983            exporter = self.exports_segment(keyfile=self.ssh_privkey_file,
984                    debug=self.create_debug, log=alloc_log, boss=self.boss,
985                    ops=self.ops, cert=xmlrpc_cert)
986            if not exporter(self, ename, proj, user, connInfo, tmpdir, gid):
987                raise service_error(service_error.internal,
988                        'Failed to gather export variables')
989            self.export_store_info(certfile, proj, ename, connInfo)
990            self.import_store_info(certfile, connInfo)
991
992            expfile = "%s/experiment.tcl" % tmpdir
993
994            self.generate_portal_configs(topo, pubkey_base, 
995                    secretkey_base, tmpdir, proj, ename, connInfo, services)
996            self.generate_ns2(topo, expfile, 
997                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
998
999            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1000                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1001                    ops=self.ops, cert=xmlrpc_cert)
1002            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
1003        except service_error, e:
1004            self.log.info("StartSegment for %s failed: %s"  % (fid, e))
1005            err = e
1006        except:
1007            t, v, st = sys.exc_info()
1008            self.log.info("StartSegment for %s failed:unexpected error: %s" \
1009                    % (fid, traceback.extract_tb(st)))
1010            err = service_error(service_error.internal, "%s: %s" % \
1011                    (v, traceback.extract_tb(st)))
1012
1013        # Walk up tmpdir, deleting as we go
1014        if self.cleanup: self.remove_dirs(tmpdir)
1015        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1016
1017        if rv:
1018            self.log.info("StartSegment for %s succeeded" % fid)
1019            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1020                    proof)
1021        elif err:
1022            raise service_error(service_error.federant,
1023                    "Swapin failed: %s" % err)
1024        else:
1025            raise service_error(service_error.federant, "Swapin failed")
1026
1027    def TerminateSegment(self, req, fid):
1028        self.log.info("TerminateSegment called by %s" % fid)
1029        try:
1030            req = req['TerminateSegmentRequestBody']
1031        except KeyError:
1032            raise service_error(server_error.req, "Badly formed request")
1033
1034        auth_attr = req['allocID']['fedid']
1035        aid = "%s" % auth_attr
1036        attrs = req.get('fedAttr', [])
1037
1038        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1039                with_proof=True)
1040        if not access_ok:
1041            raise service_error(service_error.access, "Access denied")
1042
1043        self.state_lock.acquire()
1044        if aid in self.allocation:
1045            proj = self.allocation[aid].get('project', None)
1046            user = self.allocation[aid].get('user', None)
1047            xmlrpc_cert = self.allocation[aid].get('cert', None)
1048            ename = self.allocation[aid].get('experiment', None)
1049            nonce = self.allocation[aid].get('nonce', False)
1050        else:
1051            proj = None
1052            user = None
1053            ename = None
1054            nonce = False
1055            xmlrpc_cert = None
1056        self.state_lock.release()
1057
1058        if not proj:
1059            self.log.info("TerminateSegment failed for %s: cannot find project"\
1060                    % fid)
1061            raise service_error(service_error.internal, 
1062                    "Can't find project for %s" % aid)
1063        else:
1064            if '/' in proj: proj, gid = proj.split('/')
1065            else: gid = None
1066
1067        if not user:
1068            self.log.info("TerminateSegment failed for %s: cannot find user"\
1069                    % fid)
1070            raise service_error(service_error.internal, 
1071                    "Can't find creation user for %s" % aid)
1072        if not ename:
1073            self.log.info(
1074                    "TerminateSegment failed for %s: cannot find experiment"\
1075                    % fid)
1076            raise service_error(service_error.internal, 
1077                    "Can't find experiment name for %s" % aid)
1078       
1079        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1080                debug=self.create_debug, boss=self.boss, ops=self.ops,
1081                cert=xmlrpc_cert)
1082        stopper(self, user, proj, ename, gid, nonce)
1083        self.log.info("TerminateSegment succeeded for %s %s %s" % \
1084                (fid, proj, ename))
1085        self.state_lock.acquire()
1086        # Remove the started flag/info - the segment is no longer started
1087        if aid in self.allocation:
1088            if 'started' in self.allocation[aid]:
1089                del self.allocation[aid]['started']
1090            self.write_state()
1091        self.state_lock.release()
1092        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1093
1094    def InfoSegment(self, req, fid):
1095        self.log.info("InfoSegment called by %s" % fid)
1096        try:
1097            req = req['InfoSegmentRequestBody']
1098        except KeyError:
1099            raise service_error(server_error.req, "Badly formed request")
1100
1101        auth_attr = req['allocID']['fedid']
1102        aid = "%s" % auth_attr
1103
1104        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1105                with_proof=True)
1106        if not access_ok:
1107            raise service_error(service_error.access, "Access denied")
1108
1109        self.state_lock.acquire()
1110        if aid in self.allocation:
1111            topo = self.allocation[aid].get('topo', None)
1112            if topo: topo = topo.clone()
1113            proj = self.allocation[aid].get('project', None)
1114            user = self.allocation[aid].get('user', None)
1115            xmlrpc_cert = self.allocation[aid].get('cert', None)
1116            ename = self.allocation[aid].get('experiment', None)
1117        else:
1118            proj = None
1119            user = None
1120            ename = None
1121            topo = None
1122            xmlrpc_cert = None
1123        self.state_lock.release()
1124
1125        if not proj:
1126            self.log.info("InfoSegment failed for %s: cannot find project"% fid)
1127            raise service_error(service_error.internal, 
1128                    "Can't find project for %s" % aid)
1129        else:
1130            if '/' in proj: proj, gid = proj.split('/')
1131            else: gid = None
1132
1133        if not user:
1134            self.log.info("InfoSegment failed for %s: cannot find user"% fid)
1135            raise service_error(service_error.internal, 
1136                    "Can't find creation user for %s" % aid)
1137        if not ename:
1138            self.log.info("InfoSegment failed for %s: cannot find exp"% fid)
1139            raise service_error(service_error.internal, 
1140                    "Can't find experiment name for %s" % aid)
1141        info = self.info_segment(keyfile=self.ssh_privkey_file,
1142                debug=self.create_debug, boss=self.boss, ops=self.ops,
1143                cert=xmlrpc_cert)
1144        info(self, user, proj, ename)
1145        self.log.info("InfoSegment gathered info for %s %s %s %s" % \
1146                (fid, user, proj, ename))
1147        self.decorate_topology(info, topo)
1148        self.state_lock.acquire()
1149        if aid in self.allocation:
1150            self.allocation[aid]['topo'] = topo
1151            self.write_state()
1152        self.state_lock.release()
1153        self.log.info("InfoSegment updated info for %s %s %s %s" % \
1154                (fid, user, proj, ename))
1155
1156        rv = { 
1157                'allocID': req['allocID'], 
1158                'proof': proof.to_dict(),
1159                }
1160        self.log.info("InfoSegment succeeded info for %s %s %s %s" % \
1161                (fid, user, proj, ename))
1162        if topo:
1163            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1164        return rv
1165
1166    def OperationSegment(self, req, fid):
1167        def get_pname(e):
1168            """
1169            Get the physical name of a node
1170            """
1171            if e.localname:
1172                return re.sub('\..*','', e.localname[0])
1173            else:
1174                return None
1175
1176        self.log.info("OperationSegment called by %s" % fid)
1177        try:
1178            req = req['OperationSegmentRequestBody']
1179        except KeyError:
1180            raise service_error(server_error.req, "Badly formed request")
1181
1182        auth_attr = req['allocID']['fedid']
1183        aid = "%s" % auth_attr
1184
1185        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1186                with_proof=True)
1187        if not access_ok:
1188            self.log.info("OperationSegment failed for %s: access denied" % fid)
1189            raise service_error(service_error.access, "Access denied")
1190
1191        op = req.get('operation', None)
1192        targets = req.get('target', None)
1193        params = req.get('parameter', None)
1194
1195        if op is None :
1196            self.log.info("OperationSegment failed for %s: no operation" % fid)
1197            raise service_error(service_error.req, "missing operation")
1198        elif targets is None:
1199            self.log.info("OperationSegment failed for %s: no targets" % fid)
1200            raise service_error(service_error.req, "no targets")
1201
1202        self.state_lock.acquire()
1203        if aid in self.allocation:
1204            topo = self.allocation[aid].get('topo', None)
1205            if topo: topo = topo.clone()
1206            xmlrpc_cert = self.allocation[aid].get('cert', None)
1207        else:
1208            topo = None
1209            xmlrpc_cert = None
1210        self.state_lock.release()
1211
1212        targets = copy.copy(targets)
1213        ptargets = { }
1214        for e in topo.elements:
1215            if isinstance(e, topdl.Computer):
1216                if e.name in targets:
1217                    targets.remove(e.name)
1218                    pn = get_pname(e)
1219                    if pn: ptargets[e.name] = pn
1220
1221        status = [ operation_status(t, operation_status.no_target) \
1222                for t in targets]
1223
1224        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1225                debug=self.create_debug, boss=self.boss, ops=self.ops,
1226                cert=xmlrpc_cert)
1227        ops(self, op, ptargets, params, topo)
1228        self.log.info("OperationSegment operated for %s" % fid)
1229       
1230        status.extend(ops.status)
1231        self.log.info("OperationSegment succeed for %s" % fid)
1232
1233        return { 
1234                'allocID': req['allocID'], 
1235                'status': [s.to_dict() for s in status],
1236                'proof': proof.to_dict(),
1237                }
Note: See TracBrowser for help on using the repository browser.