source: fedd/federation/emulab_access.py @ cde9b98

compt_changes
Last change on this file since cde9b98 was cde9b98, checked in by Ted Faber <faber@…>, 12 years ago

More debug logging

  • Property mode set to 100644
File size: 37.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from deter import fedid, generate_fedid
21from authorizer import authorizer, abac_authorizer
22from service_error import service_error
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from proof import proof as access_proof
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30from deter import topdl
31import list_log
32import emulab_segment
33
34
35# Make log messages disappear if noone configures a fedd logger
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.access")
40fl.addHandler(nullHandler())
41
42class access(access_base):
43    """
44    The implementation of access control based on mapping users to projects.
45
46    Users can be mapped to existing projects or have projects created
47    dynamically.  This implements both direct requests and proxies.
48    """
49
50    max_name_len = 19
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.max_name_len = access.max_name_len
60
61        self.allow_proxy = config.getboolean("access", "allow_proxy")
62
63        self.domain = config.get("access", "domain")
64        self.userconfdir = config.get("access","userconfdir")
65        self.userconfcmd = config.get("access","userconfcmd")
66        self.userconfurl = config.get("access","userconfurl")
67        self.federation_software = config.get("access", "federation_software")
68        self.portal_software = config.get("access", "portal_software")
69        self.local_seer_software = config.get("access", "local_seer_software")
70        self.local_seer_image = config.get("access", "local_seer_image")
71        self.local_seer_start = config.get("access", "local_seer_start")
72        self.seer_master_start = config.get("access", "seer_master_start")
73        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
74        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
75        self.ssh_port = config.get("access","ssh_port") or "22"
76        self.boss = config.get("access", "boss")
77        self.ops = config.get("access", "ops")
78        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
79        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
80
81        self.dragon_endpoint = config.get("access", "dragon")
82        self.dragon_vlans = config.get("access", "dragon_vlans")
83        self.deter_internal = config.get("access", "deter_internal")
84
85        self.tunnel_config = config.getboolean("access", "tunnel_config")
86        self.portal_command = config.get("access", "portal_command")
87        self.portal_image = config.get("access", "portal_image")
88        self.portal_type = config.get("access", "portal_type") or "pc"
89        self.portal_startcommand = config.get("access", "portal_startcommand")
90        self.node_startcommand = config.get("access", "node_startcommand")
91
92        self.federation_software = self.software_list(self.federation_software)
93        self.portal_software = self.software_list(self.portal_software)
94        self.local_seer_software = self.software_list(self.local_seer_software)
95
96        self.access_type = self.access_type.lower()
97        self.start_segment = emulab_segment.start_segment
98        self.stop_segment = emulab_segment.stop_segment
99        self.info_segment = emulab_segment.info_segment
100        self.operation_segment = emulab_segment.operation_segment
101
102        self.restricted = [ ]
103        tb = config.get('access', 'testbed')
104        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
105        else: self.testbed = [ ]
106
107        # authorization information
108        self.auth_type = config.get('access', 'auth_type') \
109                or 'abac'
110        self.auth_dir = config.get('access', 'auth_dir')
111        accessdb = config.get("access", "accessdb")
112        # initialize the authorization system
113        if self.auth_type == 'abac':
114            self.auth = abac_authorizer(load=self.auth_dir)
115            self.access = [ ]
116            if accessdb:
117                self.read_access(accessdb, self.access_tuple)
118        else:
119            raise service_error(service_error.internal, 
120                    "Unknown auth_type: %s" % self.auth_type)
121
122        # read_state in the base_class
123        self.state_lock.acquire()
124        if 'allocation' not in self.state: self.state['allocation']= { }
125        self.allocation = self.state['allocation']
126        self.state_lock.release()
127        self.exports = {
128                'SMB': self.export_SMB,
129                'seer': self.export_seer,
130                'tmcd': self.export_tmcd,
131                'userconfig': self.export_userconfig,
132                'project_export': self.export_project_export,
133                'local_seer_control': self.export_local_seer,
134                'seer_master': self.export_seer_master,
135                'hide_hosts': self.export_hide_hosts,
136                }
137
138        if not self.local_seer_image or not self.local_seer_software or \
139                not self.local_seer_start:
140            if 'local_seer_control' in self.exports:
141                del self.exports['local_seer_control']
142
143        if not self.local_seer_image or not self.local_seer_software or \
144                not self.seer_master_start:
145            if 'seer_master' in self.exports:
146                del self.exports['seer_master']
147
148
149        self.soap_services = {\
150            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
151            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
152            'StartSegment': soap_handler("StartSegment", self.StartSegment),
153            'TerminateSegment': soap_handler("TerminateSegment",
154                self.TerminateSegment),
155            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
156            'OperationSegment': soap_handler("OperationSegment",
157                self.OperationSegment),
158            }
159        self.xmlrpc_services =  {\
160            'RequestAccess': xmlrpc_handler('RequestAccess',
161                self.RequestAccess),
162            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
163                self.ReleaseAccess),
164            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
165            'TerminateSegment': xmlrpc_handler('TerminateSegment',
166                self.TerminateSegment),
167            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
168            'OperationSegment': xmlrpc_handler("OperationSegment",
169                self.OperationSegment),
170            }
171
172        self.call_SetValue = service_caller('SetValue')
173        self.call_GetValue = service_caller('GetValue', log=self.log)
174
175    @staticmethod
176    def access_tuple(str):
177        """
178        Convert a string of the form (id, id, id) into an access_project.  This
179        is called by read_access to convert to local attributes.  It returns a
180        tuple of the form (project, user, certificate_file).
181        """
182
183        str = str.strip()
184        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
185            # The slice takes the parens off the string.
186            proj, user, cert = str[1:-1].split(',')
187            return (proj.strip(), user.strip(), cert.strip())
188        else:
189            raise self.parse_error(
190                    'Bad mapping (unbalanced parens or more than 2 commas)')
191
192    # RequestAccess support routines
193
194    def save_project_state(self, aid, pname, uname, certf, owners):
195        """
196        Save the project, user, and owners associated with this allocation.
197        This creates the allocation entry.
198        """
199        self.state_lock.acquire()
200        self.allocation[aid] = { }
201        self.allocation[aid]['project'] = pname
202        self.allocation[aid]['user'] = uname
203        self.allocation[aid]['cert'] = certf
204        self.allocation[aid]['owners'] = owners
205        self.write_state()
206        self.state_lock.release()
207        return (pname, uname)
208
209    # End of RequestAccess support routines
210
211    def RequestAccess(self, req, fid):
212        """
213        Handle the access request.  Proxy if not for us.
214
215        Parse out the fields and make the allocations or rejections if for us,
216        otherwise, assuming we're willing to proxy, proxy the request out.
217        """
218
219        def gateway_hardware(h):
220            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
221            else: return h
222
223        def get_export_project(svcs):
224            """
225            if the service requests includes one to export a project, return
226            that project.
227            """
228            rv = None
229            for s in svcs:
230                if s.get('name', '') == 'project_export' and \
231                        s.get('visibility', '') == 'export':
232                    if not rv: 
233                        for a in s.get('fedAttr', []):
234                            if a.get('attribute', '') == 'project' \
235                                    and 'value' in a:
236                                rv = a['value']
237                    else:
238                        raise service_error(service_error, access, 
239                                'Requesting multiple project exports is ' + \
240                                        'not supported');
241            return rv
242
243        self.log.info("RequestAccess called by %s" % fid)
244        # The dance to get into the request body
245        if req.has_key('RequestAccessRequestBody'):
246            req = req['RequestAccessRequestBody']
247        else:
248            raise service_error(service_error.req, "No request!?")
249
250        # if this includes a project export request, construct a filter such
251        # that only the ABAC attributes mapped to that project are checked for
252        # access.
253        if 'service' in req:
254            ep = get_export_project(req['service'])
255            if ep: pf = lambda(a): a.value[0] == ep
256            else: pf = None
257        else:
258            ep = None
259            pf = None
260
261        if self.auth.import_credentials(
262                data_list=req.get('abac_credential', [])):
263            self.auth.save()
264        else:
265            self.log.debug('failed to import incoming credentials')
266
267        if self.auth_type == 'abac':
268            found, owners, proof = self.lookup_access(req, fid, filter=pf)
269        else:
270            raise service_error(service_error.internal, 
271                    'Unknown auth_type: %s' % self.auth_type)
272        ap = None
273
274        # keep track of what's been added
275        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
276        aid = unicode(allocID)
277
278        pname, uname = self.save_project_state(aid, found[0], found[1], 
279                found[2], owners)
280
281        services, svc_state = self.export_services(req.get('service',[]),
282                pname, uname)
283        self.state_lock.acquire()
284        # Store services state in global state
285        for k, v in svc_state.items():
286            self.allocation[aid][k] = v
287        self.append_allocation_authorization(aid, 
288                set([(o, allocID) for o in owners]), state_attr='allocation')
289        self.write_state()
290        self.state_lock.release()
291        try:
292            f = open("%s/%s.pem" % (self.certdir, aid), "w")
293            print >>f, alloc_cert
294            f.close()
295        except EnvironmentError, e:
296            self.log.info("RequestAccess failed for by %s: internal error" \
297                    % fid)
298            raise service_error(service_error.internal, 
299                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
300        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
301        resp = self.build_access_response({ 'fedid': allocID } ,
302                pname, services, proof)
303        return resp
304
305    def ReleaseAccess(self, req, fid):
306        self.log.info("ReleaseAccess called by %s" % fid)
307        # The dance to get into the request body
308        if req.has_key('ReleaseAccessRequestBody'):
309            req = req['ReleaseAccessRequestBody']
310        else:
311            raise service_error(service_error.req, "No request!?")
312
313        try:
314            if req['allocID'].has_key('localname'):
315                auth_attr = aid = req['allocID']['localname']
316            elif req['allocID'].has_key('fedid'):
317                aid = unicode(req['allocID']['fedid'])
318                auth_attr = req['allocID']['fedid']
319            else:
320                raise service_error(service_error.req,
321                        "Only localnames and fedids are understood")
322        except KeyError:
323            raise service_error(service_error.req, "Badly formed request")
324
325        self.log.debug("[access] deallocation requested for %s by %s" % \
326                (aid, fid))
327        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
328                with_proof=True)
329        if not access_ok:
330            self.log.debug("[access] deallocation denied for %s", aid)
331            raise service_error(service_error.access, "Access Denied")
332
333        self.state_lock.acquire()
334        if aid in self.allocation:
335            self.log.debug("Found allocation for %s" %aid)
336            self.clear_allocation_authorization(aid, state_attr='allocation')
337            del self.allocation[aid]
338            self.write_state()
339            self.state_lock.release()
340            # Remove the access cert
341            cf = "%s/%s.pem" % (self.certdir, aid)
342            self.log.debug("Removing %s" % cf)
343            os.remove(cf)
344            self.log.info("ReleaseAccess succeeded for %s" % fid)
345            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
346        else:
347            self.state_lock.release()
348            raise service_error(service_error.req, "No such allocation")
349
350    # These are subroutines for StartSegment
351    def generate_ns2(self, topo, expfn, softdir, connInfo):
352        """
353        Convert topo into an ns2 file, decorated with appropriate commands for
354        the particular testbed setup.  Convert all requests for software, etc
355        to point at the staged copies on this testbed and add the federation
356        startcommands.
357        """
358        class dragon_commands:
359            """
360            Functor to spit out approrpiate dragon commands for nodes listed in
361            the connectivity description.  The constructor makes a dict mapping
362            dragon nodes to their parameters and the __call__ checks each
363            element in turn for membership.
364            """
365            def __init__(self, map):
366                self.node_info = map
367
368            def __call__(self, e):
369                s = ""
370                if isinstance(e, topdl.Computer):
371                    if self.node_info.has_key(e.name):
372                        info = self.node_info[e.name]
373                        for ifname, vlan, type in info:
374                            for i in e.interface:
375                                if i.name == ifname:
376                                    addr = i.get_attribute('ip4_address')
377                                    subs = i.substrate[0]
378                                    break
379                            else:
380                                raise service_error(service_error.internal,
381                                        "No interface %s on element %s" % \
382                                                (ifname, e.name))
383                            # XXX: do netmask right
384                            if type =='link':
385                                s = ("tb-allow-external ${%s} " + \
386                                        "dragonportal ip %s vlan %s " + \
387                                        "netmask 255.255.255.0\n") % \
388                                        (topdl.to_tcl_name(e.name), addr, vlan)
389                            elif type =='lan':
390                                s = ("tb-allow-external ${%s} " + \
391                                        "dragonportal " + \
392                                        "ip %s vlan %s usurp %s\n") % \
393                                        (topdl.to_tcl_name(e.name), addr, 
394                                                vlan, subs)
395                            else:
396                                raise service_error(service_error_internal,
397                                        "Unknown DRAGON type %s" % type)
398                return s
399
400        class not_dragon:
401            """
402            Return true if a node is in the given map of dragon nodes.
403            """
404            def __init__(self, map):
405                self.nodes = set(map.keys())
406
407            def __call__(self, e):
408                return e.name not in self.nodes
409
410        # Main line of generate_ns2
411        t = topo.clone()
412
413        # Create the map of nodes that need direct connections (dragon
414        # connections) from the connInfo
415        dragon_map = { }
416        for i in [ i for i in connInfo if i['type'] == 'transit']:
417            for a in i.get('fedAttr', []):
418                if a['attribute'] == 'vlan_id':
419                    vlan = a['value']
420                    break
421            else:
422                raise service_error(service_error.internal, "No vlan tag")
423            members = i.get('member', [])
424            if len(members) > 1: type = 'lan'
425            else: type = 'link'
426
427            try:
428                for m in members:
429                    if m['element'] in dragon_map:
430                        dragon_map[m['element']].append(( m['interface'], 
431                            vlan, type))
432                    else:
433                        dragon_map[m['element']] = [( m['interface'], 
434                            vlan, type),]
435            except KeyError:
436                raise service_error(service_error.req,
437                        "Missing connectivity info")
438
439        # Weed out the things we aren't going to instantiate: Segments, portal
440        # substrates, and portal interfaces.  (The copy in the for loop allows
441        # us to delete from e.elements in side the for loop).  While we're
442        # touching all the elements, we also adjust paths from the original
443        # testbed to local testbed paths and put the federation commands into
444        # the start commands
445        for e in [e for e in t.elements]:
446            if isinstance(e, topdl.Segment):
447                t.elements.remove(e)
448            if isinstance(e, topdl.Computer):
449                self.add_kit(e, self.federation_software)
450                if e.get_attribute('portal') and self.portal_startcommand:
451                    # Add local portal support software
452                    self.add_kit(e, self.portal_software)
453                    # Portals never have a user-specified start command
454                    e.set_attribute('startup', self.portal_startcommand)
455                elif self.node_startcommand:
456                    if e.get_attribute('startup'):
457                        e.set_attribute('startup', "%s \\$USER '%s'" % \
458                                (self.node_startcommand, 
459                                    e.get_attribute('startup')))
460                    else:
461                        e.set_attribute('startup', self.node_startcommand)
462
463                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
464                # Remove portal interfaces that do not connect to DRAGON
465                e.interface = [i for i in e.interface \
466                        if not i.get_attribute('portal') or i.name in dinf ]
467            # Fix software paths
468            for s in getattr(e, 'software', []):
469                s.location = re.sub("^.*/", softdir, s.location)
470
471        t.substrates = [ s.clone() for s in t.substrates ]
472        t.incorporate_elements()
473
474        # Customize the ns2 output for local portal commands and images
475        filters = []
476
477        if self.dragon_endpoint:
478            add_filter = not_dragon(dragon_map)
479            filters.append(dragon_commands(dragon_map))
480        else:
481            add_filter = None
482
483        if self.portal_command:
484            filters.append(topdl.generate_portal_command_filter(
485                self.portal_command, add_filter=add_filter))
486
487        if self.portal_image:
488            filters.append(topdl.generate_portal_image_filter(
489                self.portal_image))
490
491        if self.portal_type:
492            filters.append(topdl.generate_portal_hardware_filter(
493                self.portal_type))
494
495        # Convert to ns and write it out
496        expfile = topdl.topology_to_ns2(t, filters)
497        try:
498            f = open(expfn, "w")
499            print >>f, expfile
500            f.close()
501        except EnvironmentError:
502            raise service_error(service_error.internal,
503                    "Cannot write experiment file %s: %s" % (expfn,e))
504
505    def export_store_info(self, cf, proj, ename, connInfo):
506        """
507        For the export requests in the connection info, install the peer names
508        at the experiment controller via SetValue calls.
509        """
510
511        for c in connInfo:
512            for p in [ p for p in c.get('parameter', []) \
513                    if p.get('type', '') == 'output']:
514
515                if p.get('name', '') == 'peer':
516                    k = p.get('key', None)
517                    surl = p.get('store', None)
518                    if surl and k and k.index('/') != -1:
519                        value = "%s.%s.%s%s" % \
520                                (k[k.index('/')+1:], ename, proj, self.domain)
521                        req = { 'name': k, 'value': value }
522                        self.log.debug("Setting %s to %s on %s" % \
523                                (k, value, surl))
524                        self.call_SetValue(surl, req, cf)
525                    else:
526                        self.log.error("Bad export request: %s" % p)
527                elif p.get('name', '') == 'ssh_port':
528                    k = p.get('key', None)
529                    surl = p.get('store', None)
530                    if surl and k:
531                        req = { 'name': k, 'value': self.ssh_port }
532                        self.log.debug("Setting %s to %s on %s" % \
533                                (k, self.ssh_port, surl))
534                        self.call_SetValue(surl, req, cf)
535                    else:
536                        self.log.error("Bad export request: %s" % p)
537                else:
538                    self.log.error("Unknown export parameter: %s" % \
539                            p.get('name'))
540                    continue
541
542    def add_seer_node(self, topo, name, startup):
543        """
544        Add a seer node to the given topology, with the startup command passed
545        in.  Used by configure seer_services.
546        """
547        c_node = topdl.Computer(
548                name=name, 
549                os= topdl.OperatingSystem(
550                    attribute=[
551                    { 'attribute': 'osid', 
552                        'value': self.local_seer_image },
553                    ]),
554                attribute=[
555                    { 'attribute': 'startup', 'value': startup },
556                    ]
557                )
558        self.add_kit(c_node, self.local_seer_software)
559        topo.elements.append(c_node)
560
561    def configure_seer_services(self, services, topo, softdir):
562        """
563        Make changes to the topology required for the seer requests being made.
564        Specifically, add any control or master nodes required and set up the
565        start commands on the nodes to interconnect them.
566        """
567        local_seer = False      # True if we need to add a control node
568        collect_seer = False    # True if there is a seer-master node
569        seer_master= False      # True if we need to add the seer-master
570        for s in services:
571            s_name = s.get('name', '')
572            s_vis = s.get('visibility','')
573
574            if s_name  == 'local_seer_control' and s_vis == 'export':
575                local_seer = True
576            elif s_name == 'seer_master':
577                if s_vis == 'import':
578                    collect_seer = True
579                elif s_vis == 'export':
580                    seer_master = True
581       
582        # We've got the whole picture now, so add nodes if needed and configure
583        # them to interconnect properly.
584        if local_seer or seer_master:
585            # Copy local seer control node software to the tempdir
586            for l, f in self.local_seer_software:
587                base = os.path.basename(f)
588                copy_file(f, "%s/%s" % (softdir, base))
589        # If we're collecting seers somewhere the controllers need to talk to
590        # the master.  In testbeds that export the service, that will be a
591        # local node that we'll add below.  Elsewhere it will be the control
592        # portal that will port forward to the exporting master.
593        if local_seer:
594            if collect_seer:
595                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
596            else:
597                startup = self.local_seer_start
598            self.add_seer_node(topo, 'control', startup)
599        # If this is the seer master, add that node, too.
600        if seer_master:
601            self.add_seer_node(topo, 'seer-master', 
602                    "%s -R -n -R seer-master -R -A -R sink" % \
603                            self.seer_master_start)
604
605    def retrieve_software(self, topo, certfile, softdir):
606        """
607        Collect the software that nodes in the topology need loaded and stage
608        it locally.  This implies retrieving it from the experiment_controller
609        and placing it into softdir.  Certfile is used to prove that this node
610        has access to that data (it's the allocation/segment fedid).  Finally
611        local portal and federation software is also copied to the same staging
612        directory for simplicity - all software needed for experiment creation
613        is in softdir.
614        """
615        sw = set()
616        for e in topo.elements:
617            for s in getattr(e, 'software', []):
618                sw.add(s.location)
619        for s in sw:
620            self.log.debug("Retrieving %s" % s)
621            try:
622                get_url(s, certfile, softdir)
623            except:
624                t, v, st = sys.exc_info()
625                raise service_error(service_error.internal,
626                        "Error retrieving %s: %s" % (s, v))
627
628        # Copy local federation and portal node software to the tempdir
629        for s in (self.federation_software, self.portal_software):
630            for l, f in s:
631                base = os.path.basename(f)
632                copy_file(f, "%s/%s" % (softdir, base))
633
634
635    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
636        """
637        Gather common configuration files, retrieve or create an experiment
638        name and project name, and return the ssh_key filenames.  Create an
639        allocation log bound to the state log variable as well.
640        """
641        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
642                'seer_ca_pem', 'seer_node_pem')
643        ename = None
644        pubkey_base = None
645        secretkey_base = None
646        proj = None
647        user = None
648        alloc_log = None
649        nonce_experiment = False
650        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
651
652        self.state_lock.acquire()
653        if aid in self.allocation:
654            proj = self.allocation[aid].get('project', None)
655        self.state_lock.release()
656
657        if not proj:
658            raise service_error(service_error.internal, 
659                    "Can't find project for %s" %aid)
660
661        for a in attrs:
662            if a['attribute'] in configs:
663                try:
664                    self.log.debug("Retrieving %s from %s" % \
665                            (a['attribute'], a['value']))
666                    get_url(a['value'], certfile, tmpdir)
667                except:
668                    t, v, st = sys.exc_info()
669                    raise service_error(service_error.internal,
670                            "Error retrieving %s: %s" % (a.get('value', ""), v))
671            if a['attribute'] == 'ssh_pubkey':
672                pubkey_base = a['value'].rpartition('/')[2]
673            if a['attribute'] == 'ssh_secretkey':
674                secretkey_base = a['value'].rpartition('/')[2]
675            if a['attribute'] == 'experiment_name':
676                ename = a['value']
677
678        # Names longer than the emulab max are discarded
679        if ename and len(ename) <= self.max_name_len:
680            # Clean up the experiment name so that emulab will accept it.
681            ename = re.sub(vchars_re, '-', ename)
682
683        else:
684            ename = ""
685            for i in range(0,5):
686                ename += random.choice(string.ascii_letters)
687            nonce_experiment = True
688            self.log.warn("No experiment name or suggestion too long: " + \
689                    "picked one randomly: %s" % ename)
690
691        if not pubkey_base:
692            raise service_error(service_error.req, 
693                    "No public key attribute")
694
695        if not secretkey_base:
696            raise service_error(service_error.req, 
697                    "No secret key attribute")
698
699        self.state_lock.acquire()
700        if aid in self.allocation:
701            user = self.allocation[aid].get('user', None)
702            cert = self.allocation[aid].get('cert', None)
703            self.allocation[aid]['experiment'] = ename
704            self.allocation[aid]['nonce'] = nonce_experiment
705            self.allocation[aid]['log'] = [ ]
706            # Create a logger that logs to the experiment's state object as
707            # well as to the main log file.
708            alloc_log = logging.getLogger('fedd.access.%s' % ename)
709            h = logging.StreamHandler(
710                    list_log.list_log(self.allocation[aid]['log']))
711            # XXX: there should be a global one of these rather than
712            # repeating the code.
713            h.setFormatter(logging.Formatter(
714                "%(asctime)s %(name)s %(message)s",
715                        '%d %b %y %H:%M:%S'))
716            alloc_log.addHandler(h)
717            self.write_state()
718        self.state_lock.release()
719
720        if not user:
721            raise service_error(service_error.internal, 
722                    "Can't find creation user for %s" %aid)
723
724        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
725
726    def decorate_topology(self, info, t):
727        """
728        Copy the physical mapping and status onto the topology.  Used by
729        StartSegment and InfoSegment
730        """
731        def add_new(ann, attr):
732            for a in ann:
733                if a not in attr: attr.append(a)
734
735        def merge_os(os, e):
736            if len(e.os) == 0:
737                # No OS at all:
738                if os.get_attribute('emulab_access:image'):
739                    os.set_attribute('emulab_access:initial_image', 
740                            os.get_attribute('emulab_access:image'))
741                e.os = [ os ]
742            elif len(e.os) == 1:
743                # There's one OS, copy the initial image and replace
744                eos = e.os[0]
745                initial = eos.get_attribute('emulab_access:initial_image')
746                if initial:
747                    os.set_attribute('emulab_access:initial_image', initial)
748                e.os = [ os] 
749            else:
750                # Multiple OSes, replace or append
751                for eos in e.os:
752                    if os.name == eos.name:
753                        eos.version = os.version
754                        eos.version = os.distribution
755                        eos.version = os.distributionversion
756                        for a in os.attribute:
757                            if eos.get_attribute(a.attribute):
758                                eos.remove_attribute(a.attribute)
759                            eos.set_attribute(a.attribute, a.value)
760                        break
761                else:
762                    e.os.append(os)
763
764
765        if t is None: return
766        i = 0 # For fake debugging instantiation
767        # Copy the assigned names into the return topology
768        for e in t.elements:
769            if isinstance(e, topdl.Computer):
770                if not self.create_debug:
771                    if e.name in info.node:
772                        add_new(("%s%s" % 
773                            (info.node[e.name].pname, self.domain),),
774                            e.localname)
775                        add_new(("%s%s" % 
776                            (info.node[e.name].lname, self.domain),),
777                            e.localname)
778                        e.status = info.node[e.name].status
779                        os = info.node[e.name].getOS()
780                        if os: merge_os(os, e)
781                else:
782                    # Simple debugging assignment
783                    add_new(("node%d%s" % (i, self.domain),), e.localname)
784                    e.status = 'active'
785                    add_new(('testop1', 'testop2'), e.operation)
786                    i += 1
787
788        for s in t.substrates:
789            if s.name in info.subs:
790                sub = info.subs[s.name]
791                if sub.cap is not None:
792                    s.capacity = topdl.Capacity(sub.cap, 'max')
793                if sub.delay is not None:
794                    s.delay = topdl.Latency(sub.delay, 'max')
795        # XXX interfaces
796
797
798    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
799        """
800        Store key bits of experiment state in the global repository, including
801        the response that may need to be replayed, and return the response.
802        """
803        i = 0
804        t = topo.clone()
805        self.decorate_topology(starter, t)
806        # Grab the log (this is some anal locking, but better safe than
807        # sorry)
808        self.state_lock.acquire()
809        logv = "".join(self.allocation[aid]['log'])
810        # It's possible that the StartSegment call gets retried (!).
811        # if the 'started' key is in the allocation, we'll return it rather
812        # than redo the setup.
813        self.allocation[aid]['started'] = { 
814                'allocID': alloc_id,
815                'allocationLog': logv,
816                'segmentdescription': { 
817                    'topdldescription': t.to_dict()
818                    },
819                'proof': proof.to_dict(),
820                }
821        self.allocation[aid]['topo'] = t
822        retval = copy.copy(self.allocation[aid]['started'])
823        self.write_state()
824        self.state_lock.release()
825        return retval
826   
827    # End of StartSegment support routines
828
829    def StartSegment(self, req, fid):
830        err = None  # Any service_error generated after tmpdir is created
831        rv = None   # Return value from segment creation
832
833        self.log.info("StartSegment called by %s" % fid)
834        try:
835            req = req['StartSegmentRequestBody']
836            auth_attr = req['allocID']['fedid']
837            topref = req['segmentdescription']['topdldescription']
838        except KeyError:
839            raise service_error(server_error.req, "Badly formed request")
840
841        connInfo = req.get('connection', [])
842        services = req.get('service', [])
843        aid = "%s" % auth_attr
844        attrs = req.get('fedAttr', [])
845
846        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
847                with_proof=True)
848        if not access_ok:
849            self.log.info("StartSegment for %s failed: access denied" % fid)
850            raise service_error(service_error.access, "Access denied")
851        else:
852            # See if this is a replay of an earlier succeeded StartSegment -
853            # sometimes SSL kills 'em.  If so, replay the response rather than
854            # redoing the allocation.
855            self.state_lock.acquire()
856            retval = self.allocation[aid].get('started', None)
857            self.state_lock.release()
858            if retval:
859                self.log.warning("Duplicate StartSegment for %s: " % aid + \
860                        "replaying response")
861                return retval
862
863        # A new request.  Do it.
864
865        if topref: topo = topdl.Topology(**topref)
866        else:
867            raise service_error(service_error.req, 
868                    "Request missing segmentdescription'")
869       
870        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
871        try:
872            tmpdir = tempfile.mkdtemp(prefix="access-")
873            softdir = "%s/software" % tmpdir
874            os.mkdir(softdir)
875        except EnvironmentError:
876            self.log.info("StartSegment for %s failed: internal error" % fid)
877            raise service_error(service_error.internal, "Cannot create tmp dir")
878
879        # Try block alllows us to clean up temporary files.
880        try:
881            self.retrieve_software(topo, certfile, softdir)
882            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
883                alloc_log =  self.initialize_experiment_info(attrs, aid, 
884                        certfile, tmpdir)
885
886            if '/' in proj: proj, gid = proj.split('/')
887            else: gid = None
888
889
890            # Set up userconf and seer if needed
891            self.configure_userconf(services, tmpdir)
892            self.configure_seer_services(services, topo, softdir)
893            # Get and send synch store variables
894            self.export_store_info(certfile, proj, ename, connInfo)
895            self.import_store_info(certfile, connInfo)
896
897            expfile = "%s/experiment.tcl" % tmpdir
898
899            self.generate_portal_configs(topo, pubkey_base, 
900                    secretkey_base, tmpdir, proj, ename, connInfo, services)
901            self.generate_ns2(topo, expfile, 
902                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
903
904            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
905                    debug=self.create_debug, log=alloc_log, boss=self.boss,
906                    ops=self.ops, cert=xmlrpc_cert)
907            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
908        except service_error, e:
909            self.log.info("StartSegment for %s failed: %s"  % (fid, e))
910            err = e
911        except:
912            t, v, st = sys.exc_info()
913            self.log.info("StartSegment for %s failed: unexpected error:  %s" \
914                    % (fid, v, traceback.extract_tb(st)))
915            err = service_error(service_error.internal, "%s: %s" % \
916                    (v, traceback.extract_tb(st)))
917
918        # Walk up tmpdir, deleting as we go
919        if self.cleanup: self.remove_dirs(tmpdir)
920        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
921
922        if rv:
923            self.log.info("StartSegment for %s succeeded" % fid)
924            return self.finalize_experiment(starter, topo, aid, req['allocID'],
925                    proof)
926        elif err:
927            raise service_error(service_error.federant,
928                    "Swapin failed: %s" % err)
929        else:
930            raise service_error(service_error.federant, "Swapin failed")
931
932    def TerminateSegment(self, req, fid):
933        self.log.info("TerminateSegment called by %s" % fid)
934        try:
935            req = req['TerminateSegmentRequestBody']
936        except KeyError:
937            raise service_error(server_error.req, "Badly formed request")
938
939        auth_attr = req['allocID']['fedid']
940        aid = "%s" % auth_attr
941        attrs = req.get('fedAttr', [])
942
943        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
944                with_proof=True)
945        if not access_ok:
946            raise service_error(service_error.access, "Access denied")
947
948        self.state_lock.acquire()
949        if aid in self.allocation:
950            proj = self.allocation[aid].get('project', None)
951            user = self.allocation[aid].get('user', None)
952            xmlrpc_cert = self.allocation[aid].get('cert', None)
953            ename = self.allocation[aid].get('experiment', None)
954            nonce = self.allocation[aid].get('nonce', False)
955        else:
956            proj = None
957            user = None
958            ename = None
959            nonce = False
960            xmlrpc_cert = None
961        self.state_lock.release()
962
963        if not proj:
964            self.log.info("TerminateSegment failed for %s: cannot find project"\
965                    % fid)
966            raise service_error(service_error.internal, 
967                    "Can't find project for %s" % aid)
968        else:
969            if '/' in proj: proj, gid = proj.split('/')
970            else: gid = None
971
972        if not user:
973            self.log.info("TerminateSegment failed for %s: cannot find user"\
974                    % fid)
975            raise service_error(service_error.internal, 
976                    "Can't find creation user for %s" % aid)
977        if not ename:
978            self.log.info(
979                    "TerminateSegment failed for %s: cannot find experiment"\
980                    % fid)
981            raise service_error(service_error.internal, 
982                    "Can't find experiment name for %s" % aid)
983        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
984                debug=self.create_debug, boss=self.boss, ops=self.ops,
985                cert=xmlrpc_cert)
986        stopper(self, user, proj, ename, gid, nonce)
987        self.log.info("TerminateSegment succeeded for %s %s %s" % \
988                (fid, proj, ename))
989        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
990
991    def InfoSegment(self, req, fid):
992        self.log.info("InfoSegment called by %s" % fid)
993        try:
994            req = req['InfoSegmentRequestBody']
995        except KeyError:
996            raise service_error(server_error.req, "Badly formed request")
997
998        auth_attr = req['allocID']['fedid']
999        aid = "%s" % auth_attr
1000
1001        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1002                with_proof=True)
1003        if not access_ok:
1004            raise service_error(service_error.access, "Access denied")
1005
1006        self.state_lock.acquire()
1007        if aid in self.allocation:
1008            topo = self.allocation[aid].get('topo', None)
1009            if topo: topo = topo.clone()
1010            proj = self.allocation[aid].get('project', None)
1011            user = self.allocation[aid].get('user', None)
1012            xmlrpc_cert = self.allocation[aid].get('cert', None)
1013            ename = self.allocation[aid].get('experiment', None)
1014        else:
1015            proj = None
1016            user = None
1017            ename = None
1018            topo = None
1019            xmlrpc_cert = None
1020        self.state_lock.release()
1021
1022        if not proj:
1023            self.log.info("InfoSegment failed for %s: cannot find project"% fid)
1024            raise service_error(service_error.internal, 
1025                    "Can't find project for %s" % aid)
1026        else:
1027            if '/' in proj: proj, gid = proj.split('/')
1028            else: gid = None
1029
1030        if not user:
1031            self.log.info("InfoSegment failed for %s: cannot find user"% fid)
1032            raise service_error(service_error.internal, 
1033                    "Can't find creation user for %s" % aid)
1034        if not ename:
1035            self.log.info("InfoSegment failed for %s: cannot find exp"% fid)
1036            raise service_error(service_error.internal, 
1037                    "Can't find experiment name for %s" % aid)
1038        info = self.info_segment(keyfile=self.ssh_privkey_file,
1039                debug=self.create_debug, boss=self.boss, ops=self.ops,
1040                cert=xmlrpc_cert)
1041        info(self, user, proj, ename)
1042        self.log.info("InfoSegment gathered info for %s %s %s %s" % \
1043                (fid, user, proj, ename))
1044        self.decorate_topology(info, topo)
1045        self.state_lock.acquire()
1046        if aid in self.allocation:
1047            self.allocation[aid]['topo'] = topo
1048            self.write_state()
1049        self.state_lock.release()
1050        self.log.info("InfoSegment updated info for %s %s %s %s" % \
1051                (fid, user, proj, ename))
1052
1053        rv = { 
1054                'allocID': req['allocID'], 
1055                'proof': proof.to_dict(),
1056                }
1057        self.log.info("InfoSegment succeeded info for %s %s %s %s" % \
1058                (fid, user, proj, ename))
1059        if topo:
1060            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1061        return rv
1062
1063    def OperationSegment(self, req, fid):
1064        def get_pname(e):
1065            """
1066            Get the physical name of a node
1067            """
1068            if e.localname:
1069                return re.sub('\..*','', e.localname[0])
1070            else:
1071                return None
1072
1073        self.log.info("OperationSegment called by %s" % fid)
1074        try:
1075            req = req['OperationSegmentRequestBody']
1076        except KeyError:
1077            raise service_error(server_error.req, "Badly formed request")
1078
1079        auth_attr = req['allocID']['fedid']
1080        aid = "%s" % auth_attr
1081
1082        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1083                with_proof=True)
1084        if not access_ok:
1085            self.log.info("OperationSegment failed for %s: access denied" % fid)
1086            raise service_error(service_error.access, "Access denied")
1087
1088        op = req.get('operation', None)
1089        targets = req.get('target', None)
1090        params = req.get('parameter', None)
1091
1092        if op is None :
1093            self.log.info("OperationSegment failed for %s: no operation" % fid)
1094            raise service_error(service_error.req, "missing operation")
1095        elif targets is None:
1096            self.log.info("OperationSegment failed for %s: no targets" % fid)
1097            raise service_error(service_error.req, "no targets")
1098
1099        self.state_lock.acquire()
1100        if aid in self.allocation:
1101            topo = self.allocation[aid].get('topo', None)
1102            if topo: topo = topo.clone()
1103            xmlrpc_cert = self.allocation[aid].get('cert', None)
1104        else:
1105            topo = None
1106            xmlrpc_cert = None
1107        self.state_lock.release()
1108
1109        targets = copy.copy(targets)
1110        ptargets = { }
1111        for e in topo.elements:
1112            if isinstance(e, topdl.Computer):
1113                if e.name in targets:
1114                    targets.remove(e.name)
1115                    pn = get_pname(e)
1116                    if pn: ptargets[e.name] = pn
1117
1118        status = [ operation_status(t, operation_status.no_target) \
1119                for t in targets]
1120
1121        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1122                debug=self.create_debug, boss=self.boss, ops=self.ops,
1123                cert=xmlrpc_cert)
1124        ops(self, op, ptargets, params, topo)
1125        self.log.info("OperationSegment operated for %s" % fid)
1126       
1127        status.extend(ops.status)
1128        self.log.info("OperationSegment succeed for %s" % fid)
1129
1130        return { 
1131                'allocID': req['allocID'], 
1132                'status': [s.to_dict() for s in status],
1133                'proof': proof.to_dict(),
1134                }
Note: See TracBrowser for help on using the repository browser.