source: fedd/federation/emulab_access.py @ ee950c2

compt_changesinfo-ops
Last change on this file since ee950c2 was ee950c2, checked in by Ted Faber <faber@…>, 12 years ago

Deactivate legacy authorization and dynamic projects

  • Property mode set to 100644
File size: 34.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from fedid import fedid, generate_fedid
21from authorizer import authorizer, abac_authorizer
22from service_error import service_error
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from proof import proof as access_proof
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30import topdl
31import list_log
32import emulab_segment
33
34
35# Make log messages disappear if noone configures a fedd logger
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.access")
40fl.addHandler(nullHandler())
41
42class access(access_base):
43    """
44    The implementation of access control based on mapping users to projects.
45
46    Users can be mapped to existing projects or have projects created
47    dynamically.  This implements both direct requests and proxies.
48    """
49
50    max_name_len = 19
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.max_name_len = access.max_name_len
60
61        self.allow_proxy = config.getboolean("access", "allow_proxy")
62
63        self.domain = config.get("access", "domain")
64        self.userconfdir = config.get("access","userconfdir")
65        self.userconfcmd = config.get("access","userconfcmd")
66        self.userconfurl = config.get("access","userconfurl")
67        self.federation_software = config.get("access", "federation_software")
68        self.portal_software = config.get("access", "portal_software")
69        self.local_seer_software = config.get("access", "local_seer_software")
70        self.local_seer_image = config.get("access", "local_seer_image")
71        self.local_seer_start = config.get("access", "local_seer_start")
72        self.seer_master_start = config.get("access", "seer_master_start")
73        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
74        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
75        self.ssh_port = config.get("access","ssh_port") or "22"
76        self.boss = config.get("access", "boss")
77        self.ops = config.get("access", "ops")
78        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
79        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
80
81        self.dragon_endpoint = config.get("access", "dragon")
82        self.dragon_vlans = config.get("access", "dragon_vlans")
83        self.deter_internal = config.get("access", "deter_internal")
84
85        self.tunnel_config = config.getboolean("access", "tunnel_config")
86        self.portal_command = config.get("access", "portal_command")
87        self.portal_image = config.get("access", "portal_image")
88        self.portal_type = config.get("access", "portal_type") or "pc"
89        self.portal_startcommand = config.get("access", "portal_startcommand")
90        self.node_startcommand = config.get("access", "node_startcommand")
91
92        self.federation_software = self.software_list(self.federation_software)
93        self.portal_software = self.software_list(self.portal_software)
94        self.local_seer_software = self.software_list(self.local_seer_software)
95
96        self.access_type = self.access_type.lower()
97        self.start_segment = emulab_segment.start_segment
98        self.stop_segment = emulab_segment.stop_segment
99        self.info_segment = emulab_segment.info_segment
100        self.operation_segment = emulab_segment.operation_segment
101
102        self.restricted = [ ]
103        tb = config.get('access', 'testbed')
104        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
105        else: self.testbed = [ ]
106
107        # authorization information
108        self.auth_type = config.get('access', 'auth_type') \
109                or 'abac'
110        self.auth_dir = config.get('access', 'auth_dir')
111        accessdb = config.get("access", "accessdb")
112        # initialize the authorization system
113        if self.auth_type == 'abac':
114            self.auth = abac_authorizer(load=self.auth_dir)
115            self.access = [ ]
116            if accessdb:
117                self.read_access(accessdb, self.access_tuple)
118        else:
119            raise service_error(service_error.internal, 
120                    "Unknown auth_type: %s" % self.auth_type)
121
122        # read_state in the base_class
123        self.state_lock.acquire()
124        if 'allocation' not in self.state: self.state['allocation']= { }
125        self.allocation = self.state['allocation']
126        self.state_lock.release()
127        self.exports = {
128                'SMB': self.export_SMB,
129                'seer': self.export_seer,
130                'tmcd': self.export_tmcd,
131                'userconfig': self.export_userconfig,
132                'project_export': self.export_project_export,
133                'local_seer_control': self.export_local_seer,
134                'seer_master': self.export_seer_master,
135                'hide_hosts': self.export_hide_hosts,
136                }
137
138        if not self.local_seer_image or not self.local_seer_software or \
139                not self.local_seer_start:
140            if 'local_seer_control' in self.exports:
141                del self.exports['local_seer_control']
142
143        if not self.local_seer_image or not self.local_seer_software or \
144                not self.seer_master_start:
145            if 'seer_master' in self.exports:
146                del self.exports['seer_master']
147
148
149        self.soap_services = {\
150            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
151            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
152            'StartSegment': soap_handler("StartSegment", self.StartSegment),
153            'TerminateSegment': soap_handler("TerminateSegment",
154                self.TerminateSegment),
155            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
156            'OperationSegment': soap_handler("OperationSegment",
157                self.OperationSegment),
158            }
159        self.xmlrpc_services =  {\
160            'RequestAccess': xmlrpc_handler('RequestAccess',
161                self.RequestAccess),
162            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
163                self.ReleaseAccess),
164            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
165            'TerminateSegment': xmlrpc_handler('TerminateSegment',
166                self.TerminateSegment),
167            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
168            'OperationSegment': xmlrpc_handler("OperationSegment",
169                self.OperationSegment),
170            }
171
172        self.call_SetValue = service_caller('SetValue')
173        self.call_GetValue = service_caller('GetValue', log=self.log)
174
175    @staticmethod
176    def access_tuple(str):
177        """
178        Convert a string of the form (id, id) into an access_project.  This is
179        called by read_access to convert to local attributes.  It returns
180        a tuple of the form (project, user).
181        """
182
183        str = str.strip()
184        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
185            # The slice takes the parens off the string.
186            proj, user = str[1:-1].split(',')
187            return (proj.strip(), user.strip())
188        else:
189            raise self.parse_error(
190                    'Bad mapping (unbalanced parens or more than 1 comma)')
191
192    # RequestAccess support routines
193
194    def save_project_state(self, aid, pname, uname, owners):
195        """
196        Save the project, user, and owners associated with this allocation.
197        This creates the allocation entry.
198        """
199        self.state_lock.acquire()
200        self.allocation[aid] = { }
201        self.allocation[aid]['project'] = pname
202        self.allocation[aid]['user'] = uname
203        self.allocation[aid]['owners'] = owners
204        self.write_state()
205        self.state_lock.release()
206        return (pname, uname)
207
208    # End of RequestAccess support routines
209
210    def RequestAccess(self, req, fid):
211        """
212        Handle the access request.  Proxy if not for us.
213
214        Parse out the fields and make the allocations or rejections if for us,
215        otherwise, assuming we're willing to proxy, proxy the request out.
216        """
217
218        def gateway_hardware(h):
219            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
220            else: return h
221
222        def get_export_project(svcs):
223            """
224            if the service requests includes one to export a project, return
225            that project.
226            """
227            rv = None
228            for s in svcs:
229                if s.get('name', '') == 'project_export' and \
230                        s.get('visibility', '') == 'export':
231                    if not rv: 
232                        for a in s.get('fedAttr', []):
233                            if a.get('attribute', '') == 'project' \
234                                    and 'value' in a:
235                                rv = a['value']
236                    else:
237                        raise service_error(service_error, access, 
238                                'Requesting multiple project exports is ' + \
239                                        'not supported');
240            return rv
241
242        # The dance to get into the request body
243        if req.has_key('RequestAccessRequestBody'):
244            req = req['RequestAccessRequestBody']
245        else:
246            raise service_error(service_error.req, "No request!?")
247
248        # if this includes a project export request, construct a filter such
249        # that only the ABAC attributes mapped to that project are checked for
250        # access.
251        if 'service' in req:
252            ep = get_export_project(req['service'])
253            if ep: pf = lambda(a): a.value[0] == ep
254            else: pf = None
255        else:
256            ep = None
257            pf = None
258
259        if self.auth.import_credentials(
260                data_list=req.get('abac_credential', [])):
261            self.auth.save()
262
263        if self.auth_type == 'abac':
264            found, owners, proof = self.lookup_access(req, fid, filter=pf)
265        else:
266            raise service_error(service_error.internal, 
267                    'Unknown auth_type: %s' % self.auth_type)
268        ap = None
269
270        # keep track of what's been added
271        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
272        aid = unicode(allocID)
273
274        pname, uname = self.save_project_state(aid, found[0], found[1], owners)
275
276        services, svc_state = self.export_services(req.get('service',[]),
277                pname, uname)
278        self.state_lock.acquire()
279        # Store services state in global state
280        for k, v in svc_state.items():
281            self.allocation[aid][k] = v
282        self.append_allocation_authorization(aid, 
283                set([(o, allocID) for o in owners]), state_attr='allocation')
284        self.write_state()
285        self.state_lock.release()
286        try:
287            f = open("%s/%s.pem" % (self.certdir, aid), "w")
288            print >>f, alloc_cert
289            f.close()
290        except EnvironmentError, e:
291            raise service_error(service_error.internal, 
292                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
293        resp = self.build_access_response({ 'fedid': allocID } ,
294                pname, services, proof)
295        return resp
296
297    def ReleaseAccess(self, req, fid):
298        # The dance to get into the request body
299        if req.has_key('ReleaseAccessRequestBody'):
300            req = req['ReleaseAccessRequestBody']
301        else:
302            raise service_error(service_error.req, "No request!?")
303
304        try:
305            if req['allocID'].has_key('localname'):
306                auth_attr = aid = req['allocID']['localname']
307            elif req['allocID'].has_key('fedid'):
308                aid = unicode(req['allocID']['fedid'])
309                auth_attr = req['allocID']['fedid']
310            else:
311                raise service_error(service_error.req,
312                        "Only localnames and fedids are understood")
313        except KeyError:
314            raise service_error(service_error.req, "Badly formed request")
315
316        self.log.debug("[access] deallocation requested for %s by %s" % \
317                (aid, fid))
318        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
319                with_proof=True)
320        if not access_ok:
321            self.log.debug("[access] deallocation denied for %s", aid)
322            raise service_error(service_error.access, "Access Denied")
323
324        self.state_lock.acquire()
325        if aid in self.allocation:
326            self.log.debug("Found allocation for %s" %aid)
327            self.clear_allocation_authorization(aid, state_attr='allocation')
328            del self.allocation[aid]
329            self.write_state()
330            self.state_lock.release()
331            # Remove the access cert
332            cf = "%s/%s.pem" % (self.certdir, aid)
333            self.log.debug("Removing %s" % cf)
334            os.remove(cf)
335            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
336        else:
337            self.state_lock.release()
338            raise service_error(service_error.req, "No such allocation")
339
340    # These are subroutines for StartSegment
341    def generate_ns2(self, topo, expfn, softdir, connInfo):
342        """
343        Convert topo into an ns2 file, decorated with appropriate commands for
344        the particular testbed setup.  Convert all requests for software, etc
345        to point at the staged copies on this testbed and add the federation
346        startcommands.
347        """
348        class dragon_commands:
349            """
350            Functor to spit out approrpiate dragon commands for nodes listed in
351            the connectivity description.  The constructor makes a dict mapping
352            dragon nodes to their parameters and the __call__ checks each
353            element in turn for membership.
354            """
355            def __init__(self, map):
356                self.node_info = map
357
358            def __call__(self, e):
359                s = ""
360                if isinstance(e, topdl.Computer):
361                    if self.node_info.has_key(e.name):
362                        info = self.node_info[e.name]
363                        for ifname, vlan, type in info:
364                            for i in e.interface:
365                                if i.name == ifname:
366                                    addr = i.get_attribute('ip4_address')
367                                    subs = i.substrate[0]
368                                    break
369                            else:
370                                raise service_error(service_error.internal,
371                                        "No interface %s on element %s" % \
372                                                (ifname, e.name))
373                            # XXX: do netmask right
374                            if type =='link':
375                                s = ("tb-allow-external ${%s} " + \
376                                        "dragonportal ip %s vlan %s " + \
377                                        "netmask 255.255.255.0\n") % \
378                                        (topdl.to_tcl_name(e.name), addr, vlan)
379                            elif type =='lan':
380                                s = ("tb-allow-external ${%s} " + \
381                                        "dragonportal " + \
382                                        "ip %s vlan %s usurp %s\n") % \
383                                        (topdl.to_tcl_name(e.name), addr, 
384                                                vlan, subs)
385                            else:
386                                raise service_error(service_error_internal,
387                                        "Unknown DRAGON type %s" % type)
388                return s
389
390        class not_dragon:
391            """
392            Return true if a node is in the given map of dragon nodes.
393            """
394            def __init__(self, map):
395                self.nodes = set(map.keys())
396
397            def __call__(self, e):
398                return e.name not in self.nodes
399
400        # Main line of generate_ns2
401        t = topo.clone()
402
403        # Create the map of nodes that need direct connections (dragon
404        # connections) from the connInfo
405        dragon_map = { }
406        for i in [ i for i in connInfo if i['type'] == 'transit']:
407            for a in i.get('fedAttr', []):
408                if a['attribute'] == 'vlan_id':
409                    vlan = a['value']
410                    break
411            else:
412                raise service_error(service_error.internal, "No vlan tag")
413            members = i.get('member', [])
414            if len(members) > 1: type = 'lan'
415            else: type = 'link'
416
417            try:
418                for m in members:
419                    if m['element'] in dragon_map:
420                        dragon_map[m['element']].append(( m['interface'], 
421                            vlan, type))
422                    else:
423                        dragon_map[m['element']] = [( m['interface'], 
424                            vlan, type),]
425            except KeyError:
426                raise service_error(service_error.req,
427                        "Missing connectivity info")
428
429        # Weed out the things we aren't going to instantiate: Segments, portal
430        # substrates, and portal interfaces.  (The copy in the for loop allows
431        # us to delete from e.elements in side the for loop).  While we're
432        # touching all the elements, we also adjust paths from the original
433        # testbed to local testbed paths and put the federation commands into
434        # the start commands
435        for e in [e for e in t.elements]:
436            if isinstance(e, topdl.Segment):
437                t.elements.remove(e)
438            if isinstance(e, topdl.Computer):
439                self.add_kit(e, self.federation_software)
440                if e.get_attribute('portal') and self.portal_startcommand:
441                    # Add local portal support software
442                    self.add_kit(e, self.portal_software)
443                    # Portals never have a user-specified start command
444                    e.set_attribute('startup', self.portal_startcommand)
445                elif self.node_startcommand:
446                    if e.get_attribute('startup'):
447                        e.set_attribute('startup', "%s \\$USER '%s'" % \
448                                (self.node_startcommand, 
449                                    e.get_attribute('startup')))
450                    else:
451                        e.set_attribute('startup', self.node_startcommand)
452
453                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
454                # Remove portal interfaces that do not connect to DRAGON
455                e.interface = [i for i in e.interface \
456                        if not i.get_attribute('portal') or i.name in dinf ]
457            # Fix software paths
458            for s in getattr(e, 'software', []):
459                s.location = re.sub("^.*/", softdir, s.location)
460
461        t.substrates = [ s.clone() for s in t.substrates ]
462        t.incorporate_elements()
463
464        # Customize the ns2 output for local portal commands and images
465        filters = []
466
467        if self.dragon_endpoint:
468            add_filter = not_dragon(dragon_map)
469            filters.append(dragon_commands(dragon_map))
470        else:
471            add_filter = None
472
473        if self.portal_command:
474            filters.append(topdl.generate_portal_command_filter(
475                self.portal_command, add_filter=add_filter))
476
477        if self.portal_image:
478            filters.append(topdl.generate_portal_image_filter(
479                self.portal_image))
480
481        if self.portal_type:
482            filters.append(topdl.generate_portal_hardware_filter(
483                self.portal_type))
484
485        # Convert to ns and write it out
486        expfile = topdl.topology_to_ns2(t, filters)
487        try:
488            f = open(expfn, "w")
489            print >>f, expfile
490            f.close()
491        except EnvironmentError:
492            raise service_error(service_error.internal,
493                    "Cannot write experiment file %s: %s" % (expfn,e))
494
495    def export_store_info(self, cf, proj, ename, connInfo):
496        """
497        For the export requests in the connection info, install the peer names
498        at the experiment controller via SetValue calls.
499        """
500
501        for c in connInfo:
502            for p in [ p for p in c.get('parameter', []) \
503                    if p.get('type', '') == 'output']:
504
505                if p.get('name', '') == 'peer':
506                    k = p.get('key', None)
507                    surl = p.get('store', None)
508                    if surl and k and k.index('/') != -1:
509                        value = "%s.%s.%s%s" % \
510                                (k[k.index('/')+1:], ename, proj, self.domain)
511                        req = { 'name': k, 'value': value }
512                        self.log.debug("Setting %s to %s on %s" % \
513                                (k, value, surl))
514                        self.call_SetValue(surl, req, cf)
515                    else:
516                        self.log.error("Bad export request: %s" % p)
517                elif p.get('name', '') == 'ssh_port':
518                    k = p.get('key', None)
519                    surl = p.get('store', None)
520                    if surl and k:
521                        req = { 'name': k, 'value': self.ssh_port }
522                        self.log.debug("Setting %s to %s on %s" % \
523                                (k, self.ssh_port, surl))
524                        self.call_SetValue(surl, req, cf)
525                    else:
526                        self.log.error("Bad export request: %s" % p)
527                else:
528                    self.log.error("Unknown export parameter: %s" % \
529                            p.get('name'))
530                    continue
531
532    def add_seer_node(self, topo, name, startup):
533        """
534        Add a seer node to the given topology, with the startup command passed
535        in.  Used by configure seer_services.
536        """
537        c_node = topdl.Computer(
538                name=name, 
539                os= topdl.OperatingSystem(
540                    attribute=[
541                    { 'attribute': 'osid', 
542                        'value': self.local_seer_image },
543                    ]),
544                attribute=[
545                    { 'attribute': 'startup', 'value': startup },
546                    ]
547                )
548        self.add_kit(c_node, self.local_seer_software)
549        topo.elements.append(c_node)
550
551    def configure_seer_services(self, services, topo, softdir):
552        """
553        Make changes to the topology required for the seer requests being made.
554        Specifically, add any control or master nodes required and set up the
555        start commands on the nodes to interconnect them.
556        """
557        local_seer = False      # True if we need to add a control node
558        collect_seer = False    # True if there is a seer-master node
559        seer_master= False      # True if we need to add the seer-master
560        for s in services:
561            s_name = s.get('name', '')
562            s_vis = s.get('visibility','')
563
564            if s_name  == 'local_seer_control' and s_vis == 'export':
565                local_seer = True
566            elif s_name == 'seer_master':
567                if s_vis == 'import':
568                    collect_seer = True
569                elif s_vis == 'export':
570                    seer_master = True
571       
572        # We've got the whole picture now, so add nodes if needed and configure
573        # them to interconnect properly.
574        if local_seer or seer_master:
575            # Copy local seer control node software to the tempdir
576            for l, f in self.local_seer_software:
577                base = os.path.basename(f)
578                copy_file(f, "%s/%s" % (softdir, base))
579        # If we're collecting seers somewhere the controllers need to talk to
580        # the master.  In testbeds that export the service, that will be a
581        # local node that we'll add below.  Elsewhere it will be the control
582        # portal that will port forward to the exporting master.
583        if local_seer:
584            if collect_seer:
585                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
586            else:
587                startup = self.local_seer_start
588            self.add_seer_node(topo, 'control', startup)
589        # If this is the seer master, add that node, too.
590        if seer_master:
591            self.add_seer_node(topo, 'seer-master', 
592                    "%s -R -n -R seer-master -R -A -R sink" % \
593                            self.seer_master_start)
594
595    def retrieve_software(self, topo, certfile, softdir):
596        """
597        Collect the software that nodes in the topology need loaded and stage
598        it locally.  This implies retrieving it from the experiment_controller
599        and placing it into softdir.  Certfile is used to prove that this node
600        has access to that data (it's the allocation/segment fedid).  Finally
601        local portal and federation software is also copied to the same staging
602        directory for simplicity - all software needed for experiment creation
603        is in softdir.
604        """
605        sw = set()
606        for e in topo.elements:
607            for s in getattr(e, 'software', []):
608                sw.add(s.location)
609        for s in sw:
610            self.log.debug("Retrieving %s" % s)
611            try:
612                get_url(s, certfile, softdir)
613            except:
614                t, v, st = sys.exc_info()
615                raise service_error(service_error.internal,
616                        "Error retrieving %s: %s" % (s, v))
617
618        # Copy local federation and portal node software to the tempdir
619        for s in (self.federation_software, self.portal_software):
620            for l, f in s:
621                base = os.path.basename(f)
622                copy_file(f, "%s/%s" % (softdir, base))
623
624
625    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
626        """
627        Gather common configuration files, retrieve or create an experiment
628        name and project name, and return the ssh_key filenames.  Create an
629        allocation log bound to the state log variable as well.
630        """
631        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
632                'seer_ca_pem', 'seer_node_pem')
633        ename = None
634        pubkey_base = None
635        secretkey_base = None
636        proj = None
637        user = None
638        alloc_log = None
639        nonce_experiment = False
640        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
641
642        self.state_lock.acquire()
643        if aid in self.allocation:
644            proj = self.allocation[aid].get('project', None)
645        self.state_lock.release()
646
647        if not proj:
648            raise service_error(service_error.internal, 
649                    "Can't find project for %s" %aid)
650
651        for a in attrs:
652            if a['attribute'] in configs:
653                try:
654                    self.log.debug("Retrieving %s from %s" % \
655                            (a['attribute'], a['value']))
656                    get_url(a['value'], certfile, tmpdir)
657                except:
658                    t, v, st = sys.exc_info()
659                    raise service_error(service_error.internal,
660                            "Error retrieving %s: %s" % (a.get('value', ""), v))
661            if a['attribute'] == 'ssh_pubkey':
662                pubkey_base = a['value'].rpartition('/')[2]
663            if a['attribute'] == 'ssh_secretkey':
664                secretkey_base = a['value'].rpartition('/')[2]
665            if a['attribute'] == 'experiment_name':
666                ename = a['value']
667
668        # Names longer than the emulab max are discarded
669        if ename and len(ename) <= self.max_name_len:
670            # Clean up the experiment name so that emulab will accept it.
671            ename = re.sub(vchars_re, '-', ename)
672
673        else:
674            ename = ""
675            for i in range(0,5):
676                ename += random.choice(string.ascii_letters)
677            nonce_experiment = True
678            self.log.warn("No experiment name or suggestion too long: " + \
679                    "picked one randomly: %s" % ename)
680
681        if not pubkey_base:
682            raise service_error(service_error.req, 
683                    "No public key attribute")
684
685        if not secretkey_base:
686            raise service_error(service_error.req, 
687                    "No secret key attribute")
688
689        self.state_lock.acquire()
690        if aid in self.allocation:
691            user = self.allocation[aid].get('user', None)
692            self.allocation[aid]['experiment'] = ename
693            self.allocation[aid]['nonce'] = nonce_experiment
694            self.allocation[aid]['log'] = [ ]
695            # Create a logger that logs to the experiment's state object as
696            # well as to the main log file.
697            alloc_log = logging.getLogger('fedd.access.%s' % ename)
698            h = logging.StreamHandler(
699                    list_log.list_log(self.allocation[aid]['log']))
700            # XXX: there should be a global one of these rather than
701            # repeating the code.
702            h.setFormatter(logging.Formatter(
703                "%(asctime)s %(name)s %(message)s",
704                        '%d %b %y %H:%M:%S'))
705            alloc_log.addHandler(h)
706            self.write_state()
707        self.state_lock.release()
708
709        if not user:
710            raise service_error(service_error.internal, 
711                    "Can't find creation user for %s" %aid)
712
713        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
714
715    def decorate_topology(self, info, t):
716        """
717        Copy the physical mapping and status onto the topology.  Used by
718        StartSegment and InfoSegment
719        """
720        def add_new(ann, attr):
721            for a in ann:
722                if a not in attr: attr.append(a)
723
724        def merge_os(os, e):
725            if len(e.os) == 0:
726                # No OS at all:
727                if os.get_attribute('emulab_access:image'):
728                    os.set_attribute('emulab_access:initial_image', 
729                            os.get_attribute('emulab_access:image'))
730                e.os = [ os ]
731            elif len(e.os) == 1:
732                # There's one OS, copy the initial image and replace
733                eos = e.os[0]
734                initial = eos.get_attribute('emulab_access:initial_image')
735                if initial:
736                    os.set_attribute('emulab_access:initial_image', initial)
737                e.os = [ os] 
738            else:
739                # Multiple OSes, replace or append
740                for eos in e.os:
741                    if os.name == eos.name:
742                        eos.version = os.version
743                        eos.version = os.distribution
744                        eos.version = os.distributionversion
745                        for a in os.attribute:
746                            if eos.get_attribute(a.attribute):
747                                eos.remove_attribute(a.attribute)
748                            eos.set_attribute(a.attribute, a.value)
749                        break
750                else:
751                    e.os.append(os)
752
753
754        if t is None: return
755        # Copy the assigned names into the return topology
756        for e in t.elements:
757            if isinstance(e, topdl.Computer):
758                if not self.create_debug:
759                    if e.name in info.node:
760                        add_new(("%s%s" % 
761                            (info.node[e.name].pname, self.domain),),
762                            e.localname)
763                        e.status = info.node[e.name].status
764                        os = info.node[e.name].getOS()
765                        if os: merge_os(os, e)
766                else:
767                    # Simple debugging assignment
768                    add_new(("node%d%s" % (i, self.domain),), e.localname)
769                    e.status = 'active'
770                    add_new(('testop1', 'testop2'), e.operation)
771                    i += 1
772
773
774    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
775        """
776        Store key bits of experiment state in the global repository, including
777        the response that may need to be replayed, and return the response.
778        """
779        i = 0
780        t = topo.clone()
781        self.decorate_topology(starter, t)
782        # Grab the log (this is some anal locking, but better safe than
783        # sorry)
784        self.state_lock.acquire()
785        logv = "".join(self.allocation[aid]['log'])
786        # It's possible that the StartSegment call gets retried (!).
787        # if the 'started' key is in the allocation, we'll return it rather
788        # than redo the setup.
789        self.allocation[aid]['started'] = { 
790                'allocID': alloc_id,
791                'allocationLog': logv,
792                'segmentdescription': { 
793                    'topdldescription': t.to_dict()
794                    },
795                'proof': proof.to_dict(),
796                }
797        self.allocation[aid]['topo'] = t
798        retval = copy.copy(self.allocation[aid]['started'])
799        self.write_state()
800        self.state_lock.release()
801        return retval
802   
803    # End of StartSegment support routines
804
805    def StartSegment(self, req, fid):
806        err = None  # Any service_error generated after tmpdir is created
807        rv = None   # Return value from segment creation
808
809        try:
810            req = req['StartSegmentRequestBody']
811            auth_attr = req['allocID']['fedid']
812            topref = req['segmentdescription']['topdldescription']
813        except KeyError:
814            raise service_error(server_error.req, "Badly formed request")
815
816        connInfo = req.get('connection', [])
817        services = req.get('service', [])
818        aid = "%s" % auth_attr
819        attrs = req.get('fedAttr', [])
820
821        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
822                with_proof=True)
823        if not access_ok:
824            raise service_error(service_error.access, "Access denied")
825        else:
826            # See if this is a replay of an earlier succeeded StartSegment -
827            # sometimes SSL kills 'em.  If so, replay the response rather than
828            # redoing the allocation.
829            self.state_lock.acquire()
830            retval = self.allocation[aid].get('started', None)
831            self.state_lock.release()
832            if retval:
833                self.log.warning("Duplicate StartSegment for %s: " % aid + \
834                        "replaying response")
835                return retval
836
837        # A new request.  Do it.
838
839        if topref: topo = topdl.Topology(**topref)
840        else:
841            raise service_error(service_error.req, 
842                    "Request missing segmentdescription'")
843       
844        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
845        try:
846            tmpdir = tempfile.mkdtemp(prefix="access-")
847            softdir = "%s/software" % tmpdir
848            os.mkdir(softdir)
849        except EnvironmentError:
850            raise service_error(service_error.internal, "Cannot create tmp dir")
851
852        # Try block alllows us to clean up temporary files.
853        try:
854            self.retrieve_software(topo, certfile, softdir)
855            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
856                    self.initialize_experiment_info(attrs, aid, 
857                            certfile, tmpdir)
858
859            if '/' in proj: proj, gid = proj.split('/')
860            else: gid = None
861
862
863            # Set up userconf and seer if needed
864            self.configure_userconf(services, tmpdir)
865            self.configure_seer_services(services, topo, softdir)
866            # Get and send synch store variables
867            self.export_store_info(certfile, proj, ename, connInfo)
868            self.import_store_info(certfile, connInfo)
869
870            expfile = "%s/experiment.tcl" % tmpdir
871
872            self.generate_portal_configs(topo, pubkey_base, 
873                    secretkey_base, tmpdir, proj, ename, connInfo, services)
874            self.generate_ns2(topo, expfile, 
875                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
876
877            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
878                    debug=self.create_debug, log=alloc_log, boss=self.boss,
879                    ops=self.ops, cert=self.xmlrpc_cert)
880            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
881        except service_error, e:
882            err = e
883        except:
884            t, v, st = sys.exc_info()
885            err = service_error(service_error.internal, "%s: %s" % \
886                    (v, traceback.extract_tb(st)))
887
888        # Walk up tmpdir, deleting as we go
889        if self.cleanup: self.remove_dirs(tmpdir)
890        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
891
892        if rv:
893            return self.finalize_experiment(starter, topo, aid, req['allocID'],
894                    proof)
895        elif err:
896            raise service_error(service_error.federant,
897                    "Swapin failed: %s" % err)
898        else:
899            raise service_error(service_error.federant, "Swapin failed")
900
901    def TerminateSegment(self, req, fid):
902        try:
903            req = req['TerminateSegmentRequestBody']
904        except KeyError:
905            raise service_error(server_error.req, "Badly formed request")
906
907        auth_attr = req['allocID']['fedid']
908        aid = "%s" % auth_attr
909        attrs = req.get('fedAttr', [])
910
911        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
912                with_proof=True)
913        if not access_ok:
914            raise service_error(service_error.access, "Access denied")
915
916        self.state_lock.acquire()
917        if aid in self.allocation:
918            proj = self.allocation[aid].get('project', None)
919            user = self.allocation[aid].get('user', None)
920            ename = self.allocation[aid].get('experiment', None)
921            nonce = self.allocation[aid].get('nonce', False)
922        else:
923            proj = None
924            user = None
925            ename = None
926            nonce = False
927        self.state_lock.release()
928
929        if not proj:
930            raise service_error(service_error.internal, 
931                    "Can't find project for %s" % aid)
932        else:
933            if '/' in proj: proj, gid = proj.split('/')
934            else: gid = None
935
936        if not user:
937            raise service_error(service_error.internal, 
938                    "Can't find creation user for %s" % aid)
939        if not ename:
940            raise service_error(service_error.internal, 
941                    "Can't find experiment name for %s" % aid)
942        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
943                debug=self.create_debug, boss=self.boss, ops=self.ops,
944                cert=self.xmlrpc_cert)
945        stopper(self, user, proj, ename, gid, nonce)
946        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
947
948    def InfoSegment(self, req, fid):
949        try:
950            req = req['InfoSegmentRequestBody']
951        except KeyError:
952            raise service_error(server_error.req, "Badly formed request")
953
954        auth_attr = req['allocID']['fedid']
955        aid = "%s" % auth_attr
956
957        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
958                with_proof=True)
959        if not access_ok:
960            raise service_error(service_error.access, "Access denied")
961
962        self.state_lock.acquire()
963        if aid in self.allocation:
964            topo = self.allocation[aid].get('topo', None)
965            if topo: topo = topo.clone()
966            proj = self.allocation[aid].get('project', None)
967            user = self.allocation[aid].get('user', None)
968            ename = self.allocation[aid].get('experiment', None)
969        else:
970            proj = None
971            user = None
972            ename = None
973            topo = None
974        self.state_lock.release()
975
976        if not proj:
977            raise service_error(service_error.internal, 
978                    "Can't find project for %s" % aid)
979        else:
980            if '/' in proj: proj, gid = proj.split('/')
981            else: gid = None
982
983        if not user:
984            raise service_error(service_error.internal, 
985                    "Can't find creation user for %s" % aid)
986        if not ename:
987            raise service_error(service_error.internal, 
988                    "Can't find experiment name for %s" % aid)
989        info = self.info_segment(keyfile=self.ssh_privkey_file,
990                debug=self.create_debug, boss=self.boss, ops=self.ops,
991                cert=self.xmlrpc_cert)
992        info(self, user, proj, ename)
993        self.decorate_topology(info, topo)
994        self.state_lock.acquire()
995        if aid in self.allocation:
996            self.allocation[aid]['topo'] = topo
997            self.write_state()
998        self.state_lock.release()
999
1000        rv = { 
1001                'allocID': req['allocID'], 
1002                'proof': proof.to_dict(),
1003                }
1004        if topo:
1005            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1006        return rv
1007
1008    def OperationSegment(self, req, fid):
1009        def get_pname(e):
1010            """
1011            Get the physical name of a node
1012            """
1013            if e.localname:
1014                return re.sub('\..*','', e.localname[0])
1015            else:
1016                return None
1017
1018        try:
1019            req = req['OperationSegmentRequestBody']
1020        except KeyError:
1021            raise service_error(server_error.req, "Badly formed request")
1022
1023        auth_attr = req['allocID']['fedid']
1024        aid = "%s" % auth_attr
1025
1026        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1027                with_proof=True)
1028        if not access_ok:
1029            raise service_error(service_error.access, "Access denied")
1030
1031        op = req.get('operation', None)
1032        targets = req.get('target', None)
1033        params = req.get('parameter', None)
1034
1035        if op is None :
1036            raise service_error(service_error.req, "missing operation")
1037        elif targets is None:
1038            raise service_error(service_error.req, "no targets")
1039
1040        if aid in self.allocation:
1041            topo = self.allocation[aid].get('topo', None)
1042            if topo: topo = topo.clone()
1043        else:
1044            topo = None
1045
1046        targets = copy.copy(targets)
1047        ptargets = { }
1048        for e in topo.elements:
1049            if isinstance(e, topdl.Computer):
1050                if e.name in targets:
1051                    targets.remove(e.name)
1052                    pn = get_pname(e)
1053                    if pn: ptargets[e.name] = pn
1054
1055        status = [ operation_status(t, operation_status.no_target) \
1056                for t in targets]
1057
1058        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1059                debug=self.create_debug, boss=self.boss, ops=self.ops,
1060                cert=self.xmlrpc_cert)
1061        ops(self, op, ptargets, params, topo)
1062       
1063        status.extend(ops.status)
1064
1065        return { 
1066                'allocID': req['allocID'], 
1067                'status': [s.to_dict() for s in status],
1068                'proof': proof.to_dict(),
1069                }
Note: See TracBrowser for help on using the repository browser.