source: fedd/federation/emulab_access.py @ 26821ac

Last change on this file since 26821ac was 26821ac, checked in by Ted Faber <faber@…>, 11 years ago

Hooks for shared NAT

  • Property mode set to 100644
File size: 39.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13import socket
14
15from threading import *
16from M2Crypto.SSL import SSLError
17
18from access import access_base
19
20from util import *
21from deter import fedid, generate_fedid
22from authorizer import authorizer, abac_authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25from proof import proof as access_proof
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31from deter import topdl
32import list_log
33import emulab_segment
34
35
36# Make log messages disappear if noone configures a fedd logger
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.access")
41fl.addHandler(nullHandler())
42
43class access(access_base):
44    """
45    The implementation of access control based on mapping users to projects.
46
47    Users can be mapped to existing projects or have projects created
48    dynamically.  This implements both direct requests and proxies.
49    """
50
51    max_name_len = 19
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        access_base.__init__(self, config, auth)
59
60        self.max_name_len = access.max_name_len
61
62        self.allow_proxy = config.getboolean("access", "allow_proxy")
63
64        self.domain = config.get("access", "domain")
65        self.userconfdir = config.get("access","userconfdir")
66        self.userconfcmd = config.get("access","userconfcmd")
67        self.userconfurl = config.get("access","userconfurl")
68        self.federation_software = config.get("access", "federation_software")
69        self.portal_software = config.get("access", "portal_software")
70        self.local_seer_software = config.get("access", "local_seer_software")
71        self.local_seer_image = config.get("access", "local_seer_image")
72        self.local_seer_start = config.get("access", "local_seer_start")
73        self.seer_master_start = config.get("access", "seer_master_start")
74        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
75        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
76        self.ssh_port = config.get("access","ssh_port") or "22"
77        self.boss = config.get("access", "boss")
78        self.ops = config.get("access", "ops")
79        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
80        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
81
82        self.dragon_endpoint = config.get("access", "dragon")
83        self.dragon_vlans = config.get("access", "dragon_vlans")
84        self.deter_internal = config.get("access", "deter_internal")
85
86        self.tunnel_config = config.getboolean("access", "tunnel_config")
87        self.portal_command = config.get("access", "portal_command")
88        self.portal_image = config.get("access", "portal_image")
89        self.portal_type = config.get("access", "portal_type") or "pc"
90        self.portal_startcommand = config.get("access", "portal_startcommand")
91        self.node_startcommand = config.get("access", "node_startcommand")
92        self.nat_portal = config.get("access", "nat_portal")
93
94        self.uri = 'https://%s:%d' % (socket.getfqdn(), 
95                self.get_port(config.get("globals", "services", "23235")))
96
97        self.federation_software = self.software_list(self.federation_software)
98        self.portal_software = self.software_list(self.portal_software)
99        self.local_seer_software = self.software_list(self.local_seer_software)
100
101        self.access_type = self.access_type.lower()
102        self.start_segment = emulab_segment.start_segment
103        self.stop_segment = emulab_segment.stop_segment
104        self.info_segment = emulab_segment.info_segment
105        self.operation_segment = emulab_segment.operation_segment
106        self.exports_segment = emulab_segment.exports_segment
107
108        self.restricted = [ ]
109        tb = config.get('access', 'testbed')
110        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
111        else: self.testbed = [ ]
112
113        # authorization information
114        self.auth_type = config.get('access', 'auth_type') \
115                or 'abac'
116        self.auth_dir = config.get('access', 'auth_dir')
117        accessdb = config.get("access", "accessdb")
118        # initialize the authorization system
119        if self.auth_type == 'abac':
120            self.auth = abac_authorizer(load=self.auth_dir)
121            self.access = [ ]
122            if accessdb:
123                self.read_access(accessdb, self.access_tuple)
124        else:
125            raise service_error(service_error.internal, 
126                    "Unknown auth_type: %s" % self.auth_type)
127
128        # read_state in the base_class
129        self.state_lock.acquire()
130        if 'allocation' not in self.state: self.state['allocation']= { }
131        self.allocation = self.state['allocation']
132        self.state_lock.release()
133        self.exports = {
134                'SMB': self.export_SMB,
135                'seer': self.export_seer,
136                'tmcd': self.export_tmcd,
137                'userconfig': self.export_userconfig,
138                'project_export': self.export_project_export,
139                'local_seer_control': self.export_local_seer,
140                'seer_master': self.export_seer_master,
141                'hide_hosts': self.export_hide_hosts,
142                }
143
144        if not self.local_seer_image or not self.local_seer_software or \
145                not self.local_seer_start:
146            if 'local_seer_control' in self.exports:
147                del self.exports['local_seer_control']
148
149        if not self.local_seer_image or not self.local_seer_software or \
150                not self.seer_master_start:
151            if 'seer_master' in self.exports:
152                del self.exports['seer_master']
153
154
155        self.soap_services = {\
156            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
157            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
158            'StartSegment': soap_handler("StartSegment", self.StartSegment),
159            'TerminateSegment': soap_handler("TerminateSegment",
160                self.TerminateSegment),
161            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
162            'OperationSegment': soap_handler("OperationSegment",
163                self.OperationSegment),
164            }
165        self.xmlrpc_services =  {\
166            'RequestAccess': xmlrpc_handler('RequestAccess',
167                self.RequestAccess),
168            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
169                self.ReleaseAccess),
170            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
171            'TerminateSegment': xmlrpc_handler('TerminateSegment',
172                self.TerminateSegment),
173            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
174            'OperationSegment': xmlrpc_handler("OperationSegment",
175                self.OperationSegment),
176            }
177
178        self.call_SetValue = service_caller('SetValue', log=self.log)
179        self.call_GetValue = service_caller('GetValue', log=self.log)
180
181    @staticmethod
182    def get_port(ps):
183        '''
184        Take a fedd service string and return the first port.  Used in
185        creating the testbed uri identifier.
186        '''
187        p = ps.split(',')
188        smallport = p[0].split(':')
189        try:
190            rv = int(smallport[0])
191        except ValueError:
192            rv = 23235
193        return rv
194
195    @staticmethod
196    def access_tuple(str):
197        """
198        Convert a string of the form (id, id, id) into an access_project.  This
199        is called by read_access to convert to local attributes.  It returns a
200        tuple of the form (project, user, certificate_file).
201        """
202
203        str = str.strip()
204        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
205            # The slice takes the parens off the string.
206            proj, user, cert = str[1:-1].split(',')
207            return (proj.strip(), user.strip(), cert.strip())
208        else:
209            raise self.parse_error(
210                    'Bad mapping (unbalanced parens or more than 2 commas)')
211
212    # RequestAccess support routines
213
214    def save_project_state(self, aid, pname, uname, certf, owners):
215        """
216        Save the project, user, and owners associated with this allocation.
217        This creates the allocation entry.
218        """
219        self.state_lock.acquire()
220        self.allocation[aid] = { }
221        self.allocation[aid]['project'] = pname
222        self.allocation[aid]['user'] = uname
223        self.allocation[aid]['cert'] = certf
224        self.allocation[aid]['owners'] = owners
225        self.write_state()
226        self.state_lock.release()
227        return (pname, uname)
228
229    # End of RequestAccess support routines
230
231    def RequestAccess(self, req, fid):
232        """
233        Handle the access request.  Proxy if not for us.
234
235        Parse out the fields and make the allocations or rejections if for us,
236        otherwise, assuming we're willing to proxy, proxy the request out.
237        """
238
239        def gateway_hardware(h):
240            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
241            else: return h
242
243        def get_export_project(svcs):
244            """
245            if the service requests includes one to export a project, return
246            that project.
247            """
248            rv = None
249            for s in svcs:
250                if s.get('name', '') == 'project_export' and \
251                        s.get('visibility', '') == 'export':
252                    if not rv: 
253                        for a in s.get('fedAttr', []):
254                            if a.get('attribute', '') == 'project' \
255                                    and 'value' in a:
256                                rv = a['value']
257                    else:
258                        raise service_error(service_error, access, 
259                                'Requesting multiple project exports is ' + \
260                                        'not supported');
261            return rv
262
263        self.log.info("RequestAccess called by %s" % fid)
264        # The dance to get into the request body
265        if req.has_key('RequestAccessRequestBody'):
266            req = req['RequestAccessRequestBody']
267        else:
268            raise service_error(service_error.req, "No request!?")
269
270        # if this includes a project export request, construct a filter such
271        # that only the ABAC attributes mapped to that project are checked for
272        # access.
273        if 'service' in req:
274            ep = get_export_project(req['service'])
275            if ep: pf = lambda(a): a.value[0] == ep
276            else: pf = None
277        else:
278            ep = None
279            pf = None
280
281        if self.auth.import_credentials(
282                data_list=req.get('abac_credential', [])):
283            self.auth.save()
284        else:
285            self.log.debug('failed to import incoming credentials')
286
287        if self.auth_type == 'abac':
288            found, owners, proof = self.lookup_access(req, fid, filter=pf)
289        else:
290            raise service_error(service_error.internal, 
291                    'Unknown auth_type: %s' % self.auth_type)
292        ap = None
293
294        # keep track of what's been added
295        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
296        aid = unicode(allocID)
297
298        pname, uname = self.save_project_state(aid, found[0], found[1], 
299                found[2], owners)
300
301        services, svc_state = self.export_services(req.get('service',[]),
302                pname, uname)
303        self.state_lock.acquire()
304        # Store services state in global state
305        for k, v in svc_state.items():
306            self.allocation[aid][k] = v
307        self.append_allocation_authorization(aid, 
308                set([(o, allocID) for o in owners]), state_attr='allocation')
309        self.write_state()
310        self.state_lock.release()
311        try:
312            f = open("%s/%s.pem" % (self.certdir, aid), "w")
313            print >>f, alloc_cert
314            f.close()
315        except EnvironmentError, e:
316            self.log.info("RequestAccess failed for by %s: internal error" \
317                    % fid)
318            raise service_error(service_error.internal, 
319                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
320        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
321        resp = self.build_access_response({ 'fedid': allocID } ,
322                pname, services, proof)
323        return resp
324
325    def ReleaseAccess(self, req, fid):
326        self.log.info("ReleaseAccess called by %s" % fid)
327        # The dance to get into the request body
328        if req.has_key('ReleaseAccessRequestBody'):
329            req = req['ReleaseAccessRequestBody']
330        else:
331            raise service_error(service_error.req, "No request!?")
332
333        try:
334            if req['allocID'].has_key('localname'):
335                auth_attr = aid = req['allocID']['localname']
336            elif req['allocID'].has_key('fedid'):
337                aid = unicode(req['allocID']['fedid'])
338                auth_attr = req['allocID']['fedid']
339            else:
340                raise service_error(service_error.req,
341                        "Only localnames and fedids are understood")
342        except KeyError:
343            raise service_error(service_error.req, "Badly formed request")
344
345        self.log.debug("[access] deallocation requested for %s by %s" % \
346                (aid, fid))
347        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
348                with_proof=True)
349        if not access_ok:
350            self.log.debug("[access] deallocation denied for %s", aid)
351            raise service_error(service_error.access, "Access Denied")
352
353        self.state_lock.acquire()
354        if aid in self.allocation:
355            self.log.debug("Found allocation for %s" %aid)
356            self.clear_allocation_authorization(aid, state_attr='allocation')
357            del self.allocation[aid]
358            self.write_state()
359            self.state_lock.release()
360            # Remove the access cert
361            cf = "%s/%s.pem" % (self.certdir, aid)
362            self.log.debug("Removing %s" % cf)
363            os.remove(cf)
364            self.log.info("ReleaseAccess succeeded for %s" % fid)
365            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
366        else:
367            self.state_lock.release()
368            raise service_error(service_error.req, "No such allocation")
369
370    # These are subroutines for StartSegment
371    def generate_ns2(self, topo, expfn, softdir, connInfo):
372        """
373        Convert topo into an ns2 file, decorated with appropriate commands for
374        the particular testbed setup.  Convert all requests for software, etc
375        to point at the staged copies on this testbed and add the federation
376        startcommands.
377        """
378        class dragon_commands:
379            """
380            Functor to spit out approrpiate dragon commands for nodes listed in
381            the connectivity description.  The constructor makes a dict mapping
382            dragon nodes to their parameters and the __call__ checks each
383            element in turn for membership.
384            """
385            def __init__(self, map):
386                self.node_info = map
387
388            def __call__(self, e):
389                s = ""
390                if isinstance(e, topdl.Computer):
391                    if self.node_info.has_key(e.name):
392                        info = self.node_info[e.name]
393                        for ifname, vlan, type in info:
394                            for i in e.interface:
395                                if i.name == ifname:
396                                    addr = i.get_attribute('ip4_address')
397                                    subs = i.substrate[0]
398                                    break
399                            else:
400                                raise service_error(service_error.internal,
401                                        "No interface %s on element %s" % \
402                                                (ifname, e.name))
403                            # XXX: do netmask right
404                            if type =='link':
405                                s = ("tb-allow-external ${%s} " + \
406                                        "dragonportal ip %s vlan %s " + \
407                                        "netmask 255.255.255.0\n") % \
408                                        (topdl.to_tcl_name(e.name), addr, vlan)
409                            elif type =='lan':
410                                s = ("tb-allow-external ${%s} " + \
411                                        "dragonportal " + \
412                                        "ip %s vlan %s usurp %s\n") % \
413                                        (topdl.to_tcl_name(e.name), addr, 
414                                                vlan, subs)
415                            else:
416                                raise service_error(service_error_internal,
417                                        "Unknown DRAGON type %s" % type)
418                return s
419
420        class not_dragon:
421            """
422            Return true if a node is in the given map of dragon nodes.
423            """
424            def __init__(self, map):
425                self.nodes = set(map.keys())
426
427            def __call__(self, e):
428                return e.name not in self.nodes
429
430        def have_portals(top):
431            '''
432            Return true if the topology has a portal node
433            '''
434            # The else is on the for
435            for e in top.elements:
436                if isinstance(e, topdl.Computer) and e.get_attribute('portal'):
437                    return True
438            else:
439                return False
440
441
442        # Main line of generate_ns2
443        t = topo.clone()
444
445        # Create the map of nodes that need direct connections (dragon
446        # connections) from the connInfo
447        dragon_map = { }
448        for i in [ i for i in connInfo if i['type'] == 'transit']:
449            for a in i.get('fedAttr', []):
450                if a['attribute'] == 'vlan_id':
451                    vlan = a['value']
452                    break
453            else:
454                raise service_error(service_error.internal, "No vlan tag")
455            members = i.get('member', [])
456            if len(members) > 1: type = 'lan'
457            else: type = 'link'
458
459            try:
460                for m in members:
461                    if m['element'] in dragon_map:
462                        dragon_map[m['element']].append(( m['interface'], 
463                            vlan, type))
464                    else:
465                        dragon_map[m['element']] = [( m['interface'], 
466                            vlan, type),]
467            except KeyError:
468                raise service_error(service_error.req,
469                        "Missing connectivity info")
470
471        def output_fixed_filter(e):
472            if not isinstance(e, topdl.Computer): return ""
473            fn = e.get_attribute('fixed')
474            if fn is None:
475                return ""
476            else:
477                return 'tb-fix-node ${%s} %s\n' % (topdl.to_tcl_name(e.name), fn)
478
479        def output_external_access(e):
480            '''
481            Allow nodes that have explicit external access requests attached to
482            have external access.  This will be replaced by a real risky
483            experiment filter.
484            '''
485            if not isinstance(e, topdl.Computer): return ""
486            ex = e.get_attribute('containers:external')
487            if ex is None:
488                return ""
489            else:
490                return 'tb-allow-external ${%s}\n' % topdl.to_tcl_name(e.name)
491
492        # Weed out the things we aren't going to instantiate: Segments, portal
493        # substrates, and portal interfaces.  (The copy in the for loop allows
494        # us to delete from e.elements in side the for loop).  While we're
495        # touching all the elements, we also adjust paths from the original
496        # testbed to local testbed paths and put the federation commands into
497        # the start commands
498        local = len(dragon_map) == 0 and not have_portals(t)
499        if local: routing = 'Static'
500        else: routing = 'Manual'
501
502        if local:
503            self.log.debug("Local experiment.")
504        for e in [e for e in t.elements]:
505            if isinstance(e, topdl.Segment):
506                t.elements.remove(e)
507            if isinstance(e, topdl.Computer):
508                self.add_kit(e, self.federation_software)
509                if e.get_attribute('portal') and self.portal_startcommand:
510                    # Add local portal support software
511                    self.add_kit(e, self.portal_software)
512                    # Portals never have a user-specified start command
513                    e.set_attribute('startup', self.portal_startcommand)
514                elif not local and self.node_startcommand:
515                    if e.get_attribute('startup'):
516                        e.set_attribute('startup', "%s \\$USER '%s'" % \
517                                (self.node_startcommand, 
518                                    e.get_attribute('startup')))
519                    else:
520                        e.set_attribute('startup', self.node_startcommand)
521
522                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
523                # Remove portal interfaces that do not connect to DRAGON
524                e.interface = [i for i in e.interface \
525                        if not i.get_attribute('portal') or i.name in dinf ]
526            # Fix software paths
527            for s in getattr(e, 'software', []):
528                s.location = re.sub("^.*/", softdir, s.location)
529
530        t.substrates = [ s.clone() for s in t.substrates ]
531        t.incorporate_elements()
532
533        # Customize the ns2 output for local portal commands and images
534        filters = [output_fixed_filter, output_external_access]
535
536        if self.dragon_endpoint:
537            add_filter = not_dragon(dragon_map)
538            filters.append(dragon_commands(dragon_map))
539        else:
540            add_filter = None
541
542        if self.portal_command:
543            filters.append(topdl.generate_portal_command_filter(
544                self.portal_command, add_filter=add_filter))
545
546        if self.portal_image:
547            filters.append(topdl.generate_portal_image_filter(
548                self.portal_image))
549
550        if self.portal_type:
551            filters.append(topdl.generate_portal_hardware_filter(
552                self.portal_type))
553
554        # Convert to ns and write it out
555        expfile = topdl.topology_to_ns2(t, filters, routing=routing)
556        try:
557            f = open(expfn, "w")
558            print >>f, expfile
559            f.close()
560        except EnvironmentError:
561            raise service_error(service_error.internal,
562                    "Cannot write experiment file %s: %s" % (expfn,e))
563
564    def export_store_info(self, cf, proj, ename, connInfo):
565        """
566        For the export requests in the connection info, send out teh SetValue
567        calls for parameters initialized by the segment's export_segment
568        object. Values are all in the connInfo parameter's value fields.
569        """
570
571        for c in connInfo:
572            for p in c.get('parameter', []):
573                # Ignore non-output parameters
574                if p.get('type', '') != 'output':
575                    continue
576                # Sanity check store, key and value
577                surl = p.get('store', None)
578                key = p.get('key', None)
579                value = p.get('value', None)
580                if surl is None:
581                    self.log.error("Export parameter without store: %s" % \
582                            p.get('name'))
583                    continue
584                if key is None:
585                    self.log.error("Export parameter without key: %s" % \
586                            p.get('name'))
587                    continue
588                if value is None:
589                    self.log.error("Unknown (unset) export parameter: %s" % \
590                            p.get('name'))
591                    continue
592                # Set the store value
593                req = { 'name': key, 'value': value }
594                self.call_SetValue(surl, req, cf)
595
596    def add_seer_node(self, topo, name, startup):
597        """
598        Add a seer node to the given topology, with the startup command passed
599        in.  Used by configure seer_services.
600        """
601        c_node = topdl.Computer(
602                name=name, 
603                os= topdl.OperatingSystem(
604                    attribute=[
605                    { 'attribute': 'osid', 
606                        'value': self.local_seer_image },
607                    ]),
608                attribute=[
609                    { 'attribute': 'startup', 'value': startup },
610                    ]
611                )
612        self.add_kit(c_node, self.local_seer_software)
613        topo.elements.append(c_node)
614
615    def configure_seer_services(self, services, topo, softdir):
616        """
617        Make changes to the topology required for the seer requests being made.
618        Specifically, add any control or master nodes required and set up the
619        start commands on the nodes to interconnect them.
620        """
621        local_seer = False      # True if we need to add a control node
622        collect_seer = False    # True if there is a seer-master node
623        seer_master= False      # True if we need to add the seer-master
624        for s in services:
625            s_name = s.get('name', '')
626            s_vis = s.get('visibility','')
627
628            if s_name  == 'local_seer_control' and s_vis == 'export':
629                local_seer = True
630            elif s_name == 'seer_master':
631                if s_vis == 'import':
632                    collect_seer = True
633                elif s_vis == 'export':
634                    seer_master = True
635       
636        # We've got the whole picture now, so add nodes if needed and configure
637        # them to interconnect properly.
638        if local_seer or seer_master:
639            # Copy local seer control node software to the tempdir
640            for l, f in self.local_seer_software:
641                base = os.path.basename(f)
642                copy_file(f, "%s/%s" % (softdir, base))
643        # If we're collecting seers somewhere the controllers need to talk to
644        # the master.  In testbeds that export the service, that will be a
645        # local node that we'll add below.  Elsewhere it will be the control
646        # portal that will port forward to the exporting master.
647        if local_seer:
648            if collect_seer:
649                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
650            else:
651                startup = self.local_seer_start
652            self.add_seer_node(topo, 'control', startup)
653        # If this is the seer master, add that node, too.
654        if seer_master:
655            self.add_seer_node(topo, 'seer-master', 
656                    "%s -R -n -R seer-master -R -A -R sink" % \
657                            self.seer_master_start)
658
659    def retrieve_software(self, topo, certfile, softdir):
660        """
661        Collect the software that nodes in the topology need loaded and stage
662        it locally.  This implies retrieving it from the experiment_controller
663        and placing it into softdir.  Certfile is used to prove that this node
664        has access to that data (it's the allocation/segment fedid).  Finally
665        local portal and federation software is also copied to the same staging
666        directory for simplicity - all software needed for experiment creation
667        is in softdir.
668        """
669        sw = set()
670        for e in topo.elements:
671            for s in getattr(e, 'software', []):
672                sw.add(s.location)
673        for s in sw:
674            self.log.debug("Retrieving %s" % s)
675            try:
676                get_url(s, certfile, softdir, log=self.log)
677            except:
678                t, v, st = sys.exc_info()
679                raise service_error(service_error.internal,
680                        "Error retrieving %s: %s" % (s, v))
681
682        # Copy local federation and portal node software to the tempdir
683        for s in (self.federation_software, self.portal_software):
684            for l, f in s:
685                base = os.path.basename(f)
686                copy_file(f, "%s/%s" % (softdir, base))
687
688
689    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
690        """
691        Gather common configuration files, retrieve or create an experiment
692        name and project name, and return the ssh_key filenames.  Create an
693        allocation log bound to the state log variable as well.
694        """
695        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
696                'seer_ca_pem', 'seer_node_pem', 'route.tgz')
697        ename = None
698        pubkey_base = None
699        secretkey_base = None
700        proj = None
701        user = None
702        alloc_log = None
703        nonce_experiment = False
704        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
705
706        self.state_lock.acquire()
707        if aid in self.allocation:
708            proj = self.allocation[aid].get('project', None)
709        self.state_lock.release()
710
711        if not proj:
712            raise service_error(service_error.internal, 
713                    "Can't find project for %s" %aid)
714
715        for a in attrs:
716            if a['attribute'] in configs:
717                try:
718                    self.log.debug("Retrieving %s from %s" % \
719                            (a['attribute'], a['value']))
720                    get_url(a['value'], certfile, tmpdir, log=self.log)
721                except:
722                    t, v, st = sys.exc_info()
723                    raise service_error(service_error.internal,
724                            "Error retrieving %s: %s" % (a.get('value', ""), v))
725            if a['attribute'] == 'ssh_pubkey':
726                pubkey_base = a['value'].rpartition('/')[2]
727            if a['attribute'] == 'ssh_secretkey':
728                secretkey_base = a['value'].rpartition('/')[2]
729            if a['attribute'] == 'experiment_name':
730                ename = a['value']
731
732        # Names longer than the emulab max are discarded
733        if ename and len(ename) <= self.max_name_len:
734            # Clean up the experiment name so that emulab will accept it.
735            ename = re.sub(vchars_re, '-', ename)
736
737        else:
738            ename = ""
739            for i in range(0,5):
740                ename += random.choice(string.ascii_letters)
741            nonce_experiment = True
742            self.log.warn("No experiment name or suggestion too long: " + \
743                    "picked one randomly: %s" % ename)
744
745        if not pubkey_base:
746            raise service_error(service_error.req, 
747                    "No public key attribute")
748
749        if not secretkey_base:
750            raise service_error(service_error.req, 
751                    "No secret key attribute")
752
753        self.state_lock.acquire()
754        if aid in self.allocation:
755            user = self.allocation[aid].get('user', None)
756            cert = self.allocation[aid].get('cert', None)
757            self.allocation[aid]['experiment'] = ename
758            self.allocation[aid]['nonce'] = nonce_experiment
759            self.allocation[aid]['log'] = [ ]
760            # Create a logger that logs to the experiment's state object as
761            # well as to the main log file.
762            alloc_log = logging.getLogger('fedd.access.%s' % ename)
763            h = logging.StreamHandler(
764                    list_log.list_log(self.allocation[aid]['log']))
765            # XXX: there should be a global one of these rather than
766            # repeating the code.
767            h.setFormatter(logging.Formatter(
768                "%(asctime)s %(name)s %(message)s",
769                        '%d %b %y %H:%M:%S'))
770            alloc_log.addHandler(h)
771            self.write_state()
772        self.state_lock.release()
773
774        if not user:
775            raise service_error(service_error.internal, 
776                    "Can't find creation user for %s" %aid)
777
778        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
779
780    def decorate_topology(self, info, t):
781        """
782        Copy the physical mapping and status onto the topology.  Used by
783        StartSegment and InfoSegment
784        """
785        def add_new(ann, attr):
786            for a in ann:
787                if a not in attr: attr.append(a)
788
789        def merge_os(os, e):
790            if len(e.os) == 0:
791                # No OS at all:
792                if os.get_attribute('emulab_access:image'):
793                    os.set_attribute('emulab_access:initial_image', 
794                            os.get_attribute('emulab_access:image'))
795                e.os = [ os ]
796            elif len(e.os) == 1:
797                # There's one OS, copy the initial image and replace
798                eos = e.os[0]
799                initial = eos.get_attribute('emulab_access:initial_image')
800                if initial:
801                    os.set_attribute('emulab_access:initial_image', initial)
802                e.os = [ os] 
803            else:
804                # Multiple OSes, replace or append
805                for eos in e.os:
806                    if os.name == eos.name:
807                        eos.version = os.version
808                        eos.version = os.distribution
809                        eos.version = os.distributionversion
810                        for a in os.attribute:
811                            if eos.get_attribute(a.attribute):
812                                eos.remove_attribute(a.attribute)
813                            eos.set_attribute(a.attribute, a.value)
814                        break
815                else:
816                    e.os.append(os)
817
818
819        if t is None: return
820        i = 0 # For fake debugging instantiation
821        # Copy the assigned names into the return topology
822        for e in t.elements:
823            if isinstance(e, topdl.Computer):
824                if not self.create_debug:
825                    if e.name in info.node:
826                        add_new(("%s%s" % 
827                            (info.node[e.name].pname, self.domain),),
828                            e.localname)
829                        add_new(("%s%s" % 
830                            (info.node[e.name].lname, self.domain),),
831                            e.localname)
832                        e.status = info.node[e.name].status
833                        os = info.node[e.name].getOS()
834                        if os: merge_os(os, e)
835                else:
836                    # Simple debugging assignment
837                    add_new(("node%d%s" % (i, self.domain),), e.localname)
838                    e.status = 'active'
839                    add_new(('testop1', 'testop2'), e.operation)
840                    i += 1
841
842        for s in t.substrates:
843            if s.name in info.subs:
844                sub = info.subs[s.name]
845                if sub.cap is not None:
846                    s.capacity = topdl.Capacity(sub.cap, 'max')
847                if sub.delay is not None:
848                    s.delay = topdl.Latency(sub.delay, 'max')
849        # XXX interfaces
850
851
852    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
853        """
854        Store key bits of experiment state in the global repository, including
855        the response that may need to be replayed, and return the response.
856        """
857        def get_localnames(t):
858            names = [ ]
859            for e in t.elements:
860                if isinstance(e, topdl.Computer):
861                    n = e.get_attribute('testbed')
862                    if n is not None and n not in names:
863                        names.append(n)
864            return names
865        i = 0
866        t = topo.clone()
867        self.decorate_topology(starter, t)
868        # Grab the log (this is some anal locking, but better safe than
869        # sorry)
870        self.state_lock.acquire()
871        # Put information about this testbed into the topdl
872        tb = topdl.Testbed(self.uri, "deter",
873                localname=get_localnames(t), 
874                attribute=[ 
875                    { 
876                        'attribute': 'project', 
877                        'value': self.allocation[aid]['project']
878                    },
879                    { 
880                        'attribute': 'experiment', 
881                        'value': self.allocation[aid]['experiment']
882                    }])
883        t.elements.append(tb)
884        logv = "".join(self.allocation[aid]['log'])
885        # It's possible that the StartSegment call gets retried (!).
886        # if the 'started' key is in the allocation, we'll return it rather
887        # than redo the setup.
888        self.allocation[aid]['started'] = { 
889                'allocID': alloc_id,
890                'allocationLog': logv,
891                'segmentdescription': { 
892                    'topdldescription': t.to_dict()
893                    },
894                'proof': proof.to_dict(),
895                }
896        self.allocation[aid]['topo'] = t
897        retval = copy.copy(self.allocation[aid]['started'])
898        self.write_state()
899        self.state_lock.release()
900        return retval
901   
902    # End of StartSegment support routines
903
904    def StartSegment(self, req, fid):
905        err = None  # Any service_error generated after tmpdir is created
906        rv = None   # Return value from segment creation
907
908        self.log.info("StartSegment called by %s" % fid)
909        try:
910            req = req['StartSegmentRequestBody']
911            auth_attr = req['allocID']['fedid']
912            topref = req['segmentdescription']['topdldescription']
913        except KeyError:
914            raise service_error(server_error.req, "Badly formed request")
915
916        connInfo = req.get('connection', [])
917        services = req.get('service', [])
918        aid = "%s" % auth_attr
919        attrs = req.get('fedAttr', [])
920
921        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
922                with_proof=True)
923        if not access_ok:
924            self.log.info("StartSegment for %s failed: access denied" % fid)
925            raise service_error(service_error.access, "Access denied")
926        else:
927            # See if this is a replay of an earlier succeeded StartSegment -
928            # sometimes SSL kills 'em.  If so, replay the response rather than
929            # redoing the allocation.
930            self.state_lock.acquire()
931            retval = self.allocation[aid].get('started', None)
932            self.state_lock.release()
933            if retval:
934                self.log.warning("Duplicate StartSegment for %s: " % aid + \
935                        "replaying response")
936                return retval
937
938        # A new request.  Do it.
939
940        if topref: topo = topdl.Topology(**topref)
941        else:
942            raise service_error(service_error.req, 
943                    "Request missing segmentdescription'")
944       
945        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
946        try:
947            tmpdir = tempfile.mkdtemp(prefix="access-")
948            softdir = "%s/software" % tmpdir
949            os.mkdir(softdir)
950        except EnvironmentError:
951            self.log.info("StartSegment for %s failed: internal error" % fid)
952            raise service_error(service_error.internal, "Cannot create tmp dir")
953
954        # Try block alllows us to clean up temporary files.
955        try:
956            self.retrieve_software(topo, certfile, softdir)
957            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
958                alloc_log =  self.initialize_experiment_info(attrs, aid, 
959                        certfile, tmpdir)
960            # A misconfigured cert in the ABAC map can be confusing...
961            if not os.access(xmlrpc_cert, os.R_OK):
962                self.log.error("Cannot open user's emulab SSL cert: %s" % \
963                        xmlrpc_cert)
964                raise service_error(service_error.internal,
965                        "Cannot open user's emulab SSL cert: %s" % xmlrpc_cert)
966
967
968            if '/' in proj: proj, gid = proj.split('/')
969            else: gid = None
970
971
972            # Set up userconf and seer if needed
973            self.configure_userconf(services, tmpdir)
974            self.configure_seer_services(services, topo, softdir)
975            # Get and send synch store variables
976            exporter = self.exports_segment(keyfile=self.ssh_privkey_file,
977                    debug=self.create_debug, log=alloc_log, boss=self.boss,
978                    ops=self.ops, cert=xmlrpc_cert)
979            exporter(self, ename, proj, user, connInfo, tmpdir, gid)
980            self.export_store_info(certfile, proj, ename, connInfo)
981            self.import_store_info(certfile, connInfo)
982
983            expfile = "%s/experiment.tcl" % tmpdir
984
985            self.generate_portal_configs(topo, pubkey_base, 
986                    secretkey_base, tmpdir, proj, ename, connInfo, services)
987            self.generate_ns2(topo, expfile, 
988                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
989
990            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
991                    debug=self.create_debug, log=alloc_log, boss=self.boss,
992                    ops=self.ops, cert=xmlrpc_cert)
993            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
994        except service_error, e:
995            self.log.info("StartSegment for %s failed: %s"  % (fid, e))
996            err = e
997        except:
998            t, v, st = sys.exc_info()
999            self.log.info("StartSegment for %s failed:unexpected error: %s" \
1000                    % (fid, traceback.extract_tb(st)))
1001            err = service_error(service_error.internal, "%s: %s" % \
1002                    (v, traceback.extract_tb(st)))
1003
1004        # Walk up tmpdir, deleting as we go
1005        if self.cleanup: self.remove_dirs(tmpdir)
1006        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1007
1008        if rv:
1009            self.log.info("StartSegment for %s succeeded" % fid)
1010            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1011                    proof)
1012        elif err:
1013            raise service_error(service_error.federant,
1014                    "Swapin failed: %s" % err)
1015        else:
1016            raise service_error(service_error.federant, "Swapin failed")
1017
1018    def TerminateSegment(self, req, fid):
1019        self.log.info("TerminateSegment called by %s" % fid)
1020        try:
1021            req = req['TerminateSegmentRequestBody']
1022        except KeyError:
1023            raise service_error(server_error.req, "Badly formed request")
1024
1025        auth_attr = req['allocID']['fedid']
1026        aid = "%s" % auth_attr
1027        attrs = req.get('fedAttr', [])
1028
1029        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1030                with_proof=True)
1031        if not access_ok:
1032            raise service_error(service_error.access, "Access denied")
1033
1034        self.state_lock.acquire()
1035        if aid in self.allocation:
1036            proj = self.allocation[aid].get('project', None)
1037            user = self.allocation[aid].get('user', None)
1038            xmlrpc_cert = self.allocation[aid].get('cert', None)
1039            ename = self.allocation[aid].get('experiment', None)
1040            nonce = self.allocation[aid].get('nonce', False)
1041        else:
1042            proj = None
1043            user = None
1044            ename = None
1045            nonce = False
1046            xmlrpc_cert = None
1047        self.state_lock.release()
1048
1049        if not proj:
1050            self.log.info("TerminateSegment failed for %s: cannot find project"\
1051                    % fid)
1052            raise service_error(service_error.internal, 
1053                    "Can't find project for %s" % aid)
1054        else:
1055            if '/' in proj: proj, gid = proj.split('/')
1056            else: gid = None
1057
1058        if not user:
1059            self.log.info("TerminateSegment failed for %s: cannot find user"\
1060                    % fid)
1061            raise service_error(service_error.internal, 
1062                    "Can't find creation user for %s" % aid)
1063        if not ename:
1064            self.log.info(
1065                    "TerminateSegment failed for %s: cannot find experiment"\
1066                    % fid)
1067            raise service_error(service_error.internal, 
1068                    "Can't find experiment name for %s" % aid)
1069       
1070        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1071                debug=self.create_debug, boss=self.boss, ops=self.ops,
1072                cert=xmlrpc_cert)
1073        stopper(self, user, proj, ename, gid, nonce)
1074        self.log.info("TerminateSegment succeeded for %s %s %s" % \
1075                (fid, proj, ename))
1076        self.state_lock.acquire()
1077        # Remove the started flag/info - the segment is no longer started
1078        if aid in self.allocation:
1079            if 'started' in self.allocation[aid]:
1080                del self.allocation[aid]['started']
1081            self.write_state()
1082        self.state_lock.release()
1083        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1084
1085    def InfoSegment(self, req, fid):
1086        self.log.info("InfoSegment called by %s" % fid)
1087        try:
1088            req = req['InfoSegmentRequestBody']
1089        except KeyError:
1090            raise service_error(server_error.req, "Badly formed request")
1091
1092        auth_attr = req['allocID']['fedid']
1093        aid = "%s" % auth_attr
1094
1095        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1096                with_proof=True)
1097        if not access_ok:
1098            raise service_error(service_error.access, "Access denied")
1099
1100        self.state_lock.acquire()
1101        if aid in self.allocation:
1102            topo = self.allocation[aid].get('topo', None)
1103            if topo: topo = topo.clone()
1104            proj = self.allocation[aid].get('project', None)
1105            user = self.allocation[aid].get('user', None)
1106            xmlrpc_cert = self.allocation[aid].get('cert', None)
1107            ename = self.allocation[aid].get('experiment', None)
1108        else:
1109            proj = None
1110            user = None
1111            ename = None
1112            topo = None
1113            xmlrpc_cert = None
1114        self.state_lock.release()
1115
1116        if not proj:
1117            self.log.info("InfoSegment failed for %s: cannot find project"% fid)
1118            raise service_error(service_error.internal, 
1119                    "Can't find project for %s" % aid)
1120        else:
1121            if '/' in proj: proj, gid = proj.split('/')
1122            else: gid = None
1123
1124        if not user:
1125            self.log.info("InfoSegment failed for %s: cannot find user"% fid)
1126            raise service_error(service_error.internal, 
1127                    "Can't find creation user for %s" % aid)
1128        if not ename:
1129            self.log.info("InfoSegment failed for %s: cannot find exp"% fid)
1130            raise service_error(service_error.internal, 
1131                    "Can't find experiment name for %s" % aid)
1132        info = self.info_segment(keyfile=self.ssh_privkey_file,
1133                debug=self.create_debug, boss=self.boss, ops=self.ops,
1134                cert=xmlrpc_cert)
1135        info(self, user, proj, ename)
1136        self.log.info("InfoSegment gathered info for %s %s %s %s" % \
1137                (fid, user, proj, ename))
1138        self.decorate_topology(info, topo)
1139        self.state_lock.acquire()
1140        if aid in self.allocation:
1141            self.allocation[aid]['topo'] = topo
1142            self.write_state()
1143        self.state_lock.release()
1144        self.log.info("InfoSegment updated info for %s %s %s %s" % \
1145                (fid, user, proj, ename))
1146
1147        rv = { 
1148                'allocID': req['allocID'], 
1149                'proof': proof.to_dict(),
1150                }
1151        self.log.info("InfoSegment succeeded info for %s %s %s %s" % \
1152                (fid, user, proj, ename))
1153        if topo:
1154            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1155        return rv
1156
1157    def OperationSegment(self, req, fid):
1158        def get_pname(e):
1159            """
1160            Get the physical name of a node
1161            """
1162            if e.localname:
1163                return re.sub('\..*','', e.localname[0])
1164            else:
1165                return None
1166
1167        self.log.info("OperationSegment called by %s" % fid)
1168        try:
1169            req = req['OperationSegmentRequestBody']
1170        except KeyError:
1171            raise service_error(server_error.req, "Badly formed request")
1172
1173        auth_attr = req['allocID']['fedid']
1174        aid = "%s" % auth_attr
1175
1176        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1177                with_proof=True)
1178        if not access_ok:
1179            self.log.info("OperationSegment failed for %s: access denied" % fid)
1180            raise service_error(service_error.access, "Access denied")
1181
1182        op = req.get('operation', None)
1183        targets = req.get('target', None)
1184        params = req.get('parameter', None)
1185
1186        if op is None :
1187            self.log.info("OperationSegment failed for %s: no operation" % fid)
1188            raise service_error(service_error.req, "missing operation")
1189        elif targets is None:
1190            self.log.info("OperationSegment failed for %s: no targets" % fid)
1191            raise service_error(service_error.req, "no targets")
1192
1193        self.state_lock.acquire()
1194        if aid in self.allocation:
1195            topo = self.allocation[aid].get('topo', None)
1196            if topo: topo = topo.clone()
1197            xmlrpc_cert = self.allocation[aid].get('cert', None)
1198        else:
1199            topo = None
1200            xmlrpc_cert = None
1201        self.state_lock.release()
1202
1203        targets = copy.copy(targets)
1204        ptargets = { }
1205        for e in topo.elements:
1206            if isinstance(e, topdl.Computer):
1207                if e.name in targets:
1208                    targets.remove(e.name)
1209                    pn = get_pname(e)
1210                    if pn: ptargets[e.name] = pn
1211
1212        status = [ operation_status(t, operation_status.no_target) \
1213                for t in targets]
1214
1215        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1216                debug=self.create_debug, boss=self.boss, ops=self.ops,
1217                cert=xmlrpc_cert)
1218        ops(self, op, ptargets, params, topo)
1219        self.log.info("OperationSegment operated for %s" % fid)
1220       
1221        status.extend(ops.status)
1222        self.log.info("OperationSegment succeed for %s" % fid)
1223
1224        return { 
1225                'allocID': req['allocID'], 
1226                'status': [s.to_dict() for s in status],
1227                'proof': proof.to_dict(),
1228                }
Note: See TracBrowser for help on using the repository browser.