source: fedd/federation/emulab_access.py @ 17c2f7b

compt_changes
Last change on this file since 17c2f7b was 815cd26, checked in by Ted Faber <faber@…>, 13 years ago

Add timeout and extra debug logging.

  • Property mode set to 100644
File size: 38.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13import socket
14
15from threading import *
16from M2Crypto.SSL import SSLError
17
18from access import access_base
19
20from util import *
21from deter import fedid, generate_fedid
22from authorizer import authorizer, abac_authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25from proof import proof as access_proof
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31from deter import topdl
32import list_log
33import emulab_segment
34
35
36# Make log messages disappear if noone configures a fedd logger
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.access")
41fl.addHandler(nullHandler())
42
43class access(access_base):
44    """
45    The implementation of access control based on mapping users to projects.
46
47    Users can be mapped to existing projects or have projects created
48    dynamically.  This implements both direct requests and proxies.
49    """
50
51    max_name_len = 19
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        access_base.__init__(self, config, auth)
59
60        self.max_name_len = access.max_name_len
61
62        self.allow_proxy = config.getboolean("access", "allow_proxy")
63
64        self.domain = config.get("access", "domain")
65        self.userconfdir = config.get("access","userconfdir")
66        self.userconfcmd = config.get("access","userconfcmd")
67        self.userconfurl = config.get("access","userconfurl")
68        self.federation_software = config.get("access", "federation_software")
69        self.portal_software = config.get("access", "portal_software")
70        self.local_seer_software = config.get("access", "local_seer_software")
71        self.local_seer_image = config.get("access", "local_seer_image")
72        self.local_seer_start = config.get("access", "local_seer_start")
73        self.seer_master_start = config.get("access", "seer_master_start")
74        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
75        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
76        self.ssh_port = config.get("access","ssh_port") or "22"
77        self.boss = config.get("access", "boss")
78        self.ops = config.get("access", "ops")
79        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
80        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
81
82        self.dragon_endpoint = config.get("access", "dragon")
83        self.dragon_vlans = config.get("access", "dragon_vlans")
84        self.deter_internal = config.get("access", "deter_internal")
85
86        self.tunnel_config = config.getboolean("access", "tunnel_config")
87        self.portal_command = config.get("access", "portal_command")
88        self.portal_image = config.get("access", "portal_image")
89        self.portal_type = config.get("access", "portal_type") or "pc"
90        self.portal_startcommand = config.get("access", "portal_startcommand")
91        self.node_startcommand = config.get("access", "node_startcommand")
92
93        self.uri = 'https://%s:%d' % (socket.getfqdn(), 
94                self.get_port(config.get("globals", "services", "23235")))
95
96        self.federation_software = self.software_list(self.federation_software)
97        self.portal_software = self.software_list(self.portal_software)
98        self.local_seer_software = self.software_list(self.local_seer_software)
99
100        self.access_type = self.access_type.lower()
101        self.start_segment = emulab_segment.start_segment
102        self.stop_segment = emulab_segment.stop_segment
103        self.info_segment = emulab_segment.info_segment
104        self.operation_segment = emulab_segment.operation_segment
105
106        self.restricted = [ ]
107        tb = config.get('access', 'testbed')
108        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
109        else: self.testbed = [ ]
110
111        # authorization information
112        self.auth_type = config.get('access', 'auth_type') \
113                or 'abac'
114        self.auth_dir = config.get('access', 'auth_dir')
115        accessdb = config.get("access", "accessdb")
116        # initialize the authorization system
117        if self.auth_type == 'abac':
118            self.auth = abac_authorizer(load=self.auth_dir)
119            self.access = [ ]
120            if accessdb:
121                self.read_access(accessdb, self.access_tuple)
122        else:
123            raise service_error(service_error.internal, 
124                    "Unknown auth_type: %s" % self.auth_type)
125
126        # read_state in the base_class
127        self.state_lock.acquire()
128        if 'allocation' not in self.state: self.state['allocation']= { }
129        self.allocation = self.state['allocation']
130        self.state_lock.release()
131        self.exports = {
132                'SMB': self.export_SMB,
133                'seer': self.export_seer,
134                'tmcd': self.export_tmcd,
135                'userconfig': self.export_userconfig,
136                'project_export': self.export_project_export,
137                'local_seer_control': self.export_local_seer,
138                'seer_master': self.export_seer_master,
139                'hide_hosts': self.export_hide_hosts,
140                }
141
142        if not self.local_seer_image or not self.local_seer_software or \
143                not self.local_seer_start:
144            if 'local_seer_control' in self.exports:
145                del self.exports['local_seer_control']
146
147        if not self.local_seer_image or not self.local_seer_software or \
148                not self.seer_master_start:
149            if 'seer_master' in self.exports:
150                del self.exports['seer_master']
151
152
153        self.soap_services = {\
154            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
155            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
156            'StartSegment': soap_handler("StartSegment", self.StartSegment),
157            'TerminateSegment': soap_handler("TerminateSegment",
158                self.TerminateSegment),
159            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
160            'OperationSegment': soap_handler("OperationSegment",
161                self.OperationSegment),
162            }
163        self.xmlrpc_services =  {\
164            'RequestAccess': xmlrpc_handler('RequestAccess',
165                self.RequestAccess),
166            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
167                self.ReleaseAccess),
168            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
169            'TerminateSegment': xmlrpc_handler('TerminateSegment',
170                self.TerminateSegment),
171            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
172            'OperationSegment': xmlrpc_handler("OperationSegment",
173                self.OperationSegment),
174            }
175
176        self.call_SetValue = service_caller('SetValue')
177        self.call_GetValue = service_caller('GetValue', log=self.log)
178
179    @staticmethod
180    def get_port(ps):
181        '''
182        Take a fedd service string and return the first port.  Used in
183        creating the testbed uri identifier.
184        '''
185        p = ps.split(',')
186        smallport = p[0].split(':')
187        try:
188            rv = int(smallport[0])
189        except ValueError:
190            rv = 23235
191        return rv
192
193    @staticmethod
194    def access_tuple(str):
195        """
196        Convert a string of the form (id, id, id) into an access_project.  This
197        is called by read_access to convert to local attributes.  It returns a
198        tuple of the form (project, user, certificate_file).
199        """
200
201        str = str.strip()
202        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
203            # The slice takes the parens off the string.
204            proj, user, cert = str[1:-1].split(',')
205            return (proj.strip(), user.strip(), cert.strip())
206        else:
207            raise self.parse_error(
208                    'Bad mapping (unbalanced parens or more than 2 commas)')
209
210    # RequestAccess support routines
211
212    def save_project_state(self, aid, pname, uname, certf, owners):
213        """
214        Save the project, user, and owners associated with this allocation.
215        This creates the allocation entry.
216        """
217        self.state_lock.acquire()
218        self.allocation[aid] = { }
219        self.allocation[aid]['project'] = pname
220        self.allocation[aid]['user'] = uname
221        self.allocation[aid]['cert'] = certf
222        self.allocation[aid]['owners'] = owners
223        self.write_state()
224        self.state_lock.release()
225        return (pname, uname)
226
227    # End of RequestAccess support routines
228
229    def RequestAccess(self, req, fid):
230        """
231        Handle the access request.  Proxy if not for us.
232
233        Parse out the fields and make the allocations or rejections if for us,
234        otherwise, assuming we're willing to proxy, proxy the request out.
235        """
236
237        def gateway_hardware(h):
238            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
239            else: return h
240
241        def get_export_project(svcs):
242            """
243            if the service requests includes one to export a project, return
244            that project.
245            """
246            rv = None
247            for s in svcs:
248                if s.get('name', '') == 'project_export' and \
249                        s.get('visibility', '') == 'export':
250                    if not rv: 
251                        for a in s.get('fedAttr', []):
252                            if a.get('attribute', '') == 'project' \
253                                    and 'value' in a:
254                                rv = a['value']
255                    else:
256                        raise service_error(service_error, access, 
257                                'Requesting multiple project exports is ' + \
258                                        'not supported');
259            return rv
260
261        self.log.info("RequestAccess called by %s" % fid)
262        # The dance to get into the request body
263        if req.has_key('RequestAccessRequestBody'):
264            req = req['RequestAccessRequestBody']
265        else:
266            raise service_error(service_error.req, "No request!?")
267
268        # if this includes a project export request, construct a filter such
269        # that only the ABAC attributes mapped to that project are checked for
270        # access.
271        if 'service' in req:
272            ep = get_export_project(req['service'])
273            if ep: pf = lambda(a): a.value[0] == ep
274            else: pf = None
275        else:
276            ep = None
277            pf = None
278
279        if self.auth.import_credentials(
280                data_list=req.get('abac_credential', [])):
281            self.auth.save()
282        else:
283            self.log.debug('failed to import incoming credentials')
284
285        if self.auth_type == 'abac':
286            found, owners, proof = self.lookup_access(req, fid, filter=pf)
287        else:
288            raise service_error(service_error.internal, 
289                    'Unknown auth_type: %s' % self.auth_type)
290        ap = None
291
292        # keep track of what's been added
293        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
294        aid = unicode(allocID)
295
296        pname, uname = self.save_project_state(aid, found[0], found[1], 
297                found[2], owners)
298
299        services, svc_state = self.export_services(req.get('service',[]),
300                pname, uname)
301        self.state_lock.acquire()
302        # Store services state in global state
303        for k, v in svc_state.items():
304            self.allocation[aid][k] = v
305        self.append_allocation_authorization(aid, 
306                set([(o, allocID) for o in owners]), state_attr='allocation')
307        self.write_state()
308        self.state_lock.release()
309        try:
310            f = open("%s/%s.pem" % (self.certdir, aid), "w")
311            print >>f, alloc_cert
312            f.close()
313        except EnvironmentError, e:
314            self.log.info("RequestAccess failed for by %s: internal error" \
315                    % fid)
316            raise service_error(service_error.internal, 
317                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
318        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
319        resp = self.build_access_response({ 'fedid': allocID } ,
320                pname, services, proof)
321        return resp
322
323    def ReleaseAccess(self, req, fid):
324        self.log.info("ReleaseAccess called by %s" % fid)
325        # The dance to get into the request body
326        if req.has_key('ReleaseAccessRequestBody'):
327            req = req['ReleaseAccessRequestBody']
328        else:
329            raise service_error(service_error.req, "No request!?")
330
331        try:
332            if req['allocID'].has_key('localname'):
333                auth_attr = aid = req['allocID']['localname']
334            elif req['allocID'].has_key('fedid'):
335                aid = unicode(req['allocID']['fedid'])
336                auth_attr = req['allocID']['fedid']
337            else:
338                raise service_error(service_error.req,
339                        "Only localnames and fedids are understood")
340        except KeyError:
341            raise service_error(service_error.req, "Badly formed request")
342
343        self.log.debug("[access] deallocation requested for %s by %s" % \
344                (aid, fid))
345        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
346                with_proof=True)
347        if not access_ok:
348            self.log.debug("[access] deallocation denied for %s", aid)
349            raise service_error(service_error.access, "Access Denied")
350
351        self.state_lock.acquire()
352        if aid in self.allocation:
353            self.log.debug("Found allocation for %s" %aid)
354            self.clear_allocation_authorization(aid, state_attr='allocation')
355            del self.allocation[aid]
356            self.write_state()
357            self.state_lock.release()
358            # Remove the access cert
359            cf = "%s/%s.pem" % (self.certdir, aid)
360            self.log.debug("Removing %s" % cf)
361            os.remove(cf)
362            self.log.info("ReleaseAccess succeeded for %s" % fid)
363            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
364        else:
365            self.state_lock.release()
366            raise service_error(service_error.req, "No such allocation")
367
368    # These are subroutines for StartSegment
369    def generate_ns2(self, topo, expfn, softdir, connInfo):
370        """
371        Convert topo into an ns2 file, decorated with appropriate commands for
372        the particular testbed setup.  Convert all requests for software, etc
373        to point at the staged copies on this testbed and add the federation
374        startcommands.
375        """
376        class dragon_commands:
377            """
378            Functor to spit out approrpiate dragon commands for nodes listed in
379            the connectivity description.  The constructor makes a dict mapping
380            dragon nodes to their parameters and the __call__ checks each
381            element in turn for membership.
382            """
383            def __init__(self, map):
384                self.node_info = map
385
386            def __call__(self, e):
387                s = ""
388                if isinstance(e, topdl.Computer):
389                    if self.node_info.has_key(e.name):
390                        info = self.node_info[e.name]
391                        for ifname, vlan, type in info:
392                            for i in e.interface:
393                                if i.name == ifname:
394                                    addr = i.get_attribute('ip4_address')
395                                    subs = i.substrate[0]
396                                    break
397                            else:
398                                raise service_error(service_error.internal,
399                                        "No interface %s on element %s" % \
400                                                (ifname, e.name))
401                            # XXX: do netmask right
402                            if type =='link':
403                                s = ("tb-allow-external ${%s} " + \
404                                        "dragonportal ip %s vlan %s " + \
405                                        "netmask 255.255.255.0\n") % \
406                                        (topdl.to_tcl_name(e.name), addr, vlan)
407                            elif type =='lan':
408                                s = ("tb-allow-external ${%s} " + \
409                                        "dragonportal " + \
410                                        "ip %s vlan %s usurp %s\n") % \
411                                        (topdl.to_tcl_name(e.name), addr, 
412                                                vlan, subs)
413                            else:
414                                raise service_error(service_error_internal,
415                                        "Unknown DRAGON type %s" % type)
416                return s
417
418        class not_dragon:
419            """
420            Return true if a node is in the given map of dragon nodes.
421            """
422            def __init__(self, map):
423                self.nodes = set(map.keys())
424
425            def __call__(self, e):
426                return e.name not in self.nodes
427
428        def have_portals(top):
429            '''
430            Return true if the topology has a portal node
431            '''
432            # The else is on the for
433            for e in top.elements:
434                if isinstance(e, topdl.Computer) and e.get_attribute('portal'):
435                    return True
436            else:
437                return False
438
439
440        # Main line of generate_ns2
441        t = topo.clone()
442
443        # Create the map of nodes that need direct connections (dragon
444        # connections) from the connInfo
445        dragon_map = { }
446        for i in [ i for i in connInfo if i['type'] == 'transit']:
447            for a in i.get('fedAttr', []):
448                if a['attribute'] == 'vlan_id':
449                    vlan = a['value']
450                    break
451            else:
452                raise service_error(service_error.internal, "No vlan tag")
453            members = i.get('member', [])
454            if len(members) > 1: type = 'lan'
455            else: type = 'link'
456
457            try:
458                for m in members:
459                    if m['element'] in dragon_map:
460                        dragon_map[m['element']].append(( m['interface'], 
461                            vlan, type))
462                    else:
463                        dragon_map[m['element']] = [( m['interface'], 
464                            vlan, type),]
465            except KeyError:
466                raise service_error(service_error.req,
467                        "Missing connectivity info")
468
469        # Weed out the things we aren't going to instantiate: Segments, portal
470        # substrates, and portal interfaces.  (The copy in the for loop allows
471        # us to delete from e.elements in side the for loop).  While we're
472        # touching all the elements, we also adjust paths from the original
473        # testbed to local testbed paths and put the federation commands into
474        # the start commands
475        local = len(dragon_map) == 0 and not have_portals(t)
476        if local: routing = 'Static'
477        else: routing = 'Manual'
478
479        if local:
480            self.log.debug("Local experiment.")
481        for e in [e for e in t.elements]:
482            if isinstance(e, topdl.Segment):
483                t.elements.remove(e)
484            if isinstance(e, topdl.Computer):
485                self.add_kit(e, self.federation_software)
486                if e.get_attribute('portal') and self.portal_startcommand:
487                    # Add local portal support software
488                    self.add_kit(e, self.portal_software)
489                    # Portals never have a user-specified start command
490                    e.set_attribute('startup', self.portal_startcommand)
491                elif not local and self.node_startcommand:
492                    if e.get_attribute('startup'):
493                        e.set_attribute('startup', "%s \\$USER '%s'" % \
494                                (self.node_startcommand, 
495                                    e.get_attribute('startup')))
496                    else:
497                        e.set_attribute('startup', self.node_startcommand)
498
499                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
500                # Remove portal interfaces that do not connect to DRAGON
501                e.interface = [i for i in e.interface \
502                        if not i.get_attribute('portal') or i.name in dinf ]
503            # Fix software paths
504            for s in getattr(e, 'software', []):
505                s.location = re.sub("^.*/", softdir, s.location)
506
507        t.substrates = [ s.clone() for s in t.substrates ]
508        t.incorporate_elements()
509
510        # Customize the ns2 output for local portal commands and images
511        filters = []
512
513        if self.dragon_endpoint:
514            add_filter = not_dragon(dragon_map)
515            filters.append(dragon_commands(dragon_map))
516        else:
517            add_filter = None
518
519        if self.portal_command:
520            filters.append(topdl.generate_portal_command_filter(
521                self.portal_command, add_filter=add_filter))
522
523        if self.portal_image:
524            filters.append(topdl.generate_portal_image_filter(
525                self.portal_image))
526
527        if self.portal_type:
528            filters.append(topdl.generate_portal_hardware_filter(
529                self.portal_type))
530
531        # Convert to ns and write it out
532        expfile = topdl.topology_to_ns2(t, filters, routing=routing)
533        try:
534            f = open(expfn, "w")
535            print >>f, expfile
536            f.close()
537        except EnvironmentError:
538            raise service_error(service_error.internal,
539                    "Cannot write experiment file %s: %s" % (expfn,e))
540
541    def export_store_info(self, cf, proj, ename, connInfo):
542        """
543        For the export requests in the connection info, install the peer names
544        at the experiment controller via SetValue calls.
545        """
546
547        for c in connInfo:
548            for p in [ p for p in c.get('parameter', []) \
549                    if p.get('type', '') == 'output']:
550
551                if p.get('name', '') == 'peer':
552                    k = p.get('key', None)
553                    surl = p.get('store', None)
554                    if surl and k and k.index('/') != -1:
555                        value = "%s.%s.%s%s" % \
556                                (k[k.index('/')+1:], ename, proj, self.domain)
557                        req = { 'name': k, 'value': value }
558                        self.log.debug("Setting %s to %s on %s" % \
559                                (k, value, surl))
560                        self.call_SetValue(surl, req, cf)
561                    else:
562                        self.log.error("Bad export request: %s" % p)
563                elif p.get('name', '') == 'ssh_port':
564                    k = p.get('key', None)
565                    surl = p.get('store', None)
566                    if surl and k:
567                        req = { 'name': k, 'value': self.ssh_port }
568                        self.log.debug("Setting %s to %s on %s" % \
569                                (k, self.ssh_port, surl))
570                        self.call_SetValue(surl, req, cf)
571                    else:
572                        self.log.error("Bad export request: %s" % p)
573                else:
574                    self.log.error("Unknown export parameter: %s" % \
575                            p.get('name'))
576                    continue
577
578    def add_seer_node(self, topo, name, startup):
579        """
580        Add a seer node to the given topology, with the startup command passed
581        in.  Used by configure seer_services.
582        """
583        c_node = topdl.Computer(
584                name=name, 
585                os= topdl.OperatingSystem(
586                    attribute=[
587                    { 'attribute': 'osid', 
588                        'value': self.local_seer_image },
589                    ]),
590                attribute=[
591                    { 'attribute': 'startup', 'value': startup },
592                    ]
593                )
594        self.add_kit(c_node, self.local_seer_software)
595        topo.elements.append(c_node)
596
597    def configure_seer_services(self, services, topo, softdir):
598        """
599        Make changes to the topology required for the seer requests being made.
600        Specifically, add any control or master nodes required and set up the
601        start commands on the nodes to interconnect them.
602        """
603        local_seer = False      # True if we need to add a control node
604        collect_seer = False    # True if there is a seer-master node
605        seer_master= False      # True if we need to add the seer-master
606        for s in services:
607            s_name = s.get('name', '')
608            s_vis = s.get('visibility','')
609
610            if s_name  == 'local_seer_control' and s_vis == 'export':
611                local_seer = True
612            elif s_name == 'seer_master':
613                if s_vis == 'import':
614                    collect_seer = True
615                elif s_vis == 'export':
616                    seer_master = True
617       
618        # We've got the whole picture now, so add nodes if needed and configure
619        # them to interconnect properly.
620        if local_seer or seer_master:
621            # Copy local seer control node software to the tempdir
622            for l, f in self.local_seer_software:
623                base = os.path.basename(f)
624                copy_file(f, "%s/%s" % (softdir, base))
625        # If we're collecting seers somewhere the controllers need to talk to
626        # the master.  In testbeds that export the service, that will be a
627        # local node that we'll add below.  Elsewhere it will be the control
628        # portal that will port forward to the exporting master.
629        if local_seer:
630            if collect_seer:
631                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
632            else:
633                startup = self.local_seer_start
634            self.add_seer_node(topo, 'control', startup)
635        # If this is the seer master, add that node, too.
636        if seer_master:
637            self.add_seer_node(topo, 'seer-master', 
638                    "%s -R -n -R seer-master -R -A -R sink" % \
639                            self.seer_master_start)
640
641    def retrieve_software(self, topo, certfile, softdir):
642        """
643        Collect the software that nodes in the topology need loaded and stage
644        it locally.  This implies retrieving it from the experiment_controller
645        and placing it into softdir.  Certfile is used to prove that this node
646        has access to that data (it's the allocation/segment fedid).  Finally
647        local portal and federation software is also copied to the same staging
648        directory for simplicity - all software needed for experiment creation
649        is in softdir.
650        """
651        sw = set()
652        for e in topo.elements:
653            for s in getattr(e, 'software', []):
654                sw.add(s.location)
655        for s in sw:
656            self.log.debug("Retrieving %s" % s)
657            try:
658                get_url(s, certfile, softdir, log=self.log)
659            except:
660                t, v, st = sys.exc_info()
661                raise service_error(service_error.internal,
662                        "Error retrieving %s: %s" % (s, v))
663
664        # Copy local federation and portal node software to the tempdir
665        for s in (self.federation_software, self.portal_software):
666            for l, f in s:
667                base = os.path.basename(f)
668                copy_file(f, "%s/%s" % (softdir, base))
669
670
671    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
672        """
673        Gather common configuration files, retrieve or create an experiment
674        name and project name, and return the ssh_key filenames.  Create an
675        allocation log bound to the state log variable as well.
676        """
677        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
678                'seer_ca_pem', 'seer_node_pem')
679        ename = None
680        pubkey_base = None
681        secretkey_base = None
682        proj = None
683        user = None
684        alloc_log = None
685        nonce_experiment = False
686        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
687
688        self.state_lock.acquire()
689        if aid in self.allocation:
690            proj = self.allocation[aid].get('project', None)
691        self.state_lock.release()
692
693        if not proj:
694            raise service_error(service_error.internal, 
695                    "Can't find project for %s" %aid)
696
697        for a in attrs:
698            if a['attribute'] in configs:
699                try:
700                    self.log.debug("Retrieving %s from %s" % \
701                            (a['attribute'], a['value']))
702                    get_url(a['value'], certfile, tmpdir, log=self.log)
703                except:
704                    t, v, st = sys.exc_info()
705                    raise service_error(service_error.internal,
706                            "Error retrieving %s: %s" % (a.get('value', ""), v))
707            if a['attribute'] == 'ssh_pubkey':
708                pubkey_base = a['value'].rpartition('/')[2]
709            if a['attribute'] == 'ssh_secretkey':
710                secretkey_base = a['value'].rpartition('/')[2]
711            if a['attribute'] == 'experiment_name':
712                ename = a['value']
713
714        # Names longer than the emulab max are discarded
715        if ename and len(ename) <= self.max_name_len:
716            # Clean up the experiment name so that emulab will accept it.
717            ename = re.sub(vchars_re, '-', ename)
718
719        else:
720            ename = ""
721            for i in range(0,5):
722                ename += random.choice(string.ascii_letters)
723            nonce_experiment = True
724            self.log.warn("No experiment name or suggestion too long: " + \
725                    "picked one randomly: %s" % ename)
726
727        if not pubkey_base:
728            raise service_error(service_error.req, 
729                    "No public key attribute")
730
731        if not secretkey_base:
732            raise service_error(service_error.req, 
733                    "No secret key attribute")
734
735        self.state_lock.acquire()
736        if aid in self.allocation:
737            user = self.allocation[aid].get('user', None)
738            cert = self.allocation[aid].get('cert', None)
739            self.allocation[aid]['experiment'] = ename
740            self.allocation[aid]['nonce'] = nonce_experiment
741            self.allocation[aid]['log'] = [ ]
742            # Create a logger that logs to the experiment's state object as
743            # well as to the main log file.
744            alloc_log = logging.getLogger('fedd.access.%s' % ename)
745            h = logging.StreamHandler(
746                    list_log.list_log(self.allocation[aid]['log']))
747            # XXX: there should be a global one of these rather than
748            # repeating the code.
749            h.setFormatter(logging.Formatter(
750                "%(asctime)s %(name)s %(message)s",
751                        '%d %b %y %H:%M:%S'))
752            alloc_log.addHandler(h)
753            self.write_state()
754        self.state_lock.release()
755
756        if not user:
757            raise service_error(service_error.internal, 
758                    "Can't find creation user for %s" %aid)
759
760        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
761
762    def decorate_topology(self, info, t):
763        """
764        Copy the physical mapping and status onto the topology.  Used by
765        StartSegment and InfoSegment
766        """
767        def add_new(ann, attr):
768            for a in ann:
769                if a not in attr: attr.append(a)
770
771        def merge_os(os, e):
772            if len(e.os) == 0:
773                # No OS at all:
774                if os.get_attribute('emulab_access:image'):
775                    os.set_attribute('emulab_access:initial_image', 
776                            os.get_attribute('emulab_access:image'))
777                e.os = [ os ]
778            elif len(e.os) == 1:
779                # There's one OS, copy the initial image and replace
780                eos = e.os[0]
781                initial = eos.get_attribute('emulab_access:initial_image')
782                if initial:
783                    os.set_attribute('emulab_access:initial_image', initial)
784                e.os = [ os] 
785            else:
786                # Multiple OSes, replace or append
787                for eos in e.os:
788                    if os.name == eos.name:
789                        eos.version = os.version
790                        eos.version = os.distribution
791                        eos.version = os.distributionversion
792                        for a in os.attribute:
793                            if eos.get_attribute(a.attribute):
794                                eos.remove_attribute(a.attribute)
795                            eos.set_attribute(a.attribute, a.value)
796                        break
797                else:
798                    e.os.append(os)
799
800
801        if t is None: return
802        i = 0 # For fake debugging instantiation
803        # Copy the assigned names into the return topology
804        for e in t.elements:
805            if isinstance(e, topdl.Computer):
806                if not self.create_debug:
807                    if e.name in info.node:
808                        add_new(("%s%s" % 
809                            (info.node[e.name].pname, self.domain),),
810                            e.localname)
811                        add_new(("%s%s" % 
812                            (info.node[e.name].lname, self.domain),),
813                            e.localname)
814                        e.status = info.node[e.name].status
815                        os = info.node[e.name].getOS()
816                        if os: merge_os(os, e)
817                else:
818                    # Simple debugging assignment
819                    add_new(("node%d%s" % (i, self.domain),), e.localname)
820                    e.status = 'active'
821                    add_new(('testop1', 'testop2'), e.operation)
822                    i += 1
823
824        for s in t.substrates:
825            if s.name in info.subs:
826                sub = info.subs[s.name]
827                if sub.cap is not None:
828                    s.capacity = topdl.Capacity(sub.cap, 'max')
829                if sub.delay is not None:
830                    s.delay = topdl.Latency(sub.delay, 'max')
831        # XXX interfaces
832
833
834    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
835        """
836        Store key bits of experiment state in the global repository, including
837        the response that may need to be replayed, and return the response.
838        """
839        def get_localnames(t):
840            names = [ ]
841            for e in t.elements:
842                if isinstance(e, topdl.Computer):
843                    n = e.get_attribute('testbed')
844                    if n is not None and n not in names:
845                        names.append(n)
846            return names
847        i = 0
848        t = topo.clone()
849        self.decorate_topology(starter, t)
850        # Grab the log (this is some anal locking, but better safe than
851        # sorry)
852        self.state_lock.acquire()
853        # Put information about this testbed into the topdl
854        tb = topdl.Testbed(self.uri, "deter",
855                localname=get_localnames(t), 
856                attribute=[ 
857                    { 
858                        'attribute': 'project', 
859                        'value': self.allocation[aid]['project']
860                    },
861                    { 
862                        'attribute': 'experiment', 
863                        'value': self.allocation[aid]['experiment']
864                    }])
865        t.elements.append(tb)
866        logv = "".join(self.allocation[aid]['log'])
867        # It's possible that the StartSegment call gets retried (!).
868        # if the 'started' key is in the allocation, we'll return it rather
869        # than redo the setup.
870        self.allocation[aid]['started'] = { 
871                'allocID': alloc_id,
872                'allocationLog': logv,
873                'segmentdescription': { 
874                    'topdldescription': t.to_dict()
875                    },
876                'proof': proof.to_dict(),
877                }
878        self.allocation[aid]['topo'] = t
879        retval = copy.copy(self.allocation[aid]['started'])
880        self.write_state()
881        self.state_lock.release()
882        return retval
883   
884    # End of StartSegment support routines
885
886    def StartSegment(self, req, fid):
887        err = None  # Any service_error generated after tmpdir is created
888        rv = None   # Return value from segment creation
889
890        self.log.info("StartSegment called by %s" % fid)
891        try:
892            req = req['StartSegmentRequestBody']
893            auth_attr = req['allocID']['fedid']
894            topref = req['segmentdescription']['topdldescription']
895        except KeyError:
896            raise service_error(server_error.req, "Badly formed request")
897
898        connInfo = req.get('connection', [])
899        services = req.get('service', [])
900        aid = "%s" % auth_attr
901        attrs = req.get('fedAttr', [])
902
903        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
904                with_proof=True)
905        if not access_ok:
906            self.log.info("StartSegment for %s failed: access denied" % fid)
907            raise service_error(service_error.access, "Access denied")
908        else:
909            # See if this is a replay of an earlier succeeded StartSegment -
910            # sometimes SSL kills 'em.  If so, replay the response rather than
911            # redoing the allocation.
912            self.state_lock.acquire()
913            retval = self.allocation[aid].get('started', None)
914            self.state_lock.release()
915            if retval:
916                self.log.warning("Duplicate StartSegment for %s: " % aid + \
917                        "replaying response")
918                return retval
919
920        # A new request.  Do it.
921
922        if topref: topo = topdl.Topology(**topref)
923        else:
924            raise service_error(service_error.req, 
925                    "Request missing segmentdescription'")
926       
927        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
928        try:
929            tmpdir = tempfile.mkdtemp(prefix="access-")
930            softdir = "%s/software" % tmpdir
931            os.mkdir(softdir)
932        except EnvironmentError:
933            self.log.info("StartSegment for %s failed: internal error" % fid)
934            raise service_error(service_error.internal, "Cannot create tmp dir")
935
936        # Try block alllows us to clean up temporary files.
937        try:
938            self.retrieve_software(topo, certfile, softdir)
939            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
940                alloc_log =  self.initialize_experiment_info(attrs, aid, 
941                        certfile, tmpdir)
942            # A misconfigured cert in the ABAC map can be confusing...
943            if not os.access(xmlrpc_cert, os.R_OK):
944                self.log.error("Cannot open user's emulab SSL cert: %s" % \
945                        xmlrpc_cert)
946                raise service_error(service_error.internal,
947                        "Cannot open user's emulab SSL cert: %s" % xmlrpc_cert)
948
949
950            if '/' in proj: proj, gid = proj.split('/')
951            else: gid = None
952
953
954            # Set up userconf and seer if needed
955            self.configure_userconf(services, tmpdir)
956            self.configure_seer_services(services, topo, softdir)
957            # Get and send synch store variables
958            self.export_store_info(certfile, proj, ename, connInfo)
959            self.import_store_info(certfile, connInfo)
960
961            expfile = "%s/experiment.tcl" % tmpdir
962
963            self.generate_portal_configs(topo, pubkey_base, 
964                    secretkey_base, tmpdir, proj, ename, connInfo, services)
965            self.generate_ns2(topo, expfile, 
966                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
967
968            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
969                    debug=self.create_debug, log=alloc_log, boss=self.boss,
970                    ops=self.ops, cert=xmlrpc_cert)
971            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
972        except service_error, e:
973            self.log.info("StartSegment for %s failed: %s"  % (fid, e))
974            err = e
975        except:
976            t, v, st = sys.exc_info()
977            self.log.info("StartSegment for %s failed:unexpected error: %s" \
978                    % (fid, traceback.extract_tb(st)))
979            err = service_error(service_error.internal, "%s: %s" % \
980                    (v, traceback.extract_tb(st)))
981
982        # Walk up tmpdir, deleting as we go
983        if self.cleanup: self.remove_dirs(tmpdir)
984        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
985
986        if rv:
987            self.log.info("StartSegment for %s succeeded" % fid)
988            return self.finalize_experiment(starter, topo, aid, req['allocID'],
989                    proof)
990        elif err:
991            raise service_error(service_error.federant,
992                    "Swapin failed: %s" % err)
993        else:
994            raise service_error(service_error.federant, "Swapin failed")
995
996    def TerminateSegment(self, req, fid):
997        self.log.info("TerminateSegment called by %s" % fid)
998        try:
999            req = req['TerminateSegmentRequestBody']
1000        except KeyError:
1001            raise service_error(server_error.req, "Badly formed request")
1002
1003        auth_attr = req['allocID']['fedid']
1004        aid = "%s" % auth_attr
1005        attrs = req.get('fedAttr', [])
1006
1007        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1008                with_proof=True)
1009        if not access_ok:
1010            raise service_error(service_error.access, "Access denied")
1011
1012        self.state_lock.acquire()
1013        if aid in self.allocation:
1014            proj = self.allocation[aid].get('project', None)
1015            user = self.allocation[aid].get('user', None)
1016            xmlrpc_cert = self.allocation[aid].get('cert', None)
1017            ename = self.allocation[aid].get('experiment', None)
1018            nonce = self.allocation[aid].get('nonce', False)
1019        else:
1020            proj = None
1021            user = None
1022            ename = None
1023            nonce = False
1024            xmlrpc_cert = None
1025        self.state_lock.release()
1026
1027        if not proj:
1028            self.log.info("TerminateSegment failed for %s: cannot find project"\
1029                    % fid)
1030            raise service_error(service_error.internal, 
1031                    "Can't find project for %s" % aid)
1032        else:
1033            if '/' in proj: proj, gid = proj.split('/')
1034            else: gid = None
1035
1036        if not user:
1037            self.log.info("TerminateSegment failed for %s: cannot find user"\
1038                    % fid)
1039            raise service_error(service_error.internal, 
1040                    "Can't find creation user for %s" % aid)
1041        if not ename:
1042            self.log.info(
1043                    "TerminateSegment failed for %s: cannot find experiment"\
1044                    % fid)
1045            raise service_error(service_error.internal, 
1046                    "Can't find experiment name for %s" % aid)
1047        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1048                debug=self.create_debug, boss=self.boss, ops=self.ops,
1049                cert=xmlrpc_cert)
1050        stopper(self, user, proj, ename, gid, nonce)
1051        self.log.info("TerminateSegment succeeded for %s %s %s" % \
1052                (fid, proj, ename))
1053        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1054
1055    def InfoSegment(self, req, fid):
1056        self.log.info("InfoSegment called by %s" % fid)
1057        try:
1058            req = req['InfoSegmentRequestBody']
1059        except KeyError:
1060            raise service_error(server_error.req, "Badly formed request")
1061
1062        auth_attr = req['allocID']['fedid']
1063        aid = "%s" % auth_attr
1064
1065        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1066                with_proof=True)
1067        if not access_ok:
1068            raise service_error(service_error.access, "Access denied")
1069
1070        self.state_lock.acquire()
1071        if aid in self.allocation:
1072            topo = self.allocation[aid].get('topo', None)
1073            if topo: topo = topo.clone()
1074            proj = self.allocation[aid].get('project', None)
1075            user = self.allocation[aid].get('user', None)
1076            xmlrpc_cert = self.allocation[aid].get('cert', None)
1077            ename = self.allocation[aid].get('experiment', None)
1078        else:
1079            proj = None
1080            user = None
1081            ename = None
1082            topo = None
1083            xmlrpc_cert = None
1084        self.state_lock.release()
1085
1086        if not proj:
1087            self.log.info("InfoSegment failed for %s: cannot find project"% fid)
1088            raise service_error(service_error.internal, 
1089                    "Can't find project for %s" % aid)
1090        else:
1091            if '/' in proj: proj, gid = proj.split('/')
1092            else: gid = None
1093
1094        if not user:
1095            self.log.info("InfoSegment failed for %s: cannot find user"% fid)
1096            raise service_error(service_error.internal, 
1097                    "Can't find creation user for %s" % aid)
1098        if not ename:
1099            self.log.info("InfoSegment failed for %s: cannot find exp"% fid)
1100            raise service_error(service_error.internal, 
1101                    "Can't find experiment name for %s" % aid)
1102        info = self.info_segment(keyfile=self.ssh_privkey_file,
1103                debug=self.create_debug, boss=self.boss, ops=self.ops,
1104                cert=xmlrpc_cert)
1105        info(self, user, proj, ename)
1106        self.log.info("InfoSegment gathered info for %s %s %s %s" % \
1107                (fid, user, proj, ename))
1108        self.decorate_topology(info, topo)
1109        self.state_lock.acquire()
1110        if aid in self.allocation:
1111            self.allocation[aid]['topo'] = topo
1112            self.write_state()
1113        self.state_lock.release()
1114        self.log.info("InfoSegment updated info for %s %s %s %s" % \
1115                (fid, user, proj, ename))
1116
1117        rv = { 
1118                'allocID': req['allocID'], 
1119                'proof': proof.to_dict(),
1120                }
1121        self.log.info("InfoSegment succeeded info for %s %s %s %s" % \
1122                (fid, user, proj, ename))
1123        if topo:
1124            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1125        return rv
1126
1127    def OperationSegment(self, req, fid):
1128        def get_pname(e):
1129            """
1130            Get the physical name of a node
1131            """
1132            if e.localname:
1133                return re.sub('\..*','', e.localname[0])
1134            else:
1135                return None
1136
1137        self.log.info("OperationSegment called by %s" % fid)
1138        try:
1139            req = req['OperationSegmentRequestBody']
1140        except KeyError:
1141            raise service_error(server_error.req, "Badly formed request")
1142
1143        auth_attr = req['allocID']['fedid']
1144        aid = "%s" % auth_attr
1145
1146        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1147                with_proof=True)
1148        if not access_ok:
1149            self.log.info("OperationSegment failed for %s: access denied" % fid)
1150            raise service_error(service_error.access, "Access denied")
1151
1152        op = req.get('operation', None)
1153        targets = req.get('target', None)
1154        params = req.get('parameter', None)
1155
1156        if op is None :
1157            self.log.info("OperationSegment failed for %s: no operation" % fid)
1158            raise service_error(service_error.req, "missing operation")
1159        elif targets is None:
1160            self.log.info("OperationSegment failed for %s: no targets" % fid)
1161            raise service_error(service_error.req, "no targets")
1162
1163        self.state_lock.acquire()
1164        if aid in self.allocation:
1165            topo = self.allocation[aid].get('topo', None)
1166            if topo: topo = topo.clone()
1167            xmlrpc_cert = self.allocation[aid].get('cert', None)
1168        else:
1169            topo = None
1170            xmlrpc_cert = None
1171        self.state_lock.release()
1172
1173        targets = copy.copy(targets)
1174        ptargets = { }
1175        for e in topo.elements:
1176            if isinstance(e, topdl.Computer):
1177                if e.name in targets:
1178                    targets.remove(e.name)
1179                    pn = get_pname(e)
1180                    if pn: ptargets[e.name] = pn
1181
1182        status = [ operation_status(t, operation_status.no_target) \
1183                for t in targets]
1184
1185        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1186                debug=self.create_debug, boss=self.boss, ops=self.ops,
1187                cert=xmlrpc_cert)
1188        ops(self, op, ptargets, params, topo)
1189        self.log.info("OperationSegment operated for %s" % fid)
1190       
1191        status.extend(ops.status)
1192        self.log.info("OperationSegment succeed for %s" % fid)
1193
1194        return { 
1195                'allocID': req['allocID'], 
1196                'status': [s.to_dict() for s in status],
1197                'proof': proof.to_dict(),
1198                }
Note: See TracBrowser for help on using the repository browser.