source: fedd/federation/desktop_access.py @ f1f9aec

Last change on this file since f1f9aec was f1f9aec, checked in by Ted Faber <faber@…>, 10 years ago

Make quagga paths configurable

  • Property mode set to 100644
File size: 21.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import random
10import subprocess
11
12from util import *
13from deter import fedid, generate_fedid
14from authorizer import authorizer, abac_authorizer
15from service_error import service_error
16from remote_service import xmlrpc_handler, soap_handler, service_caller
17
18from deter import topdl
19
20from access import access_base
21
22# Make log messages disappear if noone configures a fedd logger.  This is
23# something of an incantation, but basically it creates a logger object
24# registered to fedd.access if no other module above us has.  It's an extra
25# belt for the suspenders.
26class nullHandler(logging.Handler):
27    def emit(self, record): pass
28
29fl = logging.getLogger("fedd.access")
30fl.addHandler(nullHandler())
31
32
33# The plug-in itself.
34class access(access_base):
35    """
36    This is a demonstration plug-in for fedd.  It responds to all the
37    experiment_control requests and keeps internal state.  The allocations it
38    makes are simple integers associated with each valid request.  It makes use
39    of the general routines in access.access_base.
40
41    Detailed comments in the code and info at
42    """
43    def __init__(self, config=None, auth=None):
44        """
45        Initializer.  Pulls parameters out of the ConfigParser's access
46        section, and initializes simple internal state.  This version reads a
47        maximum integer to assign from the configuration file, while most other
48        configuration entries  are read by the base class. 
49
50        An access database in the cannonical format is also read as well as a
51        state database that is a hash of internal state.  Routines to
52        manipulate these are in the base class, but specializations appear
53        here.
54
55        The access database maps users to a simple string.
56        """
57
58        # Calling the base initializer, which reads canonical configuration
59        # information and initializes canonical members.
60        access_base.__init__(self, config, auth)
61        # Reading the maximum integer parameter from the configuration file
62
63        self.src_addr = config.get('access', 'interface_address')
64        self.router = config.get('access', 'gateway')
65        self.hostname = config.get('access', 'hostname')
66        # Storage for ephemeral ssh keys and host files
67        self.localdir = config.get('access', 'localdir')
68        # File containing the routing entries for external networks
69        self.external_networks = config.get('access', 'external_networks')
70        # values for locations of zebra and ospfd.
71        self.zebra = config.get('access', 'zebra')
72        if self.zebra is None:
73            self.zebra = '/usr/local/sbin/zebra'
74        self.ospfd = config.get('access', 'ospfd')
75        if self.ospfd is None:
76            self.ospfd = '/usr/local/sbin/ospfd'
77
78        self.ssh_identity = None
79
80        # hostname is the name of the ssh endpoint for the other side.  That
81        # side needs it to set up routing tables.  If hostname is not
82        # available, but an IP address is, use that.
83        if self.hostname is None:
84            if  self.src_addr is None:
85                raise service_error(service_error.server_config,
86                        'Hostname or interface_address must be set in config')
87            self.hostname = self.src_addr
88       
89        self.ssh_port = config.get('access', 'ssh_port', '22')
90
91        # authorization information
92        self.auth_type = config.get('access', 'auth_type') \
93                or 'abac'
94        self.auth_dir = config.get('access', 'auth_dir')
95        accessdb = config.get("access", "accessdb")
96        # initialize the authorization system.  We make a call to
97        # read the access database that maps from authorization information
98        # into local information.  The local information is parsed by the
99        # translator above.
100        if self.auth_type == 'abac':
101            #  Load the current authorization state
102            self.auth = abac_authorizer(load=self.auth_dir)
103            self.access = [ ]
104            if accessdb:
105                try:
106                    self.read_access(accessdb)
107                except EnvironmentError, e:
108                    self.log.error("Cannot read %s: %s" % \
109                            (config.get("access", "accessdb"), e))
110                    raise e
111        else:
112            raise service_error(service_error.internal, 
113                    "Unknown auth_type: %s" % self.auth_type)
114
115        # The superclass has read the state, but if this is the first run ever,
116        # we must initialise the running flag.  This plugin only supports one
117        # connection, so StartSegment will fail when self.state['running'] is
118        # true.
119        self.state_lock.acquire()
120        if 'running' not in self.state:
121            self.state['running'] = False
122        self.state_lock.release()
123
124        # These dictionaries register the plug-in's local routines for handline
125        # these four messages with the server code above.  There's a version
126        # for SOAP and XMLRPC, depending on which interfaces the plugin
127        # supports.  There's rarely a technical reason not to support one or
128        # the other - the plugin code almost never deals with the transport -
129        # but if a plug-in writer wanted to disable XMLRPC, they could leave
130        # the self.xmlrpc_services dictionary empty.
131        self.soap_services = {\
132            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
133            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
134            'StartSegment': soap_handler("StartSegment", self.StartSegment),
135            'TerminateSegment': soap_handler("TerminateSegment", 
136                self.TerminateSegment),
137            }
138        self.xmlrpc_services =  {\
139            'RequestAccess': xmlrpc_handler('RequestAccess',
140                self.RequestAccess),
141            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
142                self.ReleaseAccess),
143            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
144            'TerminateSegment': xmlrpc_handler('TerminateSegment',
145                self.TerminateSegment),
146            }
147        self.call_SetValue = service_caller('SetValue', log=self.log)
148        self.call_GetValue = service_caller('GetValue', log=self.log)
149
150    # ReleaseAccess come from the base class, this is a slightly modified
151    # RequestAccess from the base that includes a fedAttr to force this side to
152    # be active.
153    def RequestAccess(self, req, fid):
154        """
155        Handle an access request.  Success here maps the requester into the
156        local access control space and establishes state about that user keyed
157        to a fedid.  We also save a copy of the certificate underlying that
158        fedid so this allocation can access configuration information and
159        shared parameters on the experiment controller.
160        """
161
162        self.log.info("RequestAccess called by %s" % fid)
163        # The dance to get into the request body
164        if req.has_key('RequestAccessRequestBody'):
165            req = req['RequestAccessRequestBody']
166        else:
167            raise service_error(service_error.req, "No request!?")
168
169        # Base class lookup routine.  If this fails, it throws a service
170        # exception denying access that triggers a fault response back to the
171        # caller.
172        found,  owners, proof = self.lookup_access(req, fid)
173        self.log.info(
174                "[RequestAccess] Access granted local creds %s" % found)
175        # Make a fedid for this allocation
176        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
177        aid = unicode(allocID)
178
179        # Store the data about this allocation:
180        self.state_lock.acquire()
181        self.state[aid] = { }
182        self.state[aid]['user'] = found
183        self.state[aid]['owners'] = owners
184        self.state[aid]['auth'] = set()
185        # Authorize the creating fedid and the principal representing the
186        # allocation to manipulate it.
187        self.append_allocation_authorization(aid, 
188                ((fid, allocID), (allocID, allocID)))
189        self.write_state()
190        self.state_lock.release()
191
192        # Create a directory to stash the certificate in, ans stash it.
193        try:
194            f = open("%s/%s.pem" % (self.certdir, aid), "w")
195            print >>f, alloc_cert
196            f.close()
197        except EnvironmentError, e:
198            raise service_error(service_error.internal, 
199                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
200        self.log.debug('[RequestAccess] Returning allocation ID: %s' % allocID)
201        msg = { 
202                'allocID': { 'fedid': allocID }, 
203                'fedAttr': [{ 'attribute': 'nat_portals', 'value': 'True' }],
204                'proof': proof.to_dict()
205                }
206        return msg
207
208    def validate_topology(self, top):
209        '''
210        Validate the topology.  Desktops can only be single connections.
211        Though the topology will include a portal and a node, the access
212        controller will implement both on one node.
213
214        As more capabilities are added to the contoller the constraints here
215        will relax.
216        '''
217
218        comps = []
219        for e in top.elements:
220            if isinstance(e, topdl.Computer): comps.append(e)
221        if len(comps) > 2: 
222            raise service_error(service_error.req,
223                    "Desktop only supports 1-node subexperiments")
224
225        portals = 0
226        for c in comps:
227            if c.get_attribute('portal') is not None: 
228                portals += 1
229                continue
230            if len(c.interface) > 1:
231                raise service_error(service_error.req,
232                        "Desktop Node has more than one interface")
233            i  = c.interface[0]
234            if len(i.subs) > 1: 
235                raise service_error(service_error.req,
236                        "Desktop Node has more than one substate on interface")
237            sub = i.subs[0]
238            for i in sub.interfaces:
239                if i.element not in comps:
240                    raise service_error(service_error.req,
241                            "Desktop Node connected to non-portal")
242
243        if portals > 1:
244            raise service_error(service_error.req,
245                    "Desktop segment has more than one portal")
246        return True
247
248    def validate_connInfo(self, connInfo):
249        if len(connInfo) != 1: 
250            raise service_error(service_error.req,
251                    "Desktop segment requests multiple connections")
252        if connInfo[0]['type'] != 'ssh':
253            raise service_error(service_error.req,
254                    "Desktop segment requires ssh connecton")
255        return True
256
257    def export_store_info(self, certfile, connInfo):
258        '''
259        Tell the other portal node where to reach this desktop.  The other side
260        uses this information to set up routing, though the ssh_port is unused
261        as the Desktop always initiates ssh connections.
262        '''
263        values = { 'peer': self.hostname, 'ssh_port': self.ssh_port }
264        for c in connInfo:
265            for p in c.get('parameter', []):
266                if p.get('type','') == 'input': continue
267                pname = p.get('name', '')
268                key = p.get('key', '')
269                surl = p.get('store', '')
270                if pname not in values:
271                    self.log('Unknown export parameter: %s'  % pname)
272                    continue
273                val = values[pname]
274                req = { 'name': key, 'value': val }
275                self.log.debug('Setting %s (%s) to %s on %s' % \
276                        (pname, key,  val, surl))
277                self.call_SetValue(surl, req, certfile)
278
279    def set_route(self, dest, script, gw=None, src=None):
280        if sys.platform.startswith('freebsd'):
281            if src is not None and gw is not None:
282                raise service_error(service_error.internal, 
283                        'FreeBSD will not route based on src address')
284            elif src is not None:
285                raise service_error(service_error.internal, 
286                        'FreeBSD will not route based on src address')
287            elif gw is not None:
288                print >>script, 'route add %s %s' % (dest, gw)
289        elif sys.platform.startswith('linux'):
290            if src is not None and gw is not None:
291                print >>script, 'ip route add %s via %s src %s' % \
292                        (dest, gw, src)
293            elif src is not None:
294                print >>script, 'ip route add %s src %s' % \
295                        (dest, src)
296            elif gw is not None:
297                print >>script, 'ip route add %s via %s' % (dest, gw)
298        else:
299            raise service_error(service_error.internal, 
300                    'Unknown platform %s' % sys.platform)
301
302    def unset_route(self, dest, script):
303        rv = 0
304        if sys.platform.startswith('freebsd'):
305            print >>script, 'route delete %s' % dest
306        elif sys.platform.startswith('linux'):
307            print >>script, 'ip route delete %s' % dest
308
309    def find_a_peer(self, addr): 
310        '''
311        Find another node in the experiment that's on our subnet.  This is a
312        hack to handle the problem that we really cannot require the desktop to
313        dynamically route.  Will be improved by distributing static routes.
314        '''
315
316        peer = None
317        hosts = os.path.join(self.localdir, 'hosts')
318        p = addr.rfind('.')
319        if p == -1:
320            raise service_error(service_error.req, 'bad address in topology')
321        prefix = addr[0:p]
322        addr_re = re.compile('(%s.\\d+)' % prefix)
323        try:
324            f = open(hosts, 'r')
325            for line in f:
326                m = addr_re.search(line)
327                if m is not None and m.group(1) != addr:
328                    peer = m.group(1)
329                    break
330            else:
331                raise service_error(service_error.req, 
332                        'No other nodes in this subnet??')
333        except EnvironmentError, e:
334            raise service_error(service_error.internal, 
335                    'Cannot open %s: %s' % (e.filename, e.strerror))
336        return peer
337
338
339
340
341    def configure_desktop(self, top, connInfo):
342        '''
343        Build the connection.  Establish routing to the peer if using a
344        separate interface, wait until the other end confirms setup, establish
345        the ssh layer-two tunnel (tap), assign the in-experiment IP address to
346        the tunnel and establish routing to the experiment through the tap.
347        '''
348
349
350        # get the peer and ssh port from the portal and our IP from the other
351        peer = None
352        port = None
353        my_addr = None
354        my_name = None
355        for e in top.elements:
356            if not isinstance(e, topdl.Computer): continue
357            if e.get_attribute('portal') is None: 
358                my_name = e.name
359                # there should be one interface with one IPv4 address
360                if len(e.interface) <1 :
361                    raise service_error(service_error.internal,
362                            'No interface on experiment node!?!?')
363                my_addr = e.interface[0].get_attribute('ip4_address')
364            else:
365                for ci in connInfo:
366                    if ci.get('portal', '') != e.name: continue
367                    peer = ci.get('peer')
368                    port = '22'
369                    for a in ci.get('fedAttr', []):
370                        if a['attribute'] == 'ssh_port': port = a['value']
371
372        # XXX scan hosts for IP addresses and compose better routing entry
373       
374        if not all([peer, port, my_addr]):
375            raise service_error(service_error.req, 
376                    'Cannot find all config parameters %s %s %s' % (peer, port, my_addr))
377
378        exp_peer = self.find_a_peer(my_addr)
379
380        cscript = os.path.join(self.localdir, 'connect')
381        dscript = os.path.join(self.localdir, 'disconnect')
382        local_hosts = os.path.join(self.localdir, 'hosts')
383        zebra_conf = os.path.join(self.localdir, 'zebra.conf')
384        ospfd_conf = os.path.join(self.localdir, 'ospfd.conf')
385        try:
386            f = open(cscript, 'w')
387            print >>f, '#!/bin/sh'
388            # This picks the outgoing interface to the experiment using the
389            # routing system.
390            self.set_route(peer, f, self.router, self.src_addr)
391            # Wait until the other end reports that it is configured py placing
392            # a file this end can access into its local file system.  Try once
393            # a minute.
394            print >>f,'while ! /usr/bin/scp -o "StrictHostKeyChecking no" -i %s %s:/usr/local/federation/etc/prep_done /dev/null; do' % (self.ssh_identity, peer)
395            print >>f, 'sleep 60; done'
396            print >>f, ('ssh -w 0:0 -p %s -o "Tunnel ethernet" ' + \
397                    '-o "StrictHostKeyChecking no" -i %s %s perl -I/usr/local/federation/lib /usr/local/federation/bin/setup_bridge.pl --tapno=0 --addr=%s &') % \
398                    (port, self.ssh_identity, peer, my_addr)
399            # This should give the tap a a chance to come up
400            print >>f,'sleep 10'
401            # Add experiment nodes to hosts
402            print >>f, 'cp /etc/hosts /etc/hosts.DETER.fedd.hold'
403            print >>f, 'echo "#--- BEGIN FEDD ADDITIONS ---" >> /etc/hosts'
404            print >>f, 'cat %s >> /etc/hosts' % local_hosts
405            print >>f, 'echo "#--- END FEDD ADDITIONS ---" >> /etc/hosts'
406            # Assign tap address and route experiment connections through it.
407            print >>f, 'ifconfig tap0 %s netmask 255.255.255.0 up' % \
408                    my_addr
409            # self.set_route('10.0.0.0/8', f, exp_peer)
410            print >>f, '%s -d -f %s' % (self.zebra, zebra_conf)
411            print >>f, '%s -d -f %s' % (self.ospfd, ospfd_conf)
412            f.close()
413            os.chmod(cscript, 0755)
414            f = open(dscript, 'w')
415            print >>f, '#!/bin/sh'
416            print >>f, 'ifconfig tap0 destroy'
417            self.unset_route(peer, f)
418            #self.unset_route('10.0.0.0/8', f)
419            print >>f, 'mv /etc/hosts.DETER.fedd.hold /etc/hosts'
420            print >>f, 'kill `cat /var/run/quagga/ospfd.pid`'
421            print >>f, 'kill `cat /var/run/quagga/zebra.pid`'
422            f.close()
423            os.chmod(dscript, 0755)
424            f = open(zebra_conf, 'w')
425            print >>f, 'hostname %s' % my_name
426            print >>f, 'interface tap0'
427            if  self.external_networks is not None:
428                try:
429                    extern = open(self.external_networks, 'r')
430                    if extern is not None:
431                        for l in extern:
432                            print >>f, "%s" % l.strip()
433                        extern.close()
434                except EnvironmentError:
435                    # No external_networks or problem reading it, ignore
436                    pass
437            f.close()
438            os.chmod(zebra_conf, 0644)
439            f = open(ospfd_conf, 'w')
440            print >>f, 'hostname %s' % my_name
441            print >>f, 'router ospf'
442            print >>f, ' redistribute static'
443            print >>f, ' network %s/24 area 0.0.0.2' % my_addr
444        except EnvironmentError, e:
445            raise service_error(service_error.internal, 
446                    'Cannot create connect %s: %s' % (e.filename, e.strerror))
447        script_log = open('/tmp/connect.log', 'w')
448        subprocess.Popen(['sudo', '/bin/sh', cscript], stdout=script_log, stderr=script_log)
449        return True
450
451    def StartSegment(self, req, fid):
452        """
453        Start a segment.  In this simple skeleton, this means to parse the
454        request and assign an unassigned integer to it.  We store the integer
455        in the persistent state.
456        """
457        try:
458            req = req['StartSegmentRequestBody']
459            # Get the request topology.  If not present, a KeyError is thrown.
460            topref = req['segmentdescription']['topdldescription']
461            # The fedid of the allocation we're attaching resources to
462            auth_attr = req['allocID']['fedid']
463        except KeyError:
464            raise service_error(service_error.req, "Badly formed request")
465
466        # String version of the allocation ID for keying
467        aid = "%s" % auth_attr
468        # Authorization check
469        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
470                with_proof=True)
471        if not access_ok:
472            raise service_error(service_error.access, "Access denied", 
473                    proof=proof)
474        else:
475            # See if this is a replay of an earlier succeeded StartSegment -
476            # sometimes SSL kills 'em.  If so, replay the response rather than
477            # redoing the allocation.
478            self.state_lock.acquire()
479            # Test and set :-)
480            running = self.state['running']
481            self.state['running'] = True
482            retval = self.state[aid].get('started', None)
483            self.state_lock.release()
484            if retval:
485                self.log.warning(
486                        "[StartSegment] Duplicate StartSegment for %s: " \
487                                % aid + \
488                        "replaying response")
489                return retval
490            if running:
491                self.log.debug('[StartSegment] already running')
492                raise service_error(service_error.federant,
493                        'Desktop is already in an experiment')
494
495        certfile = "%s/%s.pem" % (self.certdir, aid)
496
497        # Convert the topology into topdl data structures.  Again, the
498        # skeletion doesn't do anything with it, but this is how one parses a
499        # topology request.
500        if topref: topo = topdl.Topology(**topref)
501        else:
502            raise service_error(service_error.req, 
503                    "Request missing segmentdescription'")
504
505        err = None
506        try:
507            self.validate_topology(topo)
508
509            # The attributes of the request.  The ones we care about are the ssh
510            # keys to operate the tunnel.
511            attrs = req.get('fedAttr', [])
512            for a in attrs:
513                # Save the hosts and ssh_privkeys to our local dir
514                if a['attribute'] in ('hosts', 'ssh_secretkey'):
515                    self.log.debug('Getting %s from %s' % \
516                            (a['attribute'], a['value']))
517                    get_url(a['value'], certfile, self.localdir, log=self.log)
518                    base = os.path.basename(a['value'])
519                    if a['attribute'] == 'ssh_secretkey':
520                        self.ssh_identity = os.path.join(self.localdir, base)
521                    os.chmod(os.path.join(self.localdir, base), 0600)
522                else:
523                    self.log.debug('Ignoring attribute %s' % a['attribute'])
524
525            # Gather connection information and exchange parameters.
526            connInfo = req.get('connection', [])
527            self.validate_connInfo(connInfo)
528            self.export_store_info(certfile, connInfo)
529            self.import_store_info(certfile, connInfo)
530
531            #build it
532            self.configure_desktop(topo, connInfo)
533        except service_error, e:
534            err = e
535
536        # Save the information
537        if err is None:
538            # It's possible that the StartSegment call gets retried (!).  if
539            # the 'started' key is in the allocation, we'll return it rather
540            # than redo the setup.  The integer allocation was saved when we
541            # made it.
542            self.state_lock.acquire()
543            self.state[aid]['started'] = { 
544                    'allocID': req['allocID'],
545                    'allocationLog': "Allocatation complete",
546                    'segmentdescription': { 'topdldescription': topo.to_dict() },
547                    'proof': proof.to_dict(),
548                    }
549            retval = copy.deepcopy(self.state[aid]['started'])
550            self.write_state()
551            self.state_lock.release()
552        else:
553            # Something bad happened - clear the "running" flag so we can try
554            # again
555            self.state_lock.acquire()
556            self.state['running'] = False
557            self.state_lock.release()
558            raise err
559
560        return retval
561
562    def TerminateSegment(self, req, fid):
563        """
564        Remove the resources associated with th eallocation and stop the music.
565        In this example, this simply means removing the integer we allocated.
566        """
567        # Gather the same access information as for Start Segment
568        try:
569            req = req['TerminateSegmentRequestBody']
570        except KeyError:
571            raise service_error(service_error.req, "Badly formed request")
572
573        auth_attr = req['allocID']['fedid']
574        aid = "%s" % auth_attr
575
576        self.log.debug("Terminate request for %s" %aid)
577        # Check authorization
578        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
579                with_proof=True)
580        if not access_ok:
581            raise service_error(service_error.access, "Access denied", 
582                    proof=proof)
583        cscript = os.path.join(self.localdir, 'connect')
584        dscript = os.path.join(self.localdir, 'disconnect')
585        # Do the work of disconnecting
586        if os.path.exists(dscript):
587            self.log.debug('calling %s' % dscript)
588            rv = subprocess.call(['sudo', '/bin/sh', dscript])
589            if rv != 0:
590                self.log.warning('%s had an error: %d' % (dscript, rv))
591        else:
592            self.log.warn('No disconnection script!?')
593
594        try:
595            for bfn in os.listdir(self.localdir):
596                fn = os.path.join(self.localdir, bfn)
597                self.log.debug('Removing %s' % fn)
598                if os.path.exists(fn):
599                    os.remove(fn)
600        except EnvironmentError, e:
601            self.log.warn('Failed to remove %s: %s' % (e.filename, e.strerror))
602
603        self.ssh_identity = None
604
605        self.state_lock.acquire()
606        self.state['running'] = False
607        self.state_lock.release()
608   
609        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
Note: See TracBrowser for help on using the repository browser.