source: fedd/federation/emulab_access.py @ 45016ae

compt_changes
Last change on this file since 45016ae was 6527d60, checked in by Ted Faber <faber@…>, 13 years ago

Improved info gathering

  • Property mode set to 100644
File size: 36.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from deter import fedid, generate_fedid
21from authorizer import authorizer, abac_authorizer
22from service_error import service_error
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from proof import proof as access_proof
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30from deter import topdl
31import list_log
32import emulab_segment
33
34
35# Make log messages disappear if noone configures a fedd logger
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.access")
40fl.addHandler(nullHandler())
41
42class access(access_base):
43    """
44    The implementation of access control based on mapping users to projects.
45
46    Users can be mapped to existing projects or have projects created
47    dynamically.  This implements both direct requests and proxies.
48    """
49
50    max_name_len = 19
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.max_name_len = access.max_name_len
60
61        self.allow_proxy = config.getboolean("access", "allow_proxy")
62
63        self.domain = config.get("access", "domain")
64        self.userconfdir = config.get("access","userconfdir")
65        self.userconfcmd = config.get("access","userconfcmd")
66        self.userconfurl = config.get("access","userconfurl")
67        self.federation_software = config.get("access", "federation_software")
68        self.portal_software = config.get("access", "portal_software")
69        self.local_seer_software = config.get("access", "local_seer_software")
70        self.local_seer_image = config.get("access", "local_seer_image")
71        self.local_seer_start = config.get("access", "local_seer_start")
72        self.seer_master_start = config.get("access", "seer_master_start")
73        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
74        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
75        self.ssh_port = config.get("access","ssh_port") or "22"
76        self.boss = config.get("access", "boss")
77        self.ops = config.get("access", "ops")
78        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
79        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
80
81        self.dragon_endpoint = config.get("access", "dragon")
82        self.dragon_vlans = config.get("access", "dragon_vlans")
83        self.deter_internal = config.get("access", "deter_internal")
84
85        self.tunnel_config = config.getboolean("access", "tunnel_config")
86        self.portal_command = config.get("access", "portal_command")
87        self.portal_image = config.get("access", "portal_image")
88        self.portal_type = config.get("access", "portal_type") or "pc"
89        self.portal_startcommand = config.get("access", "portal_startcommand")
90        self.node_startcommand = config.get("access", "node_startcommand")
91
92        self.federation_software = self.software_list(self.federation_software)
93        self.portal_software = self.software_list(self.portal_software)
94        self.local_seer_software = self.software_list(self.local_seer_software)
95
96        self.access_type = self.access_type.lower()
97        self.start_segment = emulab_segment.start_segment
98        self.stop_segment = emulab_segment.stop_segment
99        self.info_segment = emulab_segment.info_segment
100        self.operation_segment = emulab_segment.operation_segment
101
102        self.restricted = [ ]
103        tb = config.get('access', 'testbed')
104        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
105        else: self.testbed = [ ]
106
107        # authorization information
108        self.auth_type = config.get('access', 'auth_type') \
109                or 'abac'
110        self.auth_dir = config.get('access', 'auth_dir')
111        accessdb = config.get("access", "accessdb")
112        # initialize the authorization system
113        if self.auth_type == 'abac':
114            self.auth = abac_authorizer(load=self.auth_dir)
115            self.access = [ ]
116            if accessdb:
117                self.read_access(accessdb, self.access_tuple)
118        else:
119            raise service_error(service_error.internal, 
120                    "Unknown auth_type: %s" % self.auth_type)
121
122        # read_state in the base_class
123        self.state_lock.acquire()
124        if 'allocation' not in self.state: self.state['allocation']= { }
125        self.allocation = self.state['allocation']
126        self.state_lock.release()
127        self.exports = {
128                'SMB': self.export_SMB,
129                'seer': self.export_seer,
130                'tmcd': self.export_tmcd,
131                'userconfig': self.export_userconfig,
132                'project_export': self.export_project_export,
133                'local_seer_control': self.export_local_seer,
134                'seer_master': self.export_seer_master,
135                'hide_hosts': self.export_hide_hosts,
136                }
137
138        if not self.local_seer_image or not self.local_seer_software or \
139                not self.local_seer_start:
140            if 'local_seer_control' in self.exports:
141                del self.exports['local_seer_control']
142
143        if not self.local_seer_image or not self.local_seer_software or \
144                not self.seer_master_start:
145            if 'seer_master' in self.exports:
146                del self.exports['seer_master']
147
148
149        self.soap_services = {\
150            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
151            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
152            'StartSegment': soap_handler("StartSegment", self.StartSegment),
153            'TerminateSegment': soap_handler("TerminateSegment",
154                self.TerminateSegment),
155            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
156            'OperationSegment': soap_handler("OperationSegment",
157                self.OperationSegment),
158            }
159        self.xmlrpc_services =  {\
160            'RequestAccess': xmlrpc_handler('RequestAccess',
161                self.RequestAccess),
162            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
163                self.ReleaseAccess),
164            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
165            'TerminateSegment': xmlrpc_handler('TerminateSegment',
166                self.TerminateSegment),
167            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
168            'OperationSegment': xmlrpc_handler("OperationSegment",
169                self.OperationSegment),
170            }
171
172        self.call_SetValue = service_caller('SetValue')
173        self.call_GetValue = service_caller('GetValue', log=self.log)
174
175    @staticmethod
176    def access_tuple(str):
177        """
178        Convert a string of the form (id, id, id) into an access_project.  This
179        is called by read_access to convert to local attributes.  It returns a
180        tuple of the form (project, user, certificate_file).
181        """
182
183        str = str.strip()
184        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
185            # The slice takes the parens off the string.
186            proj, user, cert = str[1:-1].split(',')
187            return (proj.strip(), user.strip(), cert.strip())
188        else:
189            raise self.parse_error(
190                    'Bad mapping (unbalanced parens or more than 2 commas)')
191
192    # RequestAccess support routines
193
194    def save_project_state(self, aid, pname, uname, certf, owners):
195        """
196        Save the project, user, and owners associated with this allocation.
197        This creates the allocation entry.
198        """
199        self.state_lock.acquire()
200        self.allocation[aid] = { }
201        self.allocation[aid]['project'] = pname
202        self.allocation[aid]['user'] = uname
203        self.allocation[aid]['cert'] = certf
204        self.allocation[aid]['owners'] = owners
205        self.write_state()
206        self.state_lock.release()
207        return (pname, uname)
208
209    # End of RequestAccess support routines
210
211    def RequestAccess(self, req, fid):
212        """
213        Handle the access request.  Proxy if not for us.
214
215        Parse out the fields and make the allocations or rejections if for us,
216        otherwise, assuming we're willing to proxy, proxy the request out.
217        """
218
219        def gateway_hardware(h):
220            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
221            else: return h
222
223        def get_export_project(svcs):
224            """
225            if the service requests includes one to export a project, return
226            that project.
227            """
228            rv = None
229            for s in svcs:
230                if s.get('name', '') == 'project_export' and \
231                        s.get('visibility', '') == 'export':
232                    if not rv: 
233                        for a in s.get('fedAttr', []):
234                            if a.get('attribute', '') == 'project' \
235                                    and 'value' in a:
236                                rv = a['value']
237                    else:
238                        raise service_error(service_error, access, 
239                                'Requesting multiple project exports is ' + \
240                                        'not supported');
241            return rv
242
243        self.log.info("RequestAccess called by %s" % fid)
244        # The dance to get into the request body
245        if req.has_key('RequestAccessRequestBody'):
246            req = req['RequestAccessRequestBody']
247        else:
248            raise service_error(service_error.req, "No request!?")
249
250        # if this includes a project export request, construct a filter such
251        # that only the ABAC attributes mapped to that project are checked for
252        # access.
253        if 'service' in req:
254            ep = get_export_project(req['service'])
255            if ep: pf = lambda(a): a.value[0] == ep
256            else: pf = None
257        else:
258            ep = None
259            pf = None
260
261        if self.auth.import_credentials(
262                data_list=req.get('abac_credential', [])):
263            self.auth.save()
264
265        if self.auth_type == 'abac':
266            found, owners, proof = self.lookup_access(req, fid, filter=pf)
267        else:
268            raise service_error(service_error.internal, 
269                    'Unknown auth_type: %s' % self.auth_type)
270        ap = None
271
272        # keep track of what's been added
273        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
274        aid = unicode(allocID)
275
276        pname, uname = self.save_project_state(aid, found[0], found[1], 
277                found[2], owners)
278
279        services, svc_state = self.export_services(req.get('service',[]),
280                pname, uname)
281        self.state_lock.acquire()
282        # Store services state in global state
283        for k, v in svc_state.items():
284            self.allocation[aid][k] = v
285        self.append_allocation_authorization(aid, 
286                set([(o, allocID) for o in owners]), state_attr='allocation')
287        self.write_state()
288        self.state_lock.release()
289        try:
290            f = open("%s/%s.pem" % (self.certdir, aid), "w")
291            print >>f, alloc_cert
292            f.close()
293        except EnvironmentError, e:
294            self.log.info("RequestAccess failed for by %s: internal error" \
295                    % fid)
296            raise service_error(service_error.internal, 
297                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
298        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
299        resp = self.build_access_response({ 'fedid': allocID } ,
300                pname, services, proof)
301        return resp
302
303    def ReleaseAccess(self, req, fid):
304        self.log.info("ReleaseAccess called by %s" % fid)
305        # The dance to get into the request body
306        if req.has_key('ReleaseAccessRequestBody'):
307            req = req['ReleaseAccessRequestBody']
308        else:
309            raise service_error(service_error.req, "No request!?")
310
311        try:
312            if req['allocID'].has_key('localname'):
313                auth_attr = aid = req['allocID']['localname']
314            elif req['allocID'].has_key('fedid'):
315                aid = unicode(req['allocID']['fedid'])
316                auth_attr = req['allocID']['fedid']
317            else:
318                raise service_error(service_error.req,
319                        "Only localnames and fedids are understood")
320        except KeyError:
321            raise service_error(service_error.req, "Badly formed request")
322
323        self.log.debug("[access] deallocation requested for %s by %s" % \
324                (aid, fid))
325        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
326                with_proof=True)
327        if not access_ok:
328            self.log.debug("[access] deallocation denied for %s", aid)
329            raise service_error(service_error.access, "Access Denied")
330
331        self.state_lock.acquire()
332        if aid in self.allocation:
333            self.log.debug("Found allocation for %s" %aid)
334            self.clear_allocation_authorization(aid, state_attr='allocation')
335            del self.allocation[aid]
336            self.write_state()
337            self.state_lock.release()
338            # Remove the access cert
339            cf = "%s/%s.pem" % (self.certdir, aid)
340            self.log.debug("Removing %s" % cf)
341            os.remove(cf)
342            self.log.info("ReleaseAccess succeeded for %s" % fid)
343            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
344        else:
345            self.state_lock.release()
346            raise service_error(service_error.req, "No such allocation")
347
348    # These are subroutines for StartSegment
349    def generate_ns2(self, topo, expfn, softdir, connInfo):
350        """
351        Convert topo into an ns2 file, decorated with appropriate commands for
352        the particular testbed setup.  Convert all requests for software, etc
353        to point at the staged copies on this testbed and add the federation
354        startcommands.
355        """
356        class dragon_commands:
357            """
358            Functor to spit out approrpiate dragon commands for nodes listed in
359            the connectivity description.  The constructor makes a dict mapping
360            dragon nodes to their parameters and the __call__ checks each
361            element in turn for membership.
362            """
363            def __init__(self, map):
364                self.node_info = map
365
366            def __call__(self, e):
367                s = ""
368                if isinstance(e, topdl.Computer):
369                    if self.node_info.has_key(e.name):
370                        info = self.node_info[e.name]
371                        for ifname, vlan, type in info:
372                            for i in e.interface:
373                                if i.name == ifname:
374                                    addr = i.get_attribute('ip4_address')
375                                    subs = i.substrate[0]
376                                    break
377                            else:
378                                raise service_error(service_error.internal,
379                                        "No interface %s on element %s" % \
380                                                (ifname, e.name))
381                            # XXX: do netmask right
382                            if type =='link':
383                                s = ("tb-allow-external ${%s} " + \
384                                        "dragonportal ip %s vlan %s " + \
385                                        "netmask 255.255.255.0\n") % \
386                                        (topdl.to_tcl_name(e.name), addr, vlan)
387                            elif type =='lan':
388                                s = ("tb-allow-external ${%s} " + \
389                                        "dragonportal " + \
390                                        "ip %s vlan %s usurp %s\n") % \
391                                        (topdl.to_tcl_name(e.name), addr, 
392                                                vlan, subs)
393                            else:
394                                raise service_error(service_error_internal,
395                                        "Unknown DRAGON type %s" % type)
396                return s
397
398        class not_dragon:
399            """
400            Return true if a node is in the given map of dragon nodes.
401            """
402            def __init__(self, map):
403                self.nodes = set(map.keys())
404
405            def __call__(self, e):
406                return e.name not in self.nodes
407
408        # Main line of generate_ns2
409        t = topo.clone()
410
411        # Create the map of nodes that need direct connections (dragon
412        # connections) from the connInfo
413        dragon_map = { }
414        for i in [ i for i in connInfo if i['type'] == 'transit']:
415            for a in i.get('fedAttr', []):
416                if a['attribute'] == 'vlan_id':
417                    vlan = a['value']
418                    break
419            else:
420                raise service_error(service_error.internal, "No vlan tag")
421            members = i.get('member', [])
422            if len(members) > 1: type = 'lan'
423            else: type = 'link'
424
425            try:
426                for m in members:
427                    if m['element'] in dragon_map:
428                        dragon_map[m['element']].append(( m['interface'], 
429                            vlan, type))
430                    else:
431                        dragon_map[m['element']] = [( m['interface'], 
432                            vlan, type),]
433            except KeyError:
434                raise service_error(service_error.req,
435                        "Missing connectivity info")
436
437        # Weed out the things we aren't going to instantiate: Segments, portal
438        # substrates, and portal interfaces.  (The copy in the for loop allows
439        # us to delete from e.elements in side the for loop).  While we're
440        # touching all the elements, we also adjust paths from the original
441        # testbed to local testbed paths and put the federation commands into
442        # the start commands
443        for e in [e for e in t.elements]:
444            if isinstance(e, topdl.Segment):
445                t.elements.remove(e)
446            if isinstance(e, topdl.Computer):
447                self.add_kit(e, self.federation_software)
448                if e.get_attribute('portal') and self.portal_startcommand:
449                    # Add local portal support software
450                    self.add_kit(e, self.portal_software)
451                    # Portals never have a user-specified start command
452                    e.set_attribute('startup', self.portal_startcommand)
453                elif self.node_startcommand:
454                    if e.get_attribute('startup'):
455                        e.set_attribute('startup', "%s \\$USER '%s'" % \
456                                (self.node_startcommand, 
457                                    e.get_attribute('startup')))
458                    else:
459                        e.set_attribute('startup', self.node_startcommand)
460
461                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
462                # Remove portal interfaces that do not connect to DRAGON
463                e.interface = [i for i in e.interface \
464                        if not i.get_attribute('portal') or i.name in dinf ]
465            # Fix software paths
466            for s in getattr(e, 'software', []):
467                s.location = re.sub("^.*/", softdir, s.location)
468
469        t.substrates = [ s.clone() for s in t.substrates ]
470        t.incorporate_elements()
471
472        # Customize the ns2 output for local portal commands and images
473        filters = []
474
475        if self.dragon_endpoint:
476            add_filter = not_dragon(dragon_map)
477            filters.append(dragon_commands(dragon_map))
478        else:
479            add_filter = None
480
481        if self.portal_command:
482            filters.append(topdl.generate_portal_command_filter(
483                self.portal_command, add_filter=add_filter))
484
485        if self.portal_image:
486            filters.append(topdl.generate_portal_image_filter(
487                self.portal_image))
488
489        if self.portal_type:
490            filters.append(topdl.generate_portal_hardware_filter(
491                self.portal_type))
492
493        # Convert to ns and write it out
494        expfile = topdl.topology_to_ns2(t, filters)
495        try:
496            f = open(expfn, "w")
497            print >>f, expfile
498            f.close()
499        except EnvironmentError:
500            raise service_error(service_error.internal,
501                    "Cannot write experiment file %s: %s" % (expfn,e))
502
503    def export_store_info(self, cf, proj, ename, connInfo):
504        """
505        For the export requests in the connection info, install the peer names
506        at the experiment controller via SetValue calls.
507        """
508
509        for c in connInfo:
510            for p in [ p for p in c.get('parameter', []) \
511                    if p.get('type', '') == 'output']:
512
513                if p.get('name', '') == 'peer':
514                    k = p.get('key', None)
515                    surl = p.get('store', None)
516                    if surl and k and k.index('/') != -1:
517                        value = "%s.%s.%s%s" % \
518                                (k[k.index('/')+1:], ename, proj, self.domain)
519                        req = { 'name': k, 'value': value }
520                        self.log.debug("Setting %s to %s on %s" % \
521                                (k, value, surl))
522                        self.call_SetValue(surl, req, cf)
523                    else:
524                        self.log.error("Bad export request: %s" % p)
525                elif p.get('name', '') == 'ssh_port':
526                    k = p.get('key', None)
527                    surl = p.get('store', None)
528                    if surl and k:
529                        req = { 'name': k, 'value': self.ssh_port }
530                        self.log.debug("Setting %s to %s on %s" % \
531                                (k, self.ssh_port, surl))
532                        self.call_SetValue(surl, req, cf)
533                    else:
534                        self.log.error("Bad export request: %s" % p)
535                else:
536                    self.log.error("Unknown export parameter: %s" % \
537                            p.get('name'))
538                    continue
539
540    def add_seer_node(self, topo, name, startup):
541        """
542        Add a seer node to the given topology, with the startup command passed
543        in.  Used by configure seer_services.
544        """
545        c_node = topdl.Computer(
546                name=name, 
547                os= topdl.OperatingSystem(
548                    attribute=[
549                    { 'attribute': 'osid', 
550                        'value': self.local_seer_image },
551                    ]),
552                attribute=[
553                    { 'attribute': 'startup', 'value': startup },
554                    ]
555                )
556        self.add_kit(c_node, self.local_seer_software)
557        topo.elements.append(c_node)
558
559    def configure_seer_services(self, services, topo, softdir):
560        """
561        Make changes to the topology required for the seer requests being made.
562        Specifically, add any control or master nodes required and set up the
563        start commands on the nodes to interconnect them.
564        """
565        local_seer = False      # True if we need to add a control node
566        collect_seer = False    # True if there is a seer-master node
567        seer_master= False      # True if we need to add the seer-master
568        for s in services:
569            s_name = s.get('name', '')
570            s_vis = s.get('visibility','')
571
572            if s_name  == 'local_seer_control' and s_vis == 'export':
573                local_seer = True
574            elif s_name == 'seer_master':
575                if s_vis == 'import':
576                    collect_seer = True
577                elif s_vis == 'export':
578                    seer_master = True
579       
580        # We've got the whole picture now, so add nodes if needed and configure
581        # them to interconnect properly.
582        if local_seer or seer_master:
583            # Copy local seer control node software to the tempdir
584            for l, f in self.local_seer_software:
585                base = os.path.basename(f)
586                copy_file(f, "%s/%s" % (softdir, base))
587        # If we're collecting seers somewhere the controllers need to talk to
588        # the master.  In testbeds that export the service, that will be a
589        # local node that we'll add below.  Elsewhere it will be the control
590        # portal that will port forward to the exporting master.
591        if local_seer:
592            if collect_seer:
593                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
594            else:
595                startup = self.local_seer_start
596            self.add_seer_node(topo, 'control', startup)
597        # If this is the seer master, add that node, too.
598        if seer_master:
599            self.add_seer_node(topo, 'seer-master', 
600                    "%s -R -n -R seer-master -R -A -R sink" % \
601                            self.seer_master_start)
602
603    def retrieve_software(self, topo, certfile, softdir):
604        """
605        Collect the software that nodes in the topology need loaded and stage
606        it locally.  This implies retrieving it from the experiment_controller
607        and placing it into softdir.  Certfile is used to prove that this node
608        has access to that data (it's the allocation/segment fedid).  Finally
609        local portal and federation software is also copied to the same staging
610        directory for simplicity - all software needed for experiment creation
611        is in softdir.
612        """
613        sw = set()
614        for e in topo.elements:
615            for s in getattr(e, 'software', []):
616                sw.add(s.location)
617        for s in sw:
618            self.log.debug("Retrieving %s" % s)
619            try:
620                get_url(s, certfile, softdir)
621            except:
622                t, v, st = sys.exc_info()
623                raise service_error(service_error.internal,
624                        "Error retrieving %s: %s" % (s, v))
625
626        # Copy local federation and portal node software to the tempdir
627        for s in (self.federation_software, self.portal_software):
628            for l, f in s:
629                base = os.path.basename(f)
630                copy_file(f, "%s/%s" % (softdir, base))
631
632
633    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
634        """
635        Gather common configuration files, retrieve or create an experiment
636        name and project name, and return the ssh_key filenames.  Create an
637        allocation log bound to the state log variable as well.
638        """
639        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
640                'seer_ca_pem', 'seer_node_pem')
641        ename = None
642        pubkey_base = None
643        secretkey_base = None
644        proj = None
645        user = None
646        alloc_log = None
647        nonce_experiment = False
648        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
649
650        self.state_lock.acquire()
651        if aid in self.allocation:
652            proj = self.allocation[aid].get('project', None)
653        self.state_lock.release()
654
655        if not proj:
656            raise service_error(service_error.internal, 
657                    "Can't find project for %s" %aid)
658
659        for a in attrs:
660            if a['attribute'] in configs:
661                try:
662                    self.log.debug("Retrieving %s from %s" % \
663                            (a['attribute'], a['value']))
664                    get_url(a['value'], certfile, tmpdir)
665                except:
666                    t, v, st = sys.exc_info()
667                    raise service_error(service_error.internal,
668                            "Error retrieving %s: %s" % (a.get('value', ""), v))
669            if a['attribute'] == 'ssh_pubkey':
670                pubkey_base = a['value'].rpartition('/')[2]
671            if a['attribute'] == 'ssh_secretkey':
672                secretkey_base = a['value'].rpartition('/')[2]
673            if a['attribute'] == 'experiment_name':
674                ename = a['value']
675
676        # Names longer than the emulab max are discarded
677        if ename and len(ename) <= self.max_name_len:
678            # Clean up the experiment name so that emulab will accept it.
679            ename = re.sub(vchars_re, '-', ename)
680
681        else:
682            ename = ""
683            for i in range(0,5):
684                ename += random.choice(string.ascii_letters)
685            nonce_experiment = True
686            self.log.warn("No experiment name or suggestion too long: " + \
687                    "picked one randomly: %s" % ename)
688
689        if not pubkey_base:
690            raise service_error(service_error.req, 
691                    "No public key attribute")
692
693        if not secretkey_base:
694            raise service_error(service_error.req, 
695                    "No secret key attribute")
696
697        self.state_lock.acquire()
698        if aid in self.allocation:
699            user = self.allocation[aid].get('user', None)
700            cert = self.allocation[aid].get('cert', None)
701            self.allocation[aid]['experiment'] = ename
702            self.allocation[aid]['nonce'] = nonce_experiment
703            self.allocation[aid]['log'] = [ ]
704            # Create a logger that logs to the experiment's state object as
705            # well as to the main log file.
706            alloc_log = logging.getLogger('fedd.access.%s' % ename)
707            h = logging.StreamHandler(
708                    list_log.list_log(self.allocation[aid]['log']))
709            # XXX: there should be a global one of these rather than
710            # repeating the code.
711            h.setFormatter(logging.Formatter(
712                "%(asctime)s %(name)s %(message)s",
713                        '%d %b %y %H:%M:%S'))
714            alloc_log.addHandler(h)
715            self.write_state()
716        self.state_lock.release()
717
718        if not user:
719            raise service_error(service_error.internal, 
720                    "Can't find creation user for %s" %aid)
721
722        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
723
724    def decorate_topology(self, info, t):
725        """
726        Copy the physical mapping and status onto the topology.  Used by
727        StartSegment and InfoSegment
728        """
729        def add_new(ann, attr):
730            for a in ann:
731                if a not in attr: attr.append(a)
732
733        def merge_os(os, e):
734            if len(e.os) == 0:
735                # No OS at all:
736                if os.get_attribute('emulab_access:image'):
737                    os.set_attribute('emulab_access:initial_image', 
738                            os.get_attribute('emulab_access:image'))
739                e.os = [ os ]
740            elif len(e.os) == 1:
741                # There's one OS, copy the initial image and replace
742                eos = e.os[0]
743                initial = eos.get_attribute('emulab_access:initial_image')
744                if initial:
745                    os.set_attribute('emulab_access:initial_image', initial)
746                e.os = [ os] 
747            else:
748                # Multiple OSes, replace or append
749                for eos in e.os:
750                    if os.name == eos.name:
751                        eos.version = os.version
752                        eos.version = os.distribution
753                        eos.version = os.distributionversion
754                        for a in os.attribute:
755                            if eos.get_attribute(a.attribute):
756                                eos.remove_attribute(a.attribute)
757                            eos.set_attribute(a.attribute, a.value)
758                        break
759                else:
760                    e.os.append(os)
761
762
763        if t is None: return
764        i = 0 # For fake debugging instantiation
765        # Copy the assigned names into the return topology
766        for e in t.elements:
767            if isinstance(e, topdl.Computer):
768                if not self.create_debug:
769                    if e.name in info.node:
770                        add_new(("%s%s" % 
771                            (info.node[e.name].pname, self.domain),),
772                            e.localname)
773                        add_new(("%s%s" % 
774                            (info.node[e.name].lname, self.domain),),
775                            e.localname)
776                        e.status = info.node[e.name].status
777                        os = info.node[e.name].getOS()
778                        if os: merge_os(os, e)
779                else:
780                    # Simple debugging assignment
781                    add_new(("node%d%s" % (i, self.domain),), e.localname)
782                    e.status = 'active'
783                    add_new(('testop1', 'testop2'), e.operation)
784                    i += 1
785
786        for s in t.substrates:
787            if s.name in info.subs:
788                sub = info.subs[s.name]
789                if sub.cap is not None:
790                    s.capacity = topdl.Capacity(sub.cap, 'max')
791                if sub.delay is not None:
792                    s.delay = topdl.Latency(sub.delay, 'max')
793        # XXX interfaces
794
795
796    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
797        """
798        Store key bits of experiment state in the global repository, including
799        the response that may need to be replayed, and return the response.
800        """
801        i = 0
802        t = topo.clone()
803        self.decorate_topology(starter, t)
804        # Grab the log (this is some anal locking, but better safe than
805        # sorry)
806        self.state_lock.acquire()
807        logv = "".join(self.allocation[aid]['log'])
808        # It's possible that the StartSegment call gets retried (!).
809        # if the 'started' key is in the allocation, we'll return it rather
810        # than redo the setup.
811        self.allocation[aid]['started'] = { 
812                'allocID': alloc_id,
813                'allocationLog': logv,
814                'segmentdescription': { 
815                    'topdldescription': t.to_dict()
816                    },
817                'proof': proof.to_dict(),
818                }
819        self.allocation[aid]['topo'] = t
820        retval = copy.copy(self.allocation[aid]['started'])
821        self.write_state()
822        self.state_lock.release()
823        return retval
824   
825    # End of StartSegment support routines
826
827    def StartSegment(self, req, fid):
828        err = None  # Any service_error generated after tmpdir is created
829        rv = None   # Return value from segment creation
830
831        self.log.info("StartSegment called by %s" % fid)
832        try:
833            req = req['StartSegmentRequestBody']
834            auth_attr = req['allocID']['fedid']
835            topref = req['segmentdescription']['topdldescription']
836        except KeyError:
837            raise service_error(server_error.req, "Badly formed request")
838
839        connInfo = req.get('connection', [])
840        services = req.get('service', [])
841        aid = "%s" % auth_attr
842        attrs = req.get('fedAttr', [])
843
844        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
845                with_proof=True)
846        if not access_ok:
847            self.log.info("StartSegment for %s failed: access denied" % fid)
848            raise service_error(service_error.access, "Access denied")
849        else:
850            # See if this is a replay of an earlier succeeded StartSegment -
851            # sometimes SSL kills 'em.  If so, replay the response rather than
852            # redoing the allocation.
853            self.state_lock.acquire()
854            retval = self.allocation[aid].get('started', None)
855            self.state_lock.release()
856            if retval:
857                self.log.warning("Duplicate StartSegment for %s: " % aid + \
858                        "replaying response")
859                return retval
860
861        # A new request.  Do it.
862
863        if topref: topo = topdl.Topology(**topref)
864        else:
865            raise service_error(service_error.req, 
866                    "Request missing segmentdescription'")
867       
868        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
869        try:
870            tmpdir = tempfile.mkdtemp(prefix="access-")
871            softdir = "%s/software" % tmpdir
872            os.mkdir(softdir)
873        except EnvironmentError:
874            self.log.info("StartSegment for %s failed: internal error" % fid)
875            raise service_error(service_error.internal, "Cannot create tmp dir")
876
877        # Try block alllows us to clean up temporary files.
878        try:
879            self.retrieve_software(topo, certfile, softdir)
880            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
881                alloc_log =  self.initialize_experiment_info(attrs, aid, 
882                        certfile, tmpdir)
883
884            if '/' in proj: proj, gid = proj.split('/')
885            else: gid = None
886
887
888            # Set up userconf and seer if needed
889            self.configure_userconf(services, tmpdir)
890            self.configure_seer_services(services, topo, softdir)
891            # Get and send synch store variables
892            self.export_store_info(certfile, proj, ename, connInfo)
893            self.import_store_info(certfile, connInfo)
894
895            expfile = "%s/experiment.tcl" % tmpdir
896
897            self.generate_portal_configs(topo, pubkey_base, 
898                    secretkey_base, tmpdir, proj, ename, connInfo, services)
899            self.generate_ns2(topo, expfile, 
900                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
901
902            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
903                    debug=self.create_debug, log=alloc_log, boss=self.boss,
904                    ops=self.ops, cert=xmlrpc_cert)
905            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
906        except service_error, e:
907            self.log.info("StartSegment for %s failed: %s"  % (fid, e))
908            err = e
909        except:
910            t, v, st = sys.exc_info()
911            self.log.info("StartSegment for %s failed: unexpected error:  %s" \
912                    % (fid, v, traceback.extract_tb(st)))
913            err = service_error(service_error.internal, "%s: %s" % \
914                    (v, traceback.extract_tb(st)))
915
916        # Walk up tmpdir, deleting as we go
917        if self.cleanup: self.remove_dirs(tmpdir)
918        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
919
920        if rv:
921            self.log.info("StartSegment for %s succeeded" % fid)
922            return self.finalize_experiment(starter, topo, aid, req['allocID'],
923                    proof)
924        elif err:
925            raise service_error(service_error.federant,
926                    "Swapin failed: %s" % err)
927        else:
928            raise service_error(service_error.federant, "Swapin failed")
929
930    def TerminateSegment(self, req, fid):
931        self.log.info("TerminateSegment called by %s" % fid)
932        try:
933            req = req['TerminateSegmentRequestBody']
934        except KeyError:
935            raise service_error(server_error.req, "Badly formed request")
936
937        auth_attr = req['allocID']['fedid']
938        aid = "%s" % auth_attr
939        attrs = req.get('fedAttr', [])
940
941        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
942                with_proof=True)
943        if not access_ok:
944            raise service_error(service_error.access, "Access denied")
945
946        self.state_lock.acquire()
947        if aid in self.allocation:
948            proj = self.allocation[aid].get('project', None)
949            user = self.allocation[aid].get('user', None)
950            xmlrpc_cert = self.allocation[aid].get('cert', None)
951            ename = self.allocation[aid].get('experiment', None)
952            nonce = self.allocation[aid].get('nonce', False)
953        else:
954            proj = None
955            user = None
956            ename = None
957            nonce = False
958            xmlrpc_cert = None
959        self.state_lock.release()
960
961        if not proj:
962            self.log.info("TerminateSegment failed for %s: cannot find project"\
963                    % fid)
964            raise service_error(service_error.internal, 
965                    "Can't find project for %s" % aid)
966        else:
967            if '/' in proj: proj, gid = proj.split('/')
968            else: gid = None
969
970        if not user:
971            self.log.info("TerminateSegment failed for %s: cannot find user"\
972                    % fid)
973            raise service_error(service_error.internal, 
974                    "Can't find creation user for %s" % aid)
975        if not ename:
976            self.log.info(
977                    "TerminateSegment failed for %s: cannot find experiment"\
978                    % fid)
979            raise service_error(service_error.internal, 
980                    "Can't find experiment name for %s" % aid)
981        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
982                debug=self.create_debug, boss=self.boss, ops=self.ops,
983                cert=xmlrpc_cert)
984        stopper(self, user, proj, ename, gid, nonce)
985        self.log.info("TerminateSegment succeeded for %s %s %s" % \
986                (fid, proj, ename))
987        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
988
989    def InfoSegment(self, req, fid):
990        self.log.info("InfoSegment called by %s" % fid)
991        try:
992            req = req['InfoSegmentRequestBody']
993        except KeyError:
994            raise service_error(server_error.req, "Badly formed request")
995
996        auth_attr = req['allocID']['fedid']
997        aid = "%s" % auth_attr
998
999        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1000                with_proof=True)
1001        if not access_ok:
1002            raise service_error(service_error.access, "Access denied")
1003
1004        self.state_lock.acquire()
1005        if aid in self.allocation:
1006            topo = self.allocation[aid].get('topo', None)
1007            if topo: topo = topo.clone()
1008            proj = self.allocation[aid].get('project', None)
1009            user = self.allocation[aid].get('user', None)
1010            xmlrpc_cert = self.allocation[aid].get('cert', None)
1011            ename = self.allocation[aid].get('experiment', None)
1012        else:
1013            proj = None
1014            user = None
1015            ename = None
1016            topo = None
1017            xmlrpc_cert = None
1018        self.state_lock.release()
1019
1020        if not proj:
1021            self.log.info("InfoSegment failed for %s: cannot find project"% fid)
1022            raise service_error(service_error.internal, 
1023                    "Can't find project for %s" % aid)
1024        else:
1025            if '/' in proj: proj, gid = proj.split('/')
1026            else: gid = None
1027
1028        if not user:
1029            self.log.info("InfoSegment failed for %s: cannot find user"% fid)
1030            raise service_error(service_error.internal, 
1031                    "Can't find creation user for %s" % aid)
1032        if not ename:
1033            self.log.info("InfoSegment failed for %s: cannot find exp"% fid)
1034            raise service_error(service_error.internal, 
1035                    "Can't find experiment name for %s" % aid)
1036        info = self.info_segment(keyfile=self.ssh_privkey_file,
1037                debug=self.create_debug, boss=self.boss, ops=self.ops,
1038                cert=xmlrpc_cert)
1039        info(self, user, proj, ename)
1040        self.log.info("InfoSegment gathered info for %s %s %s %s" % \
1041                (fid, user, proj, ename))
1042        self.decorate_topology(info, topo)
1043        self.state_lock.acquire()
1044        if aid in self.allocation:
1045            self.allocation[aid]['topo'] = topo
1046            self.write_state()
1047        self.state_lock.release()
1048        self.log.info("InfoSegment updated info for %s %s %s %s" % \
1049                (fid, user, proj, ename))
1050
1051        rv = { 
1052                'allocID': req['allocID'], 
1053                'proof': proof.to_dict(),
1054                }
1055        self.log.info("InfoSegment succeeded info for %s %s %s %s" % \
1056                (fid, user, proj, ename))
1057        if topo:
1058            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1059        return rv
1060
1061    def OperationSegment(self, req, fid):
1062        def get_pname(e):
1063            """
1064            Get the physical name of a node
1065            """
1066            if e.localname:
1067                return re.sub('\..*','', e.localname[0])
1068            else:
1069                return None
1070
1071        self.log.info("OperationSegment called by %s" % fid)
1072        try:
1073            req = req['OperationSegmentRequestBody']
1074        except KeyError:
1075            raise service_error(server_error.req, "Badly formed request")
1076
1077        auth_attr = req['allocID']['fedid']
1078        aid = "%s" % auth_attr
1079
1080        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1081                with_proof=True)
1082        if not access_ok:
1083            self.log.info("OperationSegment failed for %s: access denied" % fid)
1084            raise service_error(service_error.access, "Access denied")
1085
1086        op = req.get('operation', None)
1087        targets = req.get('target', None)
1088        params = req.get('parameter', None)
1089
1090        if op is None :
1091            self.log.info("OperationSegment failed for %s: no operation" % fid)
1092            raise service_error(service_error.req, "missing operation")
1093        elif targets is None:
1094            self.log.info("OperationSegment failed for %s: no targets" % fid)
1095            raise service_error(service_error.req, "no targets")
1096
1097        self.state_lock.acquire()
1098        if aid in self.allocation:
1099            topo = self.allocation[aid].get('topo', None)
1100            if topo: topo = topo.clone()
1101            xmlrpc_cert = self.allocation[aid].get('cert', None)
1102        else:
1103            topo = None
1104            xmlrpc_cert = None
1105        self.state_lock.release()
1106
1107        targets = copy.copy(targets)
1108        ptargets = { }
1109        for e in topo.elements:
1110            if isinstance(e, topdl.Computer):
1111                if e.name in targets:
1112                    targets.remove(e.name)
1113                    pn = get_pname(e)
1114                    if pn: ptargets[e.name] = pn
1115
1116        status = [ operation_status(t, operation_status.no_target) \
1117                for t in targets]
1118
1119        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1120                debug=self.create_debug, boss=self.boss, ops=self.ops,
1121                cert=xmlrpc_cert)
1122        ops(self, op, ptargets, params, topo)
1123        self.log.info("OperationSegment operated for %s" % fid)
1124       
1125        status.extend(ops.status)
1126        self.log.info("OperationSegment succeed for %s" % fid)
1127
1128        return { 
1129                'allocID': req['allocID'], 
1130                'status': [s.to_dict() for s in status],
1131                'proof': proof.to_dict(),
1132                }
Note: See TracBrowser for help on using the repository browser.