source: fedd/federation/emulab_access.py @ c0f409a

compt_changes
Last change on this file since c0f409a was c0f409a, checked in by Ted Faber <faber@…>, 12 years ago

Log and report misconfigured user certs

  • Property mode set to 100644
File size: 37.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from deter import fedid, generate_fedid
21from authorizer import authorizer, abac_authorizer
22from service_error import service_error
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from proof import proof as access_proof
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30from deter import topdl
31import list_log
32import emulab_segment
33
34
35# Make log messages disappear if noone configures a fedd logger
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.access")
40fl.addHandler(nullHandler())
41
42class access(access_base):
43    """
44    The implementation of access control based on mapping users to projects.
45
46    Users can be mapped to existing projects or have projects created
47    dynamically.  This implements both direct requests and proxies.
48    """
49
50    max_name_len = 19
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.max_name_len = access.max_name_len
60
61        self.allow_proxy = config.getboolean("access", "allow_proxy")
62
63        self.domain = config.get("access", "domain")
64        self.userconfdir = config.get("access","userconfdir")
65        self.userconfcmd = config.get("access","userconfcmd")
66        self.userconfurl = config.get("access","userconfurl")
67        self.federation_software = config.get("access", "federation_software")
68        self.portal_software = config.get("access", "portal_software")
69        self.local_seer_software = config.get("access", "local_seer_software")
70        self.local_seer_image = config.get("access", "local_seer_image")
71        self.local_seer_start = config.get("access", "local_seer_start")
72        self.seer_master_start = config.get("access", "seer_master_start")
73        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
74        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
75        self.ssh_port = config.get("access","ssh_port") or "22"
76        self.boss = config.get("access", "boss")
77        self.ops = config.get("access", "ops")
78        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
79        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
80
81        self.dragon_endpoint = config.get("access", "dragon")
82        self.dragon_vlans = config.get("access", "dragon_vlans")
83        self.deter_internal = config.get("access", "deter_internal")
84
85        self.tunnel_config = config.getboolean("access", "tunnel_config")
86        self.portal_command = config.get("access", "portal_command")
87        self.portal_image = config.get("access", "portal_image")
88        self.portal_type = config.get("access", "portal_type") or "pc"
89        self.portal_startcommand = config.get("access", "portal_startcommand")
90        self.node_startcommand = config.get("access", "node_startcommand")
91
92        self.federation_software = self.software_list(self.federation_software)
93        self.portal_software = self.software_list(self.portal_software)
94        self.local_seer_software = self.software_list(self.local_seer_software)
95
96        self.access_type = self.access_type.lower()
97        self.start_segment = emulab_segment.start_segment
98        self.stop_segment = emulab_segment.stop_segment
99        self.info_segment = emulab_segment.info_segment
100        self.operation_segment = emulab_segment.operation_segment
101
102        self.restricted = [ ]
103        tb = config.get('access', 'testbed')
104        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
105        else: self.testbed = [ ]
106
107        # authorization information
108        self.auth_type = config.get('access', 'auth_type') \
109                or 'abac'
110        self.auth_dir = config.get('access', 'auth_dir')
111        accessdb = config.get("access", "accessdb")
112        # initialize the authorization system
113        if self.auth_type == 'abac':
114            self.auth = abac_authorizer(load=self.auth_dir)
115            self.access = [ ]
116            if accessdb:
117                self.read_access(accessdb, self.access_tuple)
118        else:
119            raise service_error(service_error.internal, 
120                    "Unknown auth_type: %s" % self.auth_type)
121
122        # read_state in the base_class
123        self.state_lock.acquire()
124        if 'allocation' not in self.state: self.state['allocation']= { }
125        self.allocation = self.state['allocation']
126        self.state_lock.release()
127        self.exports = {
128                'SMB': self.export_SMB,
129                'seer': self.export_seer,
130                'tmcd': self.export_tmcd,
131                'userconfig': self.export_userconfig,
132                'project_export': self.export_project_export,
133                'local_seer_control': self.export_local_seer,
134                'seer_master': self.export_seer_master,
135                'hide_hosts': self.export_hide_hosts,
136                }
137
138        if not self.local_seer_image or not self.local_seer_software or \
139                not self.local_seer_start:
140            if 'local_seer_control' in self.exports:
141                del self.exports['local_seer_control']
142
143        if not self.local_seer_image or not self.local_seer_software or \
144                not self.seer_master_start:
145            if 'seer_master' in self.exports:
146                del self.exports['seer_master']
147
148
149        self.soap_services = {\
150            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
151            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
152            'StartSegment': soap_handler("StartSegment", self.StartSegment),
153            'TerminateSegment': soap_handler("TerminateSegment",
154                self.TerminateSegment),
155            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
156            'OperationSegment': soap_handler("OperationSegment",
157                self.OperationSegment),
158            }
159        self.xmlrpc_services =  {\
160            'RequestAccess': xmlrpc_handler('RequestAccess',
161                self.RequestAccess),
162            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
163                self.ReleaseAccess),
164            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
165            'TerminateSegment': xmlrpc_handler('TerminateSegment',
166                self.TerminateSegment),
167            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
168            'OperationSegment': xmlrpc_handler("OperationSegment",
169                self.OperationSegment),
170            }
171
172        self.call_SetValue = service_caller('SetValue')
173        self.call_GetValue = service_caller('GetValue', log=self.log)
174
175    @staticmethod
176    def access_tuple(str):
177        """
178        Convert a string of the form (id, id, id) into an access_project.  This
179        is called by read_access to convert to local attributes.  It returns a
180        tuple of the form (project, user, certificate_file).
181        """
182
183        str = str.strip()
184        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
185            # The slice takes the parens off the string.
186            proj, user, cert = str[1:-1].split(',')
187            return (proj.strip(), user.strip(), cert.strip())
188        else:
189            raise self.parse_error(
190                    'Bad mapping (unbalanced parens or more than 2 commas)')
191
192    # RequestAccess support routines
193
194    def save_project_state(self, aid, pname, uname, certf, owners):
195        """
196        Save the project, user, and owners associated with this allocation.
197        This creates the allocation entry.
198        """
199        self.state_lock.acquire()
200        self.allocation[aid] = { }
201        self.allocation[aid]['project'] = pname
202        self.allocation[aid]['user'] = uname
203        self.allocation[aid]['cert'] = certf
204        self.allocation[aid]['owners'] = owners
205        self.write_state()
206        self.state_lock.release()
207        return (pname, uname)
208
209    # End of RequestAccess support routines
210
211    def RequestAccess(self, req, fid):
212        """
213        Handle the access request.  Proxy if not for us.
214
215        Parse out the fields and make the allocations or rejections if for us,
216        otherwise, assuming we're willing to proxy, proxy the request out.
217        """
218
219        def gateway_hardware(h):
220            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
221            else: return h
222
223        def get_export_project(svcs):
224            """
225            if the service requests includes one to export a project, return
226            that project.
227            """
228            rv = None
229            for s in svcs:
230                if s.get('name', '') == 'project_export' and \
231                        s.get('visibility', '') == 'export':
232                    if not rv: 
233                        for a in s.get('fedAttr', []):
234                            if a.get('attribute', '') == 'project' \
235                                    and 'value' in a:
236                                rv = a['value']
237                    else:
238                        raise service_error(service_error, access, 
239                                'Requesting multiple project exports is ' + \
240                                        'not supported');
241            return rv
242
243        self.log.info("RequestAccess called by %s" % fid)
244        # The dance to get into the request body
245        if req.has_key('RequestAccessRequestBody'):
246            req = req['RequestAccessRequestBody']
247        else:
248            raise service_error(service_error.req, "No request!?")
249
250        # if this includes a project export request, construct a filter such
251        # that only the ABAC attributes mapped to that project are checked for
252        # access.
253        if 'service' in req:
254            ep = get_export_project(req['service'])
255            if ep: pf = lambda(a): a.value[0] == ep
256            else: pf = None
257        else:
258            ep = None
259            pf = None
260
261        if self.auth.import_credentials(
262                data_list=req.get('abac_credential', [])):
263            self.auth.save()
264        else:
265            self.log.debug('failed to import incoming credentials')
266
267        if self.auth_type == 'abac':
268            found, owners, proof = self.lookup_access(req, fid, filter=pf)
269        else:
270            raise service_error(service_error.internal, 
271                    'Unknown auth_type: %s' % self.auth_type)
272        ap = None
273
274        # keep track of what's been added
275        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
276        aid = unicode(allocID)
277
278        pname, uname = self.save_project_state(aid, found[0], found[1], 
279                found[2], owners)
280
281        services, svc_state = self.export_services(req.get('service',[]),
282                pname, uname)
283        self.state_lock.acquire()
284        # Store services state in global state
285        for k, v in svc_state.items():
286            self.allocation[aid][k] = v
287        self.append_allocation_authorization(aid, 
288                set([(o, allocID) for o in owners]), state_attr='allocation')
289        self.write_state()
290        self.state_lock.release()
291        try:
292            f = open("%s/%s.pem" % (self.certdir, aid), "w")
293            print >>f, alloc_cert
294            f.close()
295        except EnvironmentError, e:
296            self.log.info("RequestAccess failed for by %s: internal error" \
297                    % fid)
298            raise service_error(service_error.internal, 
299                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
300        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
301        resp = self.build_access_response({ 'fedid': allocID } ,
302                pname, services, proof)
303        return resp
304
305    def ReleaseAccess(self, req, fid):
306        self.log.info("ReleaseAccess called by %s" % fid)
307        # The dance to get into the request body
308        if req.has_key('ReleaseAccessRequestBody'):
309            req = req['ReleaseAccessRequestBody']
310        else:
311            raise service_error(service_error.req, "No request!?")
312
313        try:
314            if req['allocID'].has_key('localname'):
315                auth_attr = aid = req['allocID']['localname']
316            elif req['allocID'].has_key('fedid'):
317                aid = unicode(req['allocID']['fedid'])
318                auth_attr = req['allocID']['fedid']
319            else:
320                raise service_error(service_error.req,
321                        "Only localnames and fedids are understood")
322        except KeyError:
323            raise service_error(service_error.req, "Badly formed request")
324
325        self.log.debug("[access] deallocation requested for %s by %s" % \
326                (aid, fid))
327        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
328                with_proof=True)
329        if not access_ok:
330            self.log.debug("[access] deallocation denied for %s", aid)
331            raise service_error(service_error.access, "Access Denied")
332
333        self.state_lock.acquire()
334        if aid in self.allocation:
335            self.log.debug("Found allocation for %s" %aid)
336            self.clear_allocation_authorization(aid, state_attr='allocation')
337            del self.allocation[aid]
338            self.write_state()
339            self.state_lock.release()
340            # Remove the access cert
341            cf = "%s/%s.pem" % (self.certdir, aid)
342            self.log.debug("Removing %s" % cf)
343            os.remove(cf)
344            self.log.info("ReleaseAccess succeeded for %s" % fid)
345            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
346        else:
347            self.state_lock.release()
348            raise service_error(service_error.req, "No such allocation")
349
350    # These are subroutines for StartSegment
351    def generate_ns2(self, topo, expfn, softdir, connInfo):
352        """
353        Convert topo into an ns2 file, decorated with appropriate commands for
354        the particular testbed setup.  Convert all requests for software, etc
355        to point at the staged copies on this testbed and add the federation
356        startcommands.
357        """
358        class dragon_commands:
359            """
360            Functor to spit out approrpiate dragon commands for nodes listed in
361            the connectivity description.  The constructor makes a dict mapping
362            dragon nodes to their parameters and the __call__ checks each
363            element in turn for membership.
364            """
365            def __init__(self, map):
366                self.node_info = map
367
368            def __call__(self, e):
369                s = ""
370                if isinstance(e, topdl.Computer):
371                    if self.node_info.has_key(e.name):
372                        info = self.node_info[e.name]
373                        for ifname, vlan, type in info:
374                            for i in e.interface:
375                                if i.name == ifname:
376                                    addr = i.get_attribute('ip4_address')
377                                    subs = i.substrate[0]
378                                    break
379                            else:
380                                raise service_error(service_error.internal,
381                                        "No interface %s on element %s" % \
382                                                (ifname, e.name))
383                            # XXX: do netmask right
384                            if type =='link':
385                                s = ("tb-allow-external ${%s} " + \
386                                        "dragonportal ip %s vlan %s " + \
387                                        "netmask 255.255.255.0\n") % \
388                                        (topdl.to_tcl_name(e.name), addr, vlan)
389                            elif type =='lan':
390                                s = ("tb-allow-external ${%s} " + \
391                                        "dragonportal " + \
392                                        "ip %s vlan %s usurp %s\n") % \
393                                        (topdl.to_tcl_name(e.name), addr, 
394                                                vlan, subs)
395                            else:
396                                raise service_error(service_error_internal,
397                                        "Unknown DRAGON type %s" % type)
398                return s
399
400        class not_dragon:
401            """
402            Return true if a node is in the given map of dragon nodes.
403            """
404            def __init__(self, map):
405                self.nodes = set(map.keys())
406
407            def __call__(self, e):
408                return e.name not in self.nodes
409
410        # Main line of generate_ns2
411        t = topo.clone()
412
413        # Create the map of nodes that need direct connections (dragon
414        # connections) from the connInfo
415        dragon_map = { }
416        for i in [ i for i in connInfo if i['type'] == 'transit']:
417            for a in i.get('fedAttr', []):
418                if a['attribute'] == 'vlan_id':
419                    vlan = a['value']
420                    break
421            else:
422                raise service_error(service_error.internal, "No vlan tag")
423            members = i.get('member', [])
424            if len(members) > 1: type = 'lan'
425            else: type = 'link'
426
427            try:
428                for m in members:
429                    if m['element'] in dragon_map:
430                        dragon_map[m['element']].append(( m['interface'], 
431                            vlan, type))
432                    else:
433                        dragon_map[m['element']] = [( m['interface'], 
434                            vlan, type),]
435            except KeyError:
436                raise service_error(service_error.req,
437                        "Missing connectivity info")
438
439        # Weed out the things we aren't going to instantiate: Segments, portal
440        # substrates, and portal interfaces.  (The copy in the for loop allows
441        # us to delete from e.elements in side the for loop).  While we're
442        # touching all the elements, we also adjust paths from the original
443        # testbed to local testbed paths and put the federation commands into
444        # the start commands
445        for e in [e for e in t.elements]:
446            if isinstance(e, topdl.Segment):
447                t.elements.remove(e)
448            if isinstance(e, topdl.Computer):
449                self.add_kit(e, self.federation_software)
450                if e.get_attribute('portal') and self.portal_startcommand:
451                    # Add local portal support software
452                    self.add_kit(e, self.portal_software)
453                    # Portals never have a user-specified start command
454                    e.set_attribute('startup', self.portal_startcommand)
455                elif self.node_startcommand:
456                    if e.get_attribute('startup'):
457                        e.set_attribute('startup', "%s \\$USER '%s'" % \
458                                (self.node_startcommand, 
459                                    e.get_attribute('startup')))
460                    else:
461                        e.set_attribute('startup', self.node_startcommand)
462
463                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
464                # Remove portal interfaces that do not connect to DRAGON
465                e.interface = [i for i in e.interface \
466                        if not i.get_attribute('portal') or i.name in dinf ]
467            # Fix software paths
468            for s in getattr(e, 'software', []):
469                s.location = re.sub("^.*/", softdir, s.location)
470
471        t.substrates = [ s.clone() for s in t.substrates ]
472        t.incorporate_elements()
473
474        # Customize the ns2 output for local portal commands and images
475        filters = []
476
477        if self.dragon_endpoint:
478            add_filter = not_dragon(dragon_map)
479            filters.append(dragon_commands(dragon_map))
480        else:
481            add_filter = None
482
483        if self.portal_command:
484            filters.append(topdl.generate_portal_command_filter(
485                self.portal_command, add_filter=add_filter))
486
487        if self.portal_image:
488            filters.append(topdl.generate_portal_image_filter(
489                self.portal_image))
490
491        if self.portal_type:
492            filters.append(topdl.generate_portal_hardware_filter(
493                self.portal_type))
494
495        # Convert to ns and write it out
496        expfile = topdl.topology_to_ns2(t, filters)
497        try:
498            f = open(expfn, "w")
499            print >>f, expfile
500            f.close()
501        except EnvironmentError:
502            raise service_error(service_error.internal,
503                    "Cannot write experiment file %s: %s" % (expfn,e))
504
505    def export_store_info(self, cf, proj, ename, connInfo):
506        """
507        For the export requests in the connection info, install the peer names
508        at the experiment controller via SetValue calls.
509        """
510
511        for c in connInfo:
512            for p in [ p for p in c.get('parameter', []) \
513                    if p.get('type', '') == 'output']:
514
515                if p.get('name', '') == 'peer':
516                    k = p.get('key', None)
517                    surl = p.get('store', None)
518                    if surl and k and k.index('/') != -1:
519                        value = "%s.%s.%s%s" % \
520                                (k[k.index('/')+1:], ename, proj, self.domain)
521                        req = { 'name': k, 'value': value }
522                        self.log.debug("Setting %s to %s on %s" % \
523                                (k, value, surl))
524                        self.call_SetValue(surl, req, cf)
525                    else:
526                        self.log.error("Bad export request: %s" % p)
527                elif p.get('name', '') == 'ssh_port':
528                    k = p.get('key', None)
529                    surl = p.get('store', None)
530                    if surl and k:
531                        req = { 'name': k, 'value': self.ssh_port }
532                        self.log.debug("Setting %s to %s on %s" % \
533                                (k, self.ssh_port, surl))
534                        self.call_SetValue(surl, req, cf)
535                    else:
536                        self.log.error("Bad export request: %s" % p)
537                else:
538                    self.log.error("Unknown export parameter: %s" % \
539                            p.get('name'))
540                    continue
541
542    def add_seer_node(self, topo, name, startup):
543        """
544        Add a seer node to the given topology, with the startup command passed
545        in.  Used by configure seer_services.
546        """
547        c_node = topdl.Computer(
548                name=name, 
549                os= topdl.OperatingSystem(
550                    attribute=[
551                    { 'attribute': 'osid', 
552                        'value': self.local_seer_image },
553                    ]),
554                attribute=[
555                    { 'attribute': 'startup', 'value': startup },
556                    ]
557                )
558        self.add_kit(c_node, self.local_seer_software)
559        topo.elements.append(c_node)
560
561    def configure_seer_services(self, services, topo, softdir):
562        """
563        Make changes to the topology required for the seer requests being made.
564        Specifically, add any control or master nodes required and set up the
565        start commands on the nodes to interconnect them.
566        """
567        local_seer = False      # True if we need to add a control node
568        collect_seer = False    # True if there is a seer-master node
569        seer_master= False      # True if we need to add the seer-master
570        for s in services:
571            s_name = s.get('name', '')
572            s_vis = s.get('visibility','')
573
574            if s_name  == 'local_seer_control' and s_vis == 'export':
575                local_seer = True
576            elif s_name == 'seer_master':
577                if s_vis == 'import':
578                    collect_seer = True
579                elif s_vis == 'export':
580                    seer_master = True
581       
582        # We've got the whole picture now, so add nodes if needed and configure
583        # them to interconnect properly.
584        if local_seer or seer_master:
585            # Copy local seer control node software to the tempdir
586            for l, f in self.local_seer_software:
587                base = os.path.basename(f)
588                copy_file(f, "%s/%s" % (softdir, base))
589        # If we're collecting seers somewhere the controllers need to talk to
590        # the master.  In testbeds that export the service, that will be a
591        # local node that we'll add below.  Elsewhere it will be the control
592        # portal that will port forward to the exporting master.
593        if local_seer:
594            if collect_seer:
595                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
596            else:
597                startup = self.local_seer_start
598            self.add_seer_node(topo, 'control', startup)
599        # If this is the seer master, add that node, too.
600        if seer_master:
601            self.add_seer_node(topo, 'seer-master', 
602                    "%s -R -n -R seer-master -R -A -R sink" % \
603                            self.seer_master_start)
604
605    def retrieve_software(self, topo, certfile, softdir):
606        """
607        Collect the software that nodes in the topology need loaded and stage
608        it locally.  This implies retrieving it from the experiment_controller
609        and placing it into softdir.  Certfile is used to prove that this node
610        has access to that data (it's the allocation/segment fedid).  Finally
611        local portal and federation software is also copied to the same staging
612        directory for simplicity - all software needed for experiment creation
613        is in softdir.
614        """
615        sw = set()
616        for e in topo.elements:
617            for s in getattr(e, 'software', []):
618                sw.add(s.location)
619        for s in sw:
620            self.log.debug("Retrieving %s" % s)
621            try:
622                get_url(s, certfile, softdir)
623            except:
624                t, v, st = sys.exc_info()
625                raise service_error(service_error.internal,
626                        "Error retrieving %s: %s" % (s, v))
627
628        # Copy local federation and portal node software to the tempdir
629        for s in (self.federation_software, self.portal_software):
630            for l, f in s:
631                base = os.path.basename(f)
632                copy_file(f, "%s/%s" % (softdir, base))
633
634
635    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
636        """
637        Gather common configuration files, retrieve or create an experiment
638        name and project name, and return the ssh_key filenames.  Create an
639        allocation log bound to the state log variable as well.
640        """
641        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
642                'seer_ca_pem', 'seer_node_pem')
643        ename = None
644        pubkey_base = None
645        secretkey_base = None
646        proj = None
647        user = None
648        alloc_log = None
649        nonce_experiment = False
650        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
651
652        self.state_lock.acquire()
653        if aid in self.allocation:
654            proj = self.allocation[aid].get('project', None)
655        self.state_lock.release()
656
657        if not proj:
658            raise service_error(service_error.internal, 
659                    "Can't find project for %s" %aid)
660
661        for a in attrs:
662            if a['attribute'] in configs:
663                try:
664                    self.log.debug("Retrieving %s from %s" % \
665                            (a['attribute'], a['value']))
666                    get_url(a['value'], certfile, tmpdir)
667                except:
668                    t, v, st = sys.exc_info()
669                    raise service_error(service_error.internal,
670                            "Error retrieving %s: %s" % (a.get('value', ""), v))
671            if a['attribute'] == 'ssh_pubkey':
672                pubkey_base = a['value'].rpartition('/')[2]
673            if a['attribute'] == 'ssh_secretkey':
674                secretkey_base = a['value'].rpartition('/')[2]
675            if a['attribute'] == 'experiment_name':
676                ename = a['value']
677
678        # Names longer than the emulab max are discarded
679        if ename and len(ename) <= self.max_name_len:
680            # Clean up the experiment name so that emulab will accept it.
681            ename = re.sub(vchars_re, '-', ename)
682
683        else:
684            ename = ""
685            for i in range(0,5):
686                ename += random.choice(string.ascii_letters)
687            nonce_experiment = True
688            self.log.warn("No experiment name or suggestion too long: " + \
689                    "picked one randomly: %s" % ename)
690
691        if not pubkey_base:
692            raise service_error(service_error.req, 
693                    "No public key attribute")
694
695        if not secretkey_base:
696            raise service_error(service_error.req, 
697                    "No secret key attribute")
698
699        self.state_lock.acquire()
700        if aid in self.allocation:
701            user = self.allocation[aid].get('user', None)
702            cert = self.allocation[aid].get('cert', None)
703            self.allocation[aid]['experiment'] = ename
704            self.allocation[aid]['nonce'] = nonce_experiment
705            self.allocation[aid]['log'] = [ ]
706            # Create a logger that logs to the experiment's state object as
707            # well as to the main log file.
708            alloc_log = logging.getLogger('fedd.access.%s' % ename)
709            h = logging.StreamHandler(
710                    list_log.list_log(self.allocation[aid]['log']))
711            # XXX: there should be a global one of these rather than
712            # repeating the code.
713            h.setFormatter(logging.Formatter(
714                "%(asctime)s %(name)s %(message)s",
715                        '%d %b %y %H:%M:%S'))
716            alloc_log.addHandler(h)
717            self.write_state()
718        self.state_lock.release()
719
720        if not user:
721            raise service_error(service_error.internal, 
722                    "Can't find creation user for %s" %aid)
723
724        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
725
726    def decorate_topology(self, info, t):
727        """
728        Copy the physical mapping and status onto the topology.  Used by
729        StartSegment and InfoSegment
730        """
731        def add_new(ann, attr):
732            for a in ann:
733                if a not in attr: attr.append(a)
734
735        def merge_os(os, e):
736            if len(e.os) == 0:
737                # No OS at all:
738                if os.get_attribute('emulab_access:image'):
739                    os.set_attribute('emulab_access:initial_image', 
740                            os.get_attribute('emulab_access:image'))
741                e.os = [ os ]
742            elif len(e.os) == 1:
743                # There's one OS, copy the initial image and replace
744                eos = e.os[0]
745                initial = eos.get_attribute('emulab_access:initial_image')
746                if initial:
747                    os.set_attribute('emulab_access:initial_image', initial)
748                e.os = [ os] 
749            else:
750                # Multiple OSes, replace or append
751                for eos in e.os:
752                    if os.name == eos.name:
753                        eos.version = os.version
754                        eos.version = os.distribution
755                        eos.version = os.distributionversion
756                        for a in os.attribute:
757                            if eos.get_attribute(a.attribute):
758                                eos.remove_attribute(a.attribute)
759                            eos.set_attribute(a.attribute, a.value)
760                        break
761                else:
762                    e.os.append(os)
763
764
765        if t is None: return
766        i = 0 # For fake debugging instantiation
767        # Copy the assigned names into the return topology
768        for e in t.elements:
769            if isinstance(e, topdl.Computer):
770                if not self.create_debug:
771                    if e.name in info.node:
772                        add_new(("%s%s" % 
773                            (info.node[e.name].pname, self.domain),),
774                            e.localname)
775                        add_new(("%s%s" % 
776                            (info.node[e.name].lname, self.domain),),
777                            e.localname)
778                        e.status = info.node[e.name].status
779                        os = info.node[e.name].getOS()
780                        if os: merge_os(os, e)
781                else:
782                    # Simple debugging assignment
783                    add_new(("node%d%s" % (i, self.domain),), e.localname)
784                    e.status = 'active'
785                    add_new(('testop1', 'testop2'), e.operation)
786                    i += 1
787
788        for s in t.substrates:
789            if s.name in info.subs:
790                sub = info.subs[s.name]
791                if sub.cap is not None:
792                    s.capacity = topdl.Capacity(sub.cap, 'max')
793                if sub.delay is not None:
794                    s.delay = topdl.Latency(sub.delay, 'max')
795        # XXX interfaces
796
797
798    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
799        """
800        Store key bits of experiment state in the global repository, including
801        the response that may need to be replayed, and return the response.
802        """
803        i = 0
804        t = topo.clone()
805        self.decorate_topology(starter, t)
806        # Grab the log (this is some anal locking, but better safe than
807        # sorry)
808        self.state_lock.acquire()
809        logv = "".join(self.allocation[aid]['log'])
810        # It's possible that the StartSegment call gets retried (!).
811        # if the 'started' key is in the allocation, we'll return it rather
812        # than redo the setup.
813        self.allocation[aid]['started'] = { 
814                'allocID': alloc_id,
815                'allocationLog': logv,
816                'segmentdescription': { 
817                    'topdldescription': t.to_dict()
818                    },
819                'proof': proof.to_dict(),
820                }
821        self.allocation[aid]['topo'] = t
822        retval = copy.copy(self.allocation[aid]['started'])
823        self.write_state()
824        self.state_lock.release()
825        return retval
826   
827    # End of StartSegment support routines
828
829    def StartSegment(self, req, fid):
830        err = None  # Any service_error generated after tmpdir is created
831        rv = None   # Return value from segment creation
832
833        self.log.info("StartSegment called by %s" % fid)
834        try:
835            req = req['StartSegmentRequestBody']
836            auth_attr = req['allocID']['fedid']
837            topref = req['segmentdescription']['topdldescription']
838        except KeyError:
839            raise service_error(server_error.req, "Badly formed request")
840
841        connInfo = req.get('connection', [])
842        services = req.get('service', [])
843        aid = "%s" % auth_attr
844        attrs = req.get('fedAttr', [])
845
846        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
847                with_proof=True)
848        if not access_ok:
849            self.log.info("StartSegment for %s failed: access denied" % fid)
850            raise service_error(service_error.access, "Access denied")
851        else:
852            # See if this is a replay of an earlier succeeded StartSegment -
853            # sometimes SSL kills 'em.  If so, replay the response rather than
854            # redoing the allocation.
855            self.state_lock.acquire()
856            retval = self.allocation[aid].get('started', None)
857            self.state_lock.release()
858            if retval:
859                self.log.warning("Duplicate StartSegment for %s: " % aid + \
860                        "replaying response")
861                return retval
862
863        # A new request.  Do it.
864
865        if topref: topo = topdl.Topology(**topref)
866        else:
867            raise service_error(service_error.req, 
868                    "Request missing segmentdescription'")
869       
870        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
871        try:
872            tmpdir = tempfile.mkdtemp(prefix="access-")
873            softdir = "%s/software" % tmpdir
874            os.mkdir(softdir)
875        except EnvironmentError:
876            self.log.info("StartSegment for %s failed: internal error" % fid)
877            raise service_error(service_error.internal, "Cannot create tmp dir")
878
879        # Try block alllows us to clean up temporary files.
880        try:
881            self.retrieve_software(topo, certfile, softdir)
882            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
883                alloc_log =  self.initialize_experiment_info(attrs, aid, 
884                        certfile, tmpdir)
885            # A misconfigured cert in the ABAC map can be confusing...
886            if not os.access(xmlrpc_cert, os.R_OK):
887                self.log.error("Cannot open user's emulab SSL cert: %s" % \
888                        xmlrpc_cert)
889                raise service_error(service_error.internal,
890                        "Cannot open user's emulab SSL cert: %s" % xmlrpc_cert)
891
892
893            if '/' in proj: proj, gid = proj.split('/')
894            else: gid = None
895
896
897            # Set up userconf and seer if needed
898            self.configure_userconf(services, tmpdir)
899            self.configure_seer_services(services, topo, softdir)
900            # Get and send synch store variables
901            self.export_store_info(certfile, proj, ename, connInfo)
902            self.import_store_info(certfile, connInfo)
903
904            expfile = "%s/experiment.tcl" % tmpdir
905
906            self.generate_portal_configs(topo, pubkey_base, 
907                    secretkey_base, tmpdir, proj, ename, connInfo, services)
908            self.generate_ns2(topo, expfile, 
909                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
910
911            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
912                    debug=self.create_debug, log=alloc_log, boss=self.boss,
913                    ops=self.ops, cert=xmlrpc_cert)
914            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
915        except service_error, e:
916            self.log.info("StartSegment for %s failed: %s"  % (fid, e))
917            err = e
918        except:
919            t, v, st = sys.exc_info()
920            self.log.info("StartSegment for %s failed:unexpected error: %s" \
921                    % (fid, traceback.extract_tb(st)))
922            err = service_error(service_error.internal, "%s: %s" % \
923                    (v, traceback.extract_tb(st)))
924
925        # Walk up tmpdir, deleting as we go
926        if self.cleanup: self.remove_dirs(tmpdir)
927        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
928
929        if rv:
930            self.log.info("StartSegment for %s succeeded" % fid)
931            return self.finalize_experiment(starter, topo, aid, req['allocID'],
932                    proof)
933        elif err:
934            raise service_error(service_error.federant,
935                    "Swapin failed: %s" % err)
936        else:
937            raise service_error(service_error.federant, "Swapin failed")
938
939    def TerminateSegment(self, req, fid):
940        self.log.info("TerminateSegment called by %s" % fid)
941        try:
942            req = req['TerminateSegmentRequestBody']
943        except KeyError:
944            raise service_error(server_error.req, "Badly formed request")
945
946        auth_attr = req['allocID']['fedid']
947        aid = "%s" % auth_attr
948        attrs = req.get('fedAttr', [])
949
950        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
951                with_proof=True)
952        if not access_ok:
953            raise service_error(service_error.access, "Access denied")
954
955        self.state_lock.acquire()
956        if aid in self.allocation:
957            proj = self.allocation[aid].get('project', None)
958            user = self.allocation[aid].get('user', None)
959            xmlrpc_cert = self.allocation[aid].get('cert', None)
960            ename = self.allocation[aid].get('experiment', None)
961            nonce = self.allocation[aid].get('nonce', False)
962        else:
963            proj = None
964            user = None
965            ename = None
966            nonce = False
967            xmlrpc_cert = None
968        self.state_lock.release()
969
970        if not proj:
971            self.log.info("TerminateSegment failed for %s: cannot find project"\
972                    % fid)
973            raise service_error(service_error.internal, 
974                    "Can't find project for %s" % aid)
975        else:
976            if '/' in proj: proj, gid = proj.split('/')
977            else: gid = None
978
979        if not user:
980            self.log.info("TerminateSegment failed for %s: cannot find user"\
981                    % fid)
982            raise service_error(service_error.internal, 
983                    "Can't find creation user for %s" % aid)
984        if not ename:
985            self.log.info(
986                    "TerminateSegment failed for %s: cannot find experiment"\
987                    % fid)
988            raise service_error(service_error.internal, 
989                    "Can't find experiment name for %s" % aid)
990        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
991                debug=self.create_debug, boss=self.boss, ops=self.ops,
992                cert=xmlrpc_cert)
993        stopper(self, user, proj, ename, gid, nonce)
994        self.log.info("TerminateSegment succeeded for %s %s %s" % \
995                (fid, proj, ename))
996        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
997
998    def InfoSegment(self, req, fid):
999        self.log.info("InfoSegment called by %s" % fid)
1000        try:
1001            req = req['InfoSegmentRequestBody']
1002        except KeyError:
1003            raise service_error(server_error.req, "Badly formed request")
1004
1005        auth_attr = req['allocID']['fedid']
1006        aid = "%s" % auth_attr
1007
1008        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1009                with_proof=True)
1010        if not access_ok:
1011            raise service_error(service_error.access, "Access denied")
1012
1013        self.state_lock.acquire()
1014        if aid in self.allocation:
1015            topo = self.allocation[aid].get('topo', None)
1016            if topo: topo = topo.clone()
1017            proj = self.allocation[aid].get('project', None)
1018            user = self.allocation[aid].get('user', None)
1019            xmlrpc_cert = self.allocation[aid].get('cert', None)
1020            ename = self.allocation[aid].get('experiment', None)
1021        else:
1022            proj = None
1023            user = None
1024            ename = None
1025            topo = None
1026            xmlrpc_cert = None
1027        self.state_lock.release()
1028
1029        if not proj:
1030            self.log.info("InfoSegment failed for %s: cannot find project"% fid)
1031            raise service_error(service_error.internal, 
1032                    "Can't find project for %s" % aid)
1033        else:
1034            if '/' in proj: proj, gid = proj.split('/')
1035            else: gid = None
1036
1037        if not user:
1038            self.log.info("InfoSegment failed for %s: cannot find user"% fid)
1039            raise service_error(service_error.internal, 
1040                    "Can't find creation user for %s" % aid)
1041        if not ename:
1042            self.log.info("InfoSegment failed for %s: cannot find exp"% fid)
1043            raise service_error(service_error.internal, 
1044                    "Can't find experiment name for %s" % aid)
1045        info = self.info_segment(keyfile=self.ssh_privkey_file,
1046                debug=self.create_debug, boss=self.boss, ops=self.ops,
1047                cert=xmlrpc_cert)
1048        info(self, user, proj, ename)
1049        self.log.info("InfoSegment gathered info for %s %s %s %s" % \
1050                (fid, user, proj, ename))
1051        self.decorate_topology(info, topo)
1052        self.state_lock.acquire()
1053        if aid in self.allocation:
1054            self.allocation[aid]['topo'] = topo
1055            self.write_state()
1056        self.state_lock.release()
1057        self.log.info("InfoSegment updated info for %s %s %s %s" % \
1058                (fid, user, proj, ename))
1059
1060        rv = { 
1061                'allocID': req['allocID'], 
1062                'proof': proof.to_dict(),
1063                }
1064        self.log.info("InfoSegment succeeded info for %s %s %s %s" % \
1065                (fid, user, proj, ename))
1066        if topo:
1067            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1068        return rv
1069
1070    def OperationSegment(self, req, fid):
1071        def get_pname(e):
1072            """
1073            Get the physical name of a node
1074            """
1075            if e.localname:
1076                return re.sub('\..*','', e.localname[0])
1077            else:
1078                return None
1079
1080        self.log.info("OperationSegment called by %s" % fid)
1081        try:
1082            req = req['OperationSegmentRequestBody']
1083        except KeyError:
1084            raise service_error(server_error.req, "Badly formed request")
1085
1086        auth_attr = req['allocID']['fedid']
1087        aid = "%s" % auth_attr
1088
1089        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1090                with_proof=True)
1091        if not access_ok:
1092            self.log.info("OperationSegment failed for %s: access denied" % fid)
1093            raise service_error(service_error.access, "Access denied")
1094
1095        op = req.get('operation', None)
1096        targets = req.get('target', None)
1097        params = req.get('parameter', None)
1098
1099        if op is None :
1100            self.log.info("OperationSegment failed for %s: no operation" % fid)
1101            raise service_error(service_error.req, "missing operation")
1102        elif targets is None:
1103            self.log.info("OperationSegment failed for %s: no targets" % fid)
1104            raise service_error(service_error.req, "no targets")
1105
1106        self.state_lock.acquire()
1107        if aid in self.allocation:
1108            topo = self.allocation[aid].get('topo', None)
1109            if topo: topo = topo.clone()
1110            xmlrpc_cert = self.allocation[aid].get('cert', None)
1111        else:
1112            topo = None
1113            xmlrpc_cert = None
1114        self.state_lock.release()
1115
1116        targets = copy.copy(targets)
1117        ptargets = { }
1118        for e in topo.elements:
1119            if isinstance(e, topdl.Computer):
1120                if e.name in targets:
1121                    targets.remove(e.name)
1122                    pn = get_pname(e)
1123                    if pn: ptargets[e.name] = pn
1124
1125        status = [ operation_status(t, operation_status.no_target) \
1126                for t in targets]
1127
1128        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1129                debug=self.create_debug, boss=self.boss, ops=self.ops,
1130                cert=xmlrpc_cert)
1131        ops(self, op, ptargets, params, topo)
1132        self.log.info("OperationSegment operated for %s" % fid)
1133       
1134        status.extend(ops.status)
1135        self.log.info("OperationSegment succeed for %s" % fid)
1136
1137        return { 
1138                'allocID': req['allocID'], 
1139                'status': [s.to_dict() for s in status],
1140                'proof': proof.to_dict(),
1141                }
Note: See TracBrowser for help on using the repository browser.