source: fedd/federation/emulab_access.py @ 8cab4c2

compt_changes
Last change on this file since 8cab4c2 was 8cab4c2, checked in by Ted Faber <faber@…>, 12 years ago

More improved logging

  • Property mode set to 100644
File size: 36.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from deter import fedid, generate_fedid
21from authorizer import authorizer, abac_authorizer
22from service_error import service_error
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from proof import proof as access_proof
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30from deter import topdl
31import list_log
32import emulab_segment
33
34
35# Make log messages disappear if noone configures a fedd logger
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.access")
40fl.addHandler(nullHandler())
41
42class access(access_base):
43    """
44    The implementation of access control based on mapping users to projects.
45
46    Users can be mapped to existing projects or have projects created
47    dynamically.  This implements both direct requests and proxies.
48    """
49
50    max_name_len = 19
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.max_name_len = access.max_name_len
60
61        self.allow_proxy = config.getboolean("access", "allow_proxy")
62
63        self.domain = config.get("access", "domain")
64        self.userconfdir = config.get("access","userconfdir")
65        self.userconfcmd = config.get("access","userconfcmd")
66        self.userconfurl = config.get("access","userconfurl")
67        self.federation_software = config.get("access", "federation_software")
68        self.portal_software = config.get("access", "portal_software")
69        self.local_seer_software = config.get("access", "local_seer_software")
70        self.local_seer_image = config.get("access", "local_seer_image")
71        self.local_seer_start = config.get("access", "local_seer_start")
72        self.seer_master_start = config.get("access", "seer_master_start")
73        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
74        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
75        self.ssh_port = config.get("access","ssh_port") or "22"
76        self.boss = config.get("access", "boss")
77        self.ops = config.get("access", "ops")
78        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
79        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
80
81        self.dragon_endpoint = config.get("access", "dragon")
82        self.dragon_vlans = config.get("access", "dragon_vlans")
83        self.deter_internal = config.get("access", "deter_internal")
84
85        self.tunnel_config = config.getboolean("access", "tunnel_config")
86        self.portal_command = config.get("access", "portal_command")
87        self.portal_image = config.get("access", "portal_image")
88        self.portal_type = config.get("access", "portal_type") or "pc"
89        self.portal_startcommand = config.get("access", "portal_startcommand")
90        self.node_startcommand = config.get("access", "node_startcommand")
91
92        self.federation_software = self.software_list(self.federation_software)
93        self.portal_software = self.software_list(self.portal_software)
94        self.local_seer_software = self.software_list(self.local_seer_software)
95
96        self.access_type = self.access_type.lower()
97        self.start_segment = emulab_segment.start_segment
98        self.stop_segment = emulab_segment.stop_segment
99        self.info_segment = emulab_segment.info_segment
100        self.operation_segment = emulab_segment.operation_segment
101
102        self.restricted = [ ]
103        tb = config.get('access', 'testbed')
104        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
105        else: self.testbed = [ ]
106
107        # authorization information
108        self.auth_type = config.get('access', 'auth_type') \
109                or 'abac'
110        self.auth_dir = config.get('access', 'auth_dir')
111        accessdb = config.get("access", "accessdb")
112        # initialize the authorization system
113        if self.auth_type == 'abac':
114            self.auth = abac_authorizer(load=self.auth_dir)
115            self.access = [ ]
116            if accessdb:
117                self.read_access(accessdb, self.access_tuple)
118        else:
119            raise service_error(service_error.internal, 
120                    "Unknown auth_type: %s" % self.auth_type)
121
122        # read_state in the base_class
123        self.state_lock.acquire()
124        if 'allocation' not in self.state: self.state['allocation']= { }
125        self.allocation = self.state['allocation']
126        self.state_lock.release()
127        self.exports = {
128                'SMB': self.export_SMB,
129                'seer': self.export_seer,
130                'tmcd': self.export_tmcd,
131                'userconfig': self.export_userconfig,
132                'project_export': self.export_project_export,
133                'local_seer_control': self.export_local_seer,
134                'seer_master': self.export_seer_master,
135                'hide_hosts': self.export_hide_hosts,
136                }
137
138        if not self.local_seer_image or not self.local_seer_software or \
139                not self.local_seer_start:
140            if 'local_seer_control' in self.exports:
141                del self.exports['local_seer_control']
142
143        if not self.local_seer_image or not self.local_seer_software or \
144                not self.seer_master_start:
145            if 'seer_master' in self.exports:
146                del self.exports['seer_master']
147
148
149        self.soap_services = {\
150            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
151            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
152            'StartSegment': soap_handler("StartSegment", self.StartSegment),
153            'TerminateSegment': soap_handler("TerminateSegment",
154                self.TerminateSegment),
155            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
156            'OperationSegment': soap_handler("OperationSegment",
157                self.OperationSegment),
158            }
159        self.xmlrpc_services =  {\
160            'RequestAccess': xmlrpc_handler('RequestAccess',
161                self.RequestAccess),
162            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
163                self.ReleaseAccess),
164            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
165            'TerminateSegment': xmlrpc_handler('TerminateSegment',
166                self.TerminateSegment),
167            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
168            'OperationSegment': xmlrpc_handler("OperationSegment",
169                self.OperationSegment),
170            }
171
172        self.call_SetValue = service_caller('SetValue')
173        self.call_GetValue = service_caller('GetValue', log=self.log)
174
175    @staticmethod
176    def access_tuple(str):
177        """
178        Convert a string of the form (id, id, id) into an access_project.  This
179        is called by read_access to convert to local attributes.  It returns a
180        tuple of the form (project, user, certificate_file).
181        """
182
183        str = str.strip()
184        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
185            # The slice takes the parens off the string.
186            proj, user, cert = str[1:-1].split(',')
187            return (proj.strip(), user.strip(), cert.strip())
188        else:
189            raise self.parse_error(
190                    'Bad mapping (unbalanced parens or more than 2 commas)')
191
192    # RequestAccess support routines
193
194    def save_project_state(self, aid, pname, uname, certf, owners):
195        """
196        Save the project, user, and owners associated with this allocation.
197        This creates the allocation entry.
198        """
199        self.state_lock.acquire()
200        self.allocation[aid] = { }
201        self.allocation[aid]['project'] = pname
202        self.allocation[aid]['user'] = uname
203        self.allocation[aid]['cert'] = certf
204        self.allocation[aid]['owners'] = owners
205        self.write_state()
206        self.state_lock.release()
207        return (pname, uname)
208
209    # End of RequestAccess support routines
210
211    def RequestAccess(self, req, fid):
212        """
213        Handle the access request.  Proxy if not for us.
214
215        Parse out the fields and make the allocations or rejections if for us,
216        otherwise, assuming we're willing to proxy, proxy the request out.
217        """
218
219        def gateway_hardware(h):
220            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
221            else: return h
222
223        def get_export_project(svcs):
224            """
225            if the service requests includes one to export a project, return
226            that project.
227            """
228            rv = None
229            for s in svcs:
230                if s.get('name', '') == 'project_export' and \
231                        s.get('visibility', '') == 'export':
232                    if not rv: 
233                        for a in s.get('fedAttr', []):
234                            if a.get('attribute', '') == 'project' \
235                                    and 'value' in a:
236                                rv = a['value']
237                    else:
238                        raise service_error(service_error, access, 
239                                'Requesting multiple project exports is ' + \
240                                        'not supported');
241            return rv
242
243        self.log.info("RequestAccess called by %s" % fid)
244        # The dance to get into the request body
245        if req.has_key('RequestAccessRequestBody'):
246            req = req['RequestAccessRequestBody']
247        else:
248            raise service_error(service_error.req, "No request!?")
249
250        # if this includes a project export request, construct a filter such
251        # that only the ABAC attributes mapped to that project are checked for
252        # access.
253        if 'service' in req:
254            ep = get_export_project(req['service'])
255            if ep: pf = lambda(a): a.value[0] == ep
256            else: pf = None
257        else:
258            ep = None
259            pf = None
260
261        if self.auth.import_credentials(
262                data_list=req.get('abac_credential', [])):
263            self.auth.save()
264
265        if self.auth_type == 'abac':
266            found, owners, proof = self.lookup_access(req, fid, filter=pf)
267        else:
268            raise service_error(service_error.internal, 
269                    'Unknown auth_type: %s' % self.auth_type)
270        ap = None
271
272        # keep track of what's been added
273        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
274        aid = unicode(allocID)
275
276        pname, uname = self.save_project_state(aid, found[0], found[1], 
277                found[2], owners)
278
279        services, svc_state = self.export_services(req.get('service',[]),
280                pname, uname)
281        self.state_lock.acquire()
282        # Store services state in global state
283        for k, v in svc_state.items():
284            self.allocation[aid][k] = v
285        self.append_allocation_authorization(aid, 
286                set([(o, allocID) for o in owners]), state_attr='allocation')
287        self.write_state()
288        self.state_lock.release()
289        try:
290            f = open("%s/%s.pem" % (self.certdir, aid), "w")
291            print >>f, alloc_cert
292            f.close()
293        except EnvironmentError, e:
294            self.log.info("RequestAccess failed for by %s: internal error" \
295                    % fid)
296            raise service_error(service_error.internal, 
297                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
298        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
299        resp = self.build_access_response({ 'fedid': allocID } ,
300                pname, services, proof)
301        return resp
302
303    def ReleaseAccess(self, req, fid):
304        self.log.info("ReleaseAccess called by %s" % fid)
305        # The dance to get into the request body
306        if req.has_key('ReleaseAccessRequestBody'):
307            req = req['ReleaseAccessRequestBody']
308        else:
309            raise service_error(service_error.req, "No request!?")
310
311        try:
312            if req['allocID'].has_key('localname'):
313                auth_attr = aid = req['allocID']['localname']
314            elif req['allocID'].has_key('fedid'):
315                aid = unicode(req['allocID']['fedid'])
316                auth_attr = req['allocID']['fedid']
317            else:
318                raise service_error(service_error.req,
319                        "Only localnames and fedids are understood")
320        except KeyError:
321            raise service_error(service_error.req, "Badly formed request")
322
323        self.log.debug("[access] deallocation requested for %s by %s" % \
324                (aid, fid))
325        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
326                with_proof=True)
327        if not access_ok:
328            self.log.debug("[access] deallocation denied for %s", aid)
329            raise service_error(service_error.access, "Access Denied")
330
331        self.state_lock.acquire()
332        if aid in self.allocation:
333            self.log.debug("Found allocation for %s" %aid)
334            self.clear_allocation_authorization(aid, state_attr='allocation')
335            del self.allocation[aid]
336            self.write_state()
337            self.state_lock.release()
338            # Remove the access cert
339            cf = "%s/%s.pem" % (self.certdir, aid)
340            self.log.debug("Removing %s" % cf)
341            os.remove(cf)
342            self.log.info("ReleaseAccess succeeded for %s" % fid)
343            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
344        else:
345            self.state_lock.release()
346            raise service_error(service_error.req, "No such allocation")
347
348    # These are subroutines for StartSegment
349    def generate_ns2(self, topo, expfn, softdir, connInfo):
350        """
351        Convert topo into an ns2 file, decorated with appropriate commands for
352        the particular testbed setup.  Convert all requests for software, etc
353        to point at the staged copies on this testbed and add the federation
354        startcommands.
355        """
356        class dragon_commands:
357            """
358            Functor to spit out approrpiate dragon commands for nodes listed in
359            the connectivity description.  The constructor makes a dict mapping
360            dragon nodes to their parameters and the __call__ checks each
361            element in turn for membership.
362            """
363            def __init__(self, map):
364                self.node_info = map
365
366            def __call__(self, e):
367                s = ""
368                if isinstance(e, topdl.Computer):
369                    if self.node_info.has_key(e.name):
370                        info = self.node_info[e.name]
371                        for ifname, vlan, type in info:
372                            for i in e.interface:
373                                if i.name == ifname:
374                                    addr = i.get_attribute('ip4_address')
375                                    subs = i.substrate[0]
376                                    break
377                            else:
378                                raise service_error(service_error.internal,
379                                        "No interface %s on element %s" % \
380                                                (ifname, e.name))
381                            # XXX: do netmask right
382                            if type =='link':
383                                s = ("tb-allow-external ${%s} " + \
384                                        "dragonportal ip %s vlan %s " + \
385                                        "netmask 255.255.255.0\n") % \
386                                        (topdl.to_tcl_name(e.name), addr, vlan)
387                            elif type =='lan':
388                                s = ("tb-allow-external ${%s} " + \
389                                        "dragonportal " + \
390                                        "ip %s vlan %s usurp %s\n") % \
391                                        (topdl.to_tcl_name(e.name), addr, 
392                                                vlan, subs)
393                            else:
394                                raise service_error(service_error_internal,
395                                        "Unknown DRAGON type %s" % type)
396                return s
397
398        class not_dragon:
399            """
400            Return true if a node is in the given map of dragon nodes.
401            """
402            def __init__(self, map):
403                self.nodes = set(map.keys())
404
405            def __call__(self, e):
406                return e.name not in self.nodes
407
408        # Main line of generate_ns2
409        t = topo.clone()
410
411        # Create the map of nodes that need direct connections (dragon
412        # connections) from the connInfo
413        dragon_map = { }
414        for i in [ i for i in connInfo if i['type'] == 'transit']:
415            for a in i.get('fedAttr', []):
416                if a['attribute'] == 'vlan_id':
417                    vlan = a['value']
418                    break
419            else:
420                raise service_error(service_error.internal, "No vlan tag")
421            members = i.get('member', [])
422            if len(members) > 1: type = 'lan'
423            else: type = 'link'
424
425            try:
426                for m in members:
427                    if m['element'] in dragon_map:
428                        dragon_map[m['element']].append(( m['interface'], 
429                            vlan, type))
430                    else:
431                        dragon_map[m['element']] = [( m['interface'], 
432                            vlan, type),]
433            except KeyError:
434                raise service_error(service_error.req,
435                        "Missing connectivity info")
436
437        # Weed out the things we aren't going to instantiate: Segments, portal
438        # substrates, and portal interfaces.  (The copy in the for loop allows
439        # us to delete from e.elements in side the for loop).  While we're
440        # touching all the elements, we also adjust paths from the original
441        # testbed to local testbed paths and put the federation commands into
442        # the start commands
443        for e in [e for e in t.elements]:
444            if isinstance(e, topdl.Segment):
445                t.elements.remove(e)
446            if isinstance(e, topdl.Computer):
447                self.add_kit(e, self.federation_software)
448                if e.get_attribute('portal') and self.portal_startcommand:
449                    # Add local portal support software
450                    self.add_kit(e, self.portal_software)
451                    # Portals never have a user-specified start command
452                    e.set_attribute('startup', self.portal_startcommand)
453                elif self.node_startcommand:
454                    if e.get_attribute('startup'):
455                        e.set_attribute('startup', "%s \\$USER '%s'" % \
456                                (self.node_startcommand, 
457                                    e.get_attribute('startup')))
458                    else:
459                        e.set_attribute('startup', self.node_startcommand)
460
461                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
462                # Remove portal interfaces that do not connect to DRAGON
463                e.interface = [i for i in e.interface \
464                        if not i.get_attribute('portal') or i.name in dinf ]
465            # Fix software paths
466            for s in getattr(e, 'software', []):
467                s.location = re.sub("^.*/", softdir, s.location)
468
469        t.substrates = [ s.clone() for s in t.substrates ]
470        t.incorporate_elements()
471
472        # Customize the ns2 output for local portal commands and images
473        filters = []
474
475        if self.dragon_endpoint:
476            add_filter = not_dragon(dragon_map)
477            filters.append(dragon_commands(dragon_map))
478        else:
479            add_filter = None
480
481        if self.portal_command:
482            filters.append(topdl.generate_portal_command_filter(
483                self.portal_command, add_filter=add_filter))
484
485        if self.portal_image:
486            filters.append(topdl.generate_portal_image_filter(
487                self.portal_image))
488
489        if self.portal_type:
490            filters.append(topdl.generate_portal_hardware_filter(
491                self.portal_type))
492
493        # Convert to ns and write it out
494        expfile = topdl.topology_to_ns2(t, filters)
495        try:
496            f = open(expfn, "w")
497            print >>f, expfile
498            f.close()
499        except EnvironmentError:
500            raise service_error(service_error.internal,
501                    "Cannot write experiment file %s: %s" % (expfn,e))
502
503    def export_store_info(self, cf, proj, ename, connInfo):
504        """
505        For the export requests in the connection info, install the peer names
506        at the experiment controller via SetValue calls.
507        """
508
509        for c in connInfo:
510            for p in [ p for p in c.get('parameter', []) \
511                    if p.get('type', '') == 'output']:
512
513                if p.get('name', '') == 'peer':
514                    k = p.get('key', None)
515                    surl = p.get('store', None)
516                    if surl and k and k.index('/') != -1:
517                        value = "%s.%s.%s%s" % \
518                                (k[k.index('/')+1:], ename, proj, self.domain)
519                        req = { 'name': k, 'value': value }
520                        self.log.debug("Setting %s to %s on %s" % \
521                                (k, value, surl))
522                        self.call_SetValue(surl, req, cf)
523                    else:
524                        self.log.error("Bad export request: %s" % p)
525                elif p.get('name', '') == 'ssh_port':
526                    k = p.get('key', None)
527                    surl = p.get('store', None)
528                    if surl and k:
529                        req = { 'name': k, 'value': self.ssh_port }
530                        self.log.debug("Setting %s to %s on %s" % \
531                                (k, self.ssh_port, surl))
532                        self.call_SetValue(surl, req, cf)
533                    else:
534                        self.log.error("Bad export request: %s" % p)
535                else:
536                    self.log.error("Unknown export parameter: %s" % \
537                            p.get('name'))
538                    continue
539
540    def add_seer_node(self, topo, name, startup):
541        """
542        Add a seer node to the given topology, with the startup command passed
543        in.  Used by configure seer_services.
544        """
545        c_node = topdl.Computer(
546                name=name, 
547                os= topdl.OperatingSystem(
548                    attribute=[
549                    { 'attribute': 'osid', 
550                        'value': self.local_seer_image },
551                    ]),
552                attribute=[
553                    { 'attribute': 'startup', 'value': startup },
554                    ]
555                )
556        self.add_kit(c_node, self.local_seer_software)
557        topo.elements.append(c_node)
558
559    def configure_seer_services(self, services, topo, softdir):
560        """
561        Make changes to the topology required for the seer requests being made.
562        Specifically, add any control or master nodes required and set up the
563        start commands on the nodes to interconnect them.
564        """
565        local_seer = False      # True if we need to add a control node
566        collect_seer = False    # True if there is a seer-master node
567        seer_master= False      # True if we need to add the seer-master
568        for s in services:
569            s_name = s.get('name', '')
570            s_vis = s.get('visibility','')
571
572            if s_name  == 'local_seer_control' and s_vis == 'export':
573                local_seer = True
574            elif s_name == 'seer_master':
575                if s_vis == 'import':
576                    collect_seer = True
577                elif s_vis == 'export':
578                    seer_master = True
579       
580        # We've got the whole picture now, so add nodes if needed and configure
581        # them to interconnect properly.
582        if local_seer or seer_master:
583            # Copy local seer control node software to the tempdir
584            for l, f in self.local_seer_software:
585                base = os.path.basename(f)
586                copy_file(f, "%s/%s" % (softdir, base))
587        # If we're collecting seers somewhere the controllers need to talk to
588        # the master.  In testbeds that export the service, that will be a
589        # local node that we'll add below.  Elsewhere it will be the control
590        # portal that will port forward to the exporting master.
591        if local_seer:
592            if collect_seer:
593                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
594            else:
595                startup = self.local_seer_start
596            self.add_seer_node(topo, 'control', startup)
597        # If this is the seer master, add that node, too.
598        if seer_master:
599            self.add_seer_node(topo, 'seer-master', 
600                    "%s -R -n -R seer-master -R -A -R sink" % \
601                            self.seer_master_start)
602
603    def retrieve_software(self, topo, certfile, softdir):
604        """
605        Collect the software that nodes in the topology need loaded and stage
606        it locally.  This implies retrieving it from the experiment_controller
607        and placing it into softdir.  Certfile is used to prove that this node
608        has access to that data (it's the allocation/segment fedid).  Finally
609        local portal and federation software is also copied to the same staging
610        directory for simplicity - all software needed for experiment creation
611        is in softdir.
612        """
613        sw = set()
614        for e in topo.elements:
615            for s in getattr(e, 'software', []):
616                sw.add(s.location)
617        for s in sw:
618            self.log.debug("Retrieving %s" % s)
619            try:
620                get_url(s, certfile, softdir)
621            except:
622                t, v, st = sys.exc_info()
623                raise service_error(service_error.internal,
624                        "Error retrieving %s: %s" % (s, v))
625
626        # Copy local federation and portal node software to the tempdir
627        for s in (self.federation_software, self.portal_software):
628            for l, f in s:
629                base = os.path.basename(f)
630                copy_file(f, "%s/%s" % (softdir, base))
631
632
633    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
634        """
635        Gather common configuration files, retrieve or create an experiment
636        name and project name, and return the ssh_key filenames.  Create an
637        allocation log bound to the state log variable as well.
638        """
639        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
640                'seer_ca_pem', 'seer_node_pem')
641        ename = None
642        pubkey_base = None
643        secretkey_base = None
644        proj = None
645        user = None
646        alloc_log = None
647        nonce_experiment = False
648        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
649
650        self.state_lock.acquire()
651        if aid in self.allocation:
652            proj = self.allocation[aid].get('project', None)
653        self.state_lock.release()
654
655        if not proj:
656            raise service_error(service_error.internal, 
657                    "Can't find project for %s" %aid)
658
659        for a in attrs:
660            if a['attribute'] in configs:
661                try:
662                    self.log.debug("Retrieving %s from %s" % \
663                            (a['attribute'], a['value']))
664                    get_url(a['value'], certfile, tmpdir)
665                except:
666                    t, v, st = sys.exc_info()
667                    raise service_error(service_error.internal,
668                            "Error retrieving %s: %s" % (a.get('value', ""), v))
669            if a['attribute'] == 'ssh_pubkey':
670                pubkey_base = a['value'].rpartition('/')[2]
671            if a['attribute'] == 'ssh_secretkey':
672                secretkey_base = a['value'].rpartition('/')[2]
673            if a['attribute'] == 'experiment_name':
674                ename = a['value']
675
676        # Names longer than the emulab max are discarded
677        if ename and len(ename) <= self.max_name_len:
678            # Clean up the experiment name so that emulab will accept it.
679            ename = re.sub(vchars_re, '-', ename)
680
681        else:
682            ename = ""
683            for i in range(0,5):
684                ename += random.choice(string.ascii_letters)
685            nonce_experiment = True
686            self.log.warn("No experiment name or suggestion too long: " + \
687                    "picked one randomly: %s" % ename)
688
689        if not pubkey_base:
690            raise service_error(service_error.req, 
691                    "No public key attribute")
692
693        if not secretkey_base:
694            raise service_error(service_error.req, 
695                    "No secret key attribute")
696
697        self.state_lock.acquire()
698        if aid in self.allocation:
699            user = self.allocation[aid].get('user', None)
700            cert = self.allocation[aid].get('cert', None)
701            self.allocation[aid]['experiment'] = ename
702            self.allocation[aid]['nonce'] = nonce_experiment
703            self.allocation[aid]['log'] = [ ]
704            # Create a logger that logs to the experiment's state object as
705            # well as to the main log file.
706            alloc_log = logging.getLogger('fedd.access.%s' % ename)
707            h = logging.StreamHandler(
708                    list_log.list_log(self.allocation[aid]['log']))
709            # XXX: there should be a global one of these rather than
710            # repeating the code.
711            h.setFormatter(logging.Formatter(
712                "%(asctime)s %(name)s %(message)s",
713                        '%d %b %y %H:%M:%S'))
714            alloc_log.addHandler(h)
715            self.write_state()
716        self.state_lock.release()
717
718        if not user:
719            raise service_error(service_error.internal, 
720                    "Can't find creation user for %s" %aid)
721
722        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
723
724    def decorate_topology(self, info, t):
725        """
726        Copy the physical mapping and status onto the topology.  Used by
727        StartSegment and InfoSegment
728        """
729        def add_new(ann, attr):
730            for a in ann:
731                if a not in attr: attr.append(a)
732
733        def merge_os(os, e):
734            if len(e.os) == 0:
735                # No OS at all:
736                if os.get_attribute('emulab_access:image'):
737                    os.set_attribute('emulab_access:initial_image', 
738                            os.get_attribute('emulab_access:image'))
739                e.os = [ os ]
740            elif len(e.os) == 1:
741                # There's one OS, copy the initial image and replace
742                eos = e.os[0]
743                initial = eos.get_attribute('emulab_access:initial_image')
744                if initial:
745                    os.set_attribute('emulab_access:initial_image', initial)
746                e.os = [ os] 
747            else:
748                # Multiple OSes, replace or append
749                for eos in e.os:
750                    if os.name == eos.name:
751                        eos.version = os.version
752                        eos.version = os.distribution
753                        eos.version = os.distributionversion
754                        for a in os.attribute:
755                            if eos.get_attribute(a.attribute):
756                                eos.remove_attribute(a.attribute)
757                            eos.set_attribute(a.attribute, a.value)
758                        break
759                else:
760                    e.os.append(os)
761
762
763        if t is None: return
764        i = 0 # For fake debugging instantiation
765        # Copy the assigned names into the return topology
766        for e in t.elements:
767            if isinstance(e, topdl.Computer):
768                if not self.create_debug:
769                    if e.name in info.node:
770                        add_new(("%s%s" % 
771                            (info.node[e.name].pname, self.domain),),
772                            e.localname)
773                        e.status = info.node[e.name].status
774                        os = info.node[e.name].getOS()
775                        if os: merge_os(os, e)
776                else:
777                    # Simple debugging assignment
778                    add_new(("node%d%s" % (i, self.domain),), e.localname)
779                    e.status = 'active'
780                    add_new(('testop1', 'testop2'), e.operation)
781                    i += 1
782
783
784    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
785        """
786        Store key bits of experiment state in the global repository, including
787        the response that may need to be replayed, and return the response.
788        """
789        i = 0
790        t = topo.clone()
791        self.decorate_topology(starter, t)
792        # Grab the log (this is some anal locking, but better safe than
793        # sorry)
794        self.state_lock.acquire()
795        logv = "".join(self.allocation[aid]['log'])
796        # It's possible that the StartSegment call gets retried (!).
797        # if the 'started' key is in the allocation, we'll return it rather
798        # than redo the setup.
799        self.allocation[aid]['started'] = { 
800                'allocID': alloc_id,
801                'allocationLog': logv,
802                'segmentdescription': { 
803                    'topdldescription': t.to_dict()
804                    },
805                'proof': proof.to_dict(),
806                }
807        self.allocation[aid]['topo'] = t
808        retval = copy.copy(self.allocation[aid]['started'])
809        self.write_state()
810        self.state_lock.release()
811        return retval
812   
813    # End of StartSegment support routines
814
815    def StartSegment(self, req, fid):
816        err = None  # Any service_error generated after tmpdir is created
817        rv = None   # Return value from segment creation
818
819        self.log.info("StartSegment called by %s" % fid)
820        try:
821            req = req['StartSegmentRequestBody']
822            auth_attr = req['allocID']['fedid']
823            topref = req['segmentdescription']['topdldescription']
824        except KeyError:
825            raise service_error(server_error.req, "Badly formed request")
826
827        connInfo = req.get('connection', [])
828        services = req.get('service', [])
829        aid = "%s" % auth_attr
830        attrs = req.get('fedAttr', [])
831
832        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
833                with_proof=True)
834        if not access_ok:
835            self.log.info("StartSegment for %s failed: access denied" % fid)
836            raise service_error(service_error.access, "Access denied")
837        else:
838            # See if this is a replay of an earlier succeeded StartSegment -
839            # sometimes SSL kills 'em.  If so, replay the response rather than
840            # redoing the allocation.
841            self.state_lock.acquire()
842            retval = self.allocation[aid].get('started', None)
843            self.state_lock.release()
844            if retval:
845                self.log.warning("Duplicate StartSegment for %s: " % aid + \
846                        "replaying response")
847                return retval
848
849        # A new request.  Do it.
850
851        if topref: topo = topdl.Topology(**topref)
852        else:
853            raise service_error(service_error.req, 
854                    "Request missing segmentdescription'")
855       
856        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
857        try:
858            tmpdir = tempfile.mkdtemp(prefix="access-")
859            softdir = "%s/software" % tmpdir
860            os.mkdir(softdir)
861        except EnvironmentError:
862            self.log.info("StartSegment for %s failed: internal error" % fid)
863            raise service_error(service_error.internal, "Cannot create tmp dir")
864
865        # Try block alllows us to clean up temporary files.
866        try:
867            self.retrieve_software(topo, certfile, softdir)
868            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
869                alloc_log =  self.initialize_experiment_info(attrs, aid, 
870                        certfile, tmpdir)
871
872            if '/' in proj: proj, gid = proj.split('/')
873            else: gid = None
874
875
876            # Set up userconf and seer if needed
877            self.configure_userconf(services, tmpdir)
878            self.configure_seer_services(services, topo, softdir)
879            # Get and send synch store variables
880            self.export_store_info(certfile, proj, ename, connInfo)
881            self.import_store_info(certfile, connInfo)
882
883            expfile = "%s/experiment.tcl" % tmpdir
884
885            self.generate_portal_configs(topo, pubkey_base, 
886                    secretkey_base, tmpdir, proj, ename, connInfo, services)
887            self.generate_ns2(topo, expfile, 
888                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
889
890            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
891                    debug=self.create_debug, log=alloc_log, boss=self.boss,
892                    ops=self.ops, cert=xmlrpc_cert)
893            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
894        except service_error, e:
895            self.log.info("StartSegment for %s failed: %s"  % (fid, e))
896            err = e
897        except:
898            t, v, st = sys.exc_info()
899            self.log.info("StartSegment for %s failed: unexpected error:  %s" \
900                    % (fid, v, traceback.extract_tb(st)))
901            err = service_error(service_error.internal, "%s: %s" % \
902                    (v, traceback.extract_tb(st)))
903
904        # Walk up tmpdir, deleting as we go
905        if self.cleanup: self.remove_dirs(tmpdir)
906        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
907
908        if rv:
909            self.log.info("StartSegment for %s succeeded" % fid)
910            return self.finalize_experiment(starter, topo, aid, req['allocID'],
911                    proof)
912        elif err:
913            raise service_error(service_error.federant,
914                    "Swapin failed: %s" % err)
915        else:
916            raise service_error(service_error.federant, "Swapin failed")
917
918    def TerminateSegment(self, req, fid):
919        self.log.info("TerminateSegment called by %s" % fid)
920        try:
921            req = req['TerminateSegmentRequestBody']
922        except KeyError:
923            raise service_error(server_error.req, "Badly formed request")
924
925        auth_attr = req['allocID']['fedid']
926        aid = "%s" % auth_attr
927        attrs = req.get('fedAttr', [])
928
929        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
930                with_proof=True)
931        if not access_ok:
932            raise service_error(service_error.access, "Access denied")
933
934        self.state_lock.acquire()
935        if aid in self.allocation:
936            proj = self.allocation[aid].get('project', None)
937            user = self.allocation[aid].get('user', None)
938            xmlrpc_cert = self.allocation[aid].get('cert', None)
939            ename = self.allocation[aid].get('experiment', None)
940            nonce = self.allocation[aid].get('nonce', False)
941        else:
942            proj = None
943            user = None
944            ename = None
945            nonce = False
946            xmlrpc_cert = None
947        self.state_lock.release()
948
949        if not proj:
950            self.log.info("TerminateSegment failed for %s: cannot find project"\
951                    % fid)
952            raise service_error(service_error.internal, 
953                    "Can't find project for %s" % aid)
954        else:
955            if '/' in proj: proj, gid = proj.split('/')
956            else: gid = None
957
958        if not user:
959            self.log.info("TerminateSegment failed for %s: cannot find user"\
960                    % fid)
961            raise service_error(service_error.internal, 
962                    "Can't find creation user for %s" % aid)
963        if not ename:
964            self.log.info(
965                    "TerminateSegment failed for %s: cannot find experiment"\
966                    % fid)
967            raise service_error(service_error.internal, 
968                    "Can't find experiment name for %s" % aid)
969        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
970                debug=self.create_debug, boss=self.boss, ops=self.ops,
971                cert=xmlrpc_cert)
972        stopper(self, user, proj, ename, gid, nonce)
973        self.log.info("TerminateSegment succeeded for %s %s %s" % \
974                (fid, proj, ename))
975        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
976
977    def InfoSegment(self, req, fid):
978        self.log.info("InfoSegment called by %s" % fid)
979        try:
980            req = req['InfoSegmentRequestBody']
981        except KeyError:
982            raise service_error(server_error.req, "Badly formed request")
983
984        auth_attr = req['allocID']['fedid']
985        aid = "%s" % auth_attr
986
987        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
988                with_proof=True)
989        if not access_ok:
990            raise service_error(service_error.access, "Access denied")
991
992        self.state_lock.acquire()
993        if aid in self.allocation:
994            topo = self.allocation[aid].get('topo', None)
995            if topo: topo = topo.clone()
996            proj = self.allocation[aid].get('project', None)
997            user = self.allocation[aid].get('user', None)
998            xmlrpc_cert = self.allocation[aid].get('cert', None)
999            ename = self.allocation[aid].get('experiment', None)
1000        else:
1001            proj = None
1002            user = None
1003            ename = None
1004            topo = None
1005            xmlrpc_cert = None
1006        self.state_lock.release()
1007
1008        if not proj:
1009            self.log.info("InfoSegment failed for %s: cannot find project"% fid)
1010            raise service_error(service_error.internal, 
1011                    "Can't find project for %s" % aid)
1012        else:
1013            if '/' in proj: proj, gid = proj.split('/')
1014            else: gid = None
1015
1016        if not user:
1017            self.log.info("InfoSegment failed for %s: cannot find user"% fid)
1018            raise service_error(service_error.internal, 
1019                    "Can't find creation user for %s" % aid)
1020        if not ename:
1021            self.log.info("InfoSegment failed for %s: cannot find exp"% fid)
1022            raise service_error(service_error.internal, 
1023                    "Can't find experiment name for %s" % aid)
1024        info = self.info_segment(keyfile=self.ssh_privkey_file,
1025                debug=self.create_debug, boss=self.boss, ops=self.ops,
1026                cert=xmlrpc_cert)
1027        info(self, user, proj, ename)
1028        self.log.info("InfoSegment gathered info for %s %s %s %s" % \
1029                (fid, user, proj, ename))
1030        self.decorate_topology(info, topo)
1031        self.state_lock.acquire()
1032        if aid in self.allocation:
1033            self.allocation[aid]['topo'] = topo
1034            self.write_state()
1035        self.state_lock.release()
1036        self.log.info("InfoSegment updated info for %s %s %s %s" % \
1037                (fid, user, proj, ename))
1038
1039        rv = { 
1040                'allocID': req['allocID'], 
1041                'proof': proof.to_dict(),
1042                }
1043        self.log.info("InfoSegment succeeded info for %s %s %s %s" % \
1044                (fid, user, proj, ename))
1045        if topo:
1046            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1047        return rv
1048
1049    def OperationSegment(self, req, fid):
1050        def get_pname(e):
1051            """
1052            Get the physical name of a node
1053            """
1054            if e.localname:
1055                return re.sub('\..*','', e.localname[0])
1056            else:
1057                return None
1058
1059        self.log.info("OperationSegment called by %s" % fid)
1060        try:
1061            req = req['OperationSegmentRequestBody']
1062        except KeyError:
1063            raise service_error(server_error.req, "Badly formed request")
1064
1065        auth_attr = req['allocID']['fedid']
1066        aid = "%s" % auth_attr
1067
1068        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1069                with_proof=True)
1070        if not access_ok:
1071            self.log.info("OperationSegment failed for %s: access denied" % fid)
1072            raise service_error(service_error.access, "Access denied")
1073
1074        op = req.get('operation', None)
1075        targets = req.get('target', None)
1076        params = req.get('parameter', None)
1077
1078        if op is None :
1079            self.log.info("OperationSegment failed for %s: no operation" % fid)
1080            raise service_error(service_error.req, "missing operation")
1081        elif targets is None:
1082            self.log.info("OperationSegment failed for %s: no targets" % fid)
1083            raise service_error(service_error.req, "no targets")
1084
1085        self.state_lock.acquire()
1086        if aid in self.allocation:
1087            topo = self.allocation[aid].get('topo', None)
1088            if topo: topo = topo.clone()
1089            xmlrpc_cert = self.allocation[aid].get('cert', None)
1090        else:
1091            topo = None
1092            xmlrpc_cert = None
1093        self.state_lock.release()
1094
1095        targets = copy.copy(targets)
1096        ptargets = { }
1097        for e in topo.elements:
1098            if isinstance(e, topdl.Computer):
1099                if e.name in targets:
1100                    targets.remove(e.name)
1101                    pn = get_pname(e)
1102                    if pn: ptargets[e.name] = pn
1103
1104        status = [ operation_status(t, operation_status.no_target) \
1105                for t in targets]
1106
1107        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1108                debug=self.create_debug, boss=self.boss, ops=self.ops,
1109                cert=xmlrpc_cert)
1110        ops(self, op, ptargets, params, topo)
1111        self.log.info("OperationSegment operated for %s" % fid)
1112       
1113        status.extend(ops.status)
1114        self.log.info("OperationSegment succeed for %s" % fid)
1115
1116        return { 
1117                'allocID': req['allocID'], 
1118                'status': [s.to_dict() for s in status],
1119                'proof': proof.to_dict(),
1120                }
Note: See TracBrowser for help on using the repository browser.