source: fedd/federation/emulab_access.py @ 8fbef04

Last change on this file since 8fbef04 was 8fbef04, checked in by Ted Faber <faber@…>, 10 years ago

Shared NAT integrated

  • Property mode set to 100644
File size: 40.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13import socket
14
15from threading import *
16from M2Crypto.SSL import SSLError
17
18from access import access_base
19
20from util import *
21from deter import fedid, generate_fedid
22from authorizer import authorizer, abac_authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25from proof import proof as access_proof
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31from deter import topdl
32import list_log
33import emulab_segment
34import emulab_shared_nat_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    max_name_len = 19
53
54    def __init__(self, config=None, auth=None):
55        """
56        Initializer.  Pulls parameters out of the ConfigParser's access section.
57        """
58
59        access_base.__init__(self, config, auth)
60
61        self.max_name_len = access.max_name_len
62
63        self.allow_proxy = config.getboolean("access", "allow_proxy")
64
65        self.domain = config.get("access", "domain")
66        self.userconfdir = config.get("access","userconfdir")
67        self.userconfcmd = config.get("access","userconfcmd")
68        self.userconfurl = config.get("access","userconfurl")
69        self.federation_software = config.get("access", "federation_software")
70        self.portal_software = config.get("access", "portal_software")
71        self.local_seer_software = config.get("access", "local_seer_software")
72        self.local_seer_image = config.get("access", "local_seer_image")
73        self.local_seer_start = config.get("access", "local_seer_start")
74        self.seer_master_start = config.get("access", "seer_master_start")
75        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
76        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
77        self.ssh_port = config.get("access","ssh_port") or "22"
78        self.boss = config.get("access", "boss")
79        self.ops = config.get("access", "ops")
80        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
81        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
82
83        self.dragon_endpoint = config.get("access", "dragon")
84        self.dragon_vlans = config.get("access", "dragon_vlans")
85        self.deter_internal = config.get("access", "deter_internal")
86
87        self.tunnel_config = config.getboolean("access", "tunnel_config")
88        self.portal_command = config.get("access", "portal_command")
89        self.portal_image = config.get("access", "portal_image")
90        self.portal_type = config.get("access", "portal_type") or "pc"
91        self.portal_startcommand = config.get("access", "portal_startcommand")
92        self.node_startcommand = config.get("access", "node_startcommand")
93        self.nat_portal = config.get("access", "nat_portal")
94        self.shared_nat = config.getboolean("access", "shared_nat")
95
96        self.uri = 'https://%s:%d' % (socket.getfqdn(), 
97                self.get_port(config.get("globals", "services", "23235")))
98
99        self.federation_software = self.software_list(self.federation_software)
100        self.portal_software = self.software_list(self.portal_software)
101        self.local_seer_software = self.software_list(self.local_seer_software)
102
103        self.access_type = self.access_type.lower()
104        if self.shared_nat:
105            self.start_segment = emulab_shared_nat_segment.start_segment
106            self.stop_segment = emulab_shared_nat_segment.stop_segment
107            self.info_segment = emulab_shared_nat_segment.info_segment
108            self.operation_segment = emulab_shared_nat_segment.operation_segment
109            self.exports_segment = emulab_shared_nat_segment.exports_segment
110        else:
111            self.start_segment = emulab_segment.start_segment
112            self.stop_segment = emulab_segment.stop_segment
113            self.info_segment = emulab_segment.info_segment
114            self.operation_segment = emulab_segment.operation_segment
115            self.exports_segment = emulab_segment.exports_segment
116
117        self.restricted = [ ]
118        tb = config.get('access', 'testbed')
119        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
120        else: self.testbed = [ ]
121
122        # authorization information
123        self.auth_type = config.get('access', 'auth_type') \
124                or 'abac'
125        self.auth_dir = config.get('access', 'auth_dir')
126        accessdb = config.get("access", "accessdb")
127        # initialize the authorization system
128        if self.auth_type == 'abac':
129            self.auth = abac_authorizer(load=self.auth_dir)
130            self.access = [ ]
131            if accessdb:
132                self.read_access(accessdb, self.access_tuple)
133        else:
134            raise service_error(service_error.internal, 
135                    "Unknown auth_type: %s" % self.auth_type)
136
137        # read_state in the base_class
138        self.state_lock.acquire()
139        if 'allocation' not in self.state: self.state['allocation']= { }
140        self.allocation = self.state['allocation']
141        self.state_lock.release()
142        self.exports = {
143                'SMB': self.export_SMB,
144                'seer': self.export_seer,
145                'tmcd': self.export_tmcd,
146                'userconfig': self.export_userconfig,
147                'project_export': self.export_project_export,
148                'local_seer_control': self.export_local_seer,
149                'seer_master': self.export_seer_master,
150                'hide_hosts': self.export_hide_hosts,
151                }
152
153        if not self.local_seer_image or not self.local_seer_software or \
154                not self.local_seer_start:
155            if 'local_seer_control' in self.exports:
156                del self.exports['local_seer_control']
157
158        if not self.local_seer_image or not self.local_seer_software or \
159                not self.seer_master_start:
160            if 'seer_master' in self.exports:
161                del self.exports['seer_master']
162
163
164        self.soap_services = {\
165            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
166            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
167            'StartSegment': soap_handler("StartSegment", self.StartSegment),
168            'TerminateSegment': soap_handler("TerminateSegment",
169                self.TerminateSegment),
170            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
171            'OperationSegment': soap_handler("OperationSegment",
172                self.OperationSegment),
173            }
174        self.xmlrpc_services =  {\
175            'RequestAccess': xmlrpc_handler('RequestAccess',
176                self.RequestAccess),
177            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
178                self.ReleaseAccess),
179            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
180            'TerminateSegment': xmlrpc_handler('TerminateSegment',
181                self.TerminateSegment),
182            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
183            'OperationSegment': xmlrpc_handler("OperationSegment",
184                self.OperationSegment),
185            }
186
187        self.call_SetValue = service_caller('SetValue', log=self.log)
188        self.call_GetValue = service_caller('GetValue', log=self.log)
189
190    @staticmethod
191    def get_port(ps):
192        '''
193        Take a fedd service string and return the first port.  Used in
194        creating the testbed uri identifier.
195        '''
196        p = ps.split(',')
197        smallport = p[0].split(':')
198        try:
199            rv = int(smallport[0])
200        except ValueError:
201            rv = 23235
202        return rv
203
204    @staticmethod
205    def access_tuple(str):
206        """
207        Convert a string of the form (id, id, id) into an access_project.  This
208        is called by read_access to convert to local attributes.  It returns a
209        tuple of the form (project, user, certificate_file).
210        """
211
212        str = str.strip()
213        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
214            # The slice takes the parens off the string.
215            proj, user, cert = str[1:-1].split(',')
216            return (proj.strip(), user.strip(), cert.strip())
217        else:
218            raise self.parse_error(
219                    'Bad mapping (unbalanced parens or more than 2 commas)')
220
221    # RequestAccess support routines
222
223    def save_project_state(self, aid, pname, uname, certf, owners):
224        """
225        Save the project, user, and owners associated with this allocation.
226        This creates the allocation entry.
227        """
228        self.state_lock.acquire()
229        self.allocation[aid] = { }
230        self.allocation[aid]['project'] = pname
231        self.allocation[aid]['user'] = uname
232        self.allocation[aid]['cert'] = certf
233        self.allocation[aid]['owners'] = owners
234        self.write_state()
235        self.state_lock.release()
236        return (pname, uname)
237
238    # End of RequestAccess support routines
239
240    def RequestAccess(self, req, fid):
241        """
242        Handle the access request.  Proxy if not for us.
243
244        Parse out the fields and make the allocations or rejections if for us,
245        otherwise, assuming we're willing to proxy, proxy the request out.
246        """
247
248        def gateway_hardware(h):
249            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
250            else: return h
251
252        def get_export_project(svcs):
253            """
254            if the service requests includes one to export a project, return
255            that project.
256            """
257            rv = None
258            for s in svcs:
259                if s.get('name', '') == 'project_export' and \
260                        s.get('visibility', '') == 'export':
261                    if not rv: 
262                        for a in s.get('fedAttr', []):
263                            if a.get('attribute', '') == 'project' \
264                                    and 'value' in a:
265                                rv = a['value']
266                    else:
267                        raise service_error(service_error, access, 
268                                'Requesting multiple project exports is ' + \
269                                        'not supported');
270            return rv
271
272        self.log.info("RequestAccess called by %s" % fid)
273        # The dance to get into the request body
274        if req.has_key('RequestAccessRequestBody'):
275            req = req['RequestAccessRequestBody']
276        else:
277            raise service_error(service_error.req, "No request!?")
278
279        # if this includes a project export request, construct a filter such
280        # that only the ABAC attributes mapped to that project are checked for
281        # access.
282        if 'service' in req:
283            ep = get_export_project(req['service'])
284            if ep: pf = lambda(a): a.value[0] == ep
285            else: pf = None
286        else:
287            ep = None
288            pf = None
289
290        if self.auth.import_credentials(
291                data_list=req.get('abac_credential', [])):
292            self.auth.save()
293        else:
294            self.log.debug('failed to import incoming credentials')
295
296        if self.auth_type == 'abac':
297            found, owners, proof = self.lookup_access(req, fid, filter=pf)
298        else:
299            raise service_error(service_error.internal, 
300                    'Unknown auth_type: %s' % self.auth_type)
301        ap = None
302
303        # keep track of what's been added
304        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
305        aid = unicode(allocID)
306
307        pname, uname = self.save_project_state(aid, found[0], found[1], 
308                found[2], owners)
309
310        services, svc_state = self.export_services(req.get('service',[]),
311                pname, uname)
312        self.state_lock.acquire()
313        # Store services state in global state
314        for k, v in svc_state.items():
315            self.allocation[aid][k] = v
316        self.append_allocation_authorization(aid, 
317                set([(o, allocID) for o in owners]), state_attr='allocation')
318        self.write_state()
319        self.state_lock.release()
320        try:
321            f = open("%s/%s.pem" % (self.certdir, aid), "w")
322            print >>f, alloc_cert
323            f.close()
324        except EnvironmentError, e:
325            self.log.info("RequestAccess failed for by %s: internal error" \
326                    % fid)
327            raise service_error(service_error.internal, 
328                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
329        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
330        resp = self.build_access_response({ 'fedid': allocID } ,
331                pname, services, proof)
332        return resp
333
334    def ReleaseAccess(self, req, fid):
335        self.log.info("ReleaseAccess called by %s" % fid)
336        # The dance to get into the request body
337        if req.has_key('ReleaseAccessRequestBody'):
338            req = req['ReleaseAccessRequestBody']
339        else:
340            raise service_error(service_error.req, "No request!?")
341
342        try:
343            if req['allocID'].has_key('localname'):
344                auth_attr = aid = req['allocID']['localname']
345            elif req['allocID'].has_key('fedid'):
346                aid = unicode(req['allocID']['fedid'])
347                auth_attr = req['allocID']['fedid']
348            else:
349                raise service_error(service_error.req,
350                        "Only localnames and fedids are understood")
351        except KeyError:
352            raise service_error(service_error.req, "Badly formed request")
353
354        self.log.debug("[access] deallocation requested for %s by %s" % \
355                (aid, fid))
356        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
357                with_proof=True)
358        if not access_ok:
359            self.log.debug("[access] deallocation denied for %s", aid)
360            raise service_error(service_error.access, "Access Denied")
361
362        self.state_lock.acquire()
363        if aid in self.allocation:
364            self.log.debug("Found allocation for %s" %aid)
365            self.clear_allocation_authorization(aid, state_attr='allocation')
366            del self.allocation[aid]
367            self.write_state()
368            self.state_lock.release()
369            # Remove the access cert
370            cf = "%s/%s.pem" % (self.certdir, aid)
371            self.log.debug("Removing %s" % cf)
372            os.remove(cf)
373            self.log.info("ReleaseAccess succeeded for %s" % fid)
374            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
375        else:
376            self.state_lock.release()
377            raise service_error(service_error.req, "No such allocation")
378
379    # These are subroutines for StartSegment
380    def generate_ns2(self, topo, expfn, softdir, connInfo):
381        """
382        Convert topo into an ns2 file, decorated with appropriate commands for
383        the particular testbed setup.  Convert all requests for software, etc
384        to point at the staged copies on this testbed and add the federation
385        startcommands.
386        """
387        class dragon_commands:
388            """
389            Functor to spit out approrpiate dragon commands for nodes listed in
390            the connectivity description.  The constructor makes a dict mapping
391            dragon nodes to their parameters and the __call__ checks each
392            element in turn for membership.
393            """
394            def __init__(self, map):
395                self.node_info = map
396
397            def __call__(self, e):
398                s = ""
399                if isinstance(e, topdl.Computer):
400                    if self.node_info.has_key(e.name):
401                        info = self.node_info[e.name]
402                        for ifname, vlan, type in info:
403                            for i in e.interface:
404                                if i.name == ifname:
405                                    addr = i.get_attribute('ip4_address')
406                                    subs = i.substrate[0]
407                                    break
408                            else:
409                                raise service_error(service_error.internal,
410                                        "No interface %s on element %s" % \
411                                                (ifname, e.name))
412                            # XXX: do netmask right
413                            if type =='link':
414                                s = ("tb-allow-external ${%s} " + \
415                                        "dragonportal ip %s vlan %s " + \
416                                        "netmask 255.255.255.0\n") % \
417                                        (topdl.to_tcl_name(e.name), addr, vlan)
418                            elif type =='lan':
419                                s = ("tb-allow-external ${%s} " + \
420                                        "dragonportal " + \
421                                        "ip %s vlan %s usurp %s\n") % \
422                                        (topdl.to_tcl_name(e.name), addr, 
423                                                vlan, subs)
424                            else:
425                                raise service_error(service_error_internal,
426                                        "Unknown DRAGON type %s" % type)
427                return s
428
429        class not_dragon:
430            """
431            Return true if a node is in the given map of dragon nodes.
432            """
433            def __init__(self, map):
434                self.nodes = set(map.keys())
435
436            def __call__(self, e):
437                return e.name not in self.nodes
438
439        def have_portals(top):
440            '''
441            Return true if the topology has a portal node
442            '''
443            # The else is on the for
444            for e in top.elements:
445                if isinstance(e, topdl.Computer) and e.get_attribute('portal'):
446                    return True
447            else:
448                return False
449
450
451        # Main line of generate_ns2
452        t = topo.clone()
453
454        # Create the map of nodes that need direct connections (dragon
455        # connections) from the connInfo
456        dragon_map = { }
457        for i in [ i for i in connInfo if i['type'] == 'transit']:
458            for a in i.get('fedAttr', []):
459                if a['attribute'] == 'vlan_id':
460                    vlan = a['value']
461                    break
462            else:
463                raise service_error(service_error.internal, "No vlan tag")
464            members = i.get('member', [])
465            if len(members) > 1: type = 'lan'
466            else: type = 'link'
467
468            try:
469                for m in members:
470                    if m['element'] in dragon_map:
471                        dragon_map[m['element']].append(( m['interface'], 
472                            vlan, type))
473                    else:
474                        dragon_map[m['element']] = [( m['interface'], 
475                            vlan, type),]
476            except KeyError:
477                raise service_error(service_error.req,
478                        "Missing connectivity info")
479
480        def output_fixed_filter(e):
481            if not isinstance(e, topdl.Computer): return ""
482            fn = e.get_attribute('fixed')
483            if fn is None:
484                return ""
485            else:
486                return 'tb-fix-node ${%s} %s\n' % (topdl.to_tcl_name(e.name), fn)
487
488        def output_external_access(e):
489            '''
490            Allow nodes that have explicit external access requests attached to
491            have external access.  This will be replaced by a real risky
492            experiment filter.
493            '''
494            if not isinstance(e, topdl.Computer): return ""
495            ex = e.get_attribute('containers:external')
496            if ex is None:
497                return ""
498            else:
499                return 'tb-allow-external ${%s}\n' % topdl.to_tcl_name(e.name)
500
501        # Weed out the things we aren't going to instantiate: Segments, portal
502        # substrates, and portal interfaces.  (The copy in the for loop allows
503        # us to delete from e.elements in side the for loop).  While we're
504        # touching all the elements, we also adjust paths from the original
505        # testbed to local testbed paths and put the federation commands into
506        # the start commands
507        local = len(dragon_map) == 0 and not have_portals(t)
508        if local: routing = 'Static'
509        else: routing = 'Manual'
510
511        if local:
512            self.log.debug("Local experiment.")
513        for e in [e for e in t.elements]:
514            if isinstance(e, topdl.Segment):
515                t.elements.remove(e)
516            if isinstance(e, topdl.Computer):
517                self.add_kit(e, self.federation_software)
518                if e.get_attribute('portal') and self.portal_startcommand:
519                    # Add local portal support software
520                    self.add_kit(e, self.portal_software)
521                    # Portals never have a user-specified start command
522                    e.set_attribute('startup', self.portal_startcommand)
523                elif not local and self.node_startcommand:
524                    if e.get_attribute('startup'):
525                        e.set_attribute('startup', "%s \\$USER '%s'" % \
526                                (self.node_startcommand, 
527                                    e.get_attribute('startup')))
528                    else:
529                        e.set_attribute('startup', self.node_startcommand)
530
531                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
532                # Remove portal interfaces that do not connect to DRAGON
533                e.interface = [i for i in e.interface \
534                        if not i.get_attribute('portal') or i.name in dinf ]
535            # Fix software paths
536            for s in getattr(e, 'software', []):
537                s.location = re.sub("^.*/", softdir, s.location)
538
539        t.substrates = [ s.clone() for s in t.substrates ]
540        t.incorporate_elements()
541
542        # Customize the ns2 output for local portal commands and images
543        filters = [output_fixed_filter, output_external_access]
544
545        if self.dragon_endpoint:
546            add_filter = not_dragon(dragon_map)
547            filters.append(dragon_commands(dragon_map))
548        else:
549            add_filter = None
550
551        if self.portal_command:
552            if self.shared_nat: suffix = 'shared nat 0/0/tcp rdr 22/tcp'
553            else: suffix = ''
554
555            filters.append(topdl.generate_portal_command_filter(
556                self.portal_command, add_filter=add_filter, suffix=suffix))
557
558        if self.portal_image:
559            filters.append(topdl.generate_portal_image_filter(
560                self.portal_image))
561
562        if self.portal_type:
563            filters.append(topdl.generate_portal_hardware_filter(
564                self.portal_type))
565
566        # Convert to ns and write it out
567        expfile = topdl.topology_to_ns2(t, filters, routing=routing)
568        try:
569            f = open(expfn, "w")
570            print >>f, expfile
571            f.close()
572        except EnvironmentError:
573            raise service_error(service_error.internal,
574                    "Cannot write experiment file %s: %s" % (expfn,e))
575
576    def export_store_info(self, cf, proj, ename, connInfo):
577        """
578        For the export requests in the connection info, send out teh SetValue
579        calls for parameters initialized by the segment's export_segment
580        object. Values are all in the connInfo parameter's value fields.
581        """
582
583        for c in connInfo:
584            for p in c.get('parameter', []):
585                # Ignore non-output parameters
586                if p.get('type', '') != 'output':
587                    continue
588                # Sanity check store, key and value
589                surl = p.get('store', None)
590                key = p.get('key', None)
591                value = p.get('value', None)
592                if surl is None:
593                    self.log.error("Export parameter without store: %s" % \
594                            p.get('name'))
595                    continue
596                if key is None:
597                    self.log.error("Export parameter without key: %s" % \
598                            p.get('name'))
599                    continue
600                if value is None:
601                    self.log.error("Unknown (unset) export parameter: %s" % \
602                            p.get('name'))
603                    continue
604                # Set the store value
605                req = { 'name': key, 'value': value }
606                self.call_SetValue(surl, req, cf)
607
608    def add_seer_node(self, topo, name, startup):
609        """
610        Add a seer node to the given topology, with the startup command passed
611        in.  Used by configure seer_services.
612        """
613        c_node = topdl.Computer(
614                name=name, 
615                os= topdl.OperatingSystem(
616                    attribute=[
617                    { 'attribute': 'osid', 
618                        'value': self.local_seer_image },
619                    ]),
620                attribute=[
621                    { 'attribute': 'startup', 'value': startup },
622                    ]
623                )
624        self.add_kit(c_node, self.local_seer_software)
625        topo.elements.append(c_node)
626
627    def configure_seer_services(self, services, topo, softdir):
628        """
629        Make changes to the topology required for the seer requests being made.
630        Specifically, add any control or master nodes required and set up the
631        start commands on the nodes to interconnect them.
632        """
633        local_seer = False      # True if we need to add a control node
634        collect_seer = False    # True if there is a seer-master node
635        seer_master= False      # True if we need to add the seer-master
636        for s in services:
637            s_name = s.get('name', '')
638            s_vis = s.get('visibility','')
639
640            if s_name  == 'local_seer_control' and s_vis == 'export':
641                local_seer = True
642            elif s_name == 'seer_master':
643                if s_vis == 'import':
644                    collect_seer = True
645                elif s_vis == 'export':
646                    seer_master = True
647       
648        # We've got the whole picture now, so add nodes if needed and configure
649        # them to interconnect properly.
650        if local_seer or seer_master:
651            # Copy local seer control node software to the tempdir
652            for l, f in self.local_seer_software:
653                base = os.path.basename(f)
654                copy_file(f, "%s/%s" % (softdir, base))
655        # If we're collecting seers somewhere the controllers need to talk to
656        # the master.  In testbeds that export the service, that will be a
657        # local node that we'll add below.  Elsewhere it will be the control
658        # portal that will port forward to the exporting master.
659        if local_seer:
660            if collect_seer:
661                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
662            else:
663                startup = self.local_seer_start
664            self.add_seer_node(topo, 'control', startup)
665        # If this is the seer master, add that node, too.
666        if seer_master:
667            self.add_seer_node(topo, 'seer-master', 
668                    "%s -R -n -R seer-master -R -A -R sink" % \
669                            self.seer_master_start)
670
671    def retrieve_software(self, topo, certfile, softdir):
672        """
673        Collect the software that nodes in the topology need loaded and stage
674        it locally.  This implies retrieving it from the experiment_controller
675        and placing it into softdir.  Certfile is used to prove that this node
676        has access to that data (it's the allocation/segment fedid).  Finally
677        local portal and federation software is also copied to the same staging
678        directory for simplicity - all software needed for experiment creation
679        is in softdir.
680        """
681        sw = set()
682        for e in topo.elements:
683            for s in getattr(e, 'software', []):
684                sw.add(s.location)
685        for s in sw:
686            self.log.debug("Retrieving %s" % s)
687            try:
688                get_url(s, certfile, softdir, log=self.log)
689            except:
690                t, v, st = sys.exc_info()
691                raise service_error(service_error.internal,
692                        "Error retrieving %s: %s" % (s, v))
693
694        # Copy local federation and portal node software to the tempdir
695        for s in (self.federation_software, self.portal_software):
696            for l, f in s:
697                base = os.path.basename(f)
698                copy_file(f, "%s/%s" % (softdir, base))
699
700
701    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
702        """
703        Gather common configuration files, retrieve or create an experiment
704        name and project name, and return the ssh_key filenames.  Create an
705        allocation log bound to the state log variable as well.
706        """
707        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
708                'seer_ca_pem', 'seer_node_pem', 'route.tgz')
709        ename = None
710        pubkey_base = None
711        secretkey_base = None
712        proj = None
713        user = None
714        alloc_log = None
715        nonce_experiment = False
716        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
717
718        self.state_lock.acquire()
719        if aid in self.allocation:
720            proj = self.allocation[aid].get('project', None)
721        self.state_lock.release()
722
723        if not proj:
724            raise service_error(service_error.internal, 
725                    "Can't find project for %s" %aid)
726
727        for a in attrs:
728            if a['attribute'] in configs:
729                try:
730                    self.log.debug("Retrieving %s from %s" % \
731                            (a['attribute'], a['value']))
732                    get_url(a['value'], certfile, tmpdir, log=self.log)
733                except:
734                    t, v, st = sys.exc_info()
735                    raise service_error(service_error.internal,
736                            "Error retrieving %s: %s" % (a.get('value', ""), v))
737            if a['attribute'] == 'ssh_pubkey':
738                pubkey_base = a['value'].rpartition('/')[2]
739            if a['attribute'] == 'ssh_secretkey':
740                secretkey_base = a['value'].rpartition('/')[2]
741            if a['attribute'] == 'experiment_name':
742                ename = a['value']
743
744        # Names longer than the emulab max are discarded
745        if ename and len(ename) <= self.max_name_len:
746            # Clean up the experiment name so that emulab will accept it.
747            ename = re.sub(vchars_re, '-', ename)
748
749        else:
750            ename = ""
751            for i in range(0,5):
752                ename += random.choice(string.ascii_letters)
753            nonce_experiment = True
754            self.log.warn("No experiment name or suggestion too long: " + \
755                    "picked one randomly: %s" % ename)
756
757        if not pubkey_base:
758            raise service_error(service_error.req, 
759                    "No public key attribute")
760
761        if not secretkey_base:
762            raise service_error(service_error.req, 
763                    "No secret key attribute")
764
765        self.state_lock.acquire()
766        if aid in self.allocation:
767            user = self.allocation[aid].get('user', None)
768            cert = self.allocation[aid].get('cert', None)
769            self.allocation[aid]['experiment'] = ename
770            self.allocation[aid]['nonce'] = nonce_experiment
771            self.allocation[aid]['log'] = [ ]
772            # Create a logger that logs to the experiment's state object as
773            # well as to the main log file.
774            alloc_log = logging.getLogger('fedd.access.%s' % ename)
775            h = logging.StreamHandler(
776                    list_log.list_log(self.allocation[aid]['log']))
777            # XXX: there should be a global one of these rather than
778            # repeating the code.
779            h.setFormatter(logging.Formatter(
780                "%(asctime)s %(name)s %(message)s",
781                        '%d %b %y %H:%M:%S'))
782            alloc_log.addHandler(h)
783            self.write_state()
784        self.state_lock.release()
785
786        if not user:
787            raise service_error(service_error.internal, 
788                    "Can't find creation user for %s" %aid)
789
790        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
791
792    def decorate_topology(self, info, t):
793        """
794        Copy the physical mapping and status onto the topology.  Used by
795        StartSegment and InfoSegment
796        """
797        def add_new(ann, attr):
798            for a in ann:
799                if a not in attr: attr.append(a)
800
801        def merge_os(os, e):
802            if len(e.os) == 0:
803                # No OS at all:
804                if os.get_attribute('emulab_access:image'):
805                    os.set_attribute('emulab_access:initial_image', 
806                            os.get_attribute('emulab_access:image'))
807                e.os = [ os ]
808            elif len(e.os) == 1:
809                # There's one OS, copy the initial image and replace
810                eos = e.os[0]
811                initial = eos.get_attribute('emulab_access:initial_image')
812                if initial:
813                    os.set_attribute('emulab_access:initial_image', initial)
814                e.os = [ os] 
815            else:
816                # Multiple OSes, replace or append
817                for eos in e.os:
818                    if os.name == eos.name:
819                        eos.version = os.version
820                        eos.version = os.distribution
821                        eos.version = os.distributionversion
822                        for a in os.attribute:
823                            if eos.get_attribute(a.attribute):
824                                eos.remove_attribute(a.attribute)
825                            eos.set_attribute(a.attribute, a.value)
826                        break
827                else:
828                    e.os.append(os)
829
830
831        if t is None: return
832        i = 0 # For fake debugging instantiation
833        # Copy the assigned names into the return topology
834        for e in t.elements:
835            if isinstance(e, topdl.Computer):
836                if not self.create_debug:
837                    if e.name in info.node:
838                        add_new(("%s%s" % 
839                            (info.node[e.name].pname, self.domain),),
840                            e.localname)
841                        add_new(("%s%s" % 
842                            (info.node[e.name].lname, self.domain),),
843                            e.localname)
844                        e.status = info.node[e.name].status
845                        os = info.node[e.name].getOS()
846                        if os: merge_os(os, e)
847                else:
848                    # Simple debugging assignment
849                    add_new(("node%d%s" % (i, self.domain),), e.localname)
850                    e.status = 'active'
851                    add_new(('testop1', 'testop2'), e.operation)
852                    i += 1
853
854        for s in t.substrates:
855            if s.name in info.subs:
856                sub = info.subs[s.name]
857                if sub.cap is not None:
858                    s.capacity = topdl.Capacity(sub.cap, 'max')
859                if sub.delay is not None:
860                    s.delay = topdl.Latency(sub.delay, 'max')
861        # XXX interfaces
862
863
864    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
865        """
866        Store key bits of experiment state in the global repository, including
867        the response that may need to be replayed, and return the response.
868        """
869        def get_localnames(t):
870            names = [ ]
871            for e in t.elements:
872                if isinstance(e, topdl.Computer):
873                    n = e.get_attribute('testbed')
874                    if n is not None and n not in names:
875                        names.append(n)
876            return names
877        i = 0
878        t = topo.clone()
879        self.decorate_topology(starter, t)
880        # Grab the log (this is some anal locking, but better safe than
881        # sorry)
882        self.state_lock.acquire()
883        # Put information about this testbed into the topdl
884        tb = topdl.Testbed(self.uri, "deter",
885                localname=get_localnames(t), 
886                attribute=[ 
887                    { 
888                        'attribute': 'project', 
889                        'value': self.allocation[aid]['project']
890                    },
891                    { 
892                        'attribute': 'experiment', 
893                        'value': self.allocation[aid]['experiment']
894                    }])
895        t.elements.append(tb)
896        logv = "".join(self.allocation[aid]['log'])
897        # It's possible that the StartSegment call gets retried (!).
898        # if the 'started' key is in the allocation, we'll return it rather
899        # than redo the setup.
900        self.allocation[aid]['started'] = { 
901                'allocID': alloc_id,
902                'allocationLog': logv,
903                'segmentdescription': { 
904                    'topdldescription': t.to_dict()
905                    },
906                'proof': proof.to_dict(),
907                }
908        self.allocation[aid]['topo'] = t
909        retval = copy.copy(self.allocation[aid]['started'])
910        self.write_state()
911        self.state_lock.release()
912        return retval
913   
914    # End of StartSegment support routines
915
916    def StartSegment(self, req, fid):
917        err = None  # Any service_error generated after tmpdir is created
918        rv = None   # Return value from segment creation
919
920        self.log.info("StartSegment called by %s" % fid)
921        try:
922            req = req['StartSegmentRequestBody']
923            auth_attr = req['allocID']['fedid']
924            topref = req['segmentdescription']['topdldescription']
925        except KeyError:
926            raise service_error(server_error.req, "Badly formed request")
927
928        connInfo = req.get('connection', [])
929        services = req.get('service', [])
930        aid = "%s" % auth_attr
931        attrs = req.get('fedAttr', [])
932
933        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
934                with_proof=True)
935        if not access_ok:
936            self.log.info("StartSegment for %s failed: access denied" % fid)
937            raise service_error(service_error.access, "Access denied")
938        else:
939            # See if this is a replay of an earlier succeeded StartSegment -
940            # sometimes SSL kills 'em.  If so, replay the response rather than
941            # redoing the allocation.
942            self.state_lock.acquire()
943            retval = self.allocation[aid].get('started', None)
944            self.state_lock.release()
945            if retval:
946                self.log.warning("Duplicate StartSegment for %s: " % aid + \
947                        "replaying response")
948                return retval
949
950        # A new request.  Do it.
951
952        if topref: topo = topdl.Topology(**topref)
953        else:
954            raise service_error(service_error.req, 
955                    "Request missing segmentdescription'")
956       
957        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
958        try:
959            tmpdir = tempfile.mkdtemp(prefix="access-")
960            softdir = "%s/software" % tmpdir
961            os.mkdir(softdir)
962        except EnvironmentError:
963            self.log.info("StartSegment for %s failed: internal error" % fid)
964            raise service_error(service_error.internal, "Cannot create tmp dir")
965
966        # Try block alllows us to clean up temporary files.
967        try:
968            self.retrieve_software(topo, certfile, softdir)
969            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
970                alloc_log =  self.initialize_experiment_info(attrs, aid, 
971                        certfile, tmpdir)
972            # A misconfigured cert in the ABAC map can be confusing...
973            if not os.access(xmlrpc_cert, os.R_OK):
974                self.log.error("Cannot open user's emulab SSL cert: %s" % \
975                        xmlrpc_cert)
976                raise service_error(service_error.internal,
977                        "Cannot open user's emulab SSL cert: %s" % xmlrpc_cert)
978
979
980            if '/' in proj: proj, gid = proj.split('/')
981            else: gid = None
982
983
984            # Set up userconf and seer if needed
985            self.configure_userconf(services, tmpdir)
986            self.configure_seer_services(services, topo, softdir)
987            # Get and send synch store variables
988            exporter = self.exports_segment(keyfile=self.ssh_privkey_file,
989                    debug=self.create_debug, log=alloc_log, boss=self.boss,
990                    ops=self.ops, cert=xmlrpc_cert)
991            if not exporter(self, ename, proj, user, connInfo, tmpdir, gid):
992                raise service_error(service_error.internal,
993                        'Failed to gather export variables')
994            self.export_store_info(certfile, proj, ename, connInfo)
995            self.import_store_info(certfile, connInfo)
996
997            expfile = "%s/experiment.tcl" % tmpdir
998
999            self.generate_portal_configs(topo, pubkey_base, 
1000                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1001            self.generate_ns2(topo, expfile, 
1002                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1003
1004            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1005                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1006                    ops=self.ops, cert=xmlrpc_cert)
1007            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
1008        except service_error, e:
1009            self.log.info("StartSegment for %s failed: %s"  % (fid, e))
1010            err = e
1011        except:
1012            t, v, st = sys.exc_info()
1013            self.log.info("StartSegment for %s failed:unexpected error: %s" \
1014                    % (fid, traceback.extract_tb(st)))
1015            err = service_error(service_error.internal, "%s: %s" % \
1016                    (v, traceback.extract_tb(st)))
1017
1018        # Walk up tmpdir, deleting as we go
1019        if self.cleanup: self.remove_dirs(tmpdir)
1020        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1021
1022        if rv:
1023            self.log.info("StartSegment for %s succeeded" % fid)
1024            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1025                    proof)
1026        elif err:
1027            raise service_error(service_error.federant,
1028                    "Swapin failed: %s" % err)
1029        else:
1030            raise service_error(service_error.federant, "Swapin failed")
1031
1032    def TerminateSegment(self, req, fid):
1033        self.log.info("TerminateSegment called by %s" % fid)
1034        try:
1035            req = req['TerminateSegmentRequestBody']
1036        except KeyError:
1037            raise service_error(server_error.req, "Badly formed request")
1038
1039        auth_attr = req['allocID']['fedid']
1040        aid = "%s" % auth_attr
1041        attrs = req.get('fedAttr', [])
1042
1043        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1044                with_proof=True)
1045        if not access_ok:
1046            raise service_error(service_error.access, "Access denied")
1047
1048        self.state_lock.acquire()
1049        if aid in self.allocation:
1050            proj = self.allocation[aid].get('project', None)
1051            user = self.allocation[aid].get('user', None)
1052            xmlrpc_cert = self.allocation[aid].get('cert', None)
1053            ename = self.allocation[aid].get('experiment', None)
1054            nonce = self.allocation[aid].get('nonce', False)
1055        else:
1056            proj = None
1057            user = None
1058            ename = None
1059            nonce = False
1060            xmlrpc_cert = None
1061        self.state_lock.release()
1062
1063        if not proj:
1064            self.log.info("TerminateSegment failed for %s: cannot find project"\
1065                    % fid)
1066            raise service_error(service_error.internal, 
1067                    "Can't find project for %s" % aid)
1068        else:
1069            if '/' in proj: proj, gid = proj.split('/')
1070            else: gid = None
1071
1072        if not user:
1073            self.log.info("TerminateSegment failed for %s: cannot find user"\
1074                    % fid)
1075            raise service_error(service_error.internal, 
1076                    "Can't find creation user for %s" % aid)
1077        if not ename:
1078            self.log.info(
1079                    "TerminateSegment failed for %s: cannot find experiment"\
1080                    % fid)
1081            raise service_error(service_error.internal, 
1082                    "Can't find experiment name for %s" % aid)
1083       
1084        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1085                debug=self.create_debug, boss=self.boss, ops=self.ops,
1086                cert=xmlrpc_cert)
1087        stopper(self, user, proj, ename, gid, nonce)
1088        self.log.info("TerminateSegment succeeded for %s %s %s" % \
1089                (fid, proj, ename))
1090        self.state_lock.acquire()
1091        # Remove the started flag/info - the segment is no longer started
1092        if aid in self.allocation:
1093            if 'started' in self.allocation[aid]:
1094                del self.allocation[aid]['started']
1095            self.write_state()
1096        self.state_lock.release()
1097        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1098
1099    def InfoSegment(self, req, fid):
1100        self.log.info("InfoSegment called by %s" % fid)
1101        try:
1102            req = req['InfoSegmentRequestBody']
1103        except KeyError:
1104            raise service_error(server_error.req, "Badly formed request")
1105
1106        auth_attr = req['allocID']['fedid']
1107        aid = "%s" % auth_attr
1108
1109        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1110                with_proof=True)
1111        if not access_ok:
1112            raise service_error(service_error.access, "Access denied")
1113
1114        self.state_lock.acquire()
1115        if aid in self.allocation:
1116            topo = self.allocation[aid].get('topo', None)
1117            if topo: topo = topo.clone()
1118            proj = self.allocation[aid].get('project', None)
1119            user = self.allocation[aid].get('user', None)
1120            xmlrpc_cert = self.allocation[aid].get('cert', None)
1121            ename = self.allocation[aid].get('experiment', None)
1122        else:
1123            proj = None
1124            user = None
1125            ename = None
1126            topo = None
1127            xmlrpc_cert = None
1128        self.state_lock.release()
1129
1130        if not proj:
1131            self.log.info("InfoSegment failed for %s: cannot find project"% fid)
1132            raise service_error(service_error.internal, 
1133                    "Can't find project for %s" % aid)
1134        else:
1135            if '/' in proj: proj, gid = proj.split('/')
1136            else: gid = None
1137
1138        if not user:
1139            self.log.info("InfoSegment failed for %s: cannot find user"% fid)
1140            raise service_error(service_error.internal, 
1141                    "Can't find creation user for %s" % aid)
1142        if not ename:
1143            self.log.info("InfoSegment failed for %s: cannot find exp"% fid)
1144            raise service_error(service_error.internal, 
1145                    "Can't find experiment name for %s" % aid)
1146        info = self.info_segment(keyfile=self.ssh_privkey_file,
1147                debug=self.create_debug, boss=self.boss, ops=self.ops,
1148                cert=xmlrpc_cert)
1149        info(self, user, proj, ename)
1150        self.log.info("InfoSegment gathered info for %s %s %s %s" % \
1151                (fid, user, proj, ename))
1152        self.decorate_topology(info, topo)
1153        self.state_lock.acquire()
1154        if aid in self.allocation:
1155            self.allocation[aid]['topo'] = topo
1156            self.write_state()
1157        self.state_lock.release()
1158        self.log.info("InfoSegment updated info for %s %s %s %s" % \
1159                (fid, user, proj, ename))
1160
1161        rv = { 
1162                'allocID': req['allocID'], 
1163                'proof': proof.to_dict(),
1164                }
1165        self.log.info("InfoSegment succeeded info for %s %s %s %s" % \
1166                (fid, user, proj, ename))
1167        if topo:
1168            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1169        return rv
1170
1171    def OperationSegment(self, req, fid):
1172        def get_pname(e):
1173            """
1174            Get the physical name of a node
1175            """
1176            if e.localname:
1177                return re.sub('\..*','', e.localname[0])
1178            else:
1179                return None
1180
1181        self.log.info("OperationSegment called by %s" % fid)
1182        try:
1183            req = req['OperationSegmentRequestBody']
1184        except KeyError:
1185            raise service_error(server_error.req, "Badly formed request")
1186
1187        auth_attr = req['allocID']['fedid']
1188        aid = "%s" % auth_attr
1189
1190        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1191                with_proof=True)
1192        if not access_ok:
1193            self.log.info("OperationSegment failed for %s: access denied" % fid)
1194            raise service_error(service_error.access, "Access denied")
1195
1196        op = req.get('operation', None)
1197        targets = req.get('target', None)
1198        params = req.get('parameter', None)
1199
1200        if op is None :
1201            self.log.info("OperationSegment failed for %s: no operation" % fid)
1202            raise service_error(service_error.req, "missing operation")
1203        elif targets is None:
1204            self.log.info("OperationSegment failed for %s: no targets" % fid)
1205            raise service_error(service_error.req, "no targets")
1206
1207        self.state_lock.acquire()
1208        if aid in self.allocation:
1209            topo = self.allocation[aid].get('topo', None)
1210            if topo: topo = topo.clone()
1211            xmlrpc_cert = self.allocation[aid].get('cert', None)
1212        else:
1213            topo = None
1214            xmlrpc_cert = None
1215        self.state_lock.release()
1216
1217        targets = copy.copy(targets)
1218        ptargets = { }
1219        for e in topo.elements:
1220            if isinstance(e, topdl.Computer):
1221                if e.name in targets:
1222                    targets.remove(e.name)
1223                    pn = get_pname(e)
1224                    if pn: ptargets[e.name] = pn
1225
1226        status = [ operation_status(t, operation_status.no_target) \
1227                for t in targets]
1228
1229        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1230                debug=self.create_debug, boss=self.boss, ops=self.ops,
1231                cert=xmlrpc_cert)
1232        ops(self, op, ptargets, params, topo)
1233        self.log.info("OperationSegment operated for %s" % fid)
1234       
1235        status.extend(ops.status)
1236        self.log.info("OperationSegment succeed for %s" % fid)
1237
1238        return { 
1239                'allocID': req['allocID'], 
1240                'status': [s.to_dict() for s in status],
1241                'proof': proof.to_dict(),
1242                }
Note: See TracBrowser for help on using the repository browser.