source: fedd/federation/emulab_access.py @ e62fb86

Last change on this file since e62fb86 was e62fb86, checked in by Ted Faber <faber@…>, 12 years ago

Avoid failing when started is unset

  • Property mode set to 100644
File size: 39.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13import socket
14
15from threading import *
16from M2Crypto.SSL import SSLError
17
18from access import access_base
19
20from util import *
21from deter import fedid, generate_fedid
22from authorizer import authorizer, abac_authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25from proof import proof as access_proof
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31from deter import topdl
32import list_log
33import emulab_segment
34
35
36# Make log messages disappear if noone configures a fedd logger
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.access")
41fl.addHandler(nullHandler())
42
43class access(access_base):
44    """
45    The implementation of access control based on mapping users to projects.
46
47    Users can be mapped to existing projects or have projects created
48    dynamically.  This implements both direct requests and proxies.
49    """
50
51    max_name_len = 19
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        access_base.__init__(self, config, auth)
59
60        self.max_name_len = access.max_name_len
61
62        self.allow_proxy = config.getboolean("access", "allow_proxy")
63
64        self.domain = config.get("access", "domain")
65        self.userconfdir = config.get("access","userconfdir")
66        self.userconfcmd = config.get("access","userconfcmd")
67        self.userconfurl = config.get("access","userconfurl")
68        self.federation_software = config.get("access", "federation_software")
69        self.portal_software = config.get("access", "portal_software")
70        self.local_seer_software = config.get("access", "local_seer_software")
71        self.local_seer_image = config.get("access", "local_seer_image")
72        self.local_seer_start = config.get("access", "local_seer_start")
73        self.seer_master_start = config.get("access", "seer_master_start")
74        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
75        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
76        self.ssh_port = config.get("access","ssh_port") or "22"
77        self.boss = config.get("access", "boss")
78        self.ops = config.get("access", "ops")
79        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
80        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
81
82        self.dragon_endpoint = config.get("access", "dragon")
83        self.dragon_vlans = config.get("access", "dragon_vlans")
84        self.deter_internal = config.get("access", "deter_internal")
85
86        self.tunnel_config = config.getboolean("access", "tunnel_config")
87        self.portal_command = config.get("access", "portal_command")
88        self.portal_image = config.get("access", "portal_image")
89        self.portal_type = config.get("access", "portal_type") or "pc"
90        self.portal_startcommand = config.get("access", "portal_startcommand")
91        self.node_startcommand = config.get("access", "node_startcommand")
92        self.nat_portal = config.get("access", "nat_portal")
93
94        self.uri = 'https://%s:%d' % (socket.getfqdn(), 
95                self.get_port(config.get("globals", "services", "23235")))
96
97        self.federation_software = self.software_list(self.federation_software)
98        self.portal_software = self.software_list(self.portal_software)
99        self.local_seer_software = self.software_list(self.local_seer_software)
100
101        self.access_type = self.access_type.lower()
102        self.start_segment = emulab_segment.start_segment
103        self.stop_segment = emulab_segment.stop_segment
104        self.info_segment = emulab_segment.info_segment
105        self.operation_segment = emulab_segment.operation_segment
106
107        self.restricted = [ ]
108        tb = config.get('access', 'testbed')
109        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
110        else: self.testbed = [ ]
111
112        # authorization information
113        self.auth_type = config.get('access', 'auth_type') \
114                or 'abac'
115        self.auth_dir = config.get('access', 'auth_dir')
116        accessdb = config.get("access", "accessdb")
117        # initialize the authorization system
118        if self.auth_type == 'abac':
119            self.auth = abac_authorizer(load=self.auth_dir)
120            self.access = [ ]
121            if accessdb:
122                self.read_access(accessdb, self.access_tuple)
123        else:
124            raise service_error(service_error.internal, 
125                    "Unknown auth_type: %s" % self.auth_type)
126
127        # read_state in the base_class
128        self.state_lock.acquire()
129        if 'allocation' not in self.state: self.state['allocation']= { }
130        self.allocation = self.state['allocation']
131        self.state_lock.release()
132        self.exports = {
133                'SMB': self.export_SMB,
134                'seer': self.export_seer,
135                'tmcd': self.export_tmcd,
136                'userconfig': self.export_userconfig,
137                'project_export': self.export_project_export,
138                'local_seer_control': self.export_local_seer,
139                'seer_master': self.export_seer_master,
140                'hide_hosts': self.export_hide_hosts,
141                }
142
143        if not self.local_seer_image or not self.local_seer_software or \
144                not self.local_seer_start:
145            if 'local_seer_control' in self.exports:
146                del self.exports['local_seer_control']
147
148        if not self.local_seer_image or not self.local_seer_software or \
149                not self.seer_master_start:
150            if 'seer_master' in self.exports:
151                del self.exports['seer_master']
152
153
154        self.soap_services = {\
155            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
156            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
157            'StartSegment': soap_handler("StartSegment", self.StartSegment),
158            'TerminateSegment': soap_handler("TerminateSegment",
159                self.TerminateSegment),
160            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
161            'OperationSegment': soap_handler("OperationSegment",
162                self.OperationSegment),
163            }
164        self.xmlrpc_services =  {\
165            'RequestAccess': xmlrpc_handler('RequestAccess',
166                self.RequestAccess),
167            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
168                self.ReleaseAccess),
169            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
170            'TerminateSegment': xmlrpc_handler('TerminateSegment',
171                self.TerminateSegment),
172            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
173            'OperationSegment': xmlrpc_handler("OperationSegment",
174                self.OperationSegment),
175            }
176
177        self.call_SetValue = service_caller('SetValue', log=self.log)
178        self.call_GetValue = service_caller('GetValue', log=self.log)
179
180    @staticmethod
181    def get_port(ps):
182        '''
183        Take a fedd service string and return the first port.  Used in
184        creating the testbed uri identifier.
185        '''
186        p = ps.split(',')
187        smallport = p[0].split(':')
188        try:
189            rv = int(smallport[0])
190        except ValueError:
191            rv = 23235
192        return rv
193
194    @staticmethod
195    def access_tuple(str):
196        """
197        Convert a string of the form (id, id, id) into an access_project.  This
198        is called by read_access to convert to local attributes.  It returns a
199        tuple of the form (project, user, certificate_file).
200        """
201
202        str = str.strip()
203        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
204            # The slice takes the parens off the string.
205            proj, user, cert = str[1:-1].split(',')
206            return (proj.strip(), user.strip(), cert.strip())
207        else:
208            raise self.parse_error(
209                    'Bad mapping (unbalanced parens or more than 2 commas)')
210
211    # RequestAccess support routines
212
213    def save_project_state(self, aid, pname, uname, certf, owners):
214        """
215        Save the project, user, and owners associated with this allocation.
216        This creates the allocation entry.
217        """
218        self.state_lock.acquire()
219        self.allocation[aid] = { }
220        self.allocation[aid]['project'] = pname
221        self.allocation[aid]['user'] = uname
222        self.allocation[aid]['cert'] = certf
223        self.allocation[aid]['owners'] = owners
224        self.write_state()
225        self.state_lock.release()
226        return (pname, uname)
227
228    # End of RequestAccess support routines
229
230    def RequestAccess(self, req, fid):
231        """
232        Handle the access request.  Proxy if not for us.
233
234        Parse out the fields and make the allocations or rejections if for us,
235        otherwise, assuming we're willing to proxy, proxy the request out.
236        """
237
238        def gateway_hardware(h):
239            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
240            else: return h
241
242        def get_export_project(svcs):
243            """
244            if the service requests includes one to export a project, return
245            that project.
246            """
247            rv = None
248            for s in svcs:
249                if s.get('name', '') == 'project_export' and \
250                        s.get('visibility', '') == 'export':
251                    if not rv: 
252                        for a in s.get('fedAttr', []):
253                            if a.get('attribute', '') == 'project' \
254                                    and 'value' in a:
255                                rv = a['value']
256                    else:
257                        raise service_error(service_error, access, 
258                                'Requesting multiple project exports is ' + \
259                                        'not supported');
260            return rv
261
262        self.log.info("RequestAccess called by %s" % fid)
263        # The dance to get into the request body
264        if req.has_key('RequestAccessRequestBody'):
265            req = req['RequestAccessRequestBody']
266        else:
267            raise service_error(service_error.req, "No request!?")
268
269        # if this includes a project export request, construct a filter such
270        # that only the ABAC attributes mapped to that project are checked for
271        # access.
272        if 'service' in req:
273            ep = get_export_project(req['service'])
274            if ep: pf = lambda(a): a.value[0] == ep
275            else: pf = None
276        else:
277            ep = None
278            pf = None
279
280        if self.auth.import_credentials(
281                data_list=req.get('abac_credential', [])):
282            self.auth.save()
283        else:
284            self.log.debug('failed to import incoming credentials')
285
286        if self.auth_type == 'abac':
287            found, owners, proof = self.lookup_access(req, fid, filter=pf)
288        else:
289            raise service_error(service_error.internal, 
290                    'Unknown auth_type: %s' % self.auth_type)
291        ap = None
292
293        # keep track of what's been added
294        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
295        aid = unicode(allocID)
296
297        pname, uname = self.save_project_state(aid, found[0], found[1], 
298                found[2], owners)
299
300        services, svc_state = self.export_services(req.get('service',[]),
301                pname, uname)
302        self.state_lock.acquire()
303        # Store services state in global state
304        for k, v in svc_state.items():
305            self.allocation[aid][k] = v
306        self.append_allocation_authorization(aid, 
307                set([(o, allocID) for o in owners]), state_attr='allocation')
308        self.write_state()
309        self.state_lock.release()
310        try:
311            f = open("%s/%s.pem" % (self.certdir, aid), "w")
312            print >>f, alloc_cert
313            f.close()
314        except EnvironmentError, e:
315            self.log.info("RequestAccess failed for by %s: internal error" \
316                    % fid)
317            raise service_error(service_error.internal, 
318                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
319        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
320        resp = self.build_access_response({ 'fedid': allocID } ,
321                pname, services, proof)
322        return resp
323
324    def ReleaseAccess(self, req, fid):
325        self.log.info("ReleaseAccess called by %s" % fid)
326        # The dance to get into the request body
327        if req.has_key('ReleaseAccessRequestBody'):
328            req = req['ReleaseAccessRequestBody']
329        else:
330            raise service_error(service_error.req, "No request!?")
331
332        try:
333            if req['allocID'].has_key('localname'):
334                auth_attr = aid = req['allocID']['localname']
335            elif req['allocID'].has_key('fedid'):
336                aid = unicode(req['allocID']['fedid'])
337                auth_attr = req['allocID']['fedid']
338            else:
339                raise service_error(service_error.req,
340                        "Only localnames and fedids are understood")
341        except KeyError:
342            raise service_error(service_error.req, "Badly formed request")
343
344        self.log.debug("[access] deallocation requested for %s by %s" % \
345                (aid, fid))
346        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
347                with_proof=True)
348        if not access_ok:
349            self.log.debug("[access] deallocation denied for %s", aid)
350            raise service_error(service_error.access, "Access Denied")
351
352        self.state_lock.acquire()
353        if aid in self.allocation:
354            self.log.debug("Found allocation for %s" %aid)
355            self.clear_allocation_authorization(aid, state_attr='allocation')
356            del self.allocation[aid]
357            self.write_state()
358            self.state_lock.release()
359            # Remove the access cert
360            cf = "%s/%s.pem" % (self.certdir, aid)
361            self.log.debug("Removing %s" % cf)
362            os.remove(cf)
363            self.log.info("ReleaseAccess succeeded for %s" % fid)
364            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
365        else:
366            self.state_lock.release()
367            raise service_error(service_error.req, "No such allocation")
368
369    # These are subroutines for StartSegment
370    def generate_ns2(self, topo, expfn, softdir, connInfo):
371        """
372        Convert topo into an ns2 file, decorated with appropriate commands for
373        the particular testbed setup.  Convert all requests for software, etc
374        to point at the staged copies on this testbed and add the federation
375        startcommands.
376        """
377        class dragon_commands:
378            """
379            Functor to spit out approrpiate dragon commands for nodes listed in
380            the connectivity description.  The constructor makes a dict mapping
381            dragon nodes to their parameters and the __call__ checks each
382            element in turn for membership.
383            """
384            def __init__(self, map):
385                self.node_info = map
386
387            def __call__(self, e):
388                s = ""
389                if isinstance(e, topdl.Computer):
390                    if self.node_info.has_key(e.name):
391                        info = self.node_info[e.name]
392                        for ifname, vlan, type in info:
393                            for i in e.interface:
394                                if i.name == ifname:
395                                    addr = i.get_attribute('ip4_address')
396                                    subs = i.substrate[0]
397                                    break
398                            else:
399                                raise service_error(service_error.internal,
400                                        "No interface %s on element %s" % \
401                                                (ifname, e.name))
402                            # XXX: do netmask right
403                            if type =='link':
404                                s = ("tb-allow-external ${%s} " + \
405                                        "dragonportal ip %s vlan %s " + \
406                                        "netmask 255.255.255.0\n") % \
407                                        (topdl.to_tcl_name(e.name), addr, vlan)
408                            elif type =='lan':
409                                s = ("tb-allow-external ${%s} " + \
410                                        "dragonportal " + \
411                                        "ip %s vlan %s usurp %s\n") % \
412                                        (topdl.to_tcl_name(e.name), addr, 
413                                                vlan, subs)
414                            else:
415                                raise service_error(service_error_internal,
416                                        "Unknown DRAGON type %s" % type)
417                return s
418
419        class not_dragon:
420            """
421            Return true if a node is in the given map of dragon nodes.
422            """
423            def __init__(self, map):
424                self.nodes = set(map.keys())
425
426            def __call__(self, e):
427                return e.name not in self.nodes
428
429        def have_portals(top):
430            '''
431            Return true if the topology has a portal node
432            '''
433            # The else is on the for
434            for e in top.elements:
435                if isinstance(e, topdl.Computer) and e.get_attribute('portal'):
436                    return True
437            else:
438                return False
439
440
441        # Main line of generate_ns2
442        t = topo.clone()
443
444        # Create the map of nodes that need direct connections (dragon
445        # connections) from the connInfo
446        dragon_map = { }
447        for i in [ i for i in connInfo if i['type'] == 'transit']:
448            for a in i.get('fedAttr', []):
449                if a['attribute'] == 'vlan_id':
450                    vlan = a['value']
451                    break
452            else:
453                raise service_error(service_error.internal, "No vlan tag")
454            members = i.get('member', [])
455            if len(members) > 1: type = 'lan'
456            else: type = 'link'
457
458            try:
459                for m in members:
460                    if m['element'] in dragon_map:
461                        dragon_map[m['element']].append(( m['interface'], 
462                            vlan, type))
463                    else:
464                        dragon_map[m['element']] = [( m['interface'], 
465                            vlan, type),]
466            except KeyError:
467                raise service_error(service_error.req,
468                        "Missing connectivity info")
469
470        def output_fixed_filter(e):
471            if not isinstance(e, topdl.Computer): return ""
472            fn = e.get_attribute('fixed')
473            if fn is None:
474                return ""
475            else:
476                return 'tb-fix-node ${%s} %s' % (topdl.to_tcl_name(e.name), fn)
477
478        # Weed out the things we aren't going to instantiate: Segments, portal
479        # substrates, and portal interfaces.  (The copy in the for loop allows
480        # us to delete from e.elements in side the for loop).  While we're
481        # touching all the elements, we also adjust paths from the original
482        # testbed to local testbed paths and put the federation commands into
483        # the start commands
484        local = len(dragon_map) == 0 and not have_portals(t)
485        if local: routing = 'Static'
486        else: routing = 'Manual'
487
488        if local:
489            self.log.debug("Local experiment.")
490        for e in [e for e in t.elements]:
491            if isinstance(e, topdl.Segment):
492                t.elements.remove(e)
493            if isinstance(e, topdl.Computer):
494                self.add_kit(e, self.federation_software)
495                if e.get_attribute('portal') and self.portal_startcommand:
496                    # Add local portal support software
497                    self.add_kit(e, self.portal_software)
498                    # Portals never have a user-specified start command
499                    e.set_attribute('startup', self.portal_startcommand)
500                elif not local and self.node_startcommand:
501                    if e.get_attribute('startup'):
502                        e.set_attribute('startup', "%s \\$USER '%s'" % \
503                                (self.node_startcommand, 
504                                    e.get_attribute('startup')))
505                    else:
506                        e.set_attribute('startup', self.node_startcommand)
507
508                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
509                # Remove portal interfaces that do not connect to DRAGON
510                e.interface = [i for i in e.interface \
511                        if not i.get_attribute('portal') or i.name in dinf ]
512            # Fix software paths
513            for s in getattr(e, 'software', []):
514                s.location = re.sub("^.*/", softdir, s.location)
515
516        t.substrates = [ s.clone() for s in t.substrates ]
517        t.incorporate_elements()
518
519        # Customize the ns2 output for local portal commands and images
520        filters = [output_fixed_filter]
521
522        if self.dragon_endpoint:
523            add_filter = not_dragon(dragon_map)
524            filters.append(dragon_commands(dragon_map))
525        else:
526            add_filter = None
527
528        if self.portal_command:
529            filters.append(topdl.generate_portal_command_filter(
530                self.portal_command, add_filter=add_filter))
531
532        if self.portal_image:
533            filters.append(topdl.generate_portal_image_filter(
534                self.portal_image))
535
536        if self.portal_type:
537            filters.append(topdl.generate_portal_hardware_filter(
538                self.portal_type))
539
540        # Convert to ns and write it out
541        expfile = topdl.topology_to_ns2(t, filters, routing=routing)
542        try:
543            f = open(expfn, "w")
544            print >>f, expfile
545            f.close()
546        except EnvironmentError:
547            raise service_error(service_error.internal,
548                    "Cannot write experiment file %s: %s" % (expfn,e))
549
550    def export_store_info(self, cf, proj, ename, connInfo):
551        """
552        For the export requests in the connection info, install the peer names
553        at the experiment controller via SetValue calls.
554        """
555
556        for c in connInfo:
557            for p in [ p for p in c.get('parameter', []) \
558                    if p.get('type', '') == 'output']:
559
560                if p.get('name', '') == 'peer':
561                    k = p.get('key', None)
562                    surl = p.get('store', None)
563                    if surl :
564                        if self.nat_portal:
565                            value = self.nat_portal
566                        elif k and k.index('/') != -1:
567                            value = "%s.%s.%s%s" % \
568                                (k[k.index('/')+1:], ename, proj, self.domain)
569                        else: 
570                            self.log.error("Bad export request: %s" % p)
571                            continue
572                        self.log.debug("Setting %s to %s on %s" % \
573                                (k, value, surl))
574                        req = { 'name': k, 'value': value }
575                        self.call_SetValue(surl, req, cf)
576                    else:
577                        self.log.error("Bad export request: %s" % p)
578                elif p.get('name', '') == 'ssh_port':
579                    k = p.get('key', None)
580                    surl = p.get('store', None)
581                    if surl and k:
582                        req = { 'name': k, 'value': self.ssh_port }
583                        self.log.debug("Setting %s to %s on %s" % \
584                                (k, self.ssh_port, surl))
585                        self.call_SetValue(surl, req, cf)
586                    else:
587                        self.log.error("Bad export request: %s" % p)
588                else:
589                    self.log.error("Unknown export parameter: %s" % \
590                            p.get('name'))
591                    continue
592
593    def add_seer_node(self, topo, name, startup):
594        """
595        Add a seer node to the given topology, with the startup command passed
596        in.  Used by configure seer_services.
597        """
598        c_node = topdl.Computer(
599                name=name, 
600                os= topdl.OperatingSystem(
601                    attribute=[
602                    { 'attribute': 'osid', 
603                        'value': self.local_seer_image },
604                    ]),
605                attribute=[
606                    { 'attribute': 'startup', 'value': startup },
607                    ]
608                )
609        self.add_kit(c_node, self.local_seer_software)
610        topo.elements.append(c_node)
611
612    def configure_seer_services(self, services, topo, softdir):
613        """
614        Make changes to the topology required for the seer requests being made.
615        Specifically, add any control or master nodes required and set up the
616        start commands on the nodes to interconnect them.
617        """
618        local_seer = False      # True if we need to add a control node
619        collect_seer = False    # True if there is a seer-master node
620        seer_master= False      # True if we need to add the seer-master
621        for s in services:
622            s_name = s.get('name', '')
623            s_vis = s.get('visibility','')
624
625            if s_name  == 'local_seer_control' and s_vis == 'export':
626                local_seer = True
627            elif s_name == 'seer_master':
628                if s_vis == 'import':
629                    collect_seer = True
630                elif s_vis == 'export':
631                    seer_master = True
632       
633        # We've got the whole picture now, so add nodes if needed and configure
634        # them to interconnect properly.
635        if local_seer or seer_master:
636            # Copy local seer control node software to the tempdir
637            for l, f in self.local_seer_software:
638                base = os.path.basename(f)
639                copy_file(f, "%s/%s" % (softdir, base))
640        # If we're collecting seers somewhere the controllers need to talk to
641        # the master.  In testbeds that export the service, that will be a
642        # local node that we'll add below.  Elsewhere it will be the control
643        # portal that will port forward to the exporting master.
644        if local_seer:
645            if collect_seer:
646                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
647            else:
648                startup = self.local_seer_start
649            self.add_seer_node(topo, 'control', startup)
650        # If this is the seer master, add that node, too.
651        if seer_master:
652            self.add_seer_node(topo, 'seer-master', 
653                    "%s -R -n -R seer-master -R -A -R sink" % \
654                            self.seer_master_start)
655
656    def retrieve_software(self, topo, certfile, softdir):
657        """
658        Collect the software that nodes in the topology need loaded and stage
659        it locally.  This implies retrieving it from the experiment_controller
660        and placing it into softdir.  Certfile is used to prove that this node
661        has access to that data (it's the allocation/segment fedid).  Finally
662        local portal and federation software is also copied to the same staging
663        directory for simplicity - all software needed for experiment creation
664        is in softdir.
665        """
666        sw = set()
667        for e in topo.elements:
668            for s in getattr(e, 'software', []):
669                sw.add(s.location)
670        for s in sw:
671            self.log.debug("Retrieving %s" % s)
672            try:
673                get_url(s, certfile, softdir, log=self.log)
674            except:
675                t, v, st = sys.exc_info()
676                raise service_error(service_error.internal,
677                        "Error retrieving %s: %s" % (s, v))
678
679        # Copy local federation and portal node software to the tempdir
680        for s in (self.federation_software, self.portal_software):
681            for l, f in s:
682                base = os.path.basename(f)
683                copy_file(f, "%s/%s" % (softdir, base))
684
685
686    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
687        """
688        Gather common configuration files, retrieve or create an experiment
689        name and project name, and return the ssh_key filenames.  Create an
690        allocation log bound to the state log variable as well.
691        """
692        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
693                'seer_ca_pem', 'seer_node_pem')
694        ename = None
695        pubkey_base = None
696        secretkey_base = None
697        proj = None
698        user = None
699        alloc_log = None
700        nonce_experiment = False
701        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
702
703        self.state_lock.acquire()
704        if aid in self.allocation:
705            proj = self.allocation[aid].get('project', None)
706        self.state_lock.release()
707
708        if not proj:
709            raise service_error(service_error.internal, 
710                    "Can't find project for %s" %aid)
711
712        for a in attrs:
713            if a['attribute'] in configs:
714                try:
715                    self.log.debug("Retrieving %s from %s" % \
716                            (a['attribute'], a['value']))
717                    get_url(a['value'], certfile, tmpdir, log=self.log)
718                except:
719                    t, v, st = sys.exc_info()
720                    raise service_error(service_error.internal,
721                            "Error retrieving %s: %s" % (a.get('value', ""), v))
722            if a['attribute'] == 'ssh_pubkey':
723                pubkey_base = a['value'].rpartition('/')[2]
724            if a['attribute'] == 'ssh_secretkey':
725                secretkey_base = a['value'].rpartition('/')[2]
726            if a['attribute'] == 'experiment_name':
727                ename = a['value']
728
729        # Names longer than the emulab max are discarded
730        if ename and len(ename) <= self.max_name_len:
731            # Clean up the experiment name so that emulab will accept it.
732            ename = re.sub(vchars_re, '-', ename)
733
734        else:
735            ename = ""
736            for i in range(0,5):
737                ename += random.choice(string.ascii_letters)
738            nonce_experiment = True
739            self.log.warn("No experiment name or suggestion too long: " + \
740                    "picked one randomly: %s" % ename)
741
742        if not pubkey_base:
743            raise service_error(service_error.req, 
744                    "No public key attribute")
745
746        if not secretkey_base:
747            raise service_error(service_error.req, 
748                    "No secret key attribute")
749
750        self.state_lock.acquire()
751        if aid in self.allocation:
752            user = self.allocation[aid].get('user', None)
753            cert = self.allocation[aid].get('cert', None)
754            self.allocation[aid]['experiment'] = ename
755            self.allocation[aid]['nonce'] = nonce_experiment
756            self.allocation[aid]['log'] = [ ]
757            # Create a logger that logs to the experiment's state object as
758            # well as to the main log file.
759            alloc_log = logging.getLogger('fedd.access.%s' % ename)
760            h = logging.StreamHandler(
761                    list_log.list_log(self.allocation[aid]['log']))
762            # XXX: there should be a global one of these rather than
763            # repeating the code.
764            h.setFormatter(logging.Formatter(
765                "%(asctime)s %(name)s %(message)s",
766                        '%d %b %y %H:%M:%S'))
767            alloc_log.addHandler(h)
768            self.write_state()
769        self.state_lock.release()
770
771        if not user:
772            raise service_error(service_error.internal, 
773                    "Can't find creation user for %s" %aid)
774
775        return (ename, proj, user, cert, pubkey_base, secretkey_base, alloc_log)
776
777    def decorate_topology(self, info, t):
778        """
779        Copy the physical mapping and status onto the topology.  Used by
780        StartSegment and InfoSegment
781        """
782        def add_new(ann, attr):
783            for a in ann:
784                if a not in attr: attr.append(a)
785
786        def merge_os(os, e):
787            if len(e.os) == 0:
788                # No OS at all:
789                if os.get_attribute('emulab_access:image'):
790                    os.set_attribute('emulab_access:initial_image', 
791                            os.get_attribute('emulab_access:image'))
792                e.os = [ os ]
793            elif len(e.os) == 1:
794                # There's one OS, copy the initial image and replace
795                eos = e.os[0]
796                initial = eos.get_attribute('emulab_access:initial_image')
797                if initial:
798                    os.set_attribute('emulab_access:initial_image', initial)
799                e.os = [ os] 
800            else:
801                # Multiple OSes, replace or append
802                for eos in e.os:
803                    if os.name == eos.name:
804                        eos.version = os.version
805                        eos.version = os.distribution
806                        eos.version = os.distributionversion
807                        for a in os.attribute:
808                            if eos.get_attribute(a.attribute):
809                                eos.remove_attribute(a.attribute)
810                            eos.set_attribute(a.attribute, a.value)
811                        break
812                else:
813                    e.os.append(os)
814
815
816        if t is None: return
817        i = 0 # For fake debugging instantiation
818        # Copy the assigned names into the return topology
819        for e in t.elements:
820            if isinstance(e, topdl.Computer):
821                if not self.create_debug:
822                    if e.name in info.node:
823                        add_new(("%s%s" % 
824                            (info.node[e.name].pname, self.domain),),
825                            e.localname)
826                        add_new(("%s%s" % 
827                            (info.node[e.name].lname, self.domain),),
828                            e.localname)
829                        e.status = info.node[e.name].status
830                        os = info.node[e.name].getOS()
831                        if os: merge_os(os, e)
832                else:
833                    # Simple debugging assignment
834                    add_new(("node%d%s" % (i, self.domain),), e.localname)
835                    e.status = 'active'
836                    add_new(('testop1', 'testop2'), e.operation)
837                    i += 1
838
839        for s in t.substrates:
840            if s.name in info.subs:
841                sub = info.subs[s.name]
842                if sub.cap is not None:
843                    s.capacity = topdl.Capacity(sub.cap, 'max')
844                if sub.delay is not None:
845                    s.delay = topdl.Latency(sub.delay, 'max')
846        # XXX interfaces
847
848
849    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
850        """
851        Store key bits of experiment state in the global repository, including
852        the response that may need to be replayed, and return the response.
853        """
854        def get_localnames(t):
855            names = [ ]
856            for e in t.elements:
857                if isinstance(e, topdl.Computer):
858                    n = e.get_attribute('testbed')
859                    if n is not None and n not in names:
860                        names.append(n)
861            return names
862        i = 0
863        t = topo.clone()
864        self.decorate_topology(starter, t)
865        # Grab the log (this is some anal locking, but better safe than
866        # sorry)
867        self.state_lock.acquire()
868        # Put information about this testbed into the topdl
869        tb = topdl.Testbed(self.uri, "deter",
870                localname=get_localnames(t), 
871                attribute=[ 
872                    { 
873                        'attribute': 'project', 
874                        'value': self.allocation[aid]['project']
875                    },
876                    { 
877                        'attribute': 'experiment', 
878                        'value': self.allocation[aid]['experiment']
879                    }])
880        t.elements.append(tb)
881        logv = "".join(self.allocation[aid]['log'])
882        # It's possible that the StartSegment call gets retried (!).
883        # if the 'started' key is in the allocation, we'll return it rather
884        # than redo the setup.
885        self.allocation[aid]['started'] = { 
886                'allocID': alloc_id,
887                'allocationLog': logv,
888                'segmentdescription': { 
889                    'topdldescription': t.to_dict()
890                    },
891                'proof': proof.to_dict(),
892                }
893        self.allocation[aid]['topo'] = t
894        retval = copy.copy(self.allocation[aid]['started'])
895        self.write_state()
896        self.state_lock.release()
897        return retval
898   
899    # End of StartSegment support routines
900
901    def StartSegment(self, req, fid):
902        err = None  # Any service_error generated after tmpdir is created
903        rv = None   # Return value from segment creation
904
905        self.log.info("StartSegment called by %s" % fid)
906        try:
907            req = req['StartSegmentRequestBody']
908            auth_attr = req['allocID']['fedid']
909            topref = req['segmentdescription']['topdldescription']
910        except KeyError:
911            raise service_error(server_error.req, "Badly formed request")
912
913        connInfo = req.get('connection', [])
914        services = req.get('service', [])
915        aid = "%s" % auth_attr
916        attrs = req.get('fedAttr', [])
917
918        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
919                with_proof=True)
920        if not access_ok:
921            self.log.info("StartSegment for %s failed: access denied" % fid)
922            raise service_error(service_error.access, "Access denied")
923        else:
924            # See if this is a replay of an earlier succeeded StartSegment -
925            # sometimes SSL kills 'em.  If so, replay the response rather than
926            # redoing the allocation.
927            self.state_lock.acquire()
928            retval = self.allocation[aid].get('started', None)
929            self.state_lock.release()
930            if retval:
931                self.log.warning("Duplicate StartSegment for %s: " % aid + \
932                        "replaying response")
933                return retval
934
935        # A new request.  Do it.
936
937        if topref: topo = topdl.Topology(**topref)
938        else:
939            raise service_error(service_error.req, 
940                    "Request missing segmentdescription'")
941       
942        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
943        try:
944            tmpdir = tempfile.mkdtemp(prefix="access-")
945            softdir = "%s/software" % tmpdir
946            os.mkdir(softdir)
947        except EnvironmentError:
948            self.log.info("StartSegment for %s failed: internal error" % fid)
949            raise service_error(service_error.internal, "Cannot create tmp dir")
950
951        # Try block alllows us to clean up temporary files.
952        try:
953            self.retrieve_software(topo, certfile, softdir)
954            ename, proj, user, xmlrpc_cert, pubkey_base, secretkey_base, \
955                alloc_log =  self.initialize_experiment_info(attrs, aid, 
956                        certfile, tmpdir)
957            # A misconfigured cert in the ABAC map can be confusing...
958            if not os.access(xmlrpc_cert, os.R_OK):
959                self.log.error("Cannot open user's emulab SSL cert: %s" % \
960                        xmlrpc_cert)
961                raise service_error(service_error.internal,
962                        "Cannot open user's emulab SSL cert: %s" % xmlrpc_cert)
963
964
965            if '/' in proj: proj, gid = proj.split('/')
966            else: gid = None
967
968
969            # Set up userconf and seer if needed
970            self.configure_userconf(services, tmpdir)
971            self.configure_seer_services(services, topo, softdir)
972            # Get and send synch store variables
973            self.export_store_info(certfile, proj, ename, connInfo)
974            self.import_store_info(certfile, connInfo)
975
976            expfile = "%s/experiment.tcl" % tmpdir
977
978            self.generate_portal_configs(topo, pubkey_base, 
979                    secretkey_base, tmpdir, proj, ename, connInfo, services)
980            self.generate_ns2(topo, expfile, 
981                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
982
983            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
984                    debug=self.create_debug, log=alloc_log, boss=self.boss,
985                    ops=self.ops, cert=xmlrpc_cert)
986            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
987        except service_error, e:
988            self.log.info("StartSegment for %s failed: %s"  % (fid, e))
989            err = e
990        except:
991            t, v, st = sys.exc_info()
992            self.log.info("StartSegment for %s failed:unexpected error: %s" \
993                    % (fid, traceback.extract_tb(st)))
994            err = service_error(service_error.internal, "%s: %s" % \
995                    (v, traceback.extract_tb(st)))
996
997        # Walk up tmpdir, deleting as we go
998        if self.cleanup: self.remove_dirs(tmpdir)
999        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1000
1001        if rv:
1002            self.log.info("StartSegment for %s succeeded" % fid)
1003            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1004                    proof)
1005        elif err:
1006            raise service_error(service_error.federant,
1007                    "Swapin failed: %s" % err)
1008        else:
1009            raise service_error(service_error.federant, "Swapin failed")
1010
1011    def TerminateSegment(self, req, fid):
1012        self.log.info("TerminateSegment called by %s" % fid)
1013        try:
1014            req = req['TerminateSegmentRequestBody']
1015        except KeyError:
1016            raise service_error(server_error.req, "Badly formed request")
1017
1018        auth_attr = req['allocID']['fedid']
1019        aid = "%s" % auth_attr
1020        attrs = req.get('fedAttr', [])
1021
1022        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1023                with_proof=True)
1024        if not access_ok:
1025            raise service_error(service_error.access, "Access denied")
1026
1027        self.state_lock.acquire()
1028        if aid in self.allocation:
1029            proj = self.allocation[aid].get('project', None)
1030            user = self.allocation[aid].get('user', None)
1031            xmlrpc_cert = self.allocation[aid].get('cert', None)
1032            ename = self.allocation[aid].get('experiment', None)
1033            nonce = self.allocation[aid].get('nonce', False)
1034        else:
1035            proj = None
1036            user = None
1037            ename = None
1038            nonce = False
1039            xmlrpc_cert = None
1040        self.state_lock.release()
1041
1042        if not proj:
1043            self.log.info("TerminateSegment failed for %s: cannot find project"\
1044                    % fid)
1045            raise service_error(service_error.internal, 
1046                    "Can't find project for %s" % aid)
1047        else:
1048            if '/' in proj: proj, gid = proj.split('/')
1049            else: gid = None
1050
1051        if not user:
1052            self.log.info("TerminateSegment failed for %s: cannot find user"\
1053                    % fid)
1054            raise service_error(service_error.internal, 
1055                    "Can't find creation user for %s" % aid)
1056        if not ename:
1057            self.log.info(
1058                    "TerminateSegment failed for %s: cannot find experiment"\
1059                    % fid)
1060            raise service_error(service_error.internal, 
1061                    "Can't find experiment name for %s" % aid)
1062       
1063        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1064                debug=self.create_debug, boss=self.boss, ops=self.ops,
1065                cert=xmlrpc_cert)
1066        stopper(self, user, proj, ename, gid, nonce)
1067        self.log.info("TerminateSegment succeeded for %s %s %s" % \
1068                (fid, proj, ename))
1069        self.state_lock.acquire()
1070        # Remove the started flag/info - the segment is no longer started
1071        if aid in self.allocation:
1072            if 'started' in self.allocation[aid]:
1073                del self.allocation[aid]['started']
1074            self.write_state()
1075        self.state_lock.release()
1076        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1077
1078    def InfoSegment(self, req, fid):
1079        self.log.info("InfoSegment called by %s" % fid)
1080        try:
1081            req = req['InfoSegmentRequestBody']
1082        except KeyError:
1083            raise service_error(server_error.req, "Badly formed request")
1084
1085        auth_attr = req['allocID']['fedid']
1086        aid = "%s" % auth_attr
1087
1088        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1089                with_proof=True)
1090        if not access_ok:
1091            raise service_error(service_error.access, "Access denied")
1092
1093        self.state_lock.acquire()
1094        if aid in self.allocation:
1095            topo = self.allocation[aid].get('topo', None)
1096            if topo: topo = topo.clone()
1097            proj = self.allocation[aid].get('project', None)
1098            user = self.allocation[aid].get('user', None)
1099            xmlrpc_cert = self.allocation[aid].get('cert', None)
1100            ename = self.allocation[aid].get('experiment', None)
1101        else:
1102            proj = None
1103            user = None
1104            ename = None
1105            topo = None
1106            xmlrpc_cert = None
1107        self.state_lock.release()
1108
1109        if not proj:
1110            self.log.info("InfoSegment failed for %s: cannot find project"% fid)
1111            raise service_error(service_error.internal, 
1112                    "Can't find project for %s" % aid)
1113        else:
1114            if '/' in proj: proj, gid = proj.split('/')
1115            else: gid = None
1116
1117        if not user:
1118            self.log.info("InfoSegment failed for %s: cannot find user"% fid)
1119            raise service_error(service_error.internal, 
1120                    "Can't find creation user for %s" % aid)
1121        if not ename:
1122            self.log.info("InfoSegment failed for %s: cannot find exp"% fid)
1123            raise service_error(service_error.internal, 
1124                    "Can't find experiment name for %s" % aid)
1125        info = self.info_segment(keyfile=self.ssh_privkey_file,
1126                debug=self.create_debug, boss=self.boss, ops=self.ops,
1127                cert=xmlrpc_cert)
1128        info(self, user, proj, ename)
1129        self.log.info("InfoSegment gathered info for %s %s %s %s" % \
1130                (fid, user, proj, ename))
1131        self.decorate_topology(info, topo)
1132        self.state_lock.acquire()
1133        if aid in self.allocation:
1134            self.allocation[aid]['topo'] = topo
1135            self.write_state()
1136        self.state_lock.release()
1137        self.log.info("InfoSegment updated info for %s %s %s %s" % \
1138                (fid, user, proj, ename))
1139
1140        rv = { 
1141                'allocID': req['allocID'], 
1142                'proof': proof.to_dict(),
1143                }
1144        self.log.info("InfoSegment succeeded info for %s %s %s %s" % \
1145                (fid, user, proj, ename))
1146        if topo:
1147            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1148        return rv
1149
1150    def OperationSegment(self, req, fid):
1151        def get_pname(e):
1152            """
1153            Get the physical name of a node
1154            """
1155            if e.localname:
1156                return re.sub('\..*','', e.localname[0])
1157            else:
1158                return None
1159
1160        self.log.info("OperationSegment called by %s" % fid)
1161        try:
1162            req = req['OperationSegmentRequestBody']
1163        except KeyError:
1164            raise service_error(server_error.req, "Badly formed request")
1165
1166        auth_attr = req['allocID']['fedid']
1167        aid = "%s" % auth_attr
1168
1169        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1170                with_proof=True)
1171        if not access_ok:
1172            self.log.info("OperationSegment failed for %s: access denied" % fid)
1173            raise service_error(service_error.access, "Access denied")
1174
1175        op = req.get('operation', None)
1176        targets = req.get('target', None)
1177        params = req.get('parameter', None)
1178
1179        if op is None :
1180            self.log.info("OperationSegment failed for %s: no operation" % fid)
1181            raise service_error(service_error.req, "missing operation")
1182        elif targets is None:
1183            self.log.info("OperationSegment failed for %s: no targets" % fid)
1184            raise service_error(service_error.req, "no targets")
1185
1186        self.state_lock.acquire()
1187        if aid in self.allocation:
1188            topo = self.allocation[aid].get('topo', None)
1189            if topo: topo = topo.clone()
1190            xmlrpc_cert = self.allocation[aid].get('cert', None)
1191        else:
1192            topo = None
1193            xmlrpc_cert = None
1194        self.state_lock.release()
1195
1196        targets = copy.copy(targets)
1197        ptargets = { }
1198        for e in topo.elements:
1199            if isinstance(e, topdl.Computer):
1200                if e.name in targets:
1201                    targets.remove(e.name)
1202                    pn = get_pname(e)
1203                    if pn: ptargets[e.name] = pn
1204
1205        status = [ operation_status(t, operation_status.no_target) \
1206                for t in targets]
1207
1208        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1209                debug=self.create_debug, boss=self.boss, ops=self.ops,
1210                cert=xmlrpc_cert)
1211        ops(self, op, ptargets, params, topo)
1212        self.log.info("OperationSegment operated for %s" % fid)
1213       
1214        status.extend(ops.status)
1215        self.log.info("OperationSegment succeed for %s" % fid)
1216
1217        return { 
1218                'allocID': req['allocID'], 
1219                'status': [s.to_dict() for s in status],
1220                'proof': proof.to_dict(),
1221                }
Note: See TracBrowser for help on using the repository browser.