source: fedd/federation/emulab_access.py @ 6a50b78

compt_changes
Last change on this file since 6a50b78 was 7653f01, checked in by Ted Faber <faber@…>, 12 years ago

Broke debugging.

  • Property mode set to 100644
File size: 34.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from deter import fedid, generate_fedid
21from authorizer import authorizer, abac_authorizer
22from service_error import service_error
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from proof import proof as access_proof
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30from deter import topdl
31import list_log
32import emulab_segment
33
34
35# Make log messages disappear if noone configures a fedd logger
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.access")
40fl.addHandler(nullHandler())
41
42class access(access_base):
43    """
44    The implementation of access control based on mapping users to projects.
45
46    Users can be mapped to existing projects or have projects created
47    dynamically.  This implements both direct requests and proxies.
48    """
49
50    max_name_len = 19
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.max_name_len = access.max_name_len
60
61        self.allow_proxy = config.getboolean("access", "allow_proxy")
62
63        self.domain = config.get("access", "domain")
64        self.userconfdir = config.get("access","userconfdir")
65        self.userconfcmd = config.get("access","userconfcmd")
66        self.userconfurl = config.get("access","userconfurl")
67        self.federation_software = config.get("access", "federation_software")
68        self.portal_software = config.get("access", "portal_software")
69        self.local_seer_software = config.get("access", "local_seer_software")
70        self.local_seer_image = config.get("access", "local_seer_image")
71        self.local_seer_start = config.get("access", "local_seer_start")
72        self.seer_master_start = config.get("access", "seer_master_start")
73        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
74        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
75        self.ssh_port = config.get("access","ssh_port") or "22"
76        self.boss = config.get("access", "boss")
77        self.ops = config.get("access", "ops")
78        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
79        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
80
81        self.dragon_endpoint = config.get("access", "dragon")
82        self.dragon_vlans = config.get("access", "dragon_vlans")
83        self.deter_internal = config.get("access", "deter_internal")
84
85        self.tunnel_config = config.getboolean("access", "tunnel_config")
86        self.portal_command = config.get("access", "portal_command")
87        self.portal_image = config.get("access", "portal_image")
88        self.portal_type = config.get("access", "portal_type") or "pc"
89        self.portal_startcommand = config.get("access", "portal_startcommand")
90        self.node_startcommand = config.get("access", "node_startcommand")
91
92        self.federation_software = self.software_list(self.federation_software)
93        self.portal_software = self.software_list(self.portal_software)
94        self.local_seer_software = self.software_list(self.local_seer_software)
95
96        self.access_type = self.access_type.lower()
97        self.start_segment = emulab_segment.start_segment
98        self.stop_segment = emulab_segment.stop_segment
99        self.info_segment = emulab_segment.info_segment
100        self.operation_segment = emulab_segment.operation_segment
101
102        self.restricted = [ ]
103        tb = config.get('access', 'testbed')
104        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
105        else: self.testbed = [ ]
106
107        # authorization information
108        self.auth_type = config.get('access', 'auth_type') \
109                or 'abac'
110        self.auth_dir = config.get('access', 'auth_dir')
111        accessdb = config.get("access", "accessdb")
112        # initialize the authorization system
113        if self.auth_type == 'abac':
114            self.auth = abac_authorizer(load=self.auth_dir)
115            self.access = [ ]
116            if accessdb:
117                self.read_access(accessdb, self.access_tuple)
118        else:
119            raise service_error(service_error.internal, 
120                    "Unknown auth_type: %s" % self.auth_type)
121
122        # read_state in the base_class
123        self.state_lock.acquire()
124        if 'allocation' not in self.state: self.state['allocation']= { }
125        self.allocation = self.state['allocation']
126        self.state_lock.release()
127        self.exports = {
128                'SMB': self.export_SMB,
129                'seer': self.export_seer,
130                'tmcd': self.export_tmcd,
131                'userconfig': self.export_userconfig,
132                'project_export': self.export_project_export,
133                'local_seer_control': self.export_local_seer,
134                'seer_master': self.export_seer_master,
135                'hide_hosts': self.export_hide_hosts,
136                }
137
138        if not self.local_seer_image or not self.local_seer_software or \
139                not self.local_seer_start:
140            if 'local_seer_control' in self.exports:
141                del self.exports['local_seer_control']
142
143        if not self.local_seer_image or not self.local_seer_software or \
144                not self.seer_master_start:
145            if 'seer_master' in self.exports:
146                del self.exports['seer_master']
147
148
149        self.soap_services = {\
150            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
151            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
152            'StartSegment': soap_handler("StartSegment", self.StartSegment),
153            'TerminateSegment': soap_handler("TerminateSegment",
154                self.TerminateSegment),
155            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
156            'OperationSegment': soap_handler("OperationSegment",
157                self.OperationSegment),
158            }
159        self.xmlrpc_services =  {\
160            'RequestAccess': xmlrpc_handler('RequestAccess',
161                self.RequestAccess),
162            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
163                self.ReleaseAccess),
164            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
165            'TerminateSegment': xmlrpc_handler('TerminateSegment',
166                self.TerminateSegment),
167            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
168            'OperationSegment': xmlrpc_handler("OperationSegment",
169                self.OperationSegment),
170            }
171
172        self.call_SetValue = service_caller('SetValue')
173        self.call_GetValue = service_caller('GetValue', log=self.log)
174
175    @staticmethod
176    def access_tuple(str):
177        """
178        Convert a string of the form (id, id, id) into an access_project.  This
179        is called by read_access to convert to local attributes.  It returns a
180        tuple of the form (project, user, certificate_file).
181        """
182
183        str = str.strip()
184        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
185            # The slice takes the parens off the string.
186            proj, user, cert = str[1:-1].split(',')
187            return (proj.strip(), user.strip(), cert.strip())
188        else:
189            raise self.parse_error(
190                    'Bad mapping (unbalanced parens or more than 2 commas)')
191
192    # RequestAccess support routines
193
194    def save_project_state(self, aid, pname, uname, certf, owners):
195        """
196        Save the project, user, and owners associated with this allocation.
197        This creates the allocation entry.
198        """
199        self.state_lock.acquire()
200        self.allocation[aid] = { }
201        self.allocation[aid]['project'] = pname
202        self.allocation[aid]['user'] = uname
203        self.allocation[aid]['cert'] = certf
204        self.allocation[aid]['owners'] = owners
205        self.write_state()
206        self.state_lock.release()
207        return (pname, uname)
208
209    # End of RequestAccess support routines
210
211    def RequestAccess(self, req, fid):
212        """
213        Handle the access request.  Proxy if not for us.
214
215        Parse out the fields and make the allocations or rejections if for us,
216        otherwise, assuming we're willing to proxy, proxy the request out.
217        """
218
219        def gateway_hardware(h):
220            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
221            else: return h
222
223        def get_export_project(svcs):
224            """
225            if the service requests includes one to export a project, return
226            that project.
227            """
228            rv = None
229            for s in svcs:
230                if s.get('name', '') == 'project_export' and \
231                        s.get('visibility', '') == 'export':
232                    if not rv: 
233                        for a in s.get('fedAttr', []):
234                            if a.get('attribute', '') == 'project' \
235                                    and 'value' in a:
236                                rv = a['value']
237                    else:
238                        raise service_error(service_error, access, 
239                                'Requesting multiple project exports is ' + \
240                                        'not supported');
241            return rv
242
243        # The dance to get into the request body
244        if req.has_key('RequestAccessRequestBody'):
245            req = req['RequestAccessRequestBody']
246        else:
247            raise service_error(service_error.req, "No request!?")
248
249        # if this includes a project export request, construct a filter such
250        # that only the ABAC attributes mapped to that project are checked for
251        # access.
252        if 'service' in req:
253            ep = get_export_project(req['service'])
254            if ep: pf = lambda(a): a.value[0] == ep
255            else: pf = None
256        else:
257            ep = None
258            pf = None
259
260        if self.auth.import_credentials(
261                data_list=req.get('abac_credential', [])):
262            self.auth.save()
263
264        if self.auth_type == 'abac':
265            found, owners, proof = self.lookup_access(req, fid, filter=pf)
266        else:
267            raise service_error(service_error.internal, 
268                    'Unknown auth_type: %s' % self.auth_type)
269        ap = None
270
271        # keep track of what's been added
272        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
273        aid = unicode(allocID)
274
275        pname, uname = self.save_project_state(aid, found[0], found[1], 
276                found[2], owners)
277
278        services, svc_state = self.export_services(req.get('service',[]),
279                pname, uname)
280        self.state_lock.acquire()
281        # Store services state in global state
282        for k, v in svc_state.items():
283            self.allocation[aid][k] = v
284        self.append_allocation_authorization(aid, 
285                set([(o, allocID) for o in owners]), state_attr='allocation')
286        self.write_state()
287        self.state_lock.release()
288        try:
289            f = open("%s/%s.pem" % (self.certdir, aid), "w")
290            print >>f, alloc_cert
291            f.close()
292        except EnvironmentError, e:
293            raise service_error(service_error.internal, 
294                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
295        resp = self.build_access_response({ 'fedid': allocID } ,
296                pname, services, proof)
297        return resp
298
299    def ReleaseAccess(self, req, fid):
300        # The dance to get into the request body
301        if req.has_key('ReleaseAccessRequestBody'):
302            req = req['ReleaseAccessRequestBody']
303        else:
304            raise service_error(service_error.req, "No request!?")
305
306        try:
307            if req['allocID'].has_key('localname'):
308                auth_attr = aid = req['allocID']['localname']
309            elif req['allocID'].has_key('fedid'):
310                aid = unicode(req['allocID']['fedid'])
311                auth_attr = req['allocID']['fedid']
312            else:
313                raise service_error(service_error.req,
314                        "Only localnames and fedids are understood")
315        except KeyError:
316            raise service_error(service_error.req, "Badly formed request")
317
318        self.log.debug("[access] deallocation requested for %s by %s" % \
319                (aid, fid))
320        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
321                with_proof=True)
322        if not access_ok:
323            self.log.debug("[access] deallocation denied for %s", aid)
324            raise service_error(service_error.access, "Access Denied")
325
326        self.state_lock.acquire()
327        if aid in self.allocation:
328            self.log.debug("Found allocation for %s" %aid)
329            self.clear_allocation_authorization(aid, state_attr='allocation')
330            del self.allocation[aid]
331            self.write_state()
332            self.state_lock.release()
333            # Remove the access cert
334            cf = "%s/%s.pem" % (self.certdir, aid)
335            self.log.debug("Removing %s" % cf)
336            os.remove(cf)
337            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
338        else:
339            self.state_lock.release()
340            raise service_error(service_error.req, "No such allocation")
341
342    # These are subroutines for StartSegment
343    def generate_ns2(self, topo, expfn, softdir, connInfo):
344        """
345        Convert topo into an ns2 file, decorated with appropriate commands for
346        the particular testbed setup.  Convert all requests for software, etc
347        to point at the staged copies on this testbed and add the federation
348        startcommands.
349        """
350        class dragon_commands:
351            """
352            Functor to spit out approrpiate dragon commands for nodes listed in
353            the connectivity description.  The constructor makes a dict mapping
354            dragon nodes to their parameters and the __call__ checks each
355            element in turn for membership.
356            """
357            def __init__(self, map):
358                self.node_info = map
359
360            def __call__(self, e):
361                s = ""
362                if isinstance(e, topdl.Computer):
363                    if self.node_info.has_key(e.name):
364                        info = self.node_info[e.name]
365                        for ifname, vlan, type in info:
366                            for i in e.interface:
367                                if i.name == ifname:
368                                    addr = i.get_attribute('ip4_address')
369                                    subs = i.substrate[0]
370                                    break
371                            else:
372                                raise service_error(service_error.internal,
373                                        "No interface %s on element %s" % \
374                                                (ifname, e.name))
375                            # XXX: do netmask right
376                            if type =='link':
377                                s = ("tb-allow-external ${%s} " + \
378                                        "dragonportal ip %s vlan %s " + \
379                                        "netmask 255.255.255.0\n") % \
380                                        (topdl.to_tcl_name(e.name), addr, vlan)
381                            elif type =='lan':
382                                s = ("tb-allow-external ${%s} " + \
383                                        "dragonportal " + \
384                                        "ip %s vlan %s usurp %s\n") % \
385                                        (topdl.to_tcl_name(e.name), addr, 
386                                                vlan, subs)
387                            else:
388                                raise service_error(service_error_internal,
389                                        "Unknown DRAGON type %s" % type)
390                return s
391
392        class not_dragon:
393            """
394            Return true if a node is in the given map of dragon nodes.
395            """
396            def __init__(self, map):
397                self.nodes = set(map.keys())
398
399            def __call__(self, e):
400                return e.name not in self.nodes
401
402        # Main line of generate_ns2
403        t = topo.clone()
404
405        # Create the map of nodes that need direct connections (dragon
406        # connections) from the connInfo
407        dragon_map = { }
408        for i in [ i for i in connInfo if i['type'] == 'transit']:
409            for a in i.get('fedAttr', []):
410                if a['attribute'] == 'vlan_id':
411                    vlan = a['value']
412                    break
413            else:
414                raise service_error(service_error.internal, "No vlan tag")
415            members = i.get('member', [])
416            if len(members) > 1: type = 'lan'
417            else: type = 'link'
418
419            try:
420                for m in members:
421                    if m['element'] in dragon_map:
422                        dragon_map[m['element']].append(( m['interface'], 
423                            vlan, type))
424                    else:
425                        dragon_map[m['element']] = [( m['interface'], 
426                            vlan, type),]
427            except KeyError:
428                raise service_error(service_error.req,
429                        "Missing connectivity info")
430
431        # Weed out the things we aren't going to instantiate: Segments, portal
432        # substrates, and portal interfaces.  (The copy in the for loop allows
433        # us to delete from e.elements in side the for loop).  While we're
434        # touching all the elements, we also adjust paths from the original
435        # testbed to local testbed paths and put the federation commands into
436        # the start commands
437        for e in [e for e in t.elements]:
438            if isinstance(e, topdl.Segment):
439                t.elements.remove(e)
440            if isinstance(e, topdl.Computer):
441                self.add_kit(e, self.federation_software)
442                if e.get_attribute('portal') and self.portal_startcommand:
443                    # Add local portal support software
444                    self.add_kit(e, self.portal_software)
445                    # Portals never have a user-specified start command
446                    e.set_attribute('startup', self.portal_startcommand)
447                elif self.node_startcommand:
448                    if e.get_attribute('startup'):
449                        e.set_attribute('startup', "%s \\$USER '%s'" % \
450                                (self.node_startcommand, 
451                                    e.get_attribute('startup')))
452                    else:
453                        e.set_attribute('startup', self.node_startcommand)
454
455                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
456                # Remove portal interfaces that do not connect to DRAGON
457                e.interface = [i for i in e.interface \
458                        if not i.get_attribute('portal') or i.name in dinf ]
459            # Fix software paths
460            for s in getattr(e, 'software', []):
461                s.location = re.sub("^.*/", softdir, s.location)
462
463        t.substrates = [ s.clone() for s in t.substrates ]
464        t.incorporate_elements()
465
466        # Customize the ns2 output for local portal commands and images
467        filters = []
468
469        if self.dragon_endpoint:
470            add_filter = not_dragon(dragon_map)
471            filters.append(dragon_commands(dragon_map))
472        else:
473            add_filter = None
474
475        if self.portal_command:
476            filters.append(topdl.generate_portal_command_filter(
477                self.portal_command, add_filter=add_filter))
478
479        if self.portal_image:
480            filters.append(topdl.generate_portal_image_filter(
481                self.portal_image))
482
483        if self.portal_type:
484            filters.append(topdl.generate_portal_hardware_filter(
485                self.portal_type))
486
487        # Convert to ns and write it out
488        expfile = topdl.topology_to_ns2(t, filters)
489        try:
490            f = open(expfn, "w")
491            print >>f, expfile
492            f.close()
493        except EnvironmentError:
494            raise service_error(service_error.internal,
495                    "Cannot write experiment file %s: %s" % (expfn,e))
496
497    def export_store_info(self, cf, proj, ename, connInfo):
498        """
499        For the export requests in the connection info, install the peer names
500        at the experiment controller via SetValue calls.
501        """
502
503        for c in connInfo:
504            for p in [ p for p in c.get('parameter', []) \
505                    if p.get('type', '') == 'output']:
506
507                if p.get('name', '') == 'peer':
508                    k = p.get('key', None)
509                    surl = p.get('store', None)
510                    if surl and k and k.index('/') != -1:
511                        value = "%s.%s.%s%s" % \
512                                (k[k.index('/')+1:], ename, proj, self.domain)
513                        req = { 'name': k, 'value': value }
514                        self.log.debug("Setting %s to %s on %s" % \
515                                (k, value, surl))
516                        self.call_SetValue(surl, req, cf)
517                    else:
518                        self.log.error("Bad export request: %s" % p)
519                elif p.get('name', '') == 'ssh_port':
520                    k = p.get('key', None)
521                    surl = p.get('store', None)
522                    if surl and k:
523                        req = { 'name': k, 'value': self.ssh_port }
524                        self.log.debug("Setting %s to %s on %s" % \
525                                (k, self.ssh_port, surl))
526                        self.call_SetValue(surl, req, cf)
527                    else:
528                        self.log.error("Bad export request: %s" % p)
529                else:
530                    self.log.error("Unknown export parameter: %s" % \
531                            p.get('name'))
532                    continue
533
534    def add_seer_node(self, topo, name, startup):
535        """
536        Add a seer node to the given topology, with the startup command passed
537        in.  Used by configure seer_services.
538        """
539        c_node = topdl.Computer(
540                name=name, 
541                os= topdl.OperatingSystem(
542                    attribute=[
543                    { 'attribute': 'osid', 
544                        'value': self.local_seer_image },
545                    ]),
546                attribute=[
547                    { 'attribute': 'startup', 'value': startup },
548                    ]
549                )
550        self.add_kit(c_node, self.local_seer_software)
551        topo.elements.append(c_node)
552
553    def configure_seer_services(self, services, topo, softdir):
554        """
555        Make changes to the topology required for the seer requests being made.
556        Specifically, add any control or master nodes required and set up the
557        start commands on the nodes to interconnect them.
558        """
559        local_seer = False      # True if we need to add a control node
560        collect_seer = False    # True if there is a seer-master node
561        seer_master= False      # True if we need to add the seer-master
562        for s in services:
563            s_name = s.get('name', '')
564            s_vis = s.get('visibility','')
565
566            if s_name  == 'local_seer_control' and s_vis == 'export':
567                local_seer = True
568            elif s_name == 'seer_master':
569                if s_vis == 'import':
570                    collect_seer = True
571                elif s_vis == 'export':
572                    seer_master = True
573       
574        # We've got the whole picture now, so add nodes if needed and configure
575        # them to interconnect properly.
576        if local_seer or seer_master:
577            # Copy local seer control node software to the tempdir
578            for l, f in self.local_seer_software:
579                base = os.path.basename(f)
580                copy_file(f, "%s/%s" % (softdir, base))
581        # If we're collecting seers somewhere the controllers need to talk to
582        # the master.  In testbeds that export the service, that will be a
583        # local node that we'll add below.  Elsewhere it will be the control
584        # portal that will port forward to the exporting master.
585        if local_seer:
586            if collect_seer:
587                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
588            else:
589                startup = self.local_seer_start
590            self.add_seer_node(topo, 'control', startup)
591        # If this is the seer master, add that node, too.
592        if seer_master:
593            self.add_seer_node(topo, 'seer-master', 
594                    "%s -R -n -R seer-master -R -A -R sink" % \
595                            self.seer_master_start)
596
597    def retrieve_software(self, topo, certfile, softdir):
598        """
599        Collect the software that nodes in the topology need loaded and stage
600        it locally.  This implies retrieving it from the experiment_controller
601        and placing it into softdir.  Certfile is used to prove that this node
602        has access to that data (it's the allocation/segment fedid).  Finally
603        local portal and federation software is also copied to the same staging
604        directory for simplicity - all software needed for experiment creation
605        is in softdir.
606        """
607        sw = set()
608        for e in topo.elements:
609            for s in getattr(e, 'software', []):
610                sw.add(s.location)
611        for s in sw:
612            self.log.debug("Retrieving %s" % s)
613            try:
614                get_url(s, certfile, softdir)
615            except:
616                t, v, st = sys.exc_info()
617                raise service_error(service_error.internal,
618                        "Error retrieving %s: %s" % (s, v))
619
620        # Copy local federation and portal node software to the tempdir
621        for s in (self.federation_software, self.portal_software):
622            for l, f in s:
623                base = os.path.basename(f)
624                copy_file(f, "%s/%s" % (softdir, base))
625
626
627    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
628        """
629        Gather common configuration files, retrieve or create an experiment
630        name and project name, and return the ssh_key filenames.  Create an
631        allocation log bound to the state log variable as well.
632        """
633        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
634                'seer_ca_pem', 'seer_node_pem')
635        ename = None
636        pubkey_base = None
637        secretkey_base = None
638        proj = None
639        user = None
640        alloc_log = None
641        nonce_experiment = False
642        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
643
644        self.state_lock.acquire()
645        if aid in self.allocation:
646            proj = self.allocation[aid].get('project', None)
647        self.state_lock.release()
648
649        if not proj:
650            raise service_error(service_error.internal, 
651                    "Can't find project for %s" %aid)
652
653        for a in attrs:
654            if a['attribute'] in configs:
655                try:
656                    self.log.debug("Retrieving %s from %s" % \
657                            (a['attribute'], a['value']))
658                    get_url(a['value'], certfile, tmpdir)
659                except:
660                    t, v, st = sys.exc_info()
661                    raise service_error(service_error.internal,
662                            "Error retrieving %s: %s" % (a.get('value', ""), v))
663            if a['attribute'] == 'ssh_pubkey':
664                pubkey_base = a['value'].rpartition('/')[2]
665            if a['attribute'] == 'ssh_secretkey':
666                secretkey_base = a['value'].rpartition('/')[2]
667            if a['attribute'] == 'experiment_name':
668                ename = a['value']
669
670        # Names longer than the emulab max are discarded
671        if ename and len(ename) <= self.max_name_len:
672            # Clean up the experiment name so that emulab will accept it.
673            ename = re.sub(vchars_re, '-', ename)
674
675        else:
676            ename = ""
677            for i in range(0,5):
678                ename += random.choice(string.ascii_letters)
679            nonce_experiment = True
680            self.log.warn("No experiment name or suggestion too long: " + \
681                    "picked one randomly: %s" % ename)
682
683        if not pubkey_base:
684            raise service_error(service_error.req, 
685                    "No public key attribute")
686
687        if not secretkey_base:
688            raise service_error(service_error.req, 
689                    "No secret key attribute")
690
691        self.state_lock.acquire()
692        if aid in self.allocation:
693            user = self.allocation[aid].get('user', None)
694            cert = self.allocation[aid].get('cert', None)
695            self.allocation[aid]['experiment'] = ename
696            self.allocation[aid]['nonce'] = nonce_experiment
697            self.allocation[aid]['log'] = [ ]
698            # Create a logger that logs to the experiment's state object as
699            # well as to the main log file.
700            alloc_log = logging.getLogger('fedd.access.%s' % ename)
701            h = logging.StreamHandler(
702                    list_log.list_log(self.allocation[aid]['log']))
703            # XXX: there should be a global one of these rather than
704            # repeating the code.
705            h.setFormatter(logging.Formatter(
706                "%(asctime)s %(name)s %(message)s",
707                        '%d %b %y %H:%M:%S'))
708            alloc_log.addHandler(h)
709            self.write_state()
710        self.state_lock.release()
711
712        if not user:
713            raise service_error(service_error.internal, 
714                    "Can't find creation user for %s" %aid)
715
716        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
717
718    def decorate_topology(self, info, t):
719        """
720        Copy the physical mapping and status onto the topology.  Used by
721        StartSegment and InfoSegment
722        """
723        def add_new(ann, attr):
724            for a in ann:
725                if a not in attr: attr.append(a)
726
727        def merge_os(os, e):
728            if len(e.os) == 0:
729                # No OS at all:
730                if os.get_attribute('emulab_access:image'):
731                    os.set_attribute('emulab_access:initial_image', 
732                            os.get_attribute('emulab_access:image'))
733                e.os = [ os ]
734            elif len(e.os) == 1:
735                # There's one OS, copy the initial image and replace
736                eos = e.os[0]
737                initial = eos.get_attribute('emulab_access:initial_image')
738                if initial:
739                    os.set_attribute('emulab_access:initial_image', initial)
740                e.os = [ os] 
741            else:
742                # Multiple OSes, replace or append
743                for eos in e.os:
744                    if os.name == eos.name:
745                        eos.version = os.version
746                        eos.version = os.distribution
747                        eos.version = os.distributionversion
748                        for a in os.attribute:
749                            if eos.get_attribute(a.attribute):
750                                eos.remove_attribute(a.attribute)
751                            eos.set_attribute(a.attribute, a.value)
752                        break
753                else:
754                    e.os.append(os)
755
756
757        if t is None: return
758        i = 0 # For fake debugging instantiation
759        # Copy the assigned names into the return topology
760        for e in t.elements:
761            if isinstance(e, topdl.Computer):
762                if not self.create_debug:
763                    if e.name in info.node:
764                        add_new(("%s%s" % 
765                            (info.node[e.name].pname, self.domain),),
766                            e.localname)
767                        e.status = info.node[e.name].status
768                        os = info.node[e.name].getOS()
769                        if os: merge_os(os, e)
770                else:
771                    # Simple debugging assignment
772                    add_new(("node%d%s" % (i, self.domain),), e.localname)
773                    e.status = 'active'
774                    add_new(('testop1', 'testop2'), e.operation)
775                    i += 1
776
777
778    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
779        """
780        Store key bits of experiment state in the global repository, including
781        the response that may need to be replayed, and return the response.
782        """
783        i = 0
784        t = topo.clone()
785        self.decorate_topology(starter, t)
786        # Grab the log (this is some anal locking, but better safe than
787        # sorry)
788        self.state_lock.acquire()
789        logv = "".join(self.allocation[aid]['log'])
790        # It's possible that the StartSegment call gets retried (!).
791        # if the 'started' key is in the allocation, we'll return it rather
792        # than redo the setup.
793        self.allocation[aid]['started'] = { 
794                'allocID': alloc_id,
795                'allocationLog': logv,
796                'segmentdescription': { 
797                    'topdldescription': t.to_dict()
798                    },
799                'proof': proof.to_dict(),
800                }
801        self.allocation[aid]['topo'] = t
802        retval = copy.copy(self.allocation[aid]['started'])
803        self.write_state()
804        self.state_lock.release()
805        return retval
806   
807    # End of StartSegment support routines
808
809    def StartSegment(self, req, fid):
810        err = None  # Any service_error generated after tmpdir is created
811        rv = None   # Return value from segment creation
812
813        try:
814            req = req['StartSegmentRequestBody']
815            auth_attr = req['allocID']['fedid']
816            topref = req['segmentdescription']['topdldescription']
817        except KeyError:
818            raise service_error(server_error.req, "Badly formed request")
819
820        connInfo = req.get('connection', [])
821        services = req.get('service', [])
822        aid = "%s" % auth_attr
823        attrs = req.get('fedAttr', [])
824
825        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
826                with_proof=True)
827        if not access_ok:
828            raise service_error(service_error.access, "Access denied")
829        else:
830            # See if this is a replay of an earlier succeeded StartSegment -
831            # sometimes SSL kills 'em.  If so, replay the response rather than
832            # redoing the allocation.
833            self.state_lock.acquire()
834            retval = self.allocation[aid].get('started', None)
835            self.state_lock.release()
836            if retval:
837                self.log.warning("Duplicate StartSegment for %s: " % aid + \
838                        "replaying response")
839                return retval
840
841        # A new request.  Do it.
842
843        if topref: topo = topdl.Topology(**topref)
844        else:
845            raise service_error(service_error.req, 
846                    "Request missing segmentdescription'")
847       
848        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
849        try:
850            tmpdir = tempfile.mkdtemp(prefix="access-")
851            softdir = "%s/software" % tmpdir
852            os.mkdir(softdir)
853        except EnvironmentError:
854            raise service_error(service_error.internal, "Cannot create tmp dir")
855
856        # Try block alllows us to clean up temporary files.
857        try:
858            self.retrieve_software(topo, certfile, softdir)
859            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
860                alloc_log =  self.initialize_experiment_info(attrs, aid, 
861                        certfile, tmpdir)
862
863            if '/' in proj: proj, gid = proj.split('/')
864            else: gid = None
865
866
867            # Set up userconf and seer if needed
868            self.configure_userconf(services, tmpdir)
869            self.configure_seer_services(services, topo, softdir)
870            # Get and send synch store variables
871            self.export_store_info(certfile, proj, ename, connInfo)
872            self.import_store_info(certfile, connInfo)
873
874            expfile = "%s/experiment.tcl" % tmpdir
875
876            self.generate_portal_configs(topo, pubkey_base, 
877                    secretkey_base, tmpdir, proj, ename, connInfo, services)
878            self.generate_ns2(topo, expfile, 
879                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
880
881            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
882                    debug=self.create_debug, log=alloc_log, boss=self.boss,
883                    ops=self.ops, cert=xmlrpc_cert)
884            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
885        except service_error, e:
886            err = e
887        except:
888            t, v, st = sys.exc_info()
889            err = service_error(service_error.internal, "%s: %s" % \
890                    (v, traceback.extract_tb(st)))
891
892        # Walk up tmpdir, deleting as we go
893        if self.cleanup: self.remove_dirs(tmpdir)
894        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
895
896        if rv:
897            return self.finalize_experiment(starter, topo, aid, req['allocID'],
898                    proof)
899        elif err:
900            raise service_error(service_error.federant,
901                    "Swapin failed: %s" % err)
902        else:
903            raise service_error(service_error.federant, "Swapin failed")
904
905    def TerminateSegment(self, req, fid):
906        try:
907            req = req['TerminateSegmentRequestBody']
908        except KeyError:
909            raise service_error(server_error.req, "Badly formed request")
910
911        auth_attr = req['allocID']['fedid']
912        aid = "%s" % auth_attr
913        attrs = req.get('fedAttr', [])
914
915        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
916                with_proof=True)
917        if not access_ok:
918            raise service_error(service_error.access, "Access denied")
919
920        self.state_lock.acquire()
921        if aid in self.allocation:
922            proj = self.allocation[aid].get('project', None)
923            user = self.allocation[aid].get('user', None)
924            xmlrpc_cert = self.allocation[aid].get('cert', None)
925            ename = self.allocation[aid].get('experiment', None)
926            nonce = self.allocation[aid].get('nonce', False)
927        else:
928            proj = None
929            user = None
930            ename = None
931            nonce = False
932            xmlrpc_cert = None
933        self.state_lock.release()
934
935        if not proj:
936            raise service_error(service_error.internal, 
937                    "Can't find project for %s" % aid)
938        else:
939            if '/' in proj: proj, gid = proj.split('/')
940            else: gid = None
941
942        if not user:
943            raise service_error(service_error.internal, 
944                    "Can't find creation user for %s" % aid)
945        if not ename:
946            raise service_error(service_error.internal, 
947                    "Can't find experiment name for %s" % aid)
948        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
949                debug=self.create_debug, boss=self.boss, ops=self.ops,
950                cert=xmlrpc_cert)
951        stopper(self, user, proj, ename, gid, nonce)
952        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
953
954    def InfoSegment(self, req, fid):
955        try:
956            req = req['InfoSegmentRequestBody']
957        except KeyError:
958            raise service_error(server_error.req, "Badly formed request")
959
960        auth_attr = req['allocID']['fedid']
961        aid = "%s" % auth_attr
962
963        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
964                with_proof=True)
965        if not access_ok:
966            raise service_error(service_error.access, "Access denied")
967
968        self.state_lock.acquire()
969        if aid in self.allocation:
970            topo = self.allocation[aid].get('topo', None)
971            if topo: topo = topo.clone()
972            proj = self.allocation[aid].get('project', None)
973            user = self.allocation[aid].get('user', None)
974            xmlrpc_cert = self.allocation[aid].get('cert', None)
975            ename = self.allocation[aid].get('experiment', None)
976        else:
977            proj = None
978            user = None
979            ename = None
980            topo = None
981            xmlrpc_cert = None
982        self.state_lock.release()
983
984        if not proj:
985            raise service_error(service_error.internal, 
986                    "Can't find project for %s" % aid)
987        else:
988            if '/' in proj: proj, gid = proj.split('/')
989            else: gid = None
990
991        if not user:
992            raise service_error(service_error.internal, 
993                    "Can't find creation user for %s" % aid)
994        if not ename:
995            raise service_error(service_error.internal, 
996                    "Can't find experiment name for %s" % aid)
997        info = self.info_segment(keyfile=self.ssh_privkey_file,
998                debug=self.create_debug, boss=self.boss, ops=self.ops,
999                cert=xmlrpc_cert)
1000        info(self, user, proj, ename)
1001        self.decorate_topology(info, topo)
1002        self.state_lock.acquire()
1003        if aid in self.allocation:
1004            self.allocation[aid]['topo'] = topo
1005            self.write_state()
1006        self.state_lock.release()
1007
1008        rv = { 
1009                'allocID': req['allocID'], 
1010                'proof': proof.to_dict(),
1011                }
1012        if topo:
1013            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1014        return rv
1015
1016    def OperationSegment(self, req, fid):
1017        def get_pname(e):
1018            """
1019            Get the physical name of a node
1020            """
1021            if e.localname:
1022                return re.sub('\..*','', e.localname[0])
1023            else:
1024                return None
1025
1026        try:
1027            req = req['OperationSegmentRequestBody']
1028        except KeyError:
1029            raise service_error(server_error.req, "Badly formed request")
1030
1031        auth_attr = req['allocID']['fedid']
1032        aid = "%s" % auth_attr
1033
1034        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1035                with_proof=True)
1036        if not access_ok:
1037            raise service_error(service_error.access, "Access denied")
1038
1039        op = req.get('operation', None)
1040        targets = req.get('target', None)
1041        params = req.get('parameter', None)
1042
1043        if op is None :
1044            raise service_error(service_error.req, "missing operation")
1045        elif targets is None:
1046            raise service_error(service_error.req, "no targets")
1047
1048        self.state_lock.acquire()
1049        if aid in self.allocation:
1050            topo = self.allocation[aid].get('topo', None)
1051            if topo: topo = topo.clone()
1052            xmlrpc_cert = self.allocation[aid].get('cert', None)
1053        else:
1054            topo = None
1055            xmlrpc_cert = None
1056        self.state_lock.release()
1057
1058        targets = copy.copy(targets)
1059        ptargets = { }
1060        for e in topo.elements:
1061            if isinstance(e, topdl.Computer):
1062                if e.name in targets:
1063                    targets.remove(e.name)
1064                    pn = get_pname(e)
1065                    if pn: ptargets[e.name] = pn
1066
1067        status = [ operation_status(t, operation_status.no_target) \
1068                for t in targets]
1069
1070        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1071                debug=self.create_debug, boss=self.boss, ops=self.ops,
1072                cert=xmlrpc_cert)
1073        ops(self, op, ptargets, params, topo)
1074       
1075        status.extend(ops.status)
1076
1077        return { 
1078                'allocID': req['allocID'], 
1079                'status': [s.to_dict() for s in status],
1080                'proof': proof.to_dict(),
1081                }
Note: See TracBrowser for help on using the repository browser.