source: fedd/federation/emulab_access.py @ f77a256

compt_changesinfo-ops
Last change on this file since f77a256 was f77a256, checked in by Ted Faber <faber@…>, 9 years ago

DETER plugin can act as users now

  • Property mode set to 100644
File size: 34.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from fedid import fedid, generate_fedid
21from authorizer import authorizer, abac_authorizer
22from service_error import service_error
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from proof import proof as access_proof
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30import topdl
31import list_log
32import emulab_segment
33
34
35# Make log messages disappear if noone configures a fedd logger
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.access")
40fl.addHandler(nullHandler())
41
42class access(access_base):
43    """
44    The implementation of access control based on mapping users to projects.
45
46    Users can be mapped to existing projects or have projects created
47    dynamically.  This implements both direct requests and proxies.
48    """
49
50    max_name_len = 19
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.max_name_len = access.max_name_len
60
61        self.allow_proxy = config.getboolean("access", "allow_proxy")
62
63        self.domain = config.get("access", "domain")
64        self.userconfdir = config.get("access","userconfdir")
65        self.userconfcmd = config.get("access","userconfcmd")
66        self.userconfurl = config.get("access","userconfurl")
67        self.federation_software = config.get("access", "federation_software")
68        self.portal_software = config.get("access", "portal_software")
69        self.local_seer_software = config.get("access", "local_seer_software")
70        self.local_seer_image = config.get("access", "local_seer_image")
71        self.local_seer_start = config.get("access", "local_seer_start")
72        self.seer_master_start = config.get("access", "seer_master_start")
73        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
74        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
75        self.ssh_port = config.get("access","ssh_port") or "22"
76        self.boss = config.get("access", "boss")
77        self.ops = config.get("access", "ops")
78        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
79        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
80
81        self.dragon_endpoint = config.get("access", "dragon")
82        self.dragon_vlans = config.get("access", "dragon_vlans")
83        self.deter_internal = config.get("access", "deter_internal")
84
85        self.tunnel_config = config.getboolean("access", "tunnel_config")
86        self.portal_command = config.get("access", "portal_command")
87        self.portal_image = config.get("access", "portal_image")
88        self.portal_type = config.get("access", "portal_type") or "pc"
89        self.portal_startcommand = config.get("access", "portal_startcommand")
90        self.node_startcommand = config.get("access", "node_startcommand")
91
92        self.federation_software = self.software_list(self.federation_software)
93        self.portal_software = self.software_list(self.portal_software)
94        self.local_seer_software = self.software_list(self.local_seer_software)
95
96        self.access_type = self.access_type.lower()
97        self.start_segment = emulab_segment.start_segment
98        self.stop_segment = emulab_segment.stop_segment
99        self.info_segment = emulab_segment.info_segment
100        self.operation_segment = emulab_segment.operation_segment
101
102        self.restricted = [ ]
103        tb = config.get('access', 'testbed')
104        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
105        else: self.testbed = [ ]
106
107        # authorization information
108        self.auth_type = config.get('access', 'auth_type') \
109                or 'abac'
110        self.auth_dir = config.get('access', 'auth_dir')
111        accessdb = config.get("access", "accessdb")
112        # initialize the authorization system
113        if self.auth_type == 'abac':
114            self.auth = abac_authorizer(load=self.auth_dir)
115            self.access = [ ]
116            if accessdb:
117                self.read_access(accessdb, self.access_tuple)
118        else:
119            raise service_error(service_error.internal, 
120                    "Unknown auth_type: %s" % self.auth_type)
121
122        # read_state in the base_class
123        self.state_lock.acquire()
124        if 'allocation' not in self.state: self.state['allocation']= { }
125        self.allocation = self.state['allocation']
126        self.state_lock.release()
127        self.exports = {
128                'SMB': self.export_SMB,
129                'seer': self.export_seer,
130                'tmcd': self.export_tmcd,
131                'userconfig': self.export_userconfig,
132                'project_export': self.export_project_export,
133                'local_seer_control': self.export_local_seer,
134                'seer_master': self.export_seer_master,
135                'hide_hosts': self.export_hide_hosts,
136                }
137
138        if not self.local_seer_image or not self.local_seer_software or \
139                not self.local_seer_start:
140            if 'local_seer_control' in self.exports:
141                del self.exports['local_seer_control']
142
143        if not self.local_seer_image or not self.local_seer_software or \
144                not self.seer_master_start:
145            if 'seer_master' in self.exports:
146                del self.exports['seer_master']
147
148
149        self.soap_services = {\
150            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
151            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
152            'StartSegment': soap_handler("StartSegment", self.StartSegment),
153            'TerminateSegment': soap_handler("TerminateSegment",
154                self.TerminateSegment),
155            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
156            'OperationSegment': soap_handler("OperationSegment",
157                self.OperationSegment),
158            }
159        self.xmlrpc_services =  {\
160            'RequestAccess': xmlrpc_handler('RequestAccess',
161                self.RequestAccess),
162            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
163                self.ReleaseAccess),
164            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
165            'TerminateSegment': xmlrpc_handler('TerminateSegment',
166                self.TerminateSegment),
167            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
168            'OperationSegment': xmlrpc_handler("OperationSegment",
169                self.OperationSegment),
170            }
171
172        self.call_SetValue = service_caller('SetValue')
173        self.call_GetValue = service_caller('GetValue', log=self.log)
174
175    @staticmethod
176    def access_tuple(str):
177        """
178        Convert a string of the form (id, id, id) into an access_project.  This
179        is called by read_access to convert to local attributes.  It returns a
180        tuple of the form (project, user, certificate_file).
181        """
182
183        str = str.strip()
184        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
185            # The slice takes the parens off the string.
186            proj, user, cert = str[1:-1].split(',')
187            return (proj.strip(), user.strip(), cert.strip())
188        else:
189            raise self.parse_error(
190                    'Bad mapping (unbalanced parens or more than 2 commas)')
191
192    # RequestAccess support routines
193
194    def save_project_state(self, aid, pname, uname, certf, owners):
195        """
196        Save the project, user, and owners associated with this allocation.
197        This creates the allocation entry.
198        """
199        self.state_lock.acquire()
200        self.allocation[aid] = { }
201        self.allocation[aid]['project'] = pname
202        self.allocation[aid]['user'] = uname
203        self.allocation[aid]['cert'] = certf
204        self.allocation[aid]['owners'] = owners
205        self.write_state()
206        self.state_lock.release()
207        return (pname, uname)
208
209    # End of RequestAccess support routines
210
211    def RequestAccess(self, req, fid):
212        """
213        Handle the access request.  Proxy if not for us.
214
215        Parse out the fields and make the allocations or rejections if for us,
216        otherwise, assuming we're willing to proxy, proxy the request out.
217        """
218
219        def gateway_hardware(h):
220            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
221            else: return h
222
223        def get_export_project(svcs):
224            """
225            if the service requests includes one to export a project, return
226            that project.
227            """
228            rv = None
229            for s in svcs:
230                if s.get('name', '') == 'project_export' and \
231                        s.get('visibility', '') == 'export':
232                    if not rv: 
233                        for a in s.get('fedAttr', []):
234                            if a.get('attribute', '') == 'project' \
235                                    and 'value' in a:
236                                rv = a['value']
237                    else:
238                        raise service_error(service_error, access, 
239                                'Requesting multiple project exports is ' + \
240                                        'not supported');
241            return rv
242
243        # The dance to get into the request body
244        if req.has_key('RequestAccessRequestBody'):
245            req = req['RequestAccessRequestBody']
246        else:
247            raise service_error(service_error.req, "No request!?")
248
249        # if this includes a project export request, construct a filter such
250        # that only the ABAC attributes mapped to that project are checked for
251        # access.
252        if 'service' in req:
253            ep = get_export_project(req['service'])
254            if ep: pf = lambda(a): a.value[0] == ep
255            else: pf = None
256        else:
257            ep = None
258            pf = None
259
260        if self.auth.import_credentials(
261                data_list=req.get('abac_credential', [])):
262            self.auth.save()
263
264        if self.auth_type == 'abac':
265            found, owners, proof = self.lookup_access(req, fid, filter=pf)
266        else:
267            raise service_error(service_error.internal, 
268                    'Unknown auth_type: %s' % self.auth_type)
269        ap = None
270
271        # keep track of what's been added
272        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
273        aid = unicode(allocID)
274
275        pname, uname = self.save_project_state(aid, found[0], found[1], 
276                found[2], owners)
277
278        services, svc_state = self.export_services(req.get('service',[]),
279                pname, uname)
280        self.state_lock.acquire()
281        # Store services state in global state
282        for k, v in svc_state.items():
283            self.allocation[aid][k] = v
284        self.append_allocation_authorization(aid, 
285                set([(o, allocID) for o in owners]), state_attr='allocation')
286        self.write_state()
287        self.state_lock.release()
288        try:
289            f = open("%s/%s.pem" % (self.certdir, aid), "w")
290            print >>f, alloc_cert
291            f.close()
292        except EnvironmentError, e:
293            raise service_error(service_error.internal, 
294                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
295        resp = self.build_access_response({ 'fedid': allocID } ,
296                pname, services, proof)
297        return resp
298
299    def ReleaseAccess(self, req, fid):
300        # The dance to get into the request body
301        if req.has_key('ReleaseAccessRequestBody'):
302            req = req['ReleaseAccessRequestBody']
303        else:
304            raise service_error(service_error.req, "No request!?")
305
306        try:
307            if req['allocID'].has_key('localname'):
308                auth_attr = aid = req['allocID']['localname']
309            elif req['allocID'].has_key('fedid'):
310                aid = unicode(req['allocID']['fedid'])
311                auth_attr = req['allocID']['fedid']
312            else:
313                raise service_error(service_error.req,
314                        "Only localnames and fedids are understood")
315        except KeyError:
316            raise service_error(service_error.req, "Badly formed request")
317
318        self.log.debug("[access] deallocation requested for %s by %s" % \
319                (aid, fid))
320        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
321                with_proof=True)
322        if not access_ok:
323            self.log.debug("[access] deallocation denied for %s", aid)
324            raise service_error(service_error.access, "Access Denied")
325
326        self.state_lock.acquire()
327        if aid in self.allocation:
328            self.log.debug("Found allocation for %s" %aid)
329            self.clear_allocation_authorization(aid, state_attr='allocation')
330            del self.allocation[aid]
331            self.write_state()
332            self.state_lock.release()
333            # Remove the access cert
334            cf = "%s/%s.pem" % (self.certdir, aid)
335            self.log.debug("Removing %s" % cf)
336            os.remove(cf)
337            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
338        else:
339            self.state_lock.release()
340            raise service_error(service_error.req, "No such allocation")
341
342    # These are subroutines for StartSegment
343    def generate_ns2(self, topo, expfn, softdir, connInfo):
344        """
345        Convert topo into an ns2 file, decorated with appropriate commands for
346        the particular testbed setup.  Convert all requests for software, etc
347        to point at the staged copies on this testbed and add the federation
348        startcommands.
349        """
350        class dragon_commands:
351            """
352            Functor to spit out approrpiate dragon commands for nodes listed in
353            the connectivity description.  The constructor makes a dict mapping
354            dragon nodes to their parameters and the __call__ checks each
355            element in turn for membership.
356            """
357            def __init__(self, map):
358                self.node_info = map
359
360            def __call__(self, e):
361                s = ""
362                if isinstance(e, topdl.Computer):
363                    if self.node_info.has_key(e.name):
364                        info = self.node_info[e.name]
365                        for ifname, vlan, type in info:
366                            for i in e.interface:
367                                if i.name == ifname:
368                                    addr = i.get_attribute('ip4_address')
369                                    subs = i.substrate[0]
370                                    break
371                            else:
372                                raise service_error(service_error.internal,
373                                        "No interface %s on element %s" % \
374                                                (ifname, e.name))
375                            # XXX: do netmask right
376                            if type =='link':
377                                s = ("tb-allow-external ${%s} " + \
378                                        "dragonportal ip %s vlan %s " + \
379                                        "netmask 255.255.255.0\n") % \
380                                        (topdl.to_tcl_name(e.name), addr, vlan)
381                            elif type =='lan':
382                                s = ("tb-allow-external ${%s} " + \
383                                        "dragonportal " + \
384                                        "ip %s vlan %s usurp %s\n") % \
385                                        (topdl.to_tcl_name(e.name), addr, 
386                                                vlan, subs)
387                            else:
388                                raise service_error(service_error_internal,
389                                        "Unknown DRAGON type %s" % type)
390                return s
391
392        class not_dragon:
393            """
394            Return true if a node is in the given map of dragon nodes.
395            """
396            def __init__(self, map):
397                self.nodes = set(map.keys())
398
399            def __call__(self, e):
400                return e.name not in self.nodes
401
402        # Main line of generate_ns2
403        t = topo.clone()
404
405        # Create the map of nodes that need direct connections (dragon
406        # connections) from the connInfo
407        dragon_map = { }
408        for i in [ i for i in connInfo if i['type'] == 'transit']:
409            for a in i.get('fedAttr', []):
410                if a['attribute'] == 'vlan_id':
411                    vlan = a['value']
412                    break
413            else:
414                raise service_error(service_error.internal, "No vlan tag")
415            members = i.get('member', [])
416            if len(members) > 1: type = 'lan'
417            else: type = 'link'
418
419            try:
420                for m in members:
421                    if m['element'] in dragon_map:
422                        dragon_map[m['element']].append(( m['interface'], 
423                            vlan, type))
424                    else:
425                        dragon_map[m['element']] = [( m['interface'], 
426                            vlan, type),]
427            except KeyError:
428                raise service_error(service_error.req,
429                        "Missing connectivity info")
430
431        # Weed out the things we aren't going to instantiate: Segments, portal
432        # substrates, and portal interfaces.  (The copy in the for loop allows
433        # us to delete from e.elements in side the for loop).  While we're
434        # touching all the elements, we also adjust paths from the original
435        # testbed to local testbed paths and put the federation commands into
436        # the start commands
437        for e in [e for e in t.elements]:
438            if isinstance(e, topdl.Segment):
439                t.elements.remove(e)
440            if isinstance(e, topdl.Computer):
441                self.add_kit(e, self.federation_software)
442                if e.get_attribute('portal') and self.portal_startcommand:
443                    # Add local portal support software
444                    self.add_kit(e, self.portal_software)
445                    # Portals never have a user-specified start command
446                    e.set_attribute('startup', self.portal_startcommand)
447                elif self.node_startcommand:
448                    if e.get_attribute('startup'):
449                        e.set_attribute('startup', "%s \\$USER '%s'" % \
450                                (self.node_startcommand, 
451                                    e.get_attribute('startup')))
452                    else:
453                        e.set_attribute('startup', self.node_startcommand)
454
455                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
456                # Remove portal interfaces that do not connect to DRAGON
457                e.interface = [i for i in e.interface \
458                        if not i.get_attribute('portal') or i.name in dinf ]
459            # Fix software paths
460            for s in getattr(e, 'software', []):
461                s.location = re.sub("^.*/", softdir, s.location)
462
463        t.substrates = [ s.clone() for s in t.substrates ]
464        t.incorporate_elements()
465
466        # Customize the ns2 output for local portal commands and images
467        filters = []
468
469        if self.dragon_endpoint:
470            add_filter = not_dragon(dragon_map)
471            filters.append(dragon_commands(dragon_map))
472        else:
473            add_filter = None
474
475        if self.portal_command:
476            filters.append(topdl.generate_portal_command_filter(
477                self.portal_command, add_filter=add_filter))
478
479        if self.portal_image:
480            filters.append(topdl.generate_portal_image_filter(
481                self.portal_image))
482
483        if self.portal_type:
484            filters.append(topdl.generate_portal_hardware_filter(
485                self.portal_type))
486
487        # Convert to ns and write it out
488        expfile = topdl.topology_to_ns2(t, filters)
489        try:
490            f = open(expfn, "w")
491            print >>f, expfile
492            f.close()
493        except EnvironmentError:
494            raise service_error(service_error.internal,
495                    "Cannot write experiment file %s: %s" % (expfn,e))
496
497    def export_store_info(self, cf, proj, ename, connInfo):
498        """
499        For the export requests in the connection info, install the peer names
500        at the experiment controller via SetValue calls.
501        """
502
503        for c in connInfo:
504            for p in [ p for p in c.get('parameter', []) \
505                    if p.get('type', '') == 'output']:
506
507                if p.get('name', '') == 'peer':
508                    k = p.get('key', None)
509                    surl = p.get('store', None)
510                    if surl and k and k.index('/') != -1:
511                        value = "%s.%s.%s%s" % \
512                                (k[k.index('/')+1:], ename, proj, self.domain)
513                        req = { 'name': k, 'value': value }
514                        self.log.debug("Setting %s to %s on %s" % \
515                                (k, value, surl))
516                        self.call_SetValue(surl, req, cf)
517                    else:
518                        self.log.error("Bad export request: %s" % p)
519                elif p.get('name', '') == 'ssh_port':
520                    k = p.get('key', None)
521                    surl = p.get('store', None)
522                    if surl and k:
523                        req = { 'name': k, 'value': self.ssh_port }
524                        self.log.debug("Setting %s to %s on %s" % \
525                                (k, self.ssh_port, surl))
526                        self.call_SetValue(surl, req, cf)
527                    else:
528                        self.log.error("Bad export request: %s" % p)
529                else:
530                    self.log.error("Unknown export parameter: %s" % \
531                            p.get('name'))
532                    continue
533
534    def add_seer_node(self, topo, name, startup):
535        """
536        Add a seer node to the given topology, with the startup command passed
537        in.  Used by configure seer_services.
538        """
539        c_node = topdl.Computer(
540                name=name, 
541                os= topdl.OperatingSystem(
542                    attribute=[
543                    { 'attribute': 'osid', 
544                        'value': self.local_seer_image },
545                    ]),
546                attribute=[
547                    { 'attribute': 'startup', 'value': startup },
548                    ]
549                )
550        self.add_kit(c_node, self.local_seer_software)
551        topo.elements.append(c_node)
552
553    def configure_seer_services(self, services, topo, softdir):
554        """
555        Make changes to the topology required for the seer requests being made.
556        Specifically, add any control or master nodes required and set up the
557        start commands on the nodes to interconnect them.
558        """
559        local_seer = False      # True if we need to add a control node
560        collect_seer = False    # True if there is a seer-master node
561        seer_master= False      # True if we need to add the seer-master
562        for s in services:
563            s_name = s.get('name', '')
564            s_vis = s.get('visibility','')
565
566            if s_name  == 'local_seer_control' and s_vis == 'export':
567                local_seer = True
568            elif s_name == 'seer_master':
569                if s_vis == 'import':
570                    collect_seer = True
571                elif s_vis == 'export':
572                    seer_master = True
573       
574        # We've got the whole picture now, so add nodes if needed and configure
575        # them to interconnect properly.
576        if local_seer or seer_master:
577            # Copy local seer control node software to the tempdir
578            for l, f in self.local_seer_software:
579                base = os.path.basename(f)
580                copy_file(f, "%s/%s" % (softdir, base))
581        # If we're collecting seers somewhere the controllers need to talk to
582        # the master.  In testbeds that export the service, that will be a
583        # local node that we'll add below.  Elsewhere it will be the control
584        # portal that will port forward to the exporting master.
585        if local_seer:
586            if collect_seer:
587                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
588            else:
589                startup = self.local_seer_start
590            self.add_seer_node(topo, 'control', startup)
591        # If this is the seer master, add that node, too.
592        if seer_master:
593            self.add_seer_node(topo, 'seer-master', 
594                    "%s -R -n -R seer-master -R -A -R sink" % \
595                            self.seer_master_start)
596
597    def retrieve_software(self, topo, certfile, softdir):
598        """
599        Collect the software that nodes in the topology need loaded and stage
600        it locally.  This implies retrieving it from the experiment_controller
601        and placing it into softdir.  Certfile is used to prove that this node
602        has access to that data (it's the allocation/segment fedid).  Finally
603        local portal and federation software is also copied to the same staging
604        directory for simplicity - all software needed for experiment creation
605        is in softdir.
606        """
607        sw = set()
608        for e in topo.elements:
609            for s in getattr(e, 'software', []):
610                sw.add(s.location)
611        for s in sw:
612            self.log.debug("Retrieving %s" % s)
613            try:
614                get_url(s, certfile, softdir)
615            except:
616                t, v, st = sys.exc_info()
617                raise service_error(service_error.internal,
618                        "Error retrieving %s: %s" % (s, v))
619
620        # Copy local federation and portal node software to the tempdir
621        for s in (self.federation_software, self.portal_software):
622            for l, f in s:
623                base = os.path.basename(f)
624                copy_file(f, "%s/%s" % (softdir, base))
625
626
627    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
628        """
629        Gather common configuration files, retrieve or create an experiment
630        name and project name, and return the ssh_key filenames.  Create an
631        allocation log bound to the state log variable as well.
632        """
633        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
634                'seer_ca_pem', 'seer_node_pem')
635        ename = None
636        pubkey_base = None
637        secretkey_base = None
638        proj = None
639        user = None
640        alloc_log = None
641        nonce_experiment = False
642        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
643
644        self.state_lock.acquire()
645        if aid in self.allocation:
646            proj = self.allocation[aid].get('project', None)
647        self.state_lock.release()
648
649        if not proj:
650            raise service_error(service_error.internal, 
651                    "Can't find project for %s" %aid)
652
653        for a in attrs:
654            if a['attribute'] in configs:
655                try:
656                    self.log.debug("Retrieving %s from %s" % \
657                            (a['attribute'], a['value']))
658                    get_url(a['value'], certfile, tmpdir)
659                except:
660                    t, v, st = sys.exc_info()
661                    raise service_error(service_error.internal,
662                            "Error retrieving %s: %s" % (a.get('value', ""), v))
663            if a['attribute'] == 'ssh_pubkey':
664                pubkey_base = a['value'].rpartition('/')[2]
665            if a['attribute'] == 'ssh_secretkey':
666                secretkey_base = a['value'].rpartition('/')[2]
667            if a['attribute'] == 'experiment_name':
668                ename = a['value']
669
670        # Names longer than the emulab max are discarded
671        if ename and len(ename) <= self.max_name_len:
672            # Clean up the experiment name so that emulab will accept it.
673            ename = re.sub(vchars_re, '-', ename)
674
675        else:
676            ename = ""
677            for i in range(0,5):
678                ename += random.choice(string.ascii_letters)
679            nonce_experiment = True
680            self.log.warn("No experiment name or suggestion too long: " + \
681                    "picked one randomly: %s" % ename)
682
683        if not pubkey_base:
684            raise service_error(service_error.req, 
685                    "No public key attribute")
686
687        if not secretkey_base:
688            raise service_error(service_error.req, 
689                    "No secret key attribute")
690
691        self.state_lock.acquire()
692        if aid in self.allocation:
693            user = self.allocation[aid].get('user', None)
694            cert = self.allocation[aid].get('cert', None)
695            self.allocation[aid]['experiment'] = ename
696            self.allocation[aid]['nonce'] = nonce_experiment
697            self.allocation[aid]['log'] = [ ]
698            # Create a logger that logs to the experiment's state object as
699            # well as to the main log file.
700            alloc_log = logging.getLogger('fedd.access.%s' % ename)
701            h = logging.StreamHandler(
702                    list_log.list_log(self.allocation[aid]['log']))
703            # XXX: there should be a global one of these rather than
704            # repeating the code.
705            h.setFormatter(logging.Formatter(
706                "%(asctime)s %(name)s %(message)s",
707                        '%d %b %y %H:%M:%S'))
708            alloc_log.addHandler(h)
709            self.write_state()
710        self.state_lock.release()
711
712        if not user:
713            raise service_error(service_error.internal, 
714                    "Can't find creation user for %s" %aid)
715
716        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
717
718    def decorate_topology(self, info, t):
719        """
720        Copy the physical mapping and status onto the topology.  Used by
721        StartSegment and InfoSegment
722        """
723        def add_new(ann, attr):
724            for a in ann:
725                if a not in attr: attr.append(a)
726
727        def merge_os(os, e):
728            if len(e.os) == 0:
729                # No OS at all:
730                if os.get_attribute('emulab_access:image'):
731                    os.set_attribute('emulab_access:initial_image', 
732                            os.get_attribute('emulab_access:image'))
733                e.os = [ os ]
734            elif len(e.os) == 1:
735                # There's one OS, copy the initial image and replace
736                eos = e.os[0]
737                initial = eos.get_attribute('emulab_access:initial_image')
738                if initial:
739                    os.set_attribute('emulab_access:initial_image', initial)
740                e.os = [ os] 
741            else:
742                # Multiple OSes, replace or append
743                for eos in e.os:
744                    if os.name == eos.name:
745                        eos.version = os.version
746                        eos.version = os.distribution
747                        eos.version = os.distributionversion
748                        for a in os.attribute:
749                            if eos.get_attribute(a.attribute):
750                                eos.remove_attribute(a.attribute)
751                            eos.set_attribute(a.attribute, a.value)
752                        break
753                else:
754                    e.os.append(os)
755
756
757        if t is None: return
758        # Copy the assigned names into the return topology
759        for e in t.elements:
760            if isinstance(e, topdl.Computer):
761                if not self.create_debug:
762                    if e.name in info.node:
763                        add_new(("%s%s" % 
764                            (info.node[e.name].pname, self.domain),),
765                            e.localname)
766                        e.status = info.node[e.name].status
767                        os = info.node[e.name].getOS()
768                        if os: merge_os(os, e)
769                else:
770                    # Simple debugging assignment
771                    add_new(("node%d%s" % (i, self.domain),), e.localname)
772                    e.status = 'active'
773                    add_new(('testop1', 'testop2'), e.operation)
774                    i += 1
775
776
777    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
778        """
779        Store key bits of experiment state in the global repository, including
780        the response that may need to be replayed, and return the response.
781        """
782        i = 0
783        t = topo.clone()
784        self.decorate_topology(starter, t)
785        # Grab the log (this is some anal locking, but better safe than
786        # sorry)
787        self.state_lock.acquire()
788        logv = "".join(self.allocation[aid]['log'])
789        # It's possible that the StartSegment call gets retried (!).
790        # if the 'started' key is in the allocation, we'll return it rather
791        # than redo the setup.
792        self.allocation[aid]['started'] = { 
793                'allocID': alloc_id,
794                'allocationLog': logv,
795                'segmentdescription': { 
796                    'topdldescription': t.to_dict()
797                    },
798                'proof': proof.to_dict(),
799                }
800        self.allocation[aid]['topo'] = t
801        retval = copy.copy(self.allocation[aid]['started'])
802        self.write_state()
803        self.state_lock.release()
804        return retval
805   
806    # End of StartSegment support routines
807
808    def StartSegment(self, req, fid):
809        err = None  # Any service_error generated after tmpdir is created
810        rv = None   # Return value from segment creation
811
812        try:
813            req = req['StartSegmentRequestBody']
814            auth_attr = req['allocID']['fedid']
815            topref = req['segmentdescription']['topdldescription']
816        except KeyError:
817            raise service_error(server_error.req, "Badly formed request")
818
819        connInfo = req.get('connection', [])
820        services = req.get('service', [])
821        aid = "%s" % auth_attr
822        attrs = req.get('fedAttr', [])
823
824        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
825                with_proof=True)
826        if not access_ok:
827            raise service_error(service_error.access, "Access denied")
828        else:
829            # See if this is a replay of an earlier succeeded StartSegment -
830            # sometimes SSL kills 'em.  If so, replay the response rather than
831            # redoing the allocation.
832            self.state_lock.acquire()
833            retval = self.allocation[aid].get('started', None)
834            self.state_lock.release()
835            if retval:
836                self.log.warning("Duplicate StartSegment for %s: " % aid + \
837                        "replaying response")
838                return retval
839
840        # A new request.  Do it.
841
842        if topref: topo = topdl.Topology(**topref)
843        else:
844            raise service_error(service_error.req, 
845                    "Request missing segmentdescription'")
846       
847        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
848        try:
849            tmpdir = tempfile.mkdtemp(prefix="access-")
850            softdir = "%s/software" % tmpdir
851            os.mkdir(softdir)
852        except EnvironmentError:
853            raise service_error(service_error.internal, "Cannot create tmp dir")
854
855        # Try block alllows us to clean up temporary files.
856        try:
857            self.retrieve_software(topo, certfile, softdir)
858            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
859                alloc_log =  self.initialize_experiment_info(attrs, aid, 
860                        certfile, tmpdir)
861
862            if '/' in proj: proj, gid = proj.split('/')
863            else: gid = None
864
865
866            # Set up userconf and seer if needed
867            self.configure_userconf(services, tmpdir)
868            self.configure_seer_services(services, topo, softdir)
869            # Get and send synch store variables
870            self.export_store_info(certfile, proj, ename, connInfo)
871            self.import_store_info(certfile, connInfo)
872
873            expfile = "%s/experiment.tcl" % tmpdir
874
875            self.generate_portal_configs(topo, pubkey_base, 
876                    secretkey_base, tmpdir, proj, ename, connInfo, services)
877            self.generate_ns2(topo, expfile, 
878                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
879
880            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
881                    debug=self.create_debug, log=alloc_log, boss=self.boss,
882                    ops=self.ops, cert=xmlrpc_cert)
883            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
884        except service_error, e:
885            err = e
886        except:
887            t, v, st = sys.exc_info()
888            err = service_error(service_error.internal, "%s: %s" % \
889                    (v, traceback.extract_tb(st)))
890
891        # Walk up tmpdir, deleting as we go
892        if self.cleanup: self.remove_dirs(tmpdir)
893        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
894
895        if rv:
896            return self.finalize_experiment(starter, topo, aid, req['allocID'],
897                    proof)
898        elif err:
899            raise service_error(service_error.federant,
900                    "Swapin failed: %s" % err)
901        else:
902            raise service_error(service_error.federant, "Swapin failed")
903
904    def TerminateSegment(self, req, fid):
905        try:
906            req = req['TerminateSegmentRequestBody']
907        except KeyError:
908            raise service_error(server_error.req, "Badly formed request")
909
910        auth_attr = req['allocID']['fedid']
911        aid = "%s" % auth_attr
912        attrs = req.get('fedAttr', [])
913
914        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
915                with_proof=True)
916        if not access_ok:
917            raise service_error(service_error.access, "Access denied")
918
919        self.state_lock.acquire()
920        if aid in self.allocation:
921            proj = self.allocation[aid].get('project', None)
922            user = self.allocation[aid].get('user', None)
923            xmlrpc_cert = self.allocation[aid].get('cert', None)
924            ename = self.allocation[aid].get('experiment', None)
925            nonce = self.allocation[aid].get('nonce', False)
926        else:
927            proj = None
928            user = None
929            ename = None
930            nonce = False
931            xmlrpc_cert = None
932        self.state_lock.release()
933
934        if not proj:
935            raise service_error(service_error.internal, 
936                    "Can't find project for %s" % aid)
937        else:
938            if '/' in proj: proj, gid = proj.split('/')
939            else: gid = None
940
941        if not user:
942            raise service_error(service_error.internal, 
943                    "Can't find creation user for %s" % aid)
944        if not ename:
945            raise service_error(service_error.internal, 
946                    "Can't find experiment name for %s" % aid)
947        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
948                debug=self.create_debug, boss=self.boss, ops=self.ops,
949                cert=xmlrpc_cert)
950        stopper(self, user, proj, ename, gid, nonce)
951        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
952
953    def InfoSegment(self, req, fid):
954        try:
955            req = req['InfoSegmentRequestBody']
956        except KeyError:
957            raise service_error(server_error.req, "Badly formed request")
958
959        auth_attr = req['allocID']['fedid']
960        aid = "%s" % auth_attr
961
962        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
963                with_proof=True)
964        if not access_ok:
965            raise service_error(service_error.access, "Access denied")
966
967        self.state_lock.acquire()
968        if aid in self.allocation:
969            topo = self.allocation[aid].get('topo', None)
970            if topo: topo = topo.clone()
971            proj = self.allocation[aid].get('project', None)
972            user = self.allocation[aid].get('user', None)
973            xmlrpc_cert = self.allocation[aid].get('cert', None)
974            ename = self.allocation[aid].get('experiment', None)
975        else:
976            proj = None
977            user = None
978            ename = None
979            topo = None
980            xmlrpc_cert = None
981        self.state_lock.release()
982
983        if not proj:
984            raise service_error(service_error.internal, 
985                    "Can't find project for %s" % aid)
986        else:
987            if '/' in proj: proj, gid = proj.split('/')
988            else: gid = None
989
990        if not user:
991            raise service_error(service_error.internal, 
992                    "Can't find creation user for %s" % aid)
993        if not ename:
994            raise service_error(service_error.internal, 
995                    "Can't find experiment name for %s" % aid)
996        info = self.info_segment(keyfile=self.ssh_privkey_file,
997                debug=self.create_debug, boss=self.boss, ops=self.ops,
998                cert=xmlrpc_cert)
999        info(self, user, proj, ename)
1000        self.decorate_topology(info, topo)
1001        self.state_lock.acquire()
1002        if aid in self.allocation:
1003            self.allocation[aid]['topo'] = topo
1004            self.write_state()
1005        self.state_lock.release()
1006
1007        rv = { 
1008                'allocID': req['allocID'], 
1009                'proof': proof.to_dict(),
1010                }
1011        if topo:
1012            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1013        return rv
1014
1015    def OperationSegment(self, req, fid):
1016        def get_pname(e):
1017            """
1018            Get the physical name of a node
1019            """
1020            if e.localname:
1021                return re.sub('\..*','', e.localname[0])
1022            else:
1023                return None
1024
1025        try:
1026            req = req['OperationSegmentRequestBody']
1027        except KeyError:
1028            raise service_error(server_error.req, "Badly formed request")
1029
1030        auth_attr = req['allocID']['fedid']
1031        aid = "%s" % auth_attr
1032
1033        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1034                with_proof=True)
1035        if not access_ok:
1036            raise service_error(service_error.access, "Access denied")
1037
1038        op = req.get('operation', None)
1039        targets = req.get('target', None)
1040        params = req.get('parameter', None)
1041
1042        if op is None :
1043            raise service_error(service_error.req, "missing operation")
1044        elif targets is None:
1045            raise service_error(service_error.req, "no targets")
1046
1047        self.state_lock.acquire()
1048        if aid in self.allocation:
1049            topo = self.allocation[aid].get('topo', None)
1050            if topo: topo = topo.clone()
1051            xmlrpc_cert = self.allocation[aid].get('cert', None)
1052        else:
1053            topo = None
1054            xmlrpc_cert = None
1055        self.state_lock.release()
1056
1057        targets = copy.copy(targets)
1058        ptargets = { }
1059        for e in topo.elements:
1060            if isinstance(e, topdl.Computer):
1061                if e.name in targets:
1062                    targets.remove(e.name)
1063                    pn = get_pname(e)
1064                    if pn: ptargets[e.name] = pn
1065
1066        status = [ operation_status(t, operation_status.no_target) \
1067                for t in targets]
1068
1069        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1070                debug=self.create_debug, boss=self.boss, ops=self.ops,
1071                cert=xmlrpc_cert)
1072        ops(self, op, ptargets, params, topo)
1073       
1074        status.extend(ops.status)
1075
1076        return { 
1077                'allocID': req['allocID'], 
1078                'status': [s.to_dict() for s in status],
1079                'proof': proof.to_dict(),
1080                }
Note: See TracBrowser for help on using the repository browser.