source: fedd/federation/emulab_access.py @ f725eda

Last change on this file since f725eda was f725eda, checked in by Ted Faber <faber@…>, 11 years ago

Allow risky access - need to be made tighter

  • Property mode set to 100644
File size: 39.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13import socket
14
15from threading import *
16from M2Crypto.SSL import SSLError
17
18from access import access_base
19
20from util import *
21from deter import fedid, generate_fedid
22from authorizer import authorizer, abac_authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25from proof import proof as access_proof
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31from deter import topdl
32import list_log
33import emulab_segment
34
35
36# Make log messages disappear if noone configures a fedd logger
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.access")
41fl.addHandler(nullHandler())
42
43class access(access_base):
44    """
45    The implementation of access control based on mapping users to projects.
46
47    Users can be mapped to existing projects or have projects created
48    dynamically.  This implements both direct requests and proxies.
49    """
50
51    max_name_len = 19
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        access_base.__init__(self, config, auth)
59
60        self.max_name_len = access.max_name_len
61
62        self.allow_proxy = config.getboolean("access", "allow_proxy")
63
64        self.domain = config.get("access", "domain")
65        self.userconfdir = config.get("access","userconfdir")
66        self.userconfcmd = config.get("access","userconfcmd")
67        self.userconfurl = config.get("access","userconfurl")
68        self.federation_software = config.get("access", "federation_software")
69        self.portal_software = config.get("access", "portal_software")
70        self.local_seer_software = config.get("access", "local_seer_software")
71        self.local_seer_image = config.get("access", "local_seer_image")
72        self.local_seer_start = config.get("access", "local_seer_start")
73        self.seer_master_start = config.get("access", "seer_master_start")
74        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
75        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
76        self.ssh_port = config.get("access","ssh_port") or "22"
77        self.boss = config.get("access", "boss")
78        self.ops = config.get("access", "ops")
79        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
80        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
81
82        self.dragon_endpoint = config.get("access", "dragon")
83        self.dragon_vlans = config.get("access", "dragon_vlans")
84        self.deter_internal = config.get("access", "deter_internal")
85
86        self.tunnel_config = config.getboolean("access", "tunnel_config")
87        self.portal_command = config.get("access", "portal_command")
88        self.portal_image = config.get("access", "portal_image")
89        self.portal_type = config.get("access", "portal_type") or "pc"
90        self.portal_startcommand = config.get("access", "portal_startcommand")
91        self.node_startcommand = config.get("access", "node_startcommand")
92        self.nat_portal = config.get("access", "nat_portal")
93
94        self.uri = 'https://%s:%d' % (socket.getfqdn(), 
95                self.get_port(config.get("globals", "services", "23235")))
96
97        self.federation_software = self.software_list(self.federation_software)
98        self.portal_software = self.software_list(self.portal_software)
99        self.local_seer_software = self.software_list(self.local_seer_software)
100
101        self.access_type = self.access_type.lower()
102        self.start_segment = emulab_segment.start_segment
103        self.stop_segment = emulab_segment.stop_segment
104        self.info_segment = emulab_segment.info_segment
105        self.operation_segment = emulab_segment.operation_segment
106
107        self.restricted = [ ]
108        tb = config.get('access', 'testbed')
109        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
110        else: self.testbed = [ ]
111
112        # authorization information
113        self.auth_type = config.get('access', 'auth_type') \
114                or 'abac'
115        self.auth_dir = config.get('access', 'auth_dir')
116        accessdb = config.get("access", "accessdb")
117        # initialize the authorization system
118        if self.auth_type == 'abac':
119            self.auth = abac_authorizer(load=self.auth_dir)
120            self.access = [ ]
121            if accessdb:
122                self.read_access(accessdb, self.access_tuple)
123        else:
124            raise service_error(service_error.internal, 
125                    "Unknown auth_type: %s" % self.auth_type)
126
127        # read_state in the base_class
128        self.state_lock.acquire()
129        if 'allocation' not in self.state: self.state['allocation']= { }
130        self.allocation = self.state['allocation']
131        self.state_lock.release()
132        self.exports = {
133                'SMB': self.export_SMB,
134                'seer': self.export_seer,
135                'tmcd': self.export_tmcd,
136                'userconfig': self.export_userconfig,
137                'project_export': self.export_project_export,
138                'local_seer_control': self.export_local_seer,
139                'seer_master': self.export_seer_master,
140                'hide_hosts': self.export_hide_hosts,
141                }
142
143        if not self.local_seer_image or not self.local_seer_software or \
144                not self.local_seer_start:
145            if 'local_seer_control' in self.exports:
146                del self.exports['local_seer_control']
147
148        if not self.local_seer_image or not self.local_seer_software or \
149                not self.seer_master_start:
150            if 'seer_master' in self.exports:
151                del self.exports['seer_master']
152
153
154        self.soap_services = {\
155            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
156            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
157            'StartSegment': soap_handler("StartSegment", self.StartSegment),
158            'TerminateSegment': soap_handler("TerminateSegment",
159                self.TerminateSegment),
160            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
161            'OperationSegment': soap_handler("OperationSegment",
162                self.OperationSegment),
163            }
164        self.xmlrpc_services =  {\
165            'RequestAccess': xmlrpc_handler('RequestAccess',
166                self.RequestAccess),
167            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
168                self.ReleaseAccess),
169            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
170            'TerminateSegment': xmlrpc_handler('TerminateSegment',
171                self.TerminateSegment),
172            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
173            'OperationSegment': xmlrpc_handler("OperationSegment",
174                self.OperationSegment),
175            }
176
177        self.call_SetValue = service_caller('SetValue', log=self.log)
178        self.call_GetValue = service_caller('GetValue', log=self.log)
179
180    @staticmethod
181    def get_port(ps):
182        '''
183        Take a fedd service string and return the first port.  Used in
184        creating the testbed uri identifier.
185        '''
186        p = ps.split(',')
187        smallport = p[0].split(':')
188        try:
189            rv = int(smallport[0])
190        except ValueError:
191            rv = 23235
192        return rv
193
194    @staticmethod
195    def access_tuple(str):
196        """
197        Convert a string of the form (id, id, id) into an access_project.  This
198        is called by read_access to convert to local attributes.  It returns a
199        tuple of the form (project, user, certificate_file).
200        """
201
202        str = str.strip()
203        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
204            # The slice takes the parens off the string.
205            proj, user, cert = str[1:-1].split(',')
206            return (proj.strip(), user.strip(), cert.strip())
207        else:
208            raise self.parse_error(
209                    'Bad mapping (unbalanced parens or more than 2 commas)')
210
211    # RequestAccess support routines
212
213    def save_project_state(self, aid, pname, uname, certf, owners):
214        """
215        Save the project, user, and owners associated with this allocation.
216        This creates the allocation entry.
217        """
218        self.state_lock.acquire()
219        self.allocation[aid] = { }
220        self.allocation[aid]['project'] = pname
221        self.allocation[aid]['user'] = uname
222        self.allocation[aid]['cert'] = certf
223        self.allocation[aid]['owners'] = owners
224        self.write_state()
225        self.state_lock.release()
226        return (pname, uname)
227
228    # End of RequestAccess support routines
229
230    def RequestAccess(self, req, fid):
231        """
232        Handle the access request.  Proxy if not for us.
233
234        Parse out the fields and make the allocations or rejections if for us,
235        otherwise, assuming we're willing to proxy, proxy the request out.
236        """
237
238        def gateway_hardware(h):
239            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
240            else: return h
241
242        def get_export_project(svcs):
243            """
244            if the service requests includes one to export a project, return
245            that project.
246            """
247            rv = None
248            for s in svcs:
249                if s.get('name', '') == 'project_export' and \
250                        s.get('visibility', '') == 'export':
251                    if not rv: 
252                        for a in s.get('fedAttr', []):
253                            if a.get('attribute', '') == 'project' \
254                                    and 'value' in a:
255                                rv = a['value']
256                    else:
257                        raise service_error(service_error, access, 
258                                'Requesting multiple project exports is ' + \
259                                        'not supported');
260            return rv
261
262        self.log.info("RequestAccess called by %s" % fid)
263        # The dance to get into the request body
264        if req.has_key('RequestAccessRequestBody'):
265            req = req['RequestAccessRequestBody']
266        else:
267            raise service_error(service_error.req, "No request!?")
268
269        # if this includes a project export request, construct a filter such
270        # that only the ABAC attributes mapped to that project are checked for
271        # access.
272        if 'service' in req:
273            ep = get_export_project(req['service'])
274            if ep: pf = lambda(a): a.value[0] == ep
275            else: pf = None
276        else:
277            ep = None
278            pf = None
279
280        if self.auth.import_credentials(
281                data_list=req.get('abac_credential', [])):
282            self.auth.save()
283        else:
284            self.log.debug('failed to import incoming credentials')
285
286        if self.auth_type == 'abac':
287            found, owners, proof = self.lookup_access(req, fid, filter=pf)
288        else:
289            raise service_error(service_error.internal, 
290                    'Unknown auth_type: %s' % self.auth_type)
291        ap = None
292
293        # keep track of what's been added
294        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
295        aid = unicode(allocID)
296
297        pname, uname = self.save_project_state(aid, found[0], found[1], 
298                found[2], owners)
299
300        services, svc_state = self.export_services(req.get('service',[]),
301                pname, uname)
302        self.state_lock.acquire()
303        # Store services state in global state
304        for k, v in svc_state.items():
305            self.allocation[aid][k] = v
306        self.append_allocation_authorization(aid, 
307                set([(o, allocID) for o in owners]), state_attr='allocation')
308        self.write_state()
309        self.state_lock.release()
310        try:
311            f = open("%s/%s.pem" % (self.certdir, aid), "w")
312            print >>f, alloc_cert
313            f.close()
314        except EnvironmentError, e:
315            self.log.info("RequestAccess failed for by %s: internal error" \
316                    % fid)
317            raise service_error(service_error.internal, 
318                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
319        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
320        resp = self.build_access_response({ 'fedid': allocID } ,
321                pname, services, proof)
322        return resp
323
324    def ReleaseAccess(self, req, fid):
325        self.log.info("ReleaseAccess called by %s" % fid)
326        # The dance to get into the request body
327        if req.has_key('ReleaseAccessRequestBody'):
328            req = req['ReleaseAccessRequestBody']
329        else:
330            raise service_error(service_error.req, "No request!?")
331
332        try:
333            if req['allocID'].has_key('localname'):
334                auth_attr = aid = req['allocID']['localname']
335            elif req['allocID'].has_key('fedid'):
336                aid = unicode(req['allocID']['fedid'])
337                auth_attr = req['allocID']['fedid']
338            else:
339                raise service_error(service_error.req,
340                        "Only localnames and fedids are understood")
341        except KeyError:
342            raise service_error(service_error.req, "Badly formed request")
343
344        self.log.debug("[access] deallocation requested for %s by %s" % \
345                (aid, fid))
346        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
347                with_proof=True)
348        if not access_ok:
349            self.log.debug("[access] deallocation denied for %s", aid)
350            raise service_error(service_error.access, "Access Denied")
351
352        self.state_lock.acquire()
353        if aid in self.allocation:
354            self.log.debug("Found allocation for %s" %aid)
355            self.clear_allocation_authorization(aid, state_attr='allocation')
356            del self.allocation[aid]
357            self.write_state()
358            self.state_lock.release()
359            # Remove the access cert
360            cf = "%s/%s.pem" % (self.certdir, aid)
361            self.log.debug("Removing %s" % cf)
362            os.remove(cf)
363            self.log.info("ReleaseAccess succeeded for %s" % fid)
364            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
365        else:
366            self.state_lock.release()
367            raise service_error(service_error.req, "No such allocation")
368
369    # These are subroutines for StartSegment
370    def generate_ns2(self, topo, expfn, softdir, connInfo):
371        """
372        Convert topo into an ns2 file, decorated with appropriate commands for
373        the particular testbed setup.  Convert all requests for software, etc
374        to point at the staged copies on this testbed and add the federation
375        startcommands.
376        """
377        class dragon_commands:
378            """
379            Functor to spit out approrpiate dragon commands for nodes listed in
380            the connectivity description.  The constructor makes a dict mapping
381            dragon nodes to their parameters and the __call__ checks each
382            element in turn for membership.
383            """
384            def __init__(self, map):
385                self.node_info = map
386
387            def __call__(self, e):
388                s = ""
389                if isinstance(e, topdl.Computer):
390                    if self.node_info.has_key(e.name):
391                        info = self.node_info[e.name]
392                        for ifname, vlan, type in info:
393                            for i in e.interface:
394                                if i.name == ifname:
395                                    addr = i.get_attribute('ip4_address')
396                                    subs = i.substrate[0]
397                                    break
398                            else:
399                                raise service_error(service_error.internal,
400                                        "No interface %s on element %s" % \
401                                                (ifname, e.name))
402                            # XXX: do netmask right
403                            if type =='link':
404                                s = ("tb-allow-external ${%s} " + \
405                                        "dragonportal ip %s vlan %s " + \
406                                        "netmask 255.255.255.0\n") % \
407                                        (topdl.to_tcl_name(e.name), addr, vlan)
408                            elif type =='lan':
409                                s = ("tb-allow-external ${%s} " + \
410                                        "dragonportal " + \
411                                        "ip %s vlan %s usurp %s\n") % \
412                                        (topdl.to_tcl_name(e.name), addr, 
413                                                vlan, subs)
414                            else:
415                                raise service_error(service_error_internal,
416                                        "Unknown DRAGON type %s" % type)
417                return s
418
419        class not_dragon:
420            """
421            Return true if a node is in the given map of dragon nodes.
422            """
423            def __init__(self, map):
424                self.nodes = set(map.keys())
425
426            def __call__(self, e):
427                return e.name not in self.nodes
428
429        def have_portals(top):
430            '''
431            Return true if the topology has a portal node
432            '''
433            # The else is on the for
434            for e in top.elements:
435                if isinstance(e, topdl.Computer) and e.get_attribute('portal'):
436                    return True
437            else:
438                return False
439
440
441        # Main line of generate_ns2
442        t = topo.clone()
443
444        # Create the map of nodes that need direct connections (dragon
445        # connections) from the connInfo
446        dragon_map = { }
447        for i in [ i for i in connInfo if i['type'] == 'transit']:
448            for a in i.get('fedAttr', []):
449                if a['attribute'] == 'vlan_id':
450                    vlan = a['value']
451                    break
452            else:
453                raise service_error(service_error.internal, "No vlan tag")
454            members = i.get('member', [])
455            if len(members) > 1: type = 'lan'
456            else: type = 'link'
457
458            try:
459                for m in members:
460                    if m['element'] in dragon_map:
461                        dragon_map[m['element']].append(( m['interface'], 
462                            vlan, type))
463                    else:
464                        dragon_map[m['element']] = [( m['interface'], 
465                            vlan, type),]
466            except KeyError:
467                raise service_error(service_error.req,
468                        "Missing connectivity info")
469
470        def output_fixed_filter(e):
471            if not isinstance(e, topdl.Computer): return ""
472            fn = e.get_attribute('fixed')
473            if fn is None:
474                return ""
475            else:
476                return 'tb-fix-node ${%s} %s\n' % (topdl.to_tcl_name(e.name), fn)
477
478        def output_external_access(e):
479            '''
480            Allow nodes that have explicit external access requests attached to
481            have external access.  This will be replaced by a real risky
482            experiment filter.
483            '''
484            if not isinstance(e, topdl.Computer): return ""
485            ex = e.get_attribute('containers:external')
486            if ex is None:
487                return ""
488            else:
489                return 'tb-allow-external ${%s}\n' % topdl.to_tcl_name(e.name)
490
491        # Weed out the things we aren't going to instantiate: Segments, portal
492        # substrates, and portal interfaces.  (The copy in the for loop allows
493        # us to delete from e.elements in side the for loop).  While we're
494        # touching all the elements, we also adjust paths from the original
495        # testbed to local testbed paths and put the federation commands into
496        # the start commands
497        local = len(dragon_map) == 0 and not have_portals(t)
498        if local: routing = 'Static'
499        else: routing = 'Manual'
500
501        if local:
502            self.log.debug("Local experiment.")
503        for e in [e for e in t.elements]:
504            if isinstance(e, topdl.Segment):
505                t.elements.remove(e)
506            if isinstance(e, topdl.Computer):
507                self.add_kit(e, self.federation_software)
508                if e.get_attribute('portal') and self.portal_startcommand:
509                    # Add local portal support software
510                    self.add_kit(e, self.portal_software)
511                    # Portals never have a user-specified start command
512                    e.set_attribute('startup', self.portal_startcommand)
513                elif not local and self.node_startcommand:
514                    if e.get_attribute('startup'):
515                        e.set_attribute('startup', "%s \\$USER '%s'" % \
516                                (self.node_startcommand, 
517                                    e.get_attribute('startup')))
518                    else:
519                        e.set_attribute('startup', self.node_startcommand)
520
521                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
522                # Remove portal interfaces that do not connect to DRAGON
523                e.interface = [i for i in e.interface \
524                        if not i.get_attribute('portal') or i.name in dinf ]
525            # Fix software paths
526            for s in getattr(e, 'software', []):
527                s.location = re.sub("^.*/", softdir, s.location)
528
529        t.substrates = [ s.clone() for s in t.substrates ]
530        t.incorporate_elements()
531
532        # Customize the ns2 output for local portal commands and images
533        filters = [output_fixed_filter, output_external_access]
534
535        if self.dragon_endpoint:
536            add_filter = not_dragon(dragon_map)
537            filters.append(dragon_commands(dragon_map))
538        else:
539            add_filter = None
540
541        if self.portal_command:
542            filters.append(topdl.generate_portal_command_filter(
543                self.portal_command, add_filter=add_filter))
544
545        if self.portal_image:
546            filters.append(topdl.generate_portal_image_filter(
547                self.portal_image))
548
549        if self.portal_type:
550            filters.append(topdl.generate_portal_hardware_filter(
551                self.portal_type))
552
553        # Convert to ns and write it out
554        expfile = topdl.topology_to_ns2(t, filters, routing=routing)
555        try:
556            f = open(expfn, "w")
557            print >>f, expfile
558            f.close()
559        except EnvironmentError:
560            raise service_error(service_error.internal,
561                    "Cannot write experiment file %s: %s" % (expfn,e))
562
563    def export_store_info(self, cf, proj, ename, connInfo):
564        """
565        For the export requests in the connection info, install the peer names
566        at the experiment controller via SetValue calls.
567        """
568
569        for c in connInfo:
570            for p in [ p for p in c.get('parameter', []) \
571                    if p.get('type', '') == 'output']:
572
573                if p.get('name', '') == 'peer':
574                    k = p.get('key', None)
575                    surl = p.get('store', None)
576                    if surl :
577                        if self.nat_portal:
578                            value = self.nat_portal
579                        elif k and k.index('/') != -1:
580                            value = "%s.%s.%s%s" % \
581                                (k[k.index('/')+1:], ename, proj, self.domain)
582                        else: 
583                            self.log.error("Bad export request: %s" % p)
584                            continue
585                        self.log.debug("Setting %s to %s on %s" % \
586                                (k, value, surl))
587                        req = { 'name': k, 'value': value }
588                        self.call_SetValue(surl, req, cf)
589                    else:
590                        self.log.error("Bad export request: %s" % p)
591                elif p.get('name', '') == 'ssh_port':
592                    k = p.get('key', None)
593                    surl = p.get('store', None)
594                    if surl and k:
595                        req = { 'name': k, 'value': self.ssh_port }
596                        self.log.debug("Setting %s to %s on %s" % \
597                                (k, self.ssh_port, surl))
598                        self.call_SetValue(surl, req, cf)
599                    else:
600                        self.log.error("Bad export request: %s" % p)
601                else:
602                    self.log.error("Unknown export parameter: %s" % \
603                            p.get('name'))
604                    continue
605
606    def add_seer_node(self, topo, name, startup):
607        """
608        Add a seer node to the given topology, with the startup command passed
609        in.  Used by configure seer_services.
610        """
611        c_node = topdl.Computer(
612                name=name, 
613                os= topdl.OperatingSystem(
614                    attribute=[
615                    { 'attribute': 'osid', 
616                        'value': self.local_seer_image },
617                    ]),
618                attribute=[
619                    { 'attribute': 'startup', 'value': startup },
620                    ]
621                )
622        self.add_kit(c_node, self.local_seer_software)
623        topo.elements.append(c_node)
624
625    def configure_seer_services(self, services, topo, softdir):
626        """
627        Make changes to the topology required for the seer requests being made.
628        Specifically, add any control or master nodes required and set up the
629        start commands on the nodes to interconnect them.
630        """
631        local_seer = False      # True if we need to add a control node
632        collect_seer = False    # True if there is a seer-master node
633        seer_master= False      # True if we need to add the seer-master
634        for s in services:
635            s_name = s.get('name', '')
636            s_vis = s.get('visibility','')
637
638            if s_name  == 'local_seer_control' and s_vis == 'export':
639                local_seer = True
640            elif s_name == 'seer_master':
641                if s_vis == 'import':
642                    collect_seer = True
643                elif s_vis == 'export':
644                    seer_master = True
645       
646        # We've got the whole picture now, so add nodes if needed and configure
647        # them to interconnect properly.
648        if local_seer or seer_master:
649            # Copy local seer control node software to the tempdir
650            for l, f in self.local_seer_software:
651                base = os.path.basename(f)
652                copy_file(f, "%s/%s" % (softdir, base))
653        # If we're collecting seers somewhere the controllers need to talk to
654        # the master.  In testbeds that export the service, that will be a
655        # local node that we'll add below.  Elsewhere it will be the control
656        # portal that will port forward to the exporting master.
657        if local_seer:
658            if collect_seer:
659                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
660            else:
661                startup = self.local_seer_start
662            self.add_seer_node(topo, 'control', startup)
663        # If this is the seer master, add that node, too.
664        if seer_master:
665            self.add_seer_node(topo, 'seer-master', 
666                    "%s -R -n -R seer-master -R -A -R sink" % \
667                            self.seer_master_start)
668
669    def retrieve_software(self, topo, certfile, softdir):
670        """
671        Collect the software that nodes in the topology need loaded and stage
672        it locally.  This implies retrieving it from the experiment_controller
673        and placing it into softdir.  Certfile is used to prove that this node
674        has access to that data (it's the allocation/segment fedid).  Finally
675        local portal and federation software is also copied to the same staging
676        directory for simplicity - all software needed for experiment creation
677        is in softdir.
678        """
679        sw = set()
680        for e in topo.elements:
681            for s in getattr(e, 'software', []):
682                sw.add(s.location)
683        for s in sw:
684            self.log.debug("Retrieving %s" % s)
685            try:
686                get_url(s, certfile, softdir, log=self.log)
687            except:
688                t, v, st = sys.exc_info()
689                raise service_error(service_error.internal,
690                        "Error retrieving %s: %s" % (s, v))
691
692        # Copy local federation and portal node software to the tempdir
693        for s in (self.federation_software, self.portal_software):
694            for l, f in s:
695                base = os.path.basename(f)
696                copy_file(f, "%s/%s" % (softdir, base))
697
698
699    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
700        """
701        Gather common configuration files, retrieve or create an experiment
702        name and project name, and return the ssh_key filenames.  Create an
703        allocation log bound to the state log variable as well.
704        """
705        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
706                'seer_ca_pem', 'seer_node_pem', 'route.tgz')
707        ename = None
708        pubkey_base = None
709        secretkey_base = None
710        proj = None
711        user = None
712        alloc_log = None
713        nonce_experiment = False
714        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
715
716        self.state_lock.acquire()
717        if aid in self.allocation:
718            proj = self.allocation[aid].get('project', None)
719        self.state_lock.release()
720
721        if not proj:
722            raise service_error(service_error.internal, 
723                    "Can't find project for %s" %aid)
724
725        for a in attrs:
726            if a['attribute'] in configs:
727                try:
728                    self.log.debug("Retrieving %s from %s" % \
729                            (a['attribute'], a['value']))
730                    get_url(a['value'], certfile, tmpdir, log=self.log)
731                except:
732                    t, v, st = sys.exc_info()
733                    raise service_error(service_error.internal,
734                            "Error retrieving %s: %s" % (a.get('value', ""), v))
735            if a['attribute'] == 'ssh_pubkey':
736                pubkey_base = a['value'].rpartition('/')[2]
737            if a['attribute'] == 'ssh_secretkey':
738                secretkey_base = a['value'].rpartition('/')[2]
739            if a['attribute'] == 'experiment_name':
740                ename = a['value']
741
742        # Names longer than the emulab max are discarded
743        if ename and len(ename) <= self.max_name_len:
744            # Clean up the experiment name so that emulab will accept it.
745            ename = re.sub(vchars_re, '-', ename)
746
747        else:
748            ename = ""
749            for i in range(0,5):
750                ename += random.choice(string.ascii_letters)
751            nonce_experiment = True
752            self.log.warn("No experiment name or suggestion too long: " + \
753                    "picked one randomly: %s" % ename)
754
755        if not pubkey_base:
756            raise service_error(service_error.req, 
757                    "No public key attribute")
758
759        if not secretkey_base:
760            raise service_error(service_error.req, 
761                    "No secret key attribute")
762
763        self.state_lock.acquire()
764        if aid in self.allocation:
765            user = self.allocation[aid].get('user', None)
766            cert = self.allocation[aid].get('cert', None)
767            self.allocation[aid]['experiment'] = ename
768            self.allocation[aid]['nonce'] = nonce_experiment
769            self.allocation[aid]['log'] = [ ]
770            # Create a logger that logs to the experiment's state object as
771            # well as to the main log file.
772            alloc_log = logging.getLogger('fedd.access.%s' % ename)
773            h = logging.StreamHandler(
774                    list_log.list_log(self.allocation[aid]['log']))
775            # XXX: there should be a global one of these rather than
776            # repeating the code.
777            h.setFormatter(logging.Formatter(
778                "%(asctime)s %(name)s %(message)s",
779                        '%d %b %y %H:%M:%S'))
780            alloc_log.addHandler(h)
781            self.write_state()
782        self.state_lock.release()
783
784        if not user:
785            raise service_error(service_error.internal, 
786                    "Can't find creation user for %s" %aid)
787
788        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
789
790    def decorate_topology(self, info, t):
791        """
792        Copy the physical mapping and status onto the topology.  Used by
793        StartSegment and InfoSegment
794        """
795        def add_new(ann, attr):
796            for a in ann:
797                if a not in attr: attr.append(a)
798
799        def merge_os(os, e):
800            if len(e.os) == 0:
801                # No OS at all:
802                if os.get_attribute('emulab_access:image'):
803                    os.set_attribute('emulab_access:initial_image', 
804                            os.get_attribute('emulab_access:image'))
805                e.os = [ os ]
806            elif len(e.os) == 1:
807                # There's one OS, copy the initial image and replace
808                eos = e.os[0]
809                initial = eos.get_attribute('emulab_access:initial_image')
810                if initial:
811                    os.set_attribute('emulab_access:initial_image', initial)
812                e.os = [ os] 
813            else:
814                # Multiple OSes, replace or append
815                for eos in e.os:
816                    if os.name == eos.name:
817                        eos.version = os.version
818                        eos.version = os.distribution
819                        eos.version = os.distributionversion
820                        for a in os.attribute:
821                            if eos.get_attribute(a.attribute):
822                                eos.remove_attribute(a.attribute)
823                            eos.set_attribute(a.attribute, a.value)
824                        break
825                else:
826                    e.os.append(os)
827
828
829        if t is None: return
830        i = 0 # For fake debugging instantiation
831        # Copy the assigned names into the return topology
832        for e in t.elements:
833            if isinstance(e, topdl.Computer):
834                if not self.create_debug:
835                    if e.name in info.node:
836                        add_new(("%s%s" % 
837                            (info.node[e.name].pname, self.domain),),
838                            e.localname)
839                        add_new(("%s%s" % 
840                            (info.node[e.name].lname, self.domain),),
841                            e.localname)
842                        e.status = info.node[e.name].status
843                        os = info.node[e.name].getOS()
844                        if os: merge_os(os, e)
845                else:
846                    # Simple debugging assignment
847                    add_new(("node%d%s" % (i, self.domain),), e.localname)
848                    e.status = 'active'
849                    add_new(('testop1', 'testop2'), e.operation)
850                    i += 1
851
852        for s in t.substrates:
853            if s.name in info.subs:
854                sub = info.subs[s.name]
855                if sub.cap is not None:
856                    s.capacity = topdl.Capacity(sub.cap, 'max')
857                if sub.delay is not None:
858                    s.delay = topdl.Latency(sub.delay, 'max')
859        # XXX interfaces
860
861
862    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
863        """
864        Store key bits of experiment state in the global repository, including
865        the response that may need to be replayed, and return the response.
866        """
867        def get_localnames(t):
868            names = [ ]
869            for e in t.elements:
870                if isinstance(e, topdl.Computer):
871                    n = e.get_attribute('testbed')
872                    if n is not None and n not in names:
873                        names.append(n)
874            return names
875        i = 0
876        t = topo.clone()
877        self.decorate_topology(starter, t)
878        # Grab the log (this is some anal locking, but better safe than
879        # sorry)
880        self.state_lock.acquire()
881        # Put information about this testbed into the topdl
882        tb = topdl.Testbed(self.uri, "deter",
883                localname=get_localnames(t), 
884                attribute=[ 
885                    { 
886                        'attribute': 'project', 
887                        'value': self.allocation[aid]['project']
888                    },
889                    { 
890                        'attribute': 'experiment', 
891                        'value': self.allocation[aid]['experiment']
892                    }])
893        t.elements.append(tb)
894        logv = "".join(self.allocation[aid]['log'])
895        # It's possible that the StartSegment call gets retried (!).
896        # if the 'started' key is in the allocation, we'll return it rather
897        # than redo the setup.
898        self.allocation[aid]['started'] = { 
899                'allocID': alloc_id,
900                'allocationLog': logv,
901                'segmentdescription': { 
902                    'topdldescription': t.to_dict()
903                    },
904                'proof': proof.to_dict(),
905                }
906        self.allocation[aid]['topo'] = t
907        retval = copy.copy(self.allocation[aid]['started'])
908        self.write_state()
909        self.state_lock.release()
910        return retval
911   
912    # End of StartSegment support routines
913
914    def StartSegment(self, req, fid):
915        err = None  # Any service_error generated after tmpdir is created
916        rv = None   # Return value from segment creation
917
918        self.log.info("StartSegment called by %s" % fid)
919        try:
920            req = req['StartSegmentRequestBody']
921            auth_attr = req['allocID']['fedid']
922            topref = req['segmentdescription']['topdldescription']
923        except KeyError:
924            raise service_error(server_error.req, "Badly formed request")
925
926        connInfo = req.get('connection', [])
927        services = req.get('service', [])
928        aid = "%s" % auth_attr
929        attrs = req.get('fedAttr', [])
930
931        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
932                with_proof=True)
933        if not access_ok:
934            self.log.info("StartSegment for %s failed: access denied" % fid)
935            raise service_error(service_error.access, "Access denied")
936        else:
937            # See if this is a replay of an earlier succeeded StartSegment -
938            # sometimes SSL kills 'em.  If so, replay the response rather than
939            # redoing the allocation.
940            self.state_lock.acquire()
941            retval = self.allocation[aid].get('started', None)
942            self.state_lock.release()
943            if retval:
944                self.log.warning("Duplicate StartSegment for %s: " % aid + \
945                        "replaying response")
946                return retval
947
948        # A new request.  Do it.
949
950        if topref: topo = topdl.Topology(**topref)
951        else:
952            raise service_error(service_error.req, 
953                    "Request missing segmentdescription'")
954       
955        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
956        try:
957            tmpdir = tempfile.mkdtemp(prefix="access-")
958            softdir = "%s/software" % tmpdir
959            os.mkdir(softdir)
960        except EnvironmentError:
961            self.log.info("StartSegment for %s failed: internal error" % fid)
962            raise service_error(service_error.internal, "Cannot create tmp dir")
963
964        # Try block alllows us to clean up temporary files.
965        try:
966            self.retrieve_software(topo, certfile, softdir)
967            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
968                alloc_log =  self.initialize_experiment_info(attrs, aid, 
969                        certfile, tmpdir)
970            # A misconfigured cert in the ABAC map can be confusing...
971            if not os.access(xmlrpc_cert, os.R_OK):
972                self.log.error("Cannot open user's emulab SSL cert: %s" % \
973                        xmlrpc_cert)
974                raise service_error(service_error.internal,
975                        "Cannot open user's emulab SSL cert: %s" % xmlrpc_cert)
976
977
978            if '/' in proj: proj, gid = proj.split('/')
979            else: gid = None
980
981
982            # Set up userconf and seer if needed
983            self.configure_userconf(services, tmpdir)
984            self.configure_seer_services(services, topo, softdir)
985            # Get and send synch store variables
986            self.export_store_info(certfile, proj, ename, connInfo)
987            self.import_store_info(certfile, connInfo)
988
989            expfile = "%s/experiment.tcl" % tmpdir
990
991            self.generate_portal_configs(topo, pubkey_base, 
992                    secretkey_base, tmpdir, proj, ename, connInfo, services)
993            self.generate_ns2(topo, expfile, 
994                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
995
996            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
997                    debug=self.create_debug, log=alloc_log, boss=self.boss,
998                    ops=self.ops, cert=xmlrpc_cert)
999            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
1000        except service_error, e:
1001            self.log.info("StartSegment for %s failed: %s"  % (fid, e))
1002            err = e
1003        except:
1004            t, v, st = sys.exc_info()
1005            self.log.info("StartSegment for %s failed:unexpected error: %s" \
1006                    % (fid, traceback.extract_tb(st)))
1007            err = service_error(service_error.internal, "%s: %s" % \
1008                    (v, traceback.extract_tb(st)))
1009
1010        # Walk up tmpdir, deleting as we go
1011        if self.cleanup: self.remove_dirs(tmpdir)
1012        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1013
1014        if rv:
1015            self.log.info("StartSegment for %s succeeded" % fid)
1016            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1017                    proof)
1018        elif err:
1019            raise service_error(service_error.federant,
1020                    "Swapin failed: %s" % err)
1021        else:
1022            raise service_error(service_error.federant, "Swapin failed")
1023
1024    def TerminateSegment(self, req, fid):
1025        self.log.info("TerminateSegment called by %s" % fid)
1026        try:
1027            req = req['TerminateSegmentRequestBody']
1028        except KeyError:
1029            raise service_error(server_error.req, "Badly formed request")
1030
1031        auth_attr = req['allocID']['fedid']
1032        aid = "%s" % auth_attr
1033        attrs = req.get('fedAttr', [])
1034
1035        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1036                with_proof=True)
1037        if not access_ok:
1038            raise service_error(service_error.access, "Access denied")
1039
1040        self.state_lock.acquire()
1041        if aid in self.allocation:
1042            proj = self.allocation[aid].get('project', None)
1043            user = self.allocation[aid].get('user', None)
1044            xmlrpc_cert = self.allocation[aid].get('cert', None)
1045            ename = self.allocation[aid].get('experiment', None)
1046            nonce = self.allocation[aid].get('nonce', False)
1047        else:
1048            proj = None
1049            user = None
1050            ename = None
1051            nonce = False
1052            xmlrpc_cert = None
1053        self.state_lock.release()
1054
1055        if not proj:
1056            self.log.info("TerminateSegment failed for %s: cannot find project"\
1057                    % fid)
1058            raise service_error(service_error.internal, 
1059                    "Can't find project for %s" % aid)
1060        else:
1061            if '/' in proj: proj, gid = proj.split('/')
1062            else: gid = None
1063
1064        if not user:
1065            self.log.info("TerminateSegment failed for %s: cannot find user"\
1066                    % fid)
1067            raise service_error(service_error.internal, 
1068                    "Can't find creation user for %s" % aid)
1069        if not ename:
1070            self.log.info(
1071                    "TerminateSegment failed for %s: cannot find experiment"\
1072                    % fid)
1073            raise service_error(service_error.internal, 
1074                    "Can't find experiment name for %s" % aid)
1075       
1076        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1077                debug=self.create_debug, boss=self.boss, ops=self.ops,
1078                cert=xmlrpc_cert)
1079        stopper(self, user, proj, ename, gid, nonce)
1080        self.log.info("TerminateSegment succeeded for %s %s %s" % \
1081                (fid, proj, ename))
1082        self.state_lock.acquire()
1083        # Remove the started flag/info - the segment is no longer started
1084        if aid in self.allocation:
1085            if 'started' in self.allocation[aid]:
1086                del self.allocation[aid]['started']
1087            self.write_state()
1088        self.state_lock.release()
1089        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1090
1091    def InfoSegment(self, req, fid):
1092        self.log.info("InfoSegment called by %s" % fid)
1093        try:
1094            req = req['InfoSegmentRequestBody']
1095        except KeyError:
1096            raise service_error(server_error.req, "Badly formed request")
1097
1098        auth_attr = req['allocID']['fedid']
1099        aid = "%s" % auth_attr
1100
1101        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1102                with_proof=True)
1103        if not access_ok:
1104            raise service_error(service_error.access, "Access denied")
1105
1106        self.state_lock.acquire()
1107        if aid in self.allocation:
1108            topo = self.allocation[aid].get('topo', None)
1109            if topo: topo = topo.clone()
1110            proj = self.allocation[aid].get('project', None)
1111            user = self.allocation[aid].get('user', None)
1112            xmlrpc_cert = self.allocation[aid].get('cert', None)
1113            ename = self.allocation[aid].get('experiment', None)
1114        else:
1115            proj = None
1116            user = None
1117            ename = None
1118            topo = None
1119            xmlrpc_cert = None
1120        self.state_lock.release()
1121
1122        if not proj:
1123            self.log.info("InfoSegment failed for %s: cannot find project"% fid)
1124            raise service_error(service_error.internal, 
1125                    "Can't find project for %s" % aid)
1126        else:
1127            if '/' in proj: proj, gid = proj.split('/')
1128            else: gid = None
1129
1130        if not user:
1131            self.log.info("InfoSegment failed for %s: cannot find user"% fid)
1132            raise service_error(service_error.internal, 
1133                    "Can't find creation user for %s" % aid)
1134        if not ename:
1135            self.log.info("InfoSegment failed for %s: cannot find exp"% fid)
1136            raise service_error(service_error.internal, 
1137                    "Can't find experiment name for %s" % aid)
1138        info = self.info_segment(keyfile=self.ssh_privkey_file,
1139                debug=self.create_debug, boss=self.boss, ops=self.ops,
1140                cert=xmlrpc_cert)
1141        info(self, user, proj, ename)
1142        self.log.info("InfoSegment gathered info for %s %s %s %s" % \
1143                (fid, user, proj, ename))
1144        self.decorate_topology(info, topo)
1145        self.state_lock.acquire()
1146        if aid in self.allocation:
1147            self.allocation[aid]['topo'] = topo
1148            self.write_state()
1149        self.state_lock.release()
1150        self.log.info("InfoSegment updated info for %s %s %s %s" % \
1151                (fid, user, proj, ename))
1152
1153        rv = { 
1154                'allocID': req['allocID'], 
1155                'proof': proof.to_dict(),
1156                }
1157        self.log.info("InfoSegment succeeded info for %s %s %s %s" % \
1158                (fid, user, proj, ename))
1159        if topo:
1160            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1161        return rv
1162
1163    def OperationSegment(self, req, fid):
1164        def get_pname(e):
1165            """
1166            Get the physical name of a node
1167            """
1168            if e.localname:
1169                return re.sub('\..*','', e.localname[0])
1170            else:
1171                return None
1172
1173        self.log.info("OperationSegment called by %s" % fid)
1174        try:
1175            req = req['OperationSegmentRequestBody']
1176        except KeyError:
1177            raise service_error(server_error.req, "Badly formed request")
1178
1179        auth_attr = req['allocID']['fedid']
1180        aid = "%s" % auth_attr
1181
1182        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1183                with_proof=True)
1184        if not access_ok:
1185            self.log.info("OperationSegment failed for %s: access denied" % fid)
1186            raise service_error(service_error.access, "Access denied")
1187
1188        op = req.get('operation', None)
1189        targets = req.get('target', None)
1190        params = req.get('parameter', None)
1191
1192        if op is None :
1193            self.log.info("OperationSegment failed for %s: no operation" % fid)
1194            raise service_error(service_error.req, "missing operation")
1195        elif targets is None:
1196            self.log.info("OperationSegment failed for %s: no targets" % fid)
1197            raise service_error(service_error.req, "no targets")
1198
1199        self.state_lock.acquire()
1200        if aid in self.allocation:
1201            topo = self.allocation[aid].get('topo', None)
1202            if topo: topo = topo.clone()
1203            xmlrpc_cert = self.allocation[aid].get('cert', None)
1204        else:
1205            topo = None
1206            xmlrpc_cert = None
1207        self.state_lock.release()
1208
1209        targets = copy.copy(targets)
1210        ptargets = { }
1211        for e in topo.elements:
1212            if isinstance(e, topdl.Computer):
1213                if e.name in targets:
1214                    targets.remove(e.name)
1215                    pn = get_pname(e)
1216                    if pn: ptargets[e.name] = pn
1217
1218        status = [ operation_status(t, operation_status.no_target) \
1219                for t in targets]
1220
1221        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1222                debug=self.create_debug, boss=self.boss, ops=self.ops,
1223                cert=xmlrpc_cert)
1224        ops(self, op, ptargets, params, topo)
1225        self.log.info("OperationSegment operated for %s" % fid)
1226       
1227        status.extend(ops.status)
1228        self.log.info("OperationSegment succeed for %s" % fid)
1229
1230        return { 
1231                'allocID': req['allocID'], 
1232                'status': [s.to_dict() for s in status],
1233                'proof': proof.to_dict(),
1234                }
Note: See TracBrowser for help on using the repository browser.