source: fedd/federation/emulab_access.py @ b035f90

compt_changes
Last change on this file since b035f90 was 9c3e77f, checked in by Ted Faber <faber@…>, 13 years ago

Add information about the project and experiment into the experiment
description

  • Property mode set to 100644
File size: 38.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13import socket
14
15from threading import *
16from M2Crypto.SSL import SSLError
17
18from access import access_base
19
20from util import *
21from deter import fedid, generate_fedid
22from authorizer import authorizer, abac_authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25from proof import proof as access_proof
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31from deter import topdl
32import list_log
33import emulab_segment
34
35
36# Make log messages disappear if noone configures a fedd logger
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.access")
41fl.addHandler(nullHandler())
42
43class access(access_base):
44    """
45    The implementation of access control based on mapping users to projects.
46
47    Users can be mapped to existing projects or have projects created
48    dynamically.  This implements both direct requests and proxies.
49    """
50
51    max_name_len = 19
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        access_base.__init__(self, config, auth)
59
60        self.max_name_len = access.max_name_len
61
62        self.allow_proxy = config.getboolean("access", "allow_proxy")
63
64        self.domain = config.get("access", "domain")
65        self.userconfdir = config.get("access","userconfdir")
66        self.userconfcmd = config.get("access","userconfcmd")
67        self.userconfurl = config.get("access","userconfurl")
68        self.federation_software = config.get("access", "federation_software")
69        self.portal_software = config.get("access", "portal_software")
70        self.local_seer_software = config.get("access", "local_seer_software")
71        self.local_seer_image = config.get("access", "local_seer_image")
72        self.local_seer_start = config.get("access", "local_seer_start")
73        self.seer_master_start = config.get("access", "seer_master_start")
74        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
75        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
76        self.ssh_port = config.get("access","ssh_port") or "22"
77        self.boss = config.get("access", "boss")
78        self.ops = config.get("access", "ops")
79        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
80        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
81
82        self.dragon_endpoint = config.get("access", "dragon")
83        self.dragon_vlans = config.get("access", "dragon_vlans")
84        self.deter_internal = config.get("access", "deter_internal")
85
86        self.tunnel_config = config.getboolean("access", "tunnel_config")
87        self.portal_command = config.get("access", "portal_command")
88        self.portal_image = config.get("access", "portal_image")
89        self.portal_type = config.get("access", "portal_type") or "pc"
90        self.portal_startcommand = config.get("access", "portal_startcommand")
91        self.node_startcommand = config.get("access", "node_startcommand")
92
93        self.uri = 'https://%s:%d' % (socket.getfqdn(), 
94                self.get_port(config.get("globals", "services", "23235")))
95
96        self.federation_software = self.software_list(self.federation_software)
97        self.portal_software = self.software_list(self.portal_software)
98        self.local_seer_software = self.software_list(self.local_seer_software)
99
100        self.access_type = self.access_type.lower()
101        self.start_segment = emulab_segment.start_segment
102        self.stop_segment = emulab_segment.stop_segment
103        self.info_segment = emulab_segment.info_segment
104        self.operation_segment = emulab_segment.operation_segment
105
106        self.restricted = [ ]
107        tb = config.get('access', 'testbed')
108        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
109        else: self.testbed = [ ]
110
111        # authorization information
112        self.auth_type = config.get('access', 'auth_type') \
113                or 'abac'
114        self.auth_dir = config.get('access', 'auth_dir')
115        accessdb = config.get("access", "accessdb")
116        # initialize the authorization system
117        if self.auth_type == 'abac':
118            self.auth = abac_authorizer(load=self.auth_dir)
119            self.access = [ ]
120            if accessdb:
121                self.read_access(accessdb, self.access_tuple)
122        else:
123            raise service_error(service_error.internal, 
124                    "Unknown auth_type: %s" % self.auth_type)
125
126        # read_state in the base_class
127        self.state_lock.acquire()
128        if 'allocation' not in self.state: self.state['allocation']= { }
129        self.allocation = self.state['allocation']
130        self.state_lock.release()
131        self.exports = {
132                'SMB': self.export_SMB,
133                'seer': self.export_seer,
134                'tmcd': self.export_tmcd,
135                'userconfig': self.export_userconfig,
136                'project_export': self.export_project_export,
137                'local_seer_control': self.export_local_seer,
138                'seer_master': self.export_seer_master,
139                'hide_hosts': self.export_hide_hosts,
140                }
141
142        if not self.local_seer_image or not self.local_seer_software or \
143                not self.local_seer_start:
144            if 'local_seer_control' in self.exports:
145                del self.exports['local_seer_control']
146
147        if not self.local_seer_image or not self.local_seer_software or \
148                not self.seer_master_start:
149            if 'seer_master' in self.exports:
150                del self.exports['seer_master']
151
152
153        self.soap_services = {\
154            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
155            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
156            'StartSegment': soap_handler("StartSegment", self.StartSegment),
157            'TerminateSegment': soap_handler("TerminateSegment",
158                self.TerminateSegment),
159            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
160            'OperationSegment': soap_handler("OperationSegment",
161                self.OperationSegment),
162            }
163        self.xmlrpc_services =  {\
164            'RequestAccess': xmlrpc_handler('RequestAccess',
165                self.RequestAccess),
166            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
167                self.ReleaseAccess),
168            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
169            'TerminateSegment': xmlrpc_handler('TerminateSegment',
170                self.TerminateSegment),
171            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
172            'OperationSegment': xmlrpc_handler("OperationSegment",
173                self.OperationSegment),
174            }
175
176        self.call_SetValue = service_caller('SetValue')
177        self.call_GetValue = service_caller('GetValue', log=self.log)
178
179    @staticmethod
180    def get_port(ps):
181        '''
182        Take a fedd service string and return the first port.  Used in
183        creating the testbed uri identifier.
184        '''
185        p = ps.split(',')
186        smallport = p[0].split(':')
187        try:
188            rv = int(smallport[0])
189        except ValueError:
190            rv = 23235
191        return rv
192
193    @staticmethod
194    def access_tuple(str):
195        """
196        Convert a string of the form (id, id, id) into an access_project.  This
197        is called by read_access to convert to local attributes.  It returns a
198        tuple of the form (project, user, certificate_file).
199        """
200
201        str = str.strip()
202        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
203            # The slice takes the parens off the string.
204            proj, user, cert = str[1:-1].split(',')
205            return (proj.strip(), user.strip(), cert.strip())
206        else:
207            raise self.parse_error(
208                    'Bad mapping (unbalanced parens or more than 2 commas)')
209
210    # RequestAccess support routines
211
212    def save_project_state(self, aid, pname, uname, certf, owners):
213        """
214        Save the project, user, and owners associated with this allocation.
215        This creates the allocation entry.
216        """
217        self.state_lock.acquire()
218        self.allocation[aid] = { }
219        self.allocation[aid]['project'] = pname
220        self.allocation[aid]['user'] = uname
221        self.allocation[aid]['cert'] = certf
222        self.allocation[aid]['owners'] = owners
223        self.write_state()
224        self.state_lock.release()
225        return (pname, uname)
226
227    # End of RequestAccess support routines
228
229    def RequestAccess(self, req, fid):
230        """
231        Handle the access request.  Proxy if not for us.
232
233        Parse out the fields and make the allocations or rejections if for us,
234        otherwise, assuming we're willing to proxy, proxy the request out.
235        """
236
237        def gateway_hardware(h):
238            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
239            else: return h
240
241        def get_export_project(svcs):
242            """
243            if the service requests includes one to export a project, return
244            that project.
245            """
246            rv = None
247            for s in svcs:
248                if s.get('name', '') == 'project_export' and \
249                        s.get('visibility', '') == 'export':
250                    if not rv: 
251                        for a in s.get('fedAttr', []):
252                            if a.get('attribute', '') == 'project' \
253                                    and 'value' in a:
254                                rv = a['value']
255                    else:
256                        raise service_error(service_error, access, 
257                                'Requesting multiple project exports is ' + \
258                                        'not supported');
259            return rv
260
261        self.log.info("RequestAccess called by %s" % fid)
262        # The dance to get into the request body
263        if req.has_key('RequestAccessRequestBody'):
264            req = req['RequestAccessRequestBody']
265        else:
266            raise service_error(service_error.req, "No request!?")
267
268        # if this includes a project export request, construct a filter such
269        # that only the ABAC attributes mapped to that project are checked for
270        # access.
271        if 'service' in req:
272            ep = get_export_project(req['service'])
273            if ep: pf = lambda(a): a.value[0] == ep
274            else: pf = None
275        else:
276            ep = None
277            pf = None
278
279        if self.auth.import_credentials(
280                data_list=req.get('abac_credential', [])):
281            self.auth.save()
282        else:
283            self.log.debug('failed to import incoming credentials')
284
285        if self.auth_type == 'abac':
286            found, owners, proof = self.lookup_access(req, fid, filter=pf)
287        else:
288            raise service_error(service_error.internal, 
289                    'Unknown auth_type: %s' % self.auth_type)
290        ap = None
291
292        # keep track of what's been added
293        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
294        aid = unicode(allocID)
295
296        pname, uname = self.save_project_state(aid, found[0], found[1], 
297                found[2], owners)
298
299        services, svc_state = self.export_services(req.get('service',[]),
300                pname, uname)
301        self.state_lock.acquire()
302        # Store services state in global state
303        for k, v in svc_state.items():
304            self.allocation[aid][k] = v
305        self.append_allocation_authorization(aid, 
306                set([(o, allocID) for o in owners]), state_attr='allocation')
307        self.write_state()
308        self.state_lock.release()
309        try:
310            f = open("%s/%s.pem" % (self.certdir, aid), "w")
311            print >>f, alloc_cert
312            f.close()
313        except EnvironmentError, e:
314            self.log.info("RequestAccess failed for by %s: internal error" \
315                    % fid)
316            raise service_error(service_error.internal, 
317                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
318        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
319        resp = self.build_access_response({ 'fedid': allocID } ,
320                pname, services, proof)
321        return resp
322
323    def ReleaseAccess(self, req, fid):
324        self.log.info("ReleaseAccess called by %s" % fid)
325        # The dance to get into the request body
326        if req.has_key('ReleaseAccessRequestBody'):
327            req = req['ReleaseAccessRequestBody']
328        else:
329            raise service_error(service_error.req, "No request!?")
330
331        try:
332            if req['allocID'].has_key('localname'):
333                auth_attr = aid = req['allocID']['localname']
334            elif req['allocID'].has_key('fedid'):
335                aid = unicode(req['allocID']['fedid'])
336                auth_attr = req['allocID']['fedid']
337            else:
338                raise service_error(service_error.req,
339                        "Only localnames and fedids are understood")
340        except KeyError:
341            raise service_error(service_error.req, "Badly formed request")
342
343        self.log.debug("[access] deallocation requested for %s by %s" % \
344                (aid, fid))
345        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
346                with_proof=True)
347        if not access_ok:
348            self.log.debug("[access] deallocation denied for %s", aid)
349            raise service_error(service_error.access, "Access Denied")
350
351        self.state_lock.acquire()
352        if aid in self.allocation:
353            self.log.debug("Found allocation for %s" %aid)
354            self.clear_allocation_authorization(aid, state_attr='allocation')
355            del self.allocation[aid]
356            self.write_state()
357            self.state_lock.release()
358            # Remove the access cert
359            cf = "%s/%s.pem" % (self.certdir, aid)
360            self.log.debug("Removing %s" % cf)
361            os.remove(cf)
362            self.log.info("ReleaseAccess succeeded for %s" % fid)
363            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
364        else:
365            self.state_lock.release()
366            raise service_error(service_error.req, "No such allocation")
367
368    # These are subroutines for StartSegment
369    def generate_ns2(self, topo, expfn, softdir, connInfo):
370        """
371        Convert topo into an ns2 file, decorated with appropriate commands for
372        the particular testbed setup.  Convert all requests for software, etc
373        to point at the staged copies on this testbed and add the federation
374        startcommands.
375        """
376        class dragon_commands:
377            """
378            Functor to spit out approrpiate dragon commands for nodes listed in
379            the connectivity description.  The constructor makes a dict mapping
380            dragon nodes to their parameters and the __call__ checks each
381            element in turn for membership.
382            """
383            def __init__(self, map):
384                self.node_info = map
385
386            def __call__(self, e):
387                s = ""
388                if isinstance(e, topdl.Computer):
389                    if self.node_info.has_key(e.name):
390                        info = self.node_info[e.name]
391                        for ifname, vlan, type in info:
392                            for i in e.interface:
393                                if i.name == ifname:
394                                    addr = i.get_attribute('ip4_address')
395                                    subs = i.substrate[0]
396                                    break
397                            else:
398                                raise service_error(service_error.internal,
399                                        "No interface %s on element %s" % \
400                                                (ifname, e.name))
401                            # XXX: do netmask right
402                            if type =='link':
403                                s = ("tb-allow-external ${%s} " + \
404                                        "dragonportal ip %s vlan %s " + \
405                                        "netmask 255.255.255.0\n") % \
406                                        (topdl.to_tcl_name(e.name), addr, vlan)
407                            elif type =='lan':
408                                s = ("tb-allow-external ${%s} " + \
409                                        "dragonportal " + \
410                                        "ip %s vlan %s usurp %s\n") % \
411                                        (topdl.to_tcl_name(e.name), addr, 
412                                                vlan, subs)
413                            else:
414                                raise service_error(service_error_internal,
415                                        "Unknown DRAGON type %s" % type)
416                return s
417
418        class not_dragon:
419            """
420            Return true if a node is in the given map of dragon nodes.
421            """
422            def __init__(self, map):
423                self.nodes = set(map.keys())
424
425            def __call__(self, e):
426                return e.name not in self.nodes
427
428        def have_portals(top):
429            '''
430            Return true if the topology has a portal node
431            '''
432            # The else is on the for
433            for e in top.elements:
434                if isinstance(e, topdl.Computer) and e.get_attribute('portal'):
435                    return True
436            else:
437                return False
438
439
440        # Main line of generate_ns2
441        t = topo.clone()
442
443        # Create the map of nodes that need direct connections (dragon
444        # connections) from the connInfo
445        dragon_map = { }
446        for i in [ i for i in connInfo if i['type'] == 'transit']:
447            for a in i.get('fedAttr', []):
448                if a['attribute'] == 'vlan_id':
449                    vlan = a['value']
450                    break
451            else:
452                raise service_error(service_error.internal, "No vlan tag")
453            members = i.get('member', [])
454            if len(members) > 1: type = 'lan'
455            else: type = 'link'
456
457            try:
458                for m in members:
459                    if m['element'] in dragon_map:
460                        dragon_map[m['element']].append(( m['interface'], 
461                            vlan, type))
462                    else:
463                        dragon_map[m['element']] = [( m['interface'], 
464                            vlan, type),]
465            except KeyError:
466                raise service_error(service_error.req,
467                        "Missing connectivity info")
468
469        # Weed out the things we aren't going to instantiate: Segments, portal
470        # substrates, and portal interfaces.  (The copy in the for loop allows
471        # us to delete from e.elements in side the for loop).  While we're
472        # touching all the elements, we also adjust paths from the original
473        # testbed to local testbed paths and put the federation commands into
474        # the start commands
475        local = len(dragon_map) == 0 and not have_portals(t)
476        if local: routing = 'Static'
477        else: routing = 'Manual'
478
479        self.log.debug("Local: %s", local)
480        for e in [e for e in t.elements]:
481            if isinstance(e, topdl.Segment):
482                t.elements.remove(e)
483            if isinstance(e, topdl.Computer):
484                self.add_kit(e, self.federation_software)
485                if e.get_attribute('portal') and self.portal_startcommand:
486                    # Add local portal support software
487                    self.add_kit(e, self.portal_software)
488                    # Portals never have a user-specified start command
489                    e.set_attribute('startup', self.portal_startcommand)
490                elif not local and self.node_startcommand:
491                    if e.get_attribute('startup'):
492                        e.set_attribute('startup', "%s \\$USER '%s'" % \
493                                (self.node_startcommand, 
494                                    e.get_attribute('startup')))
495                    else:
496                        e.set_attribute('startup', self.node_startcommand)
497
498                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
499                # Remove portal interfaces that do not connect to DRAGON
500                e.interface = [i for i in e.interface \
501                        if not i.get_attribute('portal') or i.name in dinf ]
502            # Fix software paths
503            for s in getattr(e, 'software', []):
504                s.location = re.sub("^.*/", softdir, s.location)
505
506        t.substrates = [ s.clone() for s in t.substrates ]
507        t.incorporate_elements()
508
509        # Customize the ns2 output for local portal commands and images
510        filters = []
511
512        if self.dragon_endpoint:
513            add_filter = not_dragon(dragon_map)
514            filters.append(dragon_commands(dragon_map))
515        else:
516            add_filter = None
517
518        if self.portal_command:
519            filters.append(topdl.generate_portal_command_filter(
520                self.portal_command, add_filter=add_filter))
521
522        if self.portal_image:
523            filters.append(topdl.generate_portal_image_filter(
524                self.portal_image))
525
526        if self.portal_type:
527            filters.append(topdl.generate_portal_hardware_filter(
528                self.portal_type))
529
530        # Convert to ns and write it out
531        expfile = topdl.topology_to_ns2(t, filters, routing=routing)
532        try:
533            f = open(expfn, "w")
534            print >>f, expfile
535            f.close()
536        except EnvironmentError:
537            raise service_error(service_error.internal,
538                    "Cannot write experiment file %s: %s" % (expfn,e))
539
540    def export_store_info(self, cf, proj, ename, connInfo):
541        """
542        For the export requests in the connection info, install the peer names
543        at the experiment controller via SetValue calls.
544        """
545
546        for c in connInfo:
547            for p in [ p for p in c.get('parameter', []) \
548                    if p.get('type', '') == 'output']:
549
550                if p.get('name', '') == 'peer':
551                    k = p.get('key', None)
552                    surl = p.get('store', None)
553                    if surl and k and k.index('/') != -1:
554                        value = "%s.%s.%s%s" % \
555                                (k[k.index('/')+1:], ename, proj, self.domain)
556                        req = { 'name': k, 'value': value }
557                        self.log.debug("Setting %s to %s on %s" % \
558                                (k, value, surl))
559                        self.call_SetValue(surl, req, cf)
560                    else:
561                        self.log.error("Bad export request: %s" % p)
562                elif p.get('name', '') == 'ssh_port':
563                    k = p.get('key', None)
564                    surl = p.get('store', None)
565                    if surl and k:
566                        req = { 'name': k, 'value': self.ssh_port }
567                        self.log.debug("Setting %s to %s on %s" % \
568                                (k, self.ssh_port, surl))
569                        self.call_SetValue(surl, req, cf)
570                    else:
571                        self.log.error("Bad export request: %s" % p)
572                else:
573                    self.log.error("Unknown export parameter: %s" % \
574                            p.get('name'))
575                    continue
576
577    def add_seer_node(self, topo, name, startup):
578        """
579        Add a seer node to the given topology, with the startup command passed
580        in.  Used by configure seer_services.
581        """
582        c_node = topdl.Computer(
583                name=name, 
584                os= topdl.OperatingSystem(
585                    attribute=[
586                    { 'attribute': 'osid', 
587                        'value': self.local_seer_image },
588                    ]),
589                attribute=[
590                    { 'attribute': 'startup', 'value': startup },
591                    ]
592                )
593        self.add_kit(c_node, self.local_seer_software)
594        topo.elements.append(c_node)
595
596    def configure_seer_services(self, services, topo, softdir):
597        """
598        Make changes to the topology required for the seer requests being made.
599        Specifically, add any control or master nodes required and set up the
600        start commands on the nodes to interconnect them.
601        """
602        local_seer = False      # True if we need to add a control node
603        collect_seer = False    # True if there is a seer-master node
604        seer_master= False      # True if we need to add the seer-master
605        for s in services:
606            s_name = s.get('name', '')
607            s_vis = s.get('visibility','')
608
609            if s_name  == 'local_seer_control' and s_vis == 'export':
610                local_seer = True
611            elif s_name == 'seer_master':
612                if s_vis == 'import':
613                    collect_seer = True
614                elif s_vis == 'export':
615                    seer_master = True
616       
617        # We've got the whole picture now, so add nodes if needed and configure
618        # them to interconnect properly.
619        if local_seer or seer_master:
620            # Copy local seer control node software to the tempdir
621            for l, f in self.local_seer_software:
622                base = os.path.basename(f)
623                copy_file(f, "%s/%s" % (softdir, base))
624        # If we're collecting seers somewhere the controllers need to talk to
625        # the master.  In testbeds that export the service, that will be a
626        # local node that we'll add below.  Elsewhere it will be the control
627        # portal that will port forward to the exporting master.
628        if local_seer:
629            if collect_seer:
630                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
631            else:
632                startup = self.local_seer_start
633            self.add_seer_node(topo, 'control', startup)
634        # If this is the seer master, add that node, too.
635        if seer_master:
636            self.add_seer_node(topo, 'seer-master', 
637                    "%s -R -n -R seer-master -R -A -R sink" % \
638                            self.seer_master_start)
639
640    def retrieve_software(self, topo, certfile, softdir):
641        """
642        Collect the software that nodes in the topology need loaded and stage
643        it locally.  This implies retrieving it from the experiment_controller
644        and placing it into softdir.  Certfile is used to prove that this node
645        has access to that data (it's the allocation/segment fedid).  Finally
646        local portal and federation software is also copied to the same staging
647        directory for simplicity - all software needed for experiment creation
648        is in softdir.
649        """
650        sw = set()
651        for e in topo.elements:
652            for s in getattr(e, 'software', []):
653                sw.add(s.location)
654        for s in sw:
655            self.log.debug("Retrieving %s" % s)
656            try:
657                get_url(s, certfile, softdir)
658            except:
659                t, v, st = sys.exc_info()
660                raise service_error(service_error.internal,
661                        "Error retrieving %s: %s" % (s, v))
662
663        # Copy local federation and portal node software to the tempdir
664        for s in (self.federation_software, self.portal_software):
665            for l, f in s:
666                base = os.path.basename(f)
667                copy_file(f, "%s/%s" % (softdir, base))
668
669
670    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
671        """
672        Gather common configuration files, retrieve or create an experiment
673        name and project name, and return the ssh_key filenames.  Create an
674        allocation log bound to the state log variable as well.
675        """
676        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
677                'seer_ca_pem', 'seer_node_pem')
678        ename = None
679        pubkey_base = None
680        secretkey_base = None
681        proj = None
682        user = None
683        alloc_log = None
684        nonce_experiment = False
685        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
686
687        self.state_lock.acquire()
688        if aid in self.allocation:
689            proj = self.allocation[aid].get('project', None)
690        self.state_lock.release()
691
692        if not proj:
693            raise service_error(service_error.internal, 
694                    "Can't find project for %s" %aid)
695
696        for a in attrs:
697            if a['attribute'] in configs:
698                try:
699                    self.log.debug("Retrieving %s from %s" % \
700                            (a['attribute'], a['value']))
701                    get_url(a['value'], certfile, tmpdir)
702                except:
703                    t, v, st = sys.exc_info()
704                    raise service_error(service_error.internal,
705                            "Error retrieving %s: %s" % (a.get('value', ""), v))
706            if a['attribute'] == 'ssh_pubkey':
707                pubkey_base = a['value'].rpartition('/')[2]
708            if a['attribute'] == 'ssh_secretkey':
709                secretkey_base = a['value'].rpartition('/')[2]
710            if a['attribute'] == 'experiment_name':
711                ename = a['value']
712
713        # Names longer than the emulab max are discarded
714        if ename and len(ename) <= self.max_name_len:
715            # Clean up the experiment name so that emulab will accept it.
716            ename = re.sub(vchars_re, '-', ename)
717
718        else:
719            ename = ""
720            for i in range(0,5):
721                ename += random.choice(string.ascii_letters)
722            nonce_experiment = True
723            self.log.warn("No experiment name or suggestion too long: " + \
724                    "picked one randomly: %s" % ename)
725
726        if not pubkey_base:
727            raise service_error(service_error.req, 
728                    "No public key attribute")
729
730        if not secretkey_base:
731            raise service_error(service_error.req, 
732                    "No secret key attribute")
733
734        self.state_lock.acquire()
735        if aid in self.allocation:
736            user = self.allocation[aid].get('user', None)
737            cert = self.allocation[aid].get('cert', None)
738            self.allocation[aid]['experiment'] = ename
739            self.allocation[aid]['nonce'] = nonce_experiment
740            self.allocation[aid]['log'] = [ ]
741            # Create a logger that logs to the experiment's state object as
742            # well as to the main log file.
743            alloc_log = logging.getLogger('fedd.access.%s' % ename)
744            h = logging.StreamHandler(
745                    list_log.list_log(self.allocation[aid]['log']))
746            # XXX: there should be a global one of these rather than
747            # repeating the code.
748            h.setFormatter(logging.Formatter(
749                "%(asctime)s %(name)s %(message)s",
750                        '%d %b %y %H:%M:%S'))
751            alloc_log.addHandler(h)
752            self.write_state()
753        self.state_lock.release()
754
755        if not user:
756            raise service_error(service_error.internal, 
757                    "Can't find creation user for %s" %aid)
758
759        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
760
761    def decorate_topology(self, info, t):
762        """
763        Copy the physical mapping and status onto the topology.  Used by
764        StartSegment and InfoSegment
765        """
766        def add_new(ann, attr):
767            for a in ann:
768                if a not in attr: attr.append(a)
769
770        def merge_os(os, e):
771            if len(e.os) == 0:
772                # No OS at all:
773                if os.get_attribute('emulab_access:image'):
774                    os.set_attribute('emulab_access:initial_image', 
775                            os.get_attribute('emulab_access:image'))
776                e.os = [ os ]
777            elif len(e.os) == 1:
778                # There's one OS, copy the initial image and replace
779                eos = e.os[0]
780                initial = eos.get_attribute('emulab_access:initial_image')
781                if initial:
782                    os.set_attribute('emulab_access:initial_image', initial)
783                e.os = [ os] 
784            else:
785                # Multiple OSes, replace or append
786                for eos in e.os:
787                    if os.name == eos.name:
788                        eos.version = os.version
789                        eos.version = os.distribution
790                        eos.version = os.distributionversion
791                        for a in os.attribute:
792                            if eos.get_attribute(a.attribute):
793                                eos.remove_attribute(a.attribute)
794                            eos.set_attribute(a.attribute, a.value)
795                        break
796                else:
797                    e.os.append(os)
798
799
800        if t is None: return
801        i = 0 # For fake debugging instantiation
802        # Copy the assigned names into the return topology
803        for e in t.elements:
804            if isinstance(e, topdl.Computer):
805                if not self.create_debug:
806                    if e.name in info.node:
807                        add_new(("%s%s" % 
808                            (info.node[e.name].pname, self.domain),),
809                            e.localname)
810                        add_new(("%s%s" % 
811                            (info.node[e.name].lname, self.domain),),
812                            e.localname)
813                        e.status = info.node[e.name].status
814                        os = info.node[e.name].getOS()
815                        if os: merge_os(os, e)
816                else:
817                    # Simple debugging assignment
818                    add_new(("node%d%s" % (i, self.domain),), e.localname)
819                    e.status = 'active'
820                    add_new(('testop1', 'testop2'), e.operation)
821                    i += 1
822
823        for s in t.substrates:
824            if s.name in info.subs:
825                sub = info.subs[s.name]
826                if sub.cap is not None:
827                    s.capacity = topdl.Capacity(sub.cap, 'max')
828                if sub.delay is not None:
829                    s.delay = topdl.Latency(sub.delay, 'max')
830        # XXX interfaces
831
832
833    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
834        """
835        Store key bits of experiment state in the global repository, including
836        the response that may need to be replayed, and return the response.
837        """
838        def get_localnames(t):
839            names = [ ]
840            for e in t.elements:
841                if isinstance(e, topdl.Computer):
842                    n = e.get_attribute('testbed')
843                    if n is not None and n not in names:
844                        names.append(n)
845            return names
846        i = 0
847        t = topo.clone()
848        self.decorate_topology(starter, t)
849        # Grab the log (this is some anal locking, but better safe than
850        # sorry)
851        self.state_lock.acquire()
852        # Put information about this testbed into the topdl
853        tb = topdl.Testbed(self.uri, "deter",
854                localname=get_localnames(t), 
855                attribute=[ 
856                    { 
857                        'attribute': 'project', 
858                        'value': self.allocation[aid]['project']
859                    },
860                    { 
861                        'attribute': 'experiment', 
862                        'value': self.allocation[aid]['experiment']
863                    }])
864        t.elements.append(tb)
865        logv = "".join(self.allocation[aid]['log'])
866        # It's possible that the StartSegment call gets retried (!).
867        # if the 'started' key is in the allocation, we'll return it rather
868        # than redo the setup.
869        self.allocation[aid]['started'] = { 
870                'allocID': alloc_id,
871                'allocationLog': logv,
872                'segmentdescription': { 
873                    'topdldescription': t.to_dict()
874                    },
875                'proof': proof.to_dict(),
876                }
877        self.allocation[aid]['topo'] = t
878        retval = copy.copy(self.allocation[aid]['started'])
879        self.write_state()
880        self.state_lock.release()
881        return retval
882   
883    # End of StartSegment support routines
884
885    def StartSegment(self, req, fid):
886        err = None  # Any service_error generated after tmpdir is created
887        rv = None   # Return value from segment creation
888
889        self.log.info("StartSegment called by %s" % fid)
890        try:
891            req = req['StartSegmentRequestBody']
892            auth_attr = req['allocID']['fedid']
893            topref = req['segmentdescription']['topdldescription']
894        except KeyError:
895            raise service_error(server_error.req, "Badly formed request")
896
897        connInfo = req.get('connection', [])
898        services = req.get('service', [])
899        aid = "%s" % auth_attr
900        attrs = req.get('fedAttr', [])
901
902        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
903                with_proof=True)
904        if not access_ok:
905            self.log.info("StartSegment for %s failed: access denied" % fid)
906            raise service_error(service_error.access, "Access denied")
907        else:
908            # See if this is a replay of an earlier succeeded StartSegment -
909            # sometimes SSL kills 'em.  If so, replay the response rather than
910            # redoing the allocation.
911            self.state_lock.acquire()
912            retval = self.allocation[aid].get('started', None)
913            self.state_lock.release()
914            if retval:
915                self.log.warning("Duplicate StartSegment for %s: " % aid + \
916                        "replaying response")
917                return retval
918
919        # A new request.  Do it.
920
921        if topref: topo = topdl.Topology(**topref)
922        else:
923            raise service_error(service_error.req, 
924                    "Request missing segmentdescription'")
925       
926        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
927        try:
928            tmpdir = tempfile.mkdtemp(prefix="access-")
929            softdir = "%s/software" % tmpdir
930            os.mkdir(softdir)
931        except EnvironmentError:
932            self.log.info("StartSegment for %s failed: internal error" % fid)
933            raise service_error(service_error.internal, "Cannot create tmp dir")
934
935        # Try block alllows us to clean up temporary files.
936        try:
937            self.retrieve_software(topo, certfile, softdir)
938            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
939                alloc_log =  self.initialize_experiment_info(attrs, aid, 
940                        certfile, tmpdir)
941            # A misconfigured cert in the ABAC map can be confusing...
942            if not os.access(xmlrpc_cert, os.R_OK):
943                self.log.error("Cannot open user's emulab SSL cert: %s" % \
944                        xmlrpc_cert)
945                raise service_error(service_error.internal,
946                        "Cannot open user's emulab SSL cert: %s" % xmlrpc_cert)
947
948
949            if '/' in proj: proj, gid = proj.split('/')
950            else: gid = None
951
952
953            # Set up userconf and seer if needed
954            self.configure_userconf(services, tmpdir)
955            self.configure_seer_services(services, topo, softdir)
956            # Get and send synch store variables
957            self.export_store_info(certfile, proj, ename, connInfo)
958            self.import_store_info(certfile, connInfo)
959
960            expfile = "%s/experiment.tcl" % tmpdir
961
962            self.generate_portal_configs(topo, pubkey_base, 
963                    secretkey_base, tmpdir, proj, ename, connInfo, services)
964            self.generate_ns2(topo, expfile, 
965                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
966
967            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
968                    debug=self.create_debug, log=alloc_log, boss=self.boss,
969                    ops=self.ops, cert=xmlrpc_cert)
970            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
971        except service_error, e:
972            self.log.info("StartSegment for %s failed: %s"  % (fid, e))
973            err = e
974        except:
975            t, v, st = sys.exc_info()
976            self.log.info("StartSegment for %s failed:unexpected error: %s" \
977                    % (fid, traceback.extract_tb(st)))
978            err = service_error(service_error.internal, "%s: %s" % \
979                    (v, traceback.extract_tb(st)))
980
981        # Walk up tmpdir, deleting as we go
982        if self.cleanup: self.remove_dirs(tmpdir)
983        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
984
985        if rv:
986            self.log.info("StartSegment for %s succeeded" % fid)
987            return self.finalize_experiment(starter, topo, aid, req['allocID'],
988                    proof)
989        elif err:
990            raise service_error(service_error.federant,
991                    "Swapin failed: %s" % err)
992        else:
993            raise service_error(service_error.federant, "Swapin failed")
994
995    def TerminateSegment(self, req, fid):
996        self.log.info("TerminateSegment called by %s" % fid)
997        try:
998            req = req['TerminateSegmentRequestBody']
999        except KeyError:
1000            raise service_error(server_error.req, "Badly formed request")
1001
1002        auth_attr = req['allocID']['fedid']
1003        aid = "%s" % auth_attr
1004        attrs = req.get('fedAttr', [])
1005
1006        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1007                with_proof=True)
1008        if not access_ok:
1009            raise service_error(service_error.access, "Access denied")
1010
1011        self.state_lock.acquire()
1012        if aid in self.allocation:
1013            proj = self.allocation[aid].get('project', None)
1014            user = self.allocation[aid].get('user', None)
1015            xmlrpc_cert = self.allocation[aid].get('cert', None)
1016            ename = self.allocation[aid].get('experiment', None)
1017            nonce = self.allocation[aid].get('nonce', False)
1018        else:
1019            proj = None
1020            user = None
1021            ename = None
1022            nonce = False
1023            xmlrpc_cert = None
1024        self.state_lock.release()
1025
1026        if not proj:
1027            self.log.info("TerminateSegment failed for %s: cannot find project"\
1028                    % fid)
1029            raise service_error(service_error.internal, 
1030                    "Can't find project for %s" % aid)
1031        else:
1032            if '/' in proj: proj, gid = proj.split('/')
1033            else: gid = None
1034
1035        if not user:
1036            self.log.info("TerminateSegment failed for %s: cannot find user"\
1037                    % fid)
1038            raise service_error(service_error.internal, 
1039                    "Can't find creation user for %s" % aid)
1040        if not ename:
1041            self.log.info(
1042                    "TerminateSegment failed for %s: cannot find experiment"\
1043                    % fid)
1044            raise service_error(service_error.internal, 
1045                    "Can't find experiment name for %s" % aid)
1046        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1047                debug=self.create_debug, boss=self.boss, ops=self.ops,
1048                cert=xmlrpc_cert)
1049        stopper(self, user, proj, ename, gid, nonce)
1050        self.log.info("TerminateSegment succeeded for %s %s %s" % \
1051                (fid, proj, ename))
1052        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1053
1054    def InfoSegment(self, req, fid):
1055        self.log.info("InfoSegment called by %s" % fid)
1056        try:
1057            req = req['InfoSegmentRequestBody']
1058        except KeyError:
1059            raise service_error(server_error.req, "Badly formed request")
1060
1061        auth_attr = req['allocID']['fedid']
1062        aid = "%s" % auth_attr
1063
1064        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1065                with_proof=True)
1066        if not access_ok:
1067            raise service_error(service_error.access, "Access denied")
1068
1069        self.state_lock.acquire()
1070        if aid in self.allocation:
1071            topo = self.allocation[aid].get('topo', None)
1072            if topo: topo = topo.clone()
1073            proj = self.allocation[aid].get('project', None)
1074            user = self.allocation[aid].get('user', None)
1075            xmlrpc_cert = self.allocation[aid].get('cert', None)
1076            ename = self.allocation[aid].get('experiment', None)
1077        else:
1078            proj = None
1079            user = None
1080            ename = None
1081            topo = None
1082            xmlrpc_cert = None
1083        self.state_lock.release()
1084
1085        if not proj:
1086            self.log.info("InfoSegment failed for %s: cannot find project"% fid)
1087            raise service_error(service_error.internal, 
1088                    "Can't find project for %s" % aid)
1089        else:
1090            if '/' in proj: proj, gid = proj.split('/')
1091            else: gid = None
1092
1093        if not user:
1094            self.log.info("InfoSegment failed for %s: cannot find user"% fid)
1095            raise service_error(service_error.internal, 
1096                    "Can't find creation user for %s" % aid)
1097        if not ename:
1098            self.log.info("InfoSegment failed for %s: cannot find exp"% fid)
1099            raise service_error(service_error.internal, 
1100                    "Can't find experiment name for %s" % aid)
1101        info = self.info_segment(keyfile=self.ssh_privkey_file,
1102                debug=self.create_debug, boss=self.boss, ops=self.ops,
1103                cert=xmlrpc_cert)
1104        info(self, user, proj, ename)
1105        self.log.info("InfoSegment gathered info for %s %s %s %s" % \
1106                (fid, user, proj, ename))
1107        self.decorate_topology(info, topo)
1108        self.state_lock.acquire()
1109        if aid in self.allocation:
1110            self.allocation[aid]['topo'] = topo
1111            self.write_state()
1112        self.state_lock.release()
1113        self.log.info("InfoSegment updated info for %s %s %s %s" % \
1114                (fid, user, proj, ename))
1115
1116        rv = { 
1117                'allocID': req['allocID'], 
1118                'proof': proof.to_dict(),
1119                }
1120        self.log.info("InfoSegment succeeded info for %s %s %s %s" % \
1121                (fid, user, proj, ename))
1122        if topo:
1123            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1124        return rv
1125
1126    def OperationSegment(self, req, fid):
1127        def get_pname(e):
1128            """
1129            Get the physical name of a node
1130            """
1131            if e.localname:
1132                return re.sub('\..*','', e.localname[0])
1133            else:
1134                return None
1135
1136        self.log.info("OperationSegment called by %s" % fid)
1137        try:
1138            req = req['OperationSegmentRequestBody']
1139        except KeyError:
1140            raise service_error(server_error.req, "Badly formed request")
1141
1142        auth_attr = req['allocID']['fedid']
1143        aid = "%s" % auth_attr
1144
1145        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1146                with_proof=True)
1147        if not access_ok:
1148            self.log.info("OperationSegment failed for %s: access denied" % fid)
1149            raise service_error(service_error.access, "Access denied")
1150
1151        op = req.get('operation', None)
1152        targets = req.get('target', None)
1153        params = req.get('parameter', None)
1154
1155        if op is None :
1156            self.log.info("OperationSegment failed for %s: no operation" % fid)
1157            raise service_error(service_error.req, "missing operation")
1158        elif targets is None:
1159            self.log.info("OperationSegment failed for %s: no targets" % fid)
1160            raise service_error(service_error.req, "no targets")
1161
1162        self.state_lock.acquire()
1163        if aid in self.allocation:
1164            topo = self.allocation[aid].get('topo', None)
1165            if topo: topo = topo.clone()
1166            xmlrpc_cert = self.allocation[aid].get('cert', None)
1167        else:
1168            topo = None
1169            xmlrpc_cert = None
1170        self.state_lock.release()
1171
1172        targets = copy.copy(targets)
1173        ptargets = { }
1174        for e in topo.elements:
1175            if isinstance(e, topdl.Computer):
1176                if e.name in targets:
1177                    targets.remove(e.name)
1178                    pn = get_pname(e)
1179                    if pn: ptargets[e.name] = pn
1180
1181        status = [ operation_status(t, operation_status.no_target) \
1182                for t in targets]
1183
1184        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1185                debug=self.create_debug, boss=self.boss, ops=self.ops,
1186                cert=xmlrpc_cert)
1187        ops(self, op, ptargets, params, topo)
1188        self.log.info("OperationSegment operated for %s" % fid)
1189       
1190        status.extend(ops.status)
1191        self.log.info("OperationSegment succeed for %s" % fid)
1192
1193        return { 
1194                'allocID': req['allocID'], 
1195                'status': [s.to_dict() for s in status],
1196                'proof': proof.to_dict(),
1197                }
Note: See TracBrowser for help on using the repository browser.