source: fedd/federation/emulab_access.py @ 4d68ba6

compt_changes
Last change on this file since 4d68ba6 was 4d68ba6, checked in by Ted Faber <faber@…>, 12 years ago

Do not add a startcommand to non-federated experiments

  • Property mode set to 100644
File size: 37.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from deter import fedid, generate_fedid
21from authorizer import authorizer, abac_authorizer
22from service_error import service_error
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from proof import proof as access_proof
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30from deter import topdl
31import list_log
32import emulab_segment
33
34
35# Make log messages disappear if noone configures a fedd logger
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.access")
40fl.addHandler(nullHandler())
41
42class access(access_base):
43    """
44    The implementation of access control based on mapping users to projects.
45
46    Users can be mapped to existing projects or have projects created
47    dynamically.  This implements both direct requests and proxies.
48    """
49
50    max_name_len = 19
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.max_name_len = access.max_name_len
60
61        self.allow_proxy = config.getboolean("access", "allow_proxy")
62
63        self.domain = config.get("access", "domain")
64        self.userconfdir = config.get("access","userconfdir")
65        self.userconfcmd = config.get("access","userconfcmd")
66        self.userconfurl = config.get("access","userconfurl")
67        self.federation_software = config.get("access", "federation_software")
68        self.portal_software = config.get("access", "portal_software")
69        self.local_seer_software = config.get("access", "local_seer_software")
70        self.local_seer_image = config.get("access", "local_seer_image")
71        self.local_seer_start = config.get("access", "local_seer_start")
72        self.seer_master_start = config.get("access", "seer_master_start")
73        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
74        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
75        self.ssh_port = config.get("access","ssh_port") or "22"
76        self.boss = config.get("access", "boss")
77        self.ops = config.get("access", "ops")
78        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
79        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
80
81        self.dragon_endpoint = config.get("access", "dragon")
82        self.dragon_vlans = config.get("access", "dragon_vlans")
83        self.deter_internal = config.get("access", "deter_internal")
84
85        self.tunnel_config = config.getboolean("access", "tunnel_config")
86        self.portal_command = config.get("access", "portal_command")
87        self.portal_image = config.get("access", "portal_image")
88        self.portal_type = config.get("access", "portal_type") or "pc"
89        self.portal_startcommand = config.get("access", "portal_startcommand")
90        self.node_startcommand = config.get("access", "node_startcommand")
91
92        self.federation_software = self.software_list(self.federation_software)
93        self.portal_software = self.software_list(self.portal_software)
94        self.local_seer_software = self.software_list(self.local_seer_software)
95
96        self.access_type = self.access_type.lower()
97        self.start_segment = emulab_segment.start_segment
98        self.stop_segment = emulab_segment.stop_segment
99        self.info_segment = emulab_segment.info_segment
100        self.operation_segment = emulab_segment.operation_segment
101
102        self.restricted = [ ]
103        tb = config.get('access', 'testbed')
104        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
105        else: self.testbed = [ ]
106
107        # authorization information
108        self.auth_type = config.get('access', 'auth_type') \
109                or 'abac'
110        self.auth_dir = config.get('access', 'auth_dir')
111        accessdb = config.get("access", "accessdb")
112        # initialize the authorization system
113        if self.auth_type == 'abac':
114            self.auth = abac_authorizer(load=self.auth_dir)
115            self.access = [ ]
116            if accessdb:
117                self.read_access(accessdb, self.access_tuple)
118        else:
119            raise service_error(service_error.internal, 
120                    "Unknown auth_type: %s" % self.auth_type)
121
122        # read_state in the base_class
123        self.state_lock.acquire()
124        if 'allocation' not in self.state: self.state['allocation']= { }
125        self.allocation = self.state['allocation']
126        self.state_lock.release()
127        self.exports = {
128                'SMB': self.export_SMB,
129                'seer': self.export_seer,
130                'tmcd': self.export_tmcd,
131                'userconfig': self.export_userconfig,
132                'project_export': self.export_project_export,
133                'local_seer_control': self.export_local_seer,
134                'seer_master': self.export_seer_master,
135                'hide_hosts': self.export_hide_hosts,
136                }
137
138        if not self.local_seer_image or not self.local_seer_software or \
139                not self.local_seer_start:
140            if 'local_seer_control' in self.exports:
141                del self.exports['local_seer_control']
142
143        if not self.local_seer_image or not self.local_seer_software or \
144                not self.seer_master_start:
145            if 'seer_master' in self.exports:
146                del self.exports['seer_master']
147
148
149        self.soap_services = {\
150            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
151            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
152            'StartSegment': soap_handler("StartSegment", self.StartSegment),
153            'TerminateSegment': soap_handler("TerminateSegment",
154                self.TerminateSegment),
155            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
156            'OperationSegment': soap_handler("OperationSegment",
157                self.OperationSegment),
158            }
159        self.xmlrpc_services =  {\
160            'RequestAccess': xmlrpc_handler('RequestAccess',
161                self.RequestAccess),
162            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
163                self.ReleaseAccess),
164            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
165            'TerminateSegment': xmlrpc_handler('TerminateSegment',
166                self.TerminateSegment),
167            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
168            'OperationSegment': xmlrpc_handler("OperationSegment",
169                self.OperationSegment),
170            }
171
172        self.call_SetValue = service_caller('SetValue')
173        self.call_GetValue = service_caller('GetValue', log=self.log)
174
175    @staticmethod
176    def access_tuple(str):
177        """
178        Convert a string of the form (id, id, id) into an access_project.  This
179        is called by read_access to convert to local attributes.  It returns a
180        tuple of the form (project, user, certificate_file).
181        """
182
183        str = str.strip()
184        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
185            # The slice takes the parens off the string.
186            proj, user, cert = str[1:-1].split(',')
187            return (proj.strip(), user.strip(), cert.strip())
188        else:
189            raise self.parse_error(
190                    'Bad mapping (unbalanced parens or more than 2 commas)')
191
192    # RequestAccess support routines
193
194    def save_project_state(self, aid, pname, uname, certf, owners):
195        """
196        Save the project, user, and owners associated with this allocation.
197        This creates the allocation entry.
198        """
199        self.state_lock.acquire()
200        self.allocation[aid] = { }
201        self.allocation[aid]['project'] = pname
202        self.allocation[aid]['user'] = uname
203        self.allocation[aid]['cert'] = certf
204        self.allocation[aid]['owners'] = owners
205        self.write_state()
206        self.state_lock.release()
207        return (pname, uname)
208
209    # End of RequestAccess support routines
210
211    def RequestAccess(self, req, fid):
212        """
213        Handle the access request.  Proxy if not for us.
214
215        Parse out the fields and make the allocations or rejections if for us,
216        otherwise, assuming we're willing to proxy, proxy the request out.
217        """
218
219        def gateway_hardware(h):
220            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
221            else: return h
222
223        def get_export_project(svcs):
224            """
225            if the service requests includes one to export a project, return
226            that project.
227            """
228            rv = None
229            for s in svcs:
230                if s.get('name', '') == 'project_export' and \
231                        s.get('visibility', '') == 'export':
232                    if not rv: 
233                        for a in s.get('fedAttr', []):
234                            if a.get('attribute', '') == 'project' \
235                                    and 'value' in a:
236                                rv = a['value']
237                    else:
238                        raise service_error(service_error, access, 
239                                'Requesting multiple project exports is ' + \
240                                        'not supported');
241            return rv
242
243        self.log.info("RequestAccess called by %s" % fid)
244        # The dance to get into the request body
245        if req.has_key('RequestAccessRequestBody'):
246            req = req['RequestAccessRequestBody']
247        else:
248            raise service_error(service_error.req, "No request!?")
249
250        # if this includes a project export request, construct a filter such
251        # that only the ABAC attributes mapped to that project are checked for
252        # access.
253        if 'service' in req:
254            ep = get_export_project(req['service'])
255            if ep: pf = lambda(a): a.value[0] == ep
256            else: pf = None
257        else:
258            ep = None
259            pf = None
260
261        if self.auth.import_credentials(
262                data_list=req.get('abac_credential', [])):
263            self.auth.save()
264        else:
265            self.log.debug('failed to import incoming credentials')
266
267        if self.auth_type == 'abac':
268            found, owners, proof = self.lookup_access(req, fid, filter=pf)
269        else:
270            raise service_error(service_error.internal, 
271                    'Unknown auth_type: %s' % self.auth_type)
272        ap = None
273
274        # keep track of what's been added
275        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
276        aid = unicode(allocID)
277
278        pname, uname = self.save_project_state(aid, found[0], found[1], 
279                found[2], owners)
280
281        services, svc_state = self.export_services(req.get('service',[]),
282                pname, uname)
283        self.state_lock.acquire()
284        # Store services state in global state
285        for k, v in svc_state.items():
286            self.allocation[aid][k] = v
287        self.append_allocation_authorization(aid, 
288                set([(o, allocID) for o in owners]), state_attr='allocation')
289        self.write_state()
290        self.state_lock.release()
291        try:
292            f = open("%s/%s.pem" % (self.certdir, aid), "w")
293            print >>f, alloc_cert
294            f.close()
295        except EnvironmentError, e:
296            self.log.info("RequestAccess failed for by %s: internal error" \
297                    % fid)
298            raise service_error(service_error.internal, 
299                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
300        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
301        resp = self.build_access_response({ 'fedid': allocID } ,
302                pname, services, proof)
303        return resp
304
305    def ReleaseAccess(self, req, fid):
306        self.log.info("ReleaseAccess called by %s" % fid)
307        # The dance to get into the request body
308        if req.has_key('ReleaseAccessRequestBody'):
309            req = req['ReleaseAccessRequestBody']
310        else:
311            raise service_error(service_error.req, "No request!?")
312
313        try:
314            if req['allocID'].has_key('localname'):
315                auth_attr = aid = req['allocID']['localname']
316            elif req['allocID'].has_key('fedid'):
317                aid = unicode(req['allocID']['fedid'])
318                auth_attr = req['allocID']['fedid']
319            else:
320                raise service_error(service_error.req,
321                        "Only localnames and fedids are understood")
322        except KeyError:
323            raise service_error(service_error.req, "Badly formed request")
324
325        self.log.debug("[access] deallocation requested for %s by %s" % \
326                (aid, fid))
327        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
328                with_proof=True)
329        if not access_ok:
330            self.log.debug("[access] deallocation denied for %s", aid)
331            raise service_error(service_error.access, "Access Denied")
332
333        self.state_lock.acquire()
334        if aid in self.allocation:
335            self.log.debug("Found allocation for %s" %aid)
336            self.clear_allocation_authorization(aid, state_attr='allocation')
337            del self.allocation[aid]
338            self.write_state()
339            self.state_lock.release()
340            # Remove the access cert
341            cf = "%s/%s.pem" % (self.certdir, aid)
342            self.log.debug("Removing %s" % cf)
343            os.remove(cf)
344            self.log.info("ReleaseAccess succeeded for %s" % fid)
345            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
346        else:
347            self.state_lock.release()
348            raise service_error(service_error.req, "No such allocation")
349
350    # These are subroutines for StartSegment
351    def generate_ns2(self, topo, expfn, softdir, connInfo):
352        """
353        Convert topo into an ns2 file, decorated with appropriate commands for
354        the particular testbed setup.  Convert all requests for software, etc
355        to point at the staged copies on this testbed and add the federation
356        startcommands.
357        """
358        class dragon_commands:
359            """
360            Functor to spit out approrpiate dragon commands for nodes listed in
361            the connectivity description.  The constructor makes a dict mapping
362            dragon nodes to their parameters and the __call__ checks each
363            element in turn for membership.
364            """
365            def __init__(self, map):
366                self.node_info = map
367
368            def __call__(self, e):
369                s = ""
370                if isinstance(e, topdl.Computer):
371                    if self.node_info.has_key(e.name):
372                        info = self.node_info[e.name]
373                        for ifname, vlan, type in info:
374                            for i in e.interface:
375                                if i.name == ifname:
376                                    addr = i.get_attribute('ip4_address')
377                                    subs = i.substrate[0]
378                                    break
379                            else:
380                                raise service_error(service_error.internal,
381                                        "No interface %s on element %s" % \
382                                                (ifname, e.name))
383                            # XXX: do netmask right
384                            if type =='link':
385                                s = ("tb-allow-external ${%s} " + \
386                                        "dragonportal ip %s vlan %s " + \
387                                        "netmask 255.255.255.0\n") % \
388                                        (topdl.to_tcl_name(e.name), addr, vlan)
389                            elif type =='lan':
390                                s = ("tb-allow-external ${%s} " + \
391                                        "dragonportal " + \
392                                        "ip %s vlan %s usurp %s\n") % \
393                                        (topdl.to_tcl_name(e.name), addr, 
394                                                vlan, subs)
395                            else:
396                                raise service_error(service_error_internal,
397                                        "Unknown DRAGON type %s" % type)
398                return s
399
400        class not_dragon:
401            """
402            Return true if a node is in the given map of dragon nodes.
403            """
404            def __init__(self, map):
405                self.nodes = set(map.keys())
406
407            def __call__(self, e):
408                return e.name not in self.nodes
409
410        def have_portals(top):
411            '''
412            Return true if the topology has a portal node
413            '''
414            # The else is on the for
415            for e in top.elements:
416                if isinstance(e, topdl.Computer) and e.get_attribute('portal'):
417                    return True
418            else:
419                return False
420
421
422        # Main line of generate_ns2
423        t = topo.clone()
424
425        # Create the map of nodes that need direct connections (dragon
426        # connections) from the connInfo
427        dragon_map = { }
428        for i in [ i for i in connInfo if i['type'] == 'transit']:
429            for a in i.get('fedAttr', []):
430                if a['attribute'] == 'vlan_id':
431                    vlan = a['value']
432                    break
433            else:
434                raise service_error(service_error.internal, "No vlan tag")
435            members = i.get('member', [])
436            if len(members) > 1: type = 'lan'
437            else: type = 'link'
438
439            try:
440                for m in members:
441                    if m['element'] in dragon_map:
442                        dragon_map[m['element']].append(( m['interface'], 
443                            vlan, type))
444                    else:
445                        dragon_map[m['element']] = [( m['interface'], 
446                            vlan, type),]
447            except KeyError:
448                raise service_error(service_error.req,
449                        "Missing connectivity info")
450
451        # Weed out the things we aren't going to instantiate: Segments, portal
452        # substrates, and portal interfaces.  (The copy in the for loop allows
453        # us to delete from e.elements in side the for loop).  While we're
454        # touching all the elements, we also adjust paths from the original
455        # testbed to local testbed paths and put the federation commands into
456        # the start commands
457        local = len(dragon_map) == 0 and not have_portals(t)
458        if local: routing = 'Static'
459        else: routing = 'Manual'
460
461        self.log.debug("Local: %s", local)
462        for e in [e for e in t.elements]:
463            if isinstance(e, topdl.Segment):
464                t.elements.remove(e)
465            if isinstance(e, topdl.Computer):
466                self.add_kit(e, self.federation_software)
467                if e.get_attribute('portal') and self.portal_startcommand:
468                    # Add local portal support software
469                    self.add_kit(e, self.portal_software)
470                    # Portals never have a user-specified start command
471                    e.set_attribute('startup', self.portal_startcommand)
472                elif not local and self.node_startcommand:
473                    if e.get_attribute('startup'):
474                        e.set_attribute('startup', "%s \\$USER '%s'" % \
475                                (self.node_startcommand, 
476                                    e.get_attribute('startup')))
477                    else:
478                        e.set_attribute('startup', self.node_startcommand)
479
480                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
481                # Remove portal interfaces that do not connect to DRAGON
482                e.interface = [i for i in e.interface \
483                        if not i.get_attribute('portal') or i.name in dinf ]
484            # Fix software paths
485            for s in getattr(e, 'software', []):
486                s.location = re.sub("^.*/", softdir, s.location)
487
488        t.substrates = [ s.clone() for s in t.substrates ]
489        t.incorporate_elements()
490
491        # Customize the ns2 output for local portal commands and images
492        filters = []
493
494        if self.dragon_endpoint:
495            add_filter = not_dragon(dragon_map)
496            filters.append(dragon_commands(dragon_map))
497        else:
498            add_filter = None
499
500        if self.portal_command:
501            filters.append(topdl.generate_portal_command_filter(
502                self.portal_command, add_filter=add_filter))
503
504        if self.portal_image:
505            filters.append(topdl.generate_portal_image_filter(
506                self.portal_image))
507
508        if self.portal_type:
509            filters.append(topdl.generate_portal_hardware_filter(
510                self.portal_type))
511
512        # Convert to ns and write it out
513        expfile = topdl.topology_to_ns2(t, filters, routing=routing)
514        try:
515            f = open(expfn, "w")
516            print >>f, expfile
517            f.close()
518        except EnvironmentError:
519            raise service_error(service_error.internal,
520                    "Cannot write experiment file %s: %s" % (expfn,e))
521
522    def export_store_info(self, cf, proj, ename, connInfo):
523        """
524        For the export requests in the connection info, install the peer names
525        at the experiment controller via SetValue calls.
526        """
527
528        for c in connInfo:
529            for p in [ p for p in c.get('parameter', []) \
530                    if p.get('type', '') == 'output']:
531
532                if p.get('name', '') == 'peer':
533                    k = p.get('key', None)
534                    surl = p.get('store', None)
535                    if surl and k and k.index('/') != -1:
536                        value = "%s.%s.%s%s" % \
537                                (k[k.index('/')+1:], ename, proj, self.domain)
538                        req = { 'name': k, 'value': value }
539                        self.log.debug("Setting %s to %s on %s" % \
540                                (k, value, surl))
541                        self.call_SetValue(surl, req, cf)
542                    else:
543                        self.log.error("Bad export request: %s" % p)
544                elif p.get('name', '') == 'ssh_port':
545                    k = p.get('key', None)
546                    surl = p.get('store', None)
547                    if surl and k:
548                        req = { 'name': k, 'value': self.ssh_port }
549                        self.log.debug("Setting %s to %s on %s" % \
550                                (k, self.ssh_port, surl))
551                        self.call_SetValue(surl, req, cf)
552                    else:
553                        self.log.error("Bad export request: %s" % p)
554                else:
555                    self.log.error("Unknown export parameter: %s" % \
556                            p.get('name'))
557                    continue
558
559    def add_seer_node(self, topo, name, startup):
560        """
561        Add a seer node to the given topology, with the startup command passed
562        in.  Used by configure seer_services.
563        """
564        c_node = topdl.Computer(
565                name=name, 
566                os= topdl.OperatingSystem(
567                    attribute=[
568                    { 'attribute': 'osid', 
569                        'value': self.local_seer_image },
570                    ]),
571                attribute=[
572                    { 'attribute': 'startup', 'value': startup },
573                    ]
574                )
575        self.add_kit(c_node, self.local_seer_software)
576        topo.elements.append(c_node)
577
578    def configure_seer_services(self, services, topo, softdir):
579        """
580        Make changes to the topology required for the seer requests being made.
581        Specifically, add any control or master nodes required and set up the
582        start commands on the nodes to interconnect them.
583        """
584        local_seer = False      # True if we need to add a control node
585        collect_seer = False    # True if there is a seer-master node
586        seer_master= False      # True if we need to add the seer-master
587        for s in services:
588            s_name = s.get('name', '')
589            s_vis = s.get('visibility','')
590
591            if s_name  == 'local_seer_control' and s_vis == 'export':
592                local_seer = True
593            elif s_name == 'seer_master':
594                if s_vis == 'import':
595                    collect_seer = True
596                elif s_vis == 'export':
597                    seer_master = True
598       
599        # We've got the whole picture now, so add nodes if needed and configure
600        # them to interconnect properly.
601        if local_seer or seer_master:
602            # Copy local seer control node software to the tempdir
603            for l, f in self.local_seer_software:
604                base = os.path.basename(f)
605                copy_file(f, "%s/%s" % (softdir, base))
606        # If we're collecting seers somewhere the controllers need to talk to
607        # the master.  In testbeds that export the service, that will be a
608        # local node that we'll add below.  Elsewhere it will be the control
609        # portal that will port forward to the exporting master.
610        if local_seer:
611            if collect_seer:
612                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
613            else:
614                startup = self.local_seer_start
615            self.add_seer_node(topo, 'control', startup)
616        # If this is the seer master, add that node, too.
617        if seer_master:
618            self.add_seer_node(topo, 'seer-master', 
619                    "%s -R -n -R seer-master -R -A -R sink" % \
620                            self.seer_master_start)
621
622    def retrieve_software(self, topo, certfile, softdir):
623        """
624        Collect the software that nodes in the topology need loaded and stage
625        it locally.  This implies retrieving it from the experiment_controller
626        and placing it into softdir.  Certfile is used to prove that this node
627        has access to that data (it's the allocation/segment fedid).  Finally
628        local portal and federation software is also copied to the same staging
629        directory for simplicity - all software needed for experiment creation
630        is in softdir.
631        """
632        sw = set()
633        for e in topo.elements:
634            for s in getattr(e, 'software', []):
635                sw.add(s.location)
636        for s in sw:
637            self.log.debug("Retrieving %s" % s)
638            try:
639                get_url(s, certfile, softdir)
640            except:
641                t, v, st = sys.exc_info()
642                raise service_error(service_error.internal,
643                        "Error retrieving %s: %s" % (s, v))
644
645        # Copy local federation and portal node software to the tempdir
646        for s in (self.federation_software, self.portal_software):
647            for l, f in s:
648                base = os.path.basename(f)
649                copy_file(f, "%s/%s" % (softdir, base))
650
651
652    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
653        """
654        Gather common configuration files, retrieve or create an experiment
655        name and project name, and return the ssh_key filenames.  Create an
656        allocation log bound to the state log variable as well.
657        """
658        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
659                'seer_ca_pem', 'seer_node_pem')
660        ename = None
661        pubkey_base = None
662        secretkey_base = None
663        proj = None
664        user = None
665        alloc_log = None
666        nonce_experiment = False
667        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
668
669        self.state_lock.acquire()
670        if aid in self.allocation:
671            proj = self.allocation[aid].get('project', None)
672        self.state_lock.release()
673
674        if not proj:
675            raise service_error(service_error.internal, 
676                    "Can't find project for %s" %aid)
677
678        for a in attrs:
679            if a['attribute'] in configs:
680                try:
681                    self.log.debug("Retrieving %s from %s" % \
682                            (a['attribute'], a['value']))
683                    get_url(a['value'], certfile, tmpdir)
684                except:
685                    t, v, st = sys.exc_info()
686                    raise service_error(service_error.internal,
687                            "Error retrieving %s: %s" % (a.get('value', ""), v))
688            if a['attribute'] == 'ssh_pubkey':
689                pubkey_base = a['value'].rpartition('/')[2]
690            if a['attribute'] == 'ssh_secretkey':
691                secretkey_base = a['value'].rpartition('/')[2]
692            if a['attribute'] == 'experiment_name':
693                ename = a['value']
694
695        # Names longer than the emulab max are discarded
696        if ename and len(ename) <= self.max_name_len:
697            # Clean up the experiment name so that emulab will accept it.
698            ename = re.sub(vchars_re, '-', ename)
699
700        else:
701            ename = ""
702            for i in range(0,5):
703                ename += random.choice(string.ascii_letters)
704            nonce_experiment = True
705            self.log.warn("No experiment name or suggestion too long: " + \
706                    "picked one randomly: %s" % ename)
707
708        if not pubkey_base:
709            raise service_error(service_error.req, 
710                    "No public key attribute")
711
712        if not secretkey_base:
713            raise service_error(service_error.req, 
714                    "No secret key attribute")
715
716        self.state_lock.acquire()
717        if aid in self.allocation:
718            user = self.allocation[aid].get('user', None)
719            cert = self.allocation[aid].get('cert', None)
720            self.allocation[aid]['experiment'] = ename
721            self.allocation[aid]['nonce'] = nonce_experiment
722            self.allocation[aid]['log'] = [ ]
723            # Create a logger that logs to the experiment's state object as
724            # well as to the main log file.
725            alloc_log = logging.getLogger('fedd.access.%s' % ename)
726            h = logging.StreamHandler(
727                    list_log.list_log(self.allocation[aid]['log']))
728            # XXX: there should be a global one of these rather than
729            # repeating the code.
730            h.setFormatter(logging.Formatter(
731                "%(asctime)s %(name)s %(message)s",
732                        '%d %b %y %H:%M:%S'))
733            alloc_log.addHandler(h)
734            self.write_state()
735        self.state_lock.release()
736
737        if not user:
738            raise service_error(service_error.internal, 
739                    "Can't find creation user for %s" %aid)
740
741        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
742
743    def decorate_topology(self, info, t):
744        """
745        Copy the physical mapping and status onto the topology.  Used by
746        StartSegment and InfoSegment
747        """
748        def add_new(ann, attr):
749            for a in ann:
750                if a not in attr: attr.append(a)
751
752        def merge_os(os, e):
753            if len(e.os) == 0:
754                # No OS at all:
755                if os.get_attribute('emulab_access:image'):
756                    os.set_attribute('emulab_access:initial_image', 
757                            os.get_attribute('emulab_access:image'))
758                e.os = [ os ]
759            elif len(e.os) == 1:
760                # There's one OS, copy the initial image and replace
761                eos = e.os[0]
762                initial = eos.get_attribute('emulab_access:initial_image')
763                if initial:
764                    os.set_attribute('emulab_access:initial_image', initial)
765                e.os = [ os] 
766            else:
767                # Multiple OSes, replace or append
768                for eos in e.os:
769                    if os.name == eos.name:
770                        eos.version = os.version
771                        eos.version = os.distribution
772                        eos.version = os.distributionversion
773                        for a in os.attribute:
774                            if eos.get_attribute(a.attribute):
775                                eos.remove_attribute(a.attribute)
776                            eos.set_attribute(a.attribute, a.value)
777                        break
778                else:
779                    e.os.append(os)
780
781
782        if t is None: return
783        i = 0 # For fake debugging instantiation
784        # Copy the assigned names into the return topology
785        for e in t.elements:
786            if isinstance(e, topdl.Computer):
787                if not self.create_debug:
788                    if e.name in info.node:
789                        add_new(("%s%s" % 
790                            (info.node[e.name].pname, self.domain),),
791                            e.localname)
792                        add_new(("%s%s" % 
793                            (info.node[e.name].lname, self.domain),),
794                            e.localname)
795                        e.status = info.node[e.name].status
796                        os = info.node[e.name].getOS()
797                        if os: merge_os(os, e)
798                else:
799                    # Simple debugging assignment
800                    add_new(("node%d%s" % (i, self.domain),), e.localname)
801                    e.status = 'active'
802                    add_new(('testop1', 'testop2'), e.operation)
803                    i += 1
804
805        for s in t.substrates:
806            if s.name in info.subs:
807                sub = info.subs[s.name]
808                if sub.cap is not None:
809                    s.capacity = topdl.Capacity(sub.cap, 'max')
810                if sub.delay is not None:
811                    s.delay = topdl.Latency(sub.delay, 'max')
812        # XXX interfaces
813
814
815    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
816        """
817        Store key bits of experiment state in the global repository, including
818        the response that may need to be replayed, and return the response.
819        """
820        i = 0
821        t = topo.clone()
822        self.decorate_topology(starter, t)
823        # Grab the log (this is some anal locking, but better safe than
824        # sorry)
825        self.state_lock.acquire()
826        logv = "".join(self.allocation[aid]['log'])
827        # It's possible that the StartSegment call gets retried (!).
828        # if the 'started' key is in the allocation, we'll return it rather
829        # than redo the setup.
830        self.allocation[aid]['started'] = { 
831                'allocID': alloc_id,
832                'allocationLog': logv,
833                'segmentdescription': { 
834                    'topdldescription': t.to_dict()
835                    },
836                'proof': proof.to_dict(),
837                }
838        self.allocation[aid]['topo'] = t
839        retval = copy.copy(self.allocation[aid]['started'])
840        self.write_state()
841        self.state_lock.release()
842        return retval
843   
844    # End of StartSegment support routines
845
846    def StartSegment(self, req, fid):
847        err = None  # Any service_error generated after tmpdir is created
848        rv = None   # Return value from segment creation
849
850        self.log.info("StartSegment called by %s" % fid)
851        try:
852            req = req['StartSegmentRequestBody']
853            auth_attr = req['allocID']['fedid']
854            topref = req['segmentdescription']['topdldescription']
855        except KeyError:
856            raise service_error(server_error.req, "Badly formed request")
857
858        connInfo = req.get('connection', [])
859        services = req.get('service', [])
860        aid = "%s" % auth_attr
861        attrs = req.get('fedAttr', [])
862
863        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
864                with_proof=True)
865        if not access_ok:
866            self.log.info("StartSegment for %s failed: access denied" % fid)
867            raise service_error(service_error.access, "Access denied")
868        else:
869            # See if this is a replay of an earlier succeeded StartSegment -
870            # sometimes SSL kills 'em.  If so, replay the response rather than
871            # redoing the allocation.
872            self.state_lock.acquire()
873            retval = self.allocation[aid].get('started', None)
874            self.state_lock.release()
875            if retval:
876                self.log.warning("Duplicate StartSegment for %s: " % aid + \
877                        "replaying response")
878                return retval
879
880        # A new request.  Do it.
881
882        if topref: topo = topdl.Topology(**topref)
883        else:
884            raise service_error(service_error.req, 
885                    "Request missing segmentdescription'")
886       
887        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
888        try:
889            tmpdir = tempfile.mkdtemp(prefix="access-")
890            softdir = "%s/software" % tmpdir
891            os.mkdir(softdir)
892        except EnvironmentError:
893            self.log.info("StartSegment for %s failed: internal error" % fid)
894            raise service_error(service_error.internal, "Cannot create tmp dir")
895
896        # Try block alllows us to clean up temporary files.
897        try:
898            self.retrieve_software(topo, certfile, softdir)
899            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
900                alloc_log =  self.initialize_experiment_info(attrs, aid, 
901                        certfile, tmpdir)
902            # A misconfigured cert in the ABAC map can be confusing...
903            if not os.access(xmlrpc_cert, os.R_OK):
904                self.log.error("Cannot open user's emulab SSL cert: %s" % \
905                        xmlrpc_cert)
906                raise service_error(service_error.internal,
907                        "Cannot open user's emulab SSL cert: %s" % xmlrpc_cert)
908
909
910            if '/' in proj: proj, gid = proj.split('/')
911            else: gid = None
912
913
914            # Set up userconf and seer if needed
915            self.configure_userconf(services, tmpdir)
916            self.configure_seer_services(services, topo, softdir)
917            # Get and send synch store variables
918            self.export_store_info(certfile, proj, ename, connInfo)
919            self.import_store_info(certfile, connInfo)
920
921            expfile = "%s/experiment.tcl" % tmpdir
922
923            self.generate_portal_configs(topo, pubkey_base, 
924                    secretkey_base, tmpdir, proj, ename, connInfo, services)
925            self.generate_ns2(topo, expfile, 
926                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
927
928            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
929                    debug=self.create_debug, log=alloc_log, boss=self.boss,
930                    ops=self.ops, cert=xmlrpc_cert)
931            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
932        except service_error, e:
933            self.log.info("StartSegment for %s failed: %s"  % (fid, e))
934            err = e
935        except:
936            t, v, st = sys.exc_info()
937            self.log.info("StartSegment for %s failed:unexpected error: %s" \
938                    % (fid, traceback.extract_tb(st)))
939            err = service_error(service_error.internal, "%s: %s" % \
940                    (v, traceback.extract_tb(st)))
941
942        # Walk up tmpdir, deleting as we go
943        if self.cleanup: self.remove_dirs(tmpdir)
944        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
945
946        if rv:
947            self.log.info("StartSegment for %s succeeded" % fid)
948            return self.finalize_experiment(starter, topo, aid, req['allocID'],
949                    proof)
950        elif err:
951            raise service_error(service_error.federant,
952                    "Swapin failed: %s" % err)
953        else:
954            raise service_error(service_error.federant, "Swapin failed")
955
956    def TerminateSegment(self, req, fid):
957        self.log.info("TerminateSegment called by %s" % fid)
958        try:
959            req = req['TerminateSegmentRequestBody']
960        except KeyError:
961            raise service_error(server_error.req, "Badly formed request")
962
963        auth_attr = req['allocID']['fedid']
964        aid = "%s" % auth_attr
965        attrs = req.get('fedAttr', [])
966
967        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
968                with_proof=True)
969        if not access_ok:
970            raise service_error(service_error.access, "Access denied")
971
972        self.state_lock.acquire()
973        if aid in self.allocation:
974            proj = self.allocation[aid].get('project', None)
975            user = self.allocation[aid].get('user', None)
976            xmlrpc_cert = self.allocation[aid].get('cert', None)
977            ename = self.allocation[aid].get('experiment', None)
978            nonce = self.allocation[aid].get('nonce', False)
979        else:
980            proj = None
981            user = None
982            ename = None
983            nonce = False
984            xmlrpc_cert = None
985        self.state_lock.release()
986
987        if not proj:
988            self.log.info("TerminateSegment failed for %s: cannot find project"\
989                    % fid)
990            raise service_error(service_error.internal, 
991                    "Can't find project for %s" % aid)
992        else:
993            if '/' in proj: proj, gid = proj.split('/')
994            else: gid = None
995
996        if not user:
997            self.log.info("TerminateSegment failed for %s: cannot find user"\
998                    % fid)
999            raise service_error(service_error.internal, 
1000                    "Can't find creation user for %s" % aid)
1001        if not ename:
1002            self.log.info(
1003                    "TerminateSegment failed for %s: cannot find experiment"\
1004                    % fid)
1005            raise service_error(service_error.internal, 
1006                    "Can't find experiment name for %s" % aid)
1007        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1008                debug=self.create_debug, boss=self.boss, ops=self.ops,
1009                cert=xmlrpc_cert)
1010        stopper(self, user, proj, ename, gid, nonce)
1011        self.log.info("TerminateSegment succeeded for %s %s %s" % \
1012                (fid, proj, ename))
1013        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1014
1015    def InfoSegment(self, req, fid):
1016        self.log.info("InfoSegment called by %s" % fid)
1017        try:
1018            req = req['InfoSegmentRequestBody']
1019        except KeyError:
1020            raise service_error(server_error.req, "Badly formed request")
1021
1022        auth_attr = req['allocID']['fedid']
1023        aid = "%s" % auth_attr
1024
1025        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1026                with_proof=True)
1027        if not access_ok:
1028            raise service_error(service_error.access, "Access denied")
1029
1030        self.state_lock.acquire()
1031        if aid in self.allocation:
1032            topo = self.allocation[aid].get('topo', None)
1033            if topo: topo = topo.clone()
1034            proj = self.allocation[aid].get('project', None)
1035            user = self.allocation[aid].get('user', None)
1036            xmlrpc_cert = self.allocation[aid].get('cert', None)
1037            ename = self.allocation[aid].get('experiment', None)
1038        else:
1039            proj = None
1040            user = None
1041            ename = None
1042            topo = None
1043            xmlrpc_cert = None
1044        self.state_lock.release()
1045
1046        if not proj:
1047            self.log.info("InfoSegment failed for %s: cannot find project"% fid)
1048            raise service_error(service_error.internal, 
1049                    "Can't find project for %s" % aid)
1050        else:
1051            if '/' in proj: proj, gid = proj.split('/')
1052            else: gid = None
1053
1054        if not user:
1055            self.log.info("InfoSegment failed for %s: cannot find user"% fid)
1056            raise service_error(service_error.internal, 
1057                    "Can't find creation user for %s" % aid)
1058        if not ename:
1059            self.log.info("InfoSegment failed for %s: cannot find exp"% fid)
1060            raise service_error(service_error.internal, 
1061                    "Can't find experiment name for %s" % aid)
1062        info = self.info_segment(keyfile=self.ssh_privkey_file,
1063                debug=self.create_debug, boss=self.boss, ops=self.ops,
1064                cert=xmlrpc_cert)
1065        info(self, user, proj, ename)
1066        self.log.info("InfoSegment gathered info for %s %s %s %s" % \
1067                (fid, user, proj, ename))
1068        self.decorate_topology(info, topo)
1069        self.state_lock.acquire()
1070        if aid in self.allocation:
1071            self.allocation[aid]['topo'] = topo
1072            self.write_state()
1073        self.state_lock.release()
1074        self.log.info("InfoSegment updated info for %s %s %s %s" % \
1075                (fid, user, proj, ename))
1076
1077        rv = { 
1078                'allocID': req['allocID'], 
1079                'proof': proof.to_dict(),
1080                }
1081        self.log.info("InfoSegment succeeded info for %s %s %s %s" % \
1082                (fid, user, proj, ename))
1083        if topo:
1084            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1085        return rv
1086
1087    def OperationSegment(self, req, fid):
1088        def get_pname(e):
1089            """
1090            Get the physical name of a node
1091            """
1092            if e.localname:
1093                return re.sub('\..*','', e.localname[0])
1094            else:
1095                return None
1096
1097        self.log.info("OperationSegment called by %s" % fid)
1098        try:
1099            req = req['OperationSegmentRequestBody']
1100        except KeyError:
1101            raise service_error(server_error.req, "Badly formed request")
1102
1103        auth_attr = req['allocID']['fedid']
1104        aid = "%s" % auth_attr
1105
1106        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1107                with_proof=True)
1108        if not access_ok:
1109            self.log.info("OperationSegment failed for %s: access denied" % fid)
1110            raise service_error(service_error.access, "Access denied")
1111
1112        op = req.get('operation', None)
1113        targets = req.get('target', None)
1114        params = req.get('parameter', None)
1115
1116        if op is None :
1117            self.log.info("OperationSegment failed for %s: no operation" % fid)
1118            raise service_error(service_error.req, "missing operation")
1119        elif targets is None:
1120            self.log.info("OperationSegment failed for %s: no targets" % fid)
1121            raise service_error(service_error.req, "no targets")
1122
1123        self.state_lock.acquire()
1124        if aid in self.allocation:
1125            topo = self.allocation[aid].get('topo', None)
1126            if topo: topo = topo.clone()
1127            xmlrpc_cert = self.allocation[aid].get('cert', None)
1128        else:
1129            topo = None
1130            xmlrpc_cert = None
1131        self.state_lock.release()
1132
1133        targets = copy.copy(targets)
1134        ptargets = { }
1135        for e in topo.elements:
1136            if isinstance(e, topdl.Computer):
1137                if e.name in targets:
1138                    targets.remove(e.name)
1139                    pn = get_pname(e)
1140                    if pn: ptargets[e.name] = pn
1141
1142        status = [ operation_status(t, operation_status.no_target) \
1143                for t in targets]
1144
1145        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1146                debug=self.create_debug, boss=self.boss, ops=self.ops,
1147                cert=xmlrpc_cert)
1148        ops(self, op, ptargets, params, topo)
1149        self.log.info("OperationSegment operated for %s" % fid)
1150       
1151        status.extend(ops.status)
1152        self.log.info("OperationSegment succeed for %s" % fid)
1153
1154        return { 
1155                'allocID': req['allocID'], 
1156                'status': [s.to_dict() for s in status],
1157                'proof': proof.to_dict(),
1158                }
Note: See TracBrowser for help on using the repository browser.