source: fedd/federation/emulab_access.py @ 4ffa6f8

compt_changes
Last change on this file since 4ffa6f8 was 4ffa6f8, checked in by Ted Faber <faber@…>, 12 years ago

Add support for nat_portal parameter. Remove old half-assed active
endpoints

  • Property mode set to 100644
File size: 38.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13import socket
14
15from threading import *
16from M2Crypto.SSL import SSLError
17
18from access import access_base
19
20from util import *
21from deter import fedid, generate_fedid
22from authorizer import authorizer, abac_authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25from proof import proof as access_proof
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31from deter import topdl
32import list_log
33import emulab_segment
34
35
36# Make log messages disappear if noone configures a fedd logger
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.access")
41fl.addHandler(nullHandler())
42
43class access(access_base):
44    """
45    The implementation of access control based on mapping users to projects.
46
47    Users can be mapped to existing projects or have projects created
48    dynamically.  This implements both direct requests and proxies.
49    """
50
51    max_name_len = 19
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        access_base.__init__(self, config, auth)
59
60        self.max_name_len = access.max_name_len
61
62        self.allow_proxy = config.getboolean("access", "allow_proxy")
63
64        self.domain = config.get("access", "domain")
65        self.userconfdir = config.get("access","userconfdir")
66        self.userconfcmd = config.get("access","userconfcmd")
67        self.userconfurl = config.get("access","userconfurl")
68        self.federation_software = config.get("access", "federation_software")
69        self.portal_software = config.get("access", "portal_software")
70        self.local_seer_software = config.get("access", "local_seer_software")
71        self.local_seer_image = config.get("access", "local_seer_image")
72        self.local_seer_start = config.get("access", "local_seer_start")
73        self.seer_master_start = config.get("access", "seer_master_start")
74        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
75        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
76        self.ssh_port = config.get("access","ssh_port") or "22"
77        self.boss = config.get("access", "boss")
78        self.ops = config.get("access", "ops")
79        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
80        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
81
82        self.dragon_endpoint = config.get("access", "dragon")
83        self.dragon_vlans = config.get("access", "dragon_vlans")
84        self.deter_internal = config.get("access", "deter_internal")
85
86        self.tunnel_config = config.getboolean("access", "tunnel_config")
87        self.portal_command = config.get("access", "portal_command")
88        self.portal_image = config.get("access", "portal_image")
89        self.portal_type = config.get("access", "portal_type") or "pc"
90        self.portal_startcommand = config.get("access", "portal_startcommand")
91        self.node_startcommand = config.get("access", "node_startcommand")
92        self.nat_portal = config.get("access", "nat_portal")
93
94        self.uri = 'https://%s:%d' % (socket.getfqdn(), 
95                self.get_port(config.get("globals", "services", "23235")))
96
97        self.federation_software = self.software_list(self.federation_software)
98        self.portal_software = self.software_list(self.portal_software)
99        self.local_seer_software = self.software_list(self.local_seer_software)
100
101        self.access_type = self.access_type.lower()
102        self.start_segment = emulab_segment.start_segment
103        self.stop_segment = emulab_segment.stop_segment
104        self.info_segment = emulab_segment.info_segment
105        self.operation_segment = emulab_segment.operation_segment
106
107        self.restricted = [ ]
108        tb = config.get('access', 'testbed')
109        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
110        else: self.testbed = [ ]
111
112        # authorization information
113        self.auth_type = config.get('access', 'auth_type') \
114                or 'abac'
115        self.auth_dir = config.get('access', 'auth_dir')
116        accessdb = config.get("access", "accessdb")
117        # initialize the authorization system
118        if self.auth_type == 'abac':
119            self.auth = abac_authorizer(load=self.auth_dir)
120            self.access = [ ]
121            if accessdb:
122                self.read_access(accessdb, self.access_tuple)
123        else:
124            raise service_error(service_error.internal, 
125                    "Unknown auth_type: %s" % self.auth_type)
126
127        # read_state in the base_class
128        self.state_lock.acquire()
129        if 'allocation' not in self.state: self.state['allocation']= { }
130        self.allocation = self.state['allocation']
131        self.state_lock.release()
132        self.exports = {
133                'SMB': self.export_SMB,
134                'seer': self.export_seer,
135                'tmcd': self.export_tmcd,
136                'userconfig': self.export_userconfig,
137                'project_export': self.export_project_export,
138                'local_seer_control': self.export_local_seer,
139                'seer_master': self.export_seer_master,
140                'hide_hosts': self.export_hide_hosts,
141                }
142
143        if not self.local_seer_image or not self.local_seer_software or \
144                not self.local_seer_start:
145            if 'local_seer_control' in self.exports:
146                del self.exports['local_seer_control']
147
148        if not self.local_seer_image or not self.local_seer_software or \
149                not self.seer_master_start:
150            if 'seer_master' in self.exports:
151                del self.exports['seer_master']
152
153
154        self.soap_services = {\
155            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
156            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
157            'StartSegment': soap_handler("StartSegment", self.StartSegment),
158            'TerminateSegment': soap_handler("TerminateSegment",
159                self.TerminateSegment),
160            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
161            'OperationSegment': soap_handler("OperationSegment",
162                self.OperationSegment),
163            }
164        self.xmlrpc_services =  {\
165            'RequestAccess': xmlrpc_handler('RequestAccess',
166                self.RequestAccess),
167            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
168                self.ReleaseAccess),
169            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
170            'TerminateSegment': xmlrpc_handler('TerminateSegment',
171                self.TerminateSegment),
172            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
173            'OperationSegment': xmlrpc_handler("OperationSegment",
174                self.OperationSegment),
175            }
176
177        self.call_SetValue = service_caller('SetValue', log=self.log)
178        self.call_GetValue = service_caller('GetValue', log=self.log)
179
180    @staticmethod
181    def get_port(ps):
182        '''
183        Take a fedd service string and return the first port.  Used in
184        creating the testbed uri identifier.
185        '''
186        p = ps.split(',')
187        smallport = p[0].split(':')
188        try:
189            rv = int(smallport[0])
190        except ValueError:
191            rv = 23235
192        return rv
193
194    @staticmethod
195    def access_tuple(str):
196        """
197        Convert a string of the form (id, id, id) into an access_project.  This
198        is called by read_access to convert to local attributes.  It returns a
199        tuple of the form (project, user, certificate_file).
200        """
201
202        str = str.strip()
203        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
204            # The slice takes the parens off the string.
205            proj, user, cert = str[1:-1].split(',')
206            return (proj.strip(), user.strip(), cert.strip())
207        else:
208            raise self.parse_error(
209                    'Bad mapping (unbalanced parens or more than 2 commas)')
210
211    # RequestAccess support routines
212
213    def save_project_state(self, aid, pname, uname, certf, owners):
214        """
215        Save the project, user, and owners associated with this allocation.
216        This creates the allocation entry.
217        """
218        self.state_lock.acquire()
219        self.allocation[aid] = { }
220        self.allocation[aid]['project'] = pname
221        self.allocation[aid]['user'] = uname
222        self.allocation[aid]['cert'] = certf
223        self.allocation[aid]['owners'] = owners
224        self.write_state()
225        self.state_lock.release()
226        return (pname, uname)
227
228    # End of RequestAccess support routines
229
230    def RequestAccess(self, req, fid):
231        """
232        Handle the access request.  Proxy if not for us.
233
234        Parse out the fields and make the allocations or rejections if for us,
235        otherwise, assuming we're willing to proxy, proxy the request out.
236        """
237
238        def gateway_hardware(h):
239            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
240            else: return h
241
242        def get_export_project(svcs):
243            """
244            if the service requests includes one to export a project, return
245            that project.
246            """
247            rv = None
248            for s in svcs:
249                if s.get('name', '') == 'project_export' and \
250                        s.get('visibility', '') == 'export':
251                    if not rv: 
252                        for a in s.get('fedAttr', []):
253                            if a.get('attribute', '') == 'project' \
254                                    and 'value' in a:
255                                rv = a['value']
256                    else:
257                        raise service_error(service_error, access, 
258                                'Requesting multiple project exports is ' + \
259                                        'not supported');
260            return rv
261
262        self.log.info("RequestAccess called by %s" % fid)
263        # The dance to get into the request body
264        if req.has_key('RequestAccessRequestBody'):
265            req = req['RequestAccessRequestBody']
266        else:
267            raise service_error(service_error.req, "No request!?")
268
269        # if this includes a project export request, construct a filter such
270        # that only the ABAC attributes mapped to that project are checked for
271        # access.
272        if 'service' in req:
273            ep = get_export_project(req['service'])
274            if ep: pf = lambda(a): a.value[0] == ep
275            else: pf = None
276        else:
277            ep = None
278            pf = None
279
280        if self.auth.import_credentials(
281                data_list=req.get('abac_credential', [])):
282            self.auth.save()
283        else:
284            self.log.debug('failed to import incoming credentials')
285
286        if self.auth_type == 'abac':
287            found, owners, proof = self.lookup_access(req, fid, filter=pf)
288        else:
289            raise service_error(service_error.internal, 
290                    'Unknown auth_type: %s' % self.auth_type)
291        ap = None
292
293        # keep track of what's been added
294        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
295        aid = unicode(allocID)
296
297        pname, uname = self.save_project_state(aid, found[0], found[1], 
298                found[2], owners)
299
300        services, svc_state = self.export_services(req.get('service',[]),
301                pname, uname)
302        self.state_lock.acquire()
303        # Store services state in global state
304        for k, v in svc_state.items():
305            self.allocation[aid][k] = v
306        self.append_allocation_authorization(aid, 
307                set([(o, allocID) for o in owners]), state_attr='allocation')
308        self.write_state()
309        self.state_lock.release()
310        try:
311            f = open("%s/%s.pem" % (self.certdir, aid), "w")
312            print >>f, alloc_cert
313            f.close()
314        except EnvironmentError, e:
315            self.log.info("RequestAccess failed for by %s: internal error" \
316                    % fid)
317            raise service_error(service_error.internal, 
318                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
319        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
320        resp = self.build_access_response({ 'fedid': allocID } ,
321                pname, services, proof)
322        return resp
323
324    def ReleaseAccess(self, req, fid):
325        self.log.info("ReleaseAccess called by %s" % fid)
326        # The dance to get into the request body
327        if req.has_key('ReleaseAccessRequestBody'):
328            req = req['ReleaseAccessRequestBody']
329        else:
330            raise service_error(service_error.req, "No request!?")
331
332        try:
333            if req['allocID'].has_key('localname'):
334                auth_attr = aid = req['allocID']['localname']
335            elif req['allocID'].has_key('fedid'):
336                aid = unicode(req['allocID']['fedid'])
337                auth_attr = req['allocID']['fedid']
338            else:
339                raise service_error(service_error.req,
340                        "Only localnames and fedids are understood")
341        except KeyError:
342            raise service_error(service_error.req, "Badly formed request")
343
344        self.log.debug("[access] deallocation requested for %s by %s" % \
345                (aid, fid))
346        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
347                with_proof=True)
348        if not access_ok:
349            self.log.debug("[access] deallocation denied for %s", aid)
350            raise service_error(service_error.access, "Access Denied")
351
352        self.state_lock.acquire()
353        if aid in self.allocation:
354            self.log.debug("Found allocation for %s" %aid)
355            self.clear_allocation_authorization(aid, state_attr='allocation')
356            del self.allocation[aid]
357            self.write_state()
358            self.state_lock.release()
359            # Remove the access cert
360            cf = "%s/%s.pem" % (self.certdir, aid)
361            self.log.debug("Removing %s" % cf)
362            os.remove(cf)
363            self.log.info("ReleaseAccess succeeded for %s" % fid)
364            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
365        else:
366            self.state_lock.release()
367            raise service_error(service_error.req, "No such allocation")
368
369    # These are subroutines for StartSegment
370    def generate_ns2(self, topo, expfn, softdir, connInfo):
371        """
372        Convert topo into an ns2 file, decorated with appropriate commands for
373        the particular testbed setup.  Convert all requests for software, etc
374        to point at the staged copies on this testbed and add the federation
375        startcommands.
376        """
377        class dragon_commands:
378            """
379            Functor to spit out approrpiate dragon commands for nodes listed in
380            the connectivity description.  The constructor makes a dict mapping
381            dragon nodes to their parameters and the __call__ checks each
382            element in turn for membership.
383            """
384            def __init__(self, map):
385                self.node_info = map
386
387            def __call__(self, e):
388                s = ""
389                if isinstance(e, topdl.Computer):
390                    if self.node_info.has_key(e.name):
391                        info = self.node_info[e.name]
392                        for ifname, vlan, type in info:
393                            for i in e.interface:
394                                if i.name == ifname:
395                                    addr = i.get_attribute('ip4_address')
396                                    subs = i.substrate[0]
397                                    break
398                            else:
399                                raise service_error(service_error.internal,
400                                        "No interface %s on element %s" % \
401                                                (ifname, e.name))
402                            # XXX: do netmask right
403                            if type =='link':
404                                s = ("tb-allow-external ${%s} " + \
405                                        "dragonportal ip %s vlan %s " + \
406                                        "netmask 255.255.255.0\n") % \
407                                        (topdl.to_tcl_name(e.name), addr, vlan)
408                            elif type =='lan':
409                                s = ("tb-allow-external ${%s} " + \
410                                        "dragonportal " + \
411                                        "ip %s vlan %s usurp %s\n") % \
412                                        (topdl.to_tcl_name(e.name), addr, 
413                                                vlan, subs)
414                            else:
415                                raise service_error(service_error_internal,
416                                        "Unknown DRAGON type %s" % type)
417                return s
418
419        class not_dragon:
420            """
421            Return true if a node is in the given map of dragon nodes.
422            """
423            def __init__(self, map):
424                self.nodes = set(map.keys())
425
426            def __call__(self, e):
427                return e.name not in self.nodes
428
429        def have_portals(top):
430            '''
431            Return true if the topology has a portal node
432            '''
433            # The else is on the for
434            for e in top.elements:
435                if isinstance(e, topdl.Computer) and e.get_attribute('portal'):
436                    return True
437            else:
438                return False
439
440
441        # Main line of generate_ns2
442        t = topo.clone()
443
444        # Create the map of nodes that need direct connections (dragon
445        # connections) from the connInfo
446        dragon_map = { }
447        for i in [ i for i in connInfo if i['type'] == 'transit']:
448            for a in i.get('fedAttr', []):
449                if a['attribute'] == 'vlan_id':
450                    vlan = a['value']
451                    break
452            else:
453                raise service_error(service_error.internal, "No vlan tag")
454            members = i.get('member', [])
455            if len(members) > 1: type = 'lan'
456            else: type = 'link'
457
458            try:
459                for m in members:
460                    if m['element'] in dragon_map:
461                        dragon_map[m['element']].append(( m['interface'], 
462                            vlan, type))
463                    else:
464                        dragon_map[m['element']] = [( m['interface'], 
465                            vlan, type),]
466            except KeyError:
467                raise service_error(service_error.req,
468                        "Missing connectivity info")
469
470        # Weed out the things we aren't going to instantiate: Segments, portal
471        # substrates, and portal interfaces.  (The copy in the for loop allows
472        # us to delete from e.elements in side the for loop).  While we're
473        # touching all the elements, we also adjust paths from the original
474        # testbed to local testbed paths and put the federation commands into
475        # the start commands
476        local = len(dragon_map) == 0 and not have_portals(t)
477        if local: routing = 'Static'
478        else: routing = 'Manual'
479
480        if local:
481            self.log.debug("Local experiment.")
482        for e in [e for e in t.elements]:
483            if isinstance(e, topdl.Segment):
484                t.elements.remove(e)
485            if isinstance(e, topdl.Computer):
486                self.add_kit(e, self.federation_software)
487                if e.get_attribute('portal') and self.portal_startcommand:
488                    # Add local portal support software
489                    self.add_kit(e, self.portal_software)
490                    # Portals never have a user-specified start command
491                    e.set_attribute('startup', self.portal_startcommand)
492                elif not local and self.node_startcommand:
493                    if e.get_attribute('startup'):
494                        e.set_attribute('startup', "%s \\$USER '%s'" % \
495                                (self.node_startcommand, 
496                                    e.get_attribute('startup')))
497                    else:
498                        e.set_attribute('startup', self.node_startcommand)
499
500                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
501                # Remove portal interfaces that do not connect to DRAGON
502                e.interface = [i for i in e.interface \
503                        if not i.get_attribute('portal') or i.name in dinf ]
504            # Fix software paths
505            for s in getattr(e, 'software', []):
506                s.location = re.sub("^.*/", softdir, s.location)
507
508        t.substrates = [ s.clone() for s in t.substrates ]
509        t.incorporate_elements()
510
511        # Customize the ns2 output for local portal commands and images
512        filters = []
513
514        if self.dragon_endpoint:
515            add_filter = not_dragon(dragon_map)
516            filters.append(dragon_commands(dragon_map))
517        else:
518            add_filter = None
519
520        if self.portal_command:
521            filters.append(topdl.generate_portal_command_filter(
522                self.portal_command, add_filter=add_filter))
523
524        if self.portal_image:
525            filters.append(topdl.generate_portal_image_filter(
526                self.portal_image))
527
528        if self.portal_type:
529            filters.append(topdl.generate_portal_hardware_filter(
530                self.portal_type))
531
532        # Convert to ns and write it out
533        expfile = topdl.topology_to_ns2(t, filters, routing=routing)
534        try:
535            f = open(expfn, "w")
536            print >>f, expfile
537            f.close()
538        except EnvironmentError:
539            raise service_error(service_error.internal,
540                    "Cannot write experiment file %s: %s" % (expfn,e))
541
542    def export_store_info(self, cf, proj, ename, connInfo):
543        """
544        For the export requests in the connection info, install the peer names
545        at the experiment controller via SetValue calls.
546        """
547
548        for c in connInfo:
549            for p in [ p for p in c.get('parameter', []) \
550                    if p.get('type', '') == 'output']:
551
552                if p.get('name', '') == 'peer':
553                    k = p.get('key', None)
554                    surl = p.get('store', None)
555                    if surl :
556                        if self.nat_portal:
557                            value = self.nat_portal
558                        elif k and k.index('/') != -1:
559                            value = "%s.%s.%s%s" % \
560                                (k[k.index('/')+1:], ename, proj, self.domain)
561                        else: 
562                            self.log.error("Bad export request: %s" % p)
563                            continue
564                        self.log.debug("Setting %s to %s on %s" % \
565                                (k, value, surl))
566                        req = { 'name': k, 'value': value }
567                        self.call_SetValue(surl, req, cf)
568                    else:
569                        self.log.error("Bad export request: %s" % p)
570                elif p.get('name', '') == 'ssh_port':
571                    k = p.get('key', None)
572                    surl = p.get('store', None)
573                    if surl and k:
574                        req = { 'name': k, 'value': self.ssh_port }
575                        self.log.debug("Setting %s to %s on %s" % \
576                                (k, self.ssh_port, surl))
577                        self.call_SetValue(surl, req, cf)
578                    else:
579                        self.log.error("Bad export request: %s" % p)
580                else:
581                    self.log.error("Unknown export parameter: %s" % \
582                            p.get('name'))
583                    continue
584
585    def add_seer_node(self, topo, name, startup):
586        """
587        Add a seer node to the given topology, with the startup command passed
588        in.  Used by configure seer_services.
589        """
590        c_node = topdl.Computer(
591                name=name, 
592                os= topdl.OperatingSystem(
593                    attribute=[
594                    { 'attribute': 'osid', 
595                        'value': self.local_seer_image },
596                    ]),
597                attribute=[
598                    { 'attribute': 'startup', 'value': startup },
599                    ]
600                )
601        self.add_kit(c_node, self.local_seer_software)
602        topo.elements.append(c_node)
603
604    def configure_seer_services(self, services, topo, softdir):
605        """
606        Make changes to the topology required for the seer requests being made.
607        Specifically, add any control or master nodes required and set up the
608        start commands on the nodes to interconnect them.
609        """
610        local_seer = False      # True if we need to add a control node
611        collect_seer = False    # True if there is a seer-master node
612        seer_master= False      # True if we need to add the seer-master
613        for s in services:
614            s_name = s.get('name', '')
615            s_vis = s.get('visibility','')
616
617            if s_name  == 'local_seer_control' and s_vis == 'export':
618                local_seer = True
619            elif s_name == 'seer_master':
620                if s_vis == 'import':
621                    collect_seer = True
622                elif s_vis == 'export':
623                    seer_master = True
624       
625        # We've got the whole picture now, so add nodes if needed and configure
626        # them to interconnect properly.
627        if local_seer or seer_master:
628            # Copy local seer control node software to the tempdir
629            for l, f in self.local_seer_software:
630                base = os.path.basename(f)
631                copy_file(f, "%s/%s" % (softdir, base))
632        # If we're collecting seers somewhere the controllers need to talk to
633        # the master.  In testbeds that export the service, that will be a
634        # local node that we'll add below.  Elsewhere it will be the control
635        # portal that will port forward to the exporting master.
636        if local_seer:
637            if collect_seer:
638                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
639            else:
640                startup = self.local_seer_start
641            self.add_seer_node(topo, 'control', startup)
642        # If this is the seer master, add that node, too.
643        if seer_master:
644            self.add_seer_node(topo, 'seer-master', 
645                    "%s -R -n -R seer-master -R -A -R sink" % \
646                            self.seer_master_start)
647
648    def retrieve_software(self, topo, certfile, softdir):
649        """
650        Collect the software that nodes in the topology need loaded and stage
651        it locally.  This implies retrieving it from the experiment_controller
652        and placing it into softdir.  Certfile is used to prove that this node
653        has access to that data (it's the allocation/segment fedid).  Finally
654        local portal and federation software is also copied to the same staging
655        directory for simplicity - all software needed for experiment creation
656        is in softdir.
657        """
658        sw = set()
659        for e in topo.elements:
660            for s in getattr(e, 'software', []):
661                sw.add(s.location)
662        for s in sw:
663            self.log.debug("Retrieving %s" % s)
664            try:
665                get_url(s, certfile, softdir, log=self.log)
666            except:
667                t, v, st = sys.exc_info()
668                raise service_error(service_error.internal,
669                        "Error retrieving %s: %s" % (s, v))
670
671        # Copy local federation and portal node software to the tempdir
672        for s in (self.federation_software, self.portal_software):
673            for l, f in s:
674                base = os.path.basename(f)
675                copy_file(f, "%s/%s" % (softdir, base))
676
677
678    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
679        """
680        Gather common configuration files, retrieve or create an experiment
681        name and project name, and return the ssh_key filenames.  Create an
682        allocation log bound to the state log variable as well.
683        """
684        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
685                'seer_ca_pem', 'seer_node_pem')
686        ename = None
687        pubkey_base = None
688        secretkey_base = None
689        proj = None
690        user = None
691        alloc_log = None
692        nonce_experiment = False
693        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
694
695        self.state_lock.acquire()
696        if aid in self.allocation:
697            proj = self.allocation[aid].get('project', None)
698        self.state_lock.release()
699
700        if not proj:
701            raise service_error(service_error.internal, 
702                    "Can't find project for %s" %aid)
703
704        for a in attrs:
705            if a['attribute'] in configs:
706                try:
707                    self.log.debug("Retrieving %s from %s" % \
708                            (a['attribute'], a['value']))
709                    get_url(a['value'], certfile, tmpdir, log=self.log)
710                except:
711                    t, v, st = sys.exc_info()
712                    raise service_error(service_error.internal,
713                            "Error retrieving %s: %s" % (a.get('value', ""), v))
714            if a['attribute'] == 'ssh_pubkey':
715                pubkey_base = a['value'].rpartition('/')[2]
716            if a['attribute'] == 'ssh_secretkey':
717                secretkey_base = a['value'].rpartition('/')[2]
718            if a['attribute'] == 'experiment_name':
719                ename = a['value']
720
721        # Names longer than the emulab max are discarded
722        if ename and len(ename) <= self.max_name_len:
723            # Clean up the experiment name so that emulab will accept it.
724            ename = re.sub(vchars_re, '-', ename)
725
726        else:
727            ename = ""
728            for i in range(0,5):
729                ename += random.choice(string.ascii_letters)
730            nonce_experiment = True
731            self.log.warn("No experiment name or suggestion too long: " + \
732                    "picked one randomly: %s" % ename)
733
734        if not pubkey_base:
735            raise service_error(service_error.req, 
736                    "No public key attribute")
737
738        if not secretkey_base:
739            raise service_error(service_error.req, 
740                    "No secret key attribute")
741
742        self.state_lock.acquire()
743        if aid in self.allocation:
744            user = self.allocation[aid].get('user', None)
745            cert = self.allocation[aid].get('cert', None)
746            self.allocation[aid]['experiment'] = ename
747            self.allocation[aid]['nonce'] = nonce_experiment
748            self.allocation[aid]['log'] = [ ]
749            # Create a logger that logs to the experiment's state object as
750            # well as to the main log file.
751            alloc_log = logging.getLogger('fedd.access.%s' % ename)
752            h = logging.StreamHandler(
753                    list_log.list_log(self.allocation[aid]['log']))
754            # XXX: there should be a global one of these rather than
755            # repeating the code.
756            h.setFormatter(logging.Formatter(
757                "%(asctime)s %(name)s %(message)s",
758                        '%d %b %y %H:%M:%S'))
759            alloc_log.addHandler(h)
760            self.write_state()
761        self.state_lock.release()
762
763        if not user:
764            raise service_error(service_error.internal, 
765                    "Can't find creation user for %s" %aid)
766
767        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
768
769    def decorate_topology(self, info, t):
770        """
771        Copy the physical mapping and status onto the topology.  Used by
772        StartSegment and InfoSegment
773        """
774        def add_new(ann, attr):
775            for a in ann:
776                if a not in attr: attr.append(a)
777
778        def merge_os(os, e):
779            if len(e.os) == 0:
780                # No OS at all:
781                if os.get_attribute('emulab_access:image'):
782                    os.set_attribute('emulab_access:initial_image', 
783                            os.get_attribute('emulab_access:image'))
784                e.os = [ os ]
785            elif len(e.os) == 1:
786                # There's one OS, copy the initial image and replace
787                eos = e.os[0]
788                initial = eos.get_attribute('emulab_access:initial_image')
789                if initial:
790                    os.set_attribute('emulab_access:initial_image', initial)
791                e.os = [ os] 
792            else:
793                # Multiple OSes, replace or append
794                for eos in e.os:
795                    if os.name == eos.name:
796                        eos.version = os.version
797                        eos.version = os.distribution
798                        eos.version = os.distributionversion
799                        for a in os.attribute:
800                            if eos.get_attribute(a.attribute):
801                                eos.remove_attribute(a.attribute)
802                            eos.set_attribute(a.attribute, a.value)
803                        break
804                else:
805                    e.os.append(os)
806
807
808        if t is None: return
809        i = 0 # For fake debugging instantiation
810        # Copy the assigned names into the return topology
811        for e in t.elements:
812            if isinstance(e, topdl.Computer):
813                if not self.create_debug:
814                    if e.name in info.node:
815                        add_new(("%s%s" % 
816                            (info.node[e.name].pname, self.domain),),
817                            e.localname)
818                        add_new(("%s%s" % 
819                            (info.node[e.name].lname, self.domain),),
820                            e.localname)
821                        e.status = info.node[e.name].status
822                        os = info.node[e.name].getOS()
823                        if os: merge_os(os, e)
824                else:
825                    # Simple debugging assignment
826                    add_new(("node%d%s" % (i, self.domain),), e.localname)
827                    e.status = 'active'
828                    add_new(('testop1', 'testop2'), e.operation)
829                    i += 1
830
831        for s in t.substrates:
832            if s.name in info.subs:
833                sub = info.subs[s.name]
834                if sub.cap is not None:
835                    s.capacity = topdl.Capacity(sub.cap, 'max')
836                if sub.delay is not None:
837                    s.delay = topdl.Latency(sub.delay, 'max')
838        # XXX interfaces
839
840
841    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
842        """
843        Store key bits of experiment state in the global repository, including
844        the response that may need to be replayed, and return the response.
845        """
846        def get_localnames(t):
847            names = [ ]
848            for e in t.elements:
849                if isinstance(e, topdl.Computer):
850                    n = e.get_attribute('testbed')
851                    if n is not None and n not in names:
852                        names.append(n)
853            return names
854        i = 0
855        t = topo.clone()
856        self.decorate_topology(starter, t)
857        # Grab the log (this is some anal locking, but better safe than
858        # sorry)
859        self.state_lock.acquire()
860        # Put information about this testbed into the topdl
861        tb = topdl.Testbed(self.uri, "deter",
862                localname=get_localnames(t), 
863                attribute=[ 
864                    { 
865                        'attribute': 'project', 
866                        'value': self.allocation[aid]['project']
867                    },
868                    { 
869                        'attribute': 'experiment', 
870                        'value': self.allocation[aid]['experiment']
871                    }])
872        t.elements.append(tb)
873        logv = "".join(self.allocation[aid]['log'])
874        # It's possible that the StartSegment call gets retried (!).
875        # if the 'started' key is in the allocation, we'll return it rather
876        # than redo the setup.
877        self.allocation[aid]['started'] = { 
878                'allocID': alloc_id,
879                'allocationLog': logv,
880                'segmentdescription': { 
881                    'topdldescription': t.to_dict()
882                    },
883                'proof': proof.to_dict(),
884                }
885        self.allocation[aid]['topo'] = t
886        retval = copy.copy(self.allocation[aid]['started'])
887        self.write_state()
888        self.state_lock.release()
889        return retval
890   
891    # End of StartSegment support routines
892
893    def StartSegment(self, req, fid):
894        err = None  # Any service_error generated after tmpdir is created
895        rv = None   # Return value from segment creation
896
897        self.log.info("StartSegment called by %s" % fid)
898        try:
899            req = req['StartSegmentRequestBody']
900            auth_attr = req['allocID']['fedid']
901            topref = req['segmentdescription']['topdldescription']
902        except KeyError:
903            raise service_error(server_error.req, "Badly formed request")
904
905        connInfo = req.get('connection', [])
906        services = req.get('service', [])
907        aid = "%s" % auth_attr
908        attrs = req.get('fedAttr', [])
909
910        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
911                with_proof=True)
912        if not access_ok:
913            self.log.info("StartSegment for %s failed: access denied" % fid)
914            raise service_error(service_error.access, "Access denied")
915        else:
916            # See if this is a replay of an earlier succeeded StartSegment -
917            # sometimes SSL kills 'em.  If so, replay the response rather than
918            # redoing the allocation.
919            self.state_lock.acquire()
920            retval = self.allocation[aid].get('started', None)
921            self.state_lock.release()
922            if retval:
923                self.log.warning("Duplicate StartSegment for %s: " % aid + \
924                        "replaying response")
925                return retval
926
927        # A new request.  Do it.
928
929        if topref: topo = topdl.Topology(**topref)
930        else:
931            raise service_error(service_error.req, 
932                    "Request missing segmentdescription'")
933       
934        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
935        try:
936            tmpdir = tempfile.mkdtemp(prefix="access-")
937            softdir = "%s/software" % tmpdir
938            os.mkdir(softdir)
939        except EnvironmentError:
940            self.log.info("StartSegment for %s failed: internal error" % fid)
941            raise service_error(service_error.internal, "Cannot create tmp dir")
942
943        # Try block alllows us to clean up temporary files.
944        try:
945            self.retrieve_software(topo, certfile, softdir)
946            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
947                alloc_log =  self.initialize_experiment_info(attrs, aid, 
948                        certfile, tmpdir)
949            # A misconfigured cert in the ABAC map can be confusing...
950            if not os.access(xmlrpc_cert, os.R_OK):
951                self.log.error("Cannot open user's emulab SSL cert: %s" % \
952                        xmlrpc_cert)
953                raise service_error(service_error.internal,
954                        "Cannot open user's emulab SSL cert: %s" % xmlrpc_cert)
955
956
957            if '/' in proj: proj, gid = proj.split('/')
958            else: gid = None
959
960
961            # Set up userconf and seer if needed
962            self.configure_userconf(services, tmpdir)
963            self.configure_seer_services(services, topo, softdir)
964            # Get and send synch store variables
965            self.export_store_info(certfile, proj, ename, connInfo)
966            self.import_store_info(certfile, connInfo)
967
968            expfile = "%s/experiment.tcl" % tmpdir
969
970            self.generate_portal_configs(topo, pubkey_base, 
971                    secretkey_base, tmpdir, proj, ename, connInfo, services)
972            self.generate_ns2(topo, expfile, 
973                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
974
975            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
976                    debug=self.create_debug, log=alloc_log, boss=self.boss,
977                    ops=self.ops, cert=xmlrpc_cert)
978            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
979        except service_error, e:
980            self.log.info("StartSegment for %s failed: %s"  % (fid, e))
981            err = e
982        except:
983            t, v, st = sys.exc_info()
984            self.log.info("StartSegment for %s failed:unexpected error: %s" \
985                    % (fid, traceback.extract_tb(st)))
986            err = service_error(service_error.internal, "%s: %s" % \
987                    (v, traceback.extract_tb(st)))
988
989        # Walk up tmpdir, deleting as we go
990        if self.cleanup: self.remove_dirs(tmpdir)
991        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
992
993        if rv:
994            self.log.info("StartSegment for %s succeeded" % fid)
995            return self.finalize_experiment(starter, topo, aid, req['allocID'],
996                    proof)
997        elif err:
998            raise service_error(service_error.federant,
999                    "Swapin failed: %s" % err)
1000        else:
1001            raise service_error(service_error.federant, "Swapin failed")
1002
1003    def TerminateSegment(self, req, fid):
1004        self.log.info("TerminateSegment called by %s" % fid)
1005        try:
1006            req = req['TerminateSegmentRequestBody']
1007        except KeyError:
1008            raise service_error(server_error.req, "Badly formed request")
1009
1010        auth_attr = req['allocID']['fedid']
1011        aid = "%s" % auth_attr
1012        attrs = req.get('fedAttr', [])
1013
1014        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1015                with_proof=True)
1016        if not access_ok:
1017            raise service_error(service_error.access, "Access denied")
1018
1019        self.state_lock.acquire()
1020        if aid in self.allocation:
1021            proj = self.allocation[aid].get('project', None)
1022            user = self.allocation[aid].get('user', None)
1023            xmlrpc_cert = self.allocation[aid].get('cert', None)
1024            ename = self.allocation[aid].get('experiment', None)
1025            nonce = self.allocation[aid].get('nonce', False)
1026        else:
1027            proj = None
1028            user = None
1029            ename = None
1030            nonce = False
1031            xmlrpc_cert = None
1032        self.state_lock.release()
1033
1034        if not proj:
1035            self.log.info("TerminateSegment failed for %s: cannot find project"\
1036                    % fid)
1037            raise service_error(service_error.internal, 
1038                    "Can't find project for %s" % aid)
1039        else:
1040            if '/' in proj: proj, gid = proj.split('/')
1041            else: gid = None
1042
1043        if not user:
1044            self.log.info("TerminateSegment failed for %s: cannot find user"\
1045                    % fid)
1046            raise service_error(service_error.internal, 
1047                    "Can't find creation user for %s" % aid)
1048        if not ename:
1049            self.log.info(
1050                    "TerminateSegment failed for %s: cannot find experiment"\
1051                    % fid)
1052            raise service_error(service_error.internal, 
1053                    "Can't find experiment name for %s" % aid)
1054        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1055                debug=self.create_debug, boss=self.boss, ops=self.ops,
1056                cert=xmlrpc_cert)
1057        stopper(self, user, proj, ename, gid, nonce)
1058        self.log.info("TerminateSegment succeeded for %s %s %s" % \
1059                (fid, proj, ename))
1060        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1061
1062    def InfoSegment(self, req, fid):
1063        self.log.info("InfoSegment called by %s" % fid)
1064        try:
1065            req = req['InfoSegmentRequestBody']
1066        except KeyError:
1067            raise service_error(server_error.req, "Badly formed request")
1068
1069        auth_attr = req['allocID']['fedid']
1070        aid = "%s" % auth_attr
1071
1072        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1073                with_proof=True)
1074        if not access_ok:
1075            raise service_error(service_error.access, "Access denied")
1076
1077        self.state_lock.acquire()
1078        if aid in self.allocation:
1079            topo = self.allocation[aid].get('topo', None)
1080            if topo: topo = topo.clone()
1081            proj = self.allocation[aid].get('project', None)
1082            user = self.allocation[aid].get('user', None)
1083            xmlrpc_cert = self.allocation[aid].get('cert', None)
1084            ename = self.allocation[aid].get('experiment', None)
1085        else:
1086            proj = None
1087            user = None
1088            ename = None
1089            topo = None
1090            xmlrpc_cert = None
1091        self.state_lock.release()
1092
1093        if not proj:
1094            self.log.info("InfoSegment failed for %s: cannot find project"% fid)
1095            raise service_error(service_error.internal, 
1096                    "Can't find project for %s" % aid)
1097        else:
1098            if '/' in proj: proj, gid = proj.split('/')
1099            else: gid = None
1100
1101        if not user:
1102            self.log.info("InfoSegment failed for %s: cannot find user"% fid)
1103            raise service_error(service_error.internal, 
1104                    "Can't find creation user for %s" % aid)
1105        if not ename:
1106            self.log.info("InfoSegment failed for %s: cannot find exp"% fid)
1107            raise service_error(service_error.internal, 
1108                    "Can't find experiment name for %s" % aid)
1109        info = self.info_segment(keyfile=self.ssh_privkey_file,
1110                debug=self.create_debug, boss=self.boss, ops=self.ops,
1111                cert=xmlrpc_cert)
1112        info(self, user, proj, ename)
1113        self.log.info("InfoSegment gathered info for %s %s %s %s" % \
1114                (fid, user, proj, ename))
1115        self.decorate_topology(info, topo)
1116        self.state_lock.acquire()
1117        if aid in self.allocation:
1118            self.allocation[aid]['topo'] = topo
1119            self.write_state()
1120        self.state_lock.release()
1121        self.log.info("InfoSegment updated info for %s %s %s %s" % \
1122                (fid, user, proj, ename))
1123
1124        rv = { 
1125                'allocID': req['allocID'], 
1126                'proof': proof.to_dict(),
1127                }
1128        self.log.info("InfoSegment succeeded info for %s %s %s %s" % \
1129                (fid, user, proj, ename))
1130        if topo:
1131            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1132        return rv
1133
1134    def OperationSegment(self, req, fid):
1135        def get_pname(e):
1136            """
1137            Get the physical name of a node
1138            """
1139            if e.localname:
1140                return re.sub('\..*','', e.localname[0])
1141            else:
1142                return None
1143
1144        self.log.info("OperationSegment called by %s" % fid)
1145        try:
1146            req = req['OperationSegmentRequestBody']
1147        except KeyError:
1148            raise service_error(server_error.req, "Badly formed request")
1149
1150        auth_attr = req['allocID']['fedid']
1151        aid = "%s" % auth_attr
1152
1153        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1154                with_proof=True)
1155        if not access_ok:
1156            self.log.info("OperationSegment failed for %s: access denied" % fid)
1157            raise service_error(service_error.access, "Access denied")
1158
1159        op = req.get('operation', None)
1160        targets = req.get('target', None)
1161        params = req.get('parameter', None)
1162
1163        if op is None :
1164            self.log.info("OperationSegment failed for %s: no operation" % fid)
1165            raise service_error(service_error.req, "missing operation")
1166        elif targets is None:
1167            self.log.info("OperationSegment failed for %s: no targets" % fid)
1168            raise service_error(service_error.req, "no targets")
1169
1170        self.state_lock.acquire()
1171        if aid in self.allocation:
1172            topo = self.allocation[aid].get('topo', None)
1173            if topo: topo = topo.clone()
1174            xmlrpc_cert = self.allocation[aid].get('cert', None)
1175        else:
1176            topo = None
1177            xmlrpc_cert = None
1178        self.state_lock.release()
1179
1180        targets = copy.copy(targets)
1181        ptargets = { }
1182        for e in topo.elements:
1183            if isinstance(e, topdl.Computer):
1184                if e.name in targets:
1185                    targets.remove(e.name)
1186                    pn = get_pname(e)
1187                    if pn: ptargets[e.name] = pn
1188
1189        status = [ operation_status(t, operation_status.no_target) \
1190                for t in targets]
1191
1192        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1193                debug=self.create_debug, boss=self.boss, ops=self.ops,
1194                cert=xmlrpc_cert)
1195        ops(self, op, ptargets, params, topo)
1196        self.log.info("OperationSegment operated for %s" % fid)
1197       
1198        status.extend(ops.status)
1199        self.log.info("OperationSegment succeed for %s" % fid)
1200
1201        return { 
1202                'allocID': req['allocID'], 
1203                'status': [s.to_dict() for s in status],
1204                'proof': proof.to_dict(),
1205                }
Note: See TracBrowser for help on using the repository browser.