source: fedd/federation/desktop_access.py @ ae714e4

Last change on this file since ae714e4 was ae714e4, checked in by Ted Faber <faber@…>, 11 years ago

External networks a real parameter

  • Property mode set to 100644
File size: 21.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import random
10import subprocess
11
12from util import *
13from deter import fedid, generate_fedid
14from authorizer import authorizer, abac_authorizer
15from service_error import service_error
16from remote_service import xmlrpc_handler, soap_handler, service_caller
17
18from deter import topdl
19
20from access import access_base
21
22# Make log messages disappear if noone configures a fedd logger.  This is
23# something of an incantation, but basically it creates a logger object
24# registered to fedd.access if no other module above us has.  It's an extra
25# belt for the suspenders.
26class nullHandler(logging.Handler):
27    def emit(self, record): pass
28
29fl = logging.getLogger("fedd.access")
30fl.addHandler(nullHandler())
31
32
33# The plug-in itself.
34class access(access_base):
35    """
36    This is a demonstration plug-in for fedd.  It responds to all the
37    experiment_control requests and keeps internal state.  The allocations it
38    makes are simple integers associated with each valid request.  It makes use
39    of the general routines in access.access_base.
40
41    Detailed comments in the code and info at
42    """
43    def __init__(self, config=None, auth=None):
44        """
45        Initializer.  Pulls parameters out of the ConfigParser's access
46        section, and initializes simple internal state.  This version reads a
47        maximum integer to assign from the configuration file, while most other
48        configuration entries  are read by the base class. 
49
50        An access database in the cannonical format is also read as well as a
51        state database that is a hash of internal state.  Routines to
52        manipulate these are in the base class, but specializations appear
53        here.
54
55        The access database maps users to a simple string.
56        """
57
58        # Calling the base initializer, which reads canonical configuration
59        # information and initializes canonical members.
60        access_base.__init__(self, config, auth)
61        # Reading the maximum integer parameter from the configuration file
62
63        self.src_addr = config.get('access', 'interface_address')
64        self.router = config.get('access', 'gateway')
65        self.hostname = config.get('access', 'hostname')
66        # Storage for ephemeral ssh keys and host files
67        self.localdir = config.get('access', 'localdir')
68        # File containing the routing entries for external networks
69        self.external_networks = config.get('access', 'external_networks')
70        self.ssh_identity = None
71
72        # hostname is the name of the ssh endpoint for the other side.  That
73        # side needs it to set up routing tables.  If hostname is not
74        # available, but an IP address is, use that.
75        if self.hostname is None:
76            if  self.src_addr is None:
77                raise service_error(service_error.server_config,
78                        'Hostname or interface_address must be set in config')
79            self.hostname = self.src_addr
80       
81        self.ssh_port = config.get('access', 'ssh_port', '22')
82
83        # authorization information
84        self.auth_type = config.get('access', 'auth_type') \
85                or 'abac'
86        self.auth_dir = config.get('access', 'auth_dir')
87        accessdb = config.get("access", "accessdb")
88        # initialize the authorization system.  We make a call to
89        # read the access database that maps from authorization information
90        # into local information.  The local information is parsed by the
91        # translator above.
92        if self.auth_type == 'abac':
93            #  Load the current authorization state
94            self.auth = abac_authorizer(load=self.auth_dir)
95            self.access = [ ]
96            if accessdb:
97                try:
98                    self.read_access(accessdb)
99                except EnvironmentError, e:
100                    self.log.error("Cannot read %s: %s" % \
101                            (config.get("access", "accessdb"), e))
102                    raise e
103        else:
104            raise service_error(service_error.internal, 
105                    "Unknown auth_type: %s" % self.auth_type)
106
107        # The superclass has read the state, but if this is the first run ever,
108        # we must initialise the running flag.  This plugin only supports one
109        # connection, so StartSegment will fail when self.state['running'] is
110        # true.
111        self.state_lock.acquire()
112        if 'running' not in self.state:
113            self.state['running'] = False
114        self.state_lock.release()
115
116        # These dictionaries register the plug-in's local routines for handline
117        # these four messages with the server code above.  There's a version
118        # for SOAP and XMLRPC, depending on which interfaces the plugin
119        # supports.  There's rarely a technical reason not to support one or
120        # the other - the plugin code almost never deals with the transport -
121        # but if a plug-in writer wanted to disable XMLRPC, they could leave
122        # the self.xmlrpc_services dictionary empty.
123        self.soap_services = {\
124            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
125            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
126            'StartSegment': soap_handler("StartSegment", self.StartSegment),
127            'TerminateSegment': soap_handler("TerminateSegment", 
128                self.TerminateSegment),
129            }
130        self.xmlrpc_services =  {\
131            'RequestAccess': xmlrpc_handler('RequestAccess',
132                self.RequestAccess),
133            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
134                self.ReleaseAccess),
135            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
136            'TerminateSegment': xmlrpc_handler('TerminateSegment',
137                self.TerminateSegment),
138            }
139        self.call_SetValue = service_caller('SetValue', log=self.log)
140        self.call_GetValue = service_caller('GetValue', log=self.log)
141
142    # ReleaseAccess come from the base class, this is a slightly modified
143    # RequestAccess from the base that includes a fedAttr to force this side to
144    # be active.
145    def RequestAccess(self, req, fid):
146        """
147        Handle an access request.  Success here maps the requester into the
148        local access control space and establishes state about that user keyed
149        to a fedid.  We also save a copy of the certificate underlying that
150        fedid so this allocation can access configuration information and
151        shared parameters on the experiment controller.
152        """
153
154        self.log.info("RequestAccess called by %s" % fid)
155        # The dance to get into the request body
156        if req.has_key('RequestAccessRequestBody'):
157            req = req['RequestAccessRequestBody']
158        else:
159            raise service_error(service_error.req, "No request!?")
160
161        # Base class lookup routine.  If this fails, it throws a service
162        # exception denying access that triggers a fault response back to the
163        # caller.
164        found,  owners, proof = self.lookup_access(req, fid)
165        self.log.info(
166                "[RequestAccess] Access granted local creds %s" % found)
167        # Make a fedid for this allocation
168        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
169        aid = unicode(allocID)
170
171        # Store the data about this allocation:
172        self.state_lock.acquire()
173        self.state[aid] = { }
174        self.state[aid]['user'] = found
175        self.state[aid]['owners'] = owners
176        self.state[aid]['auth'] = set()
177        # Authorize the creating fedid and the principal representing the
178        # allocation to manipulate it.
179        self.append_allocation_authorization(aid, 
180                ((fid, allocID), (allocID, allocID)))
181        self.write_state()
182        self.state_lock.release()
183
184        # Create a directory to stash the certificate in, ans stash it.
185        try:
186            f = open("%s/%s.pem" % (self.certdir, aid), "w")
187            print >>f, alloc_cert
188            f.close()
189        except EnvironmentError, e:
190            raise service_error(service_error.internal, 
191                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
192        self.log.debug('[RequestAccess] Returning allocation ID: %s' % allocID)
193        msg = { 
194                'allocID': { 'fedid': allocID }, 
195                'fedAttr': [{ 'attribute': 'nat_portals', 'value': 'True' }],
196                'proof': proof.to_dict()
197                }
198        return msg
199
200    def validate_topology(self, top):
201        '''
202        Validate the topology.  Desktops can only be single connections.
203        Though the topology will include a portal and a node, the access
204        controller will implement both on one node.
205
206        As more capabilities are added to the contoller the constraints here
207        will relax.
208        '''
209
210        comps = []
211        for e in top.elements:
212            if isinstance(e, topdl.Computer): comps.append(e)
213        if len(comps) > 2: 
214            raise service_error(service_error.req,
215                    "Desktop only supports 1-node subexperiments")
216
217        portals = 0
218        for c in comps:
219            if c.get_attribute('portal') is not None: 
220                portals += 1
221                continue
222            if len(c.interface) > 1:
223                raise service_error(service_error.req,
224                        "Desktop Node has more than one interface")
225            i  = c.interface[0]
226            if len(i.subs) > 1: 
227                raise service_error(service_error.req,
228                        "Desktop Node has more than one substate on interface")
229            sub = i.subs[0]
230            for i in sub.interfaces:
231                if i.element not in comps:
232                    raise service_error(service_error.req,
233                            "Desktop Node connected to non-portal")
234
235        if portals > 1:
236            raise service_error(service_error.req,
237                    "Desktop segment has more than one portal")
238        return True
239
240    def validate_connInfo(self, connInfo):
241        if len(connInfo) != 1: 
242            raise service_error(service_error.req,
243                    "Desktop segment requests multiple connections")
244        if connInfo[0]['type'] != 'ssh':
245            raise service_error(service_error.req,
246                    "Desktop segment requires ssh connecton")
247        return True
248
249    def export_store_info(self, certfile, connInfo):
250        '''
251        Tell the other portal node where to reach this desktop.  The other side
252        uses this information to set up routing, though the ssh_port is unused
253        as the Desktop always initiates ssh connections.
254        '''
255        values = { 'peer': self.hostname, 'ssh_port': self.ssh_port }
256        for c in connInfo:
257            for p in c.get('parameter', []):
258                if p.get('type','') == 'input': continue
259                pname = p.get('name', '')
260                key = p.get('key', '')
261                surl = p.get('store', '')
262                if pname not in values:
263                    self.log('Unknown export parameter: %s'  % pname)
264                    continue
265                val = values[pname]
266                req = { 'name': key, 'value': val }
267                self.log.debug('Setting %s (%s) to %s on %s' % \
268                        (pname, key,  val, surl))
269                self.call_SetValue(surl, req, certfile)
270
271    def set_route(self, dest, script, gw=None, src=None):
272        if sys.platform.startswith('freebsd'):
273            if src is not None and gw is not None:
274                raise service_error(service_error.internal, 
275                        'FreeBSD will not route based on src address')
276            elif src is not None:
277                raise service_error(service_error.internal, 
278                        'FreeBSD will not route based on src address')
279            elif gw is not None:
280                print >>script, 'route add %s %s' % (dest, gw)
281        elif sys.platform.startswith('linux'):
282            if src is not None and gw is not None:
283                print >>script, 'ip route add %s via %s src %s' % \
284                        (dest, gw, src)
285            elif src is not None:
286                print >>script, 'ip route add %s src %s' % \
287                        (dest, src)
288            elif gw is not None:
289                print >>script, 'ip route add %s via %s' % (dest, gw)
290        else:
291            raise service_error(service_error.internal, 
292                    'Unknown platform %s' % sys.platform)
293
294    def unset_route(self, dest, script):
295        rv = 0
296        if sys.platform.startswith('freebsd'):
297            print >>script, 'route delete %s' % dest
298        elif sys.platform.startswith('linux'):
299            print >>script, 'ip route delete %s' % dest
300
301    def find_a_peer(self, addr): 
302        '''
303        Find another node in the experiment that's on our subnet.  This is a
304        hack to handle the problem that we really cannot require the desktop to
305        dynamically route.  Will be improved by distributing static routes.
306        '''
307
308        peer = None
309        hosts = os.path.join(self.localdir, 'hosts')
310        p = addr.rfind('.')
311        if p == -1:
312            raise service_error(service_error.req, 'bad address in topology')
313        prefix = addr[0:p]
314        addr_re = re.compile('(%s.\\d+)' % prefix)
315        try:
316            f = open(hosts, 'r')
317            for line in f:
318                m = addr_re.search(line)
319                if m is not None and m.group(1) != addr:
320                    peer = m.group(1)
321                    break
322            else:
323                raise service_error(service_error.req, 
324                        'No other nodes in this subnet??')
325        except EnvironmentError, e:
326            raise service_error(service_error.internal, 
327                    'Cannot open %s: %s' % (e.filename, e.strerror))
328        return peer
329
330
331
332
333    def configure_desktop(self, top, connInfo):
334        '''
335        Build the connection.  Establish routing to the peer if using a
336        separate interface, wait until the other end confirms setup, establish
337        the ssh layer-two tunnel (tap), assign the in-experiment IP address to
338        the tunnel and establish routing to the experiment through the tap.
339        '''
340
341
342        # get the peer and ssh port from the portal and our IP from the other
343        peer = None
344        port = None
345        my_addr = None
346        my_name = None
347        for e in top.elements:
348            if not isinstance(e, topdl.Computer): continue
349            if e.get_attribute('portal') is None: 
350                my_name = e.name
351                # there should be one interface with one IPv4 address
352                if len(e.interface) <1 :
353                    raise service_error(service_error.internal,
354                            'No interface on experiment node!?!?')
355                my_addr = e.interface[0].get_attribute('ip4_address')
356            else:
357                for ci in connInfo:
358                    if ci.get('portal', '') != e.name: continue
359                    peer = ci.get('peer')
360                    port = '22'
361                    for a in ci.get('fedAttr', []):
362                        if a['attribute'] == 'ssh_port': port = a['value']
363
364        # XXX scan hosts for IP addresses and compose better routing entry
365       
366        if not all([peer, port, my_addr]):
367            raise service_error(service_error.req, 
368                    'Cannot find all config parameters %s %s %s' % (peer, port, my_addr))
369
370        exp_peer = self.find_a_peer(my_addr)
371
372        cscript = os.path.join(self.localdir, 'connect')
373        dscript = os.path.join(self.localdir, 'disconnect')
374        local_hosts = os.path.join(self.localdir, 'hosts')
375        zebra_conf = os.path.join(self.localdir, 'zebra.conf')
376        ospfd_conf = os.path.join(self.localdir, 'ospfd.conf')
377        try:
378            f = open(cscript, 'w')
379            print >>f, '#!/bin/sh'
380            # This picks the outgoing interface to the experiment using the
381            # routing system.
382            self.set_route(peer, f, self.router, self.src_addr)
383            # Wait until the other end reports that it is configured py placing
384            # a file this end can access into its local file system.  Try once
385            # a minute.
386            print >>f,'while ! /usr/bin/scp -o "StrictHostKeyChecking no" -i %s %s:/usr/local/federation/etc/prep_done /dev/null; do' % (self.ssh_identity, peer)
387            print >>f, 'sleep 60; done'
388            print >>f, ('ssh -w 0:0 -p %s -o "Tunnel ethernet" ' + \
389                    '-o "StrictHostKeyChecking no" -i %s %s perl -I/usr/local/federation/lib /usr/local/federation/bin/setup_bridge.pl --tapno=0 --addr=%s &') % \
390                    (port, self.ssh_identity, peer, my_addr)
391            # This should give the tap a a chance to come up
392            print >>f,'sleep 10'
393            # Add experiment nodes to hosts
394            print >>f, 'cp /etc/hosts /etc/hosts.DETER.fedd.hold'
395            print >>f, 'echo "#--- BEGIN FEDD ADDITIONS ---" >> /etc/hosts'
396            print >>f, 'cat %s >> /etc/hosts' % local_hosts
397            print >>f, 'echo "#--- END FEDD ADDITIONS ---" >> /etc/hosts'
398            # Assign tap address and route experiment connections through it.
399            print >>f, 'ifconfig tap0 %s netmask 255.255.255.0 up' % \
400                    my_addr
401            # self.set_route('10.0.0.0/8', f, exp_peer)
402            print >>f, '/usr/local/sbin/zebra -d -f %s' % zebra_conf
403            print >>f, '/usr/local/sbin/ospfd -d -f %s' % ospfd_conf
404            f.close()
405            os.chmod(cscript, 0755)
406            f = open(dscript, 'w')
407            print >>f, '#!/bin/sh'
408            print >>f, 'ifconfig tap0 destroy'
409            self.unset_route(peer, f)
410            #self.unset_route('10.0.0.0/8', f)
411            print >>f, 'mv /etc/hosts.DETER.fedd.hold /etc/hosts'
412            print >>f, 'kill `cat /var/run/quagga/ospfd.pid`'
413            print >>f, 'kill `cat /var/run/quagga/zebra.pid`'
414            f.close()
415            os.chmod(dscript, 0755)
416            f = open(zebra_conf, 'w')
417            print >>f, 'hostname %s' % my_name
418            print >>f, 'interface tap0'
419            try:
420                extern = open(self.external_networks, 'r')
421                if extern is not None:
422                    for l in extern:
423                        print >>f, "%s" % l.strip()
424                    extern.close()
425            except EnvironmentError:
426                # No external_networks or problem reading it, ignore
427                pass
428            f.close()
429            os.chmod(zebra_conf, 0644)
430            f = open(ospfd_conf, 'w')
431            print >>f, 'hostname %s' % my_name
432            print >>f, 'router ospf'
433            print >>f, ' redistribute static'
434            print >>f, ' network %s/24 area 0.0.0.2' % my_addr
435        except EnvironmentError, e:
436            raise service_error(service_error.internal, 
437                    'Cannot create connect %s: %s' % (e.filename, e.strerror))
438        script_log = open('/tmp/connect.log', 'w')
439        subprocess.Popen(['sudo', '/bin/sh', cscript], stdout=script_log, stderr=script_log)
440        return True
441
442    def StartSegment(self, req, fid):
443        """
444        Start a segment.  In this simple skeleton, this means to parse the
445        request and assign an unassigned integer to it.  We store the integer
446        in the persistent state.
447        """
448        try:
449            req = req['StartSegmentRequestBody']
450            # Get the request topology.  If not present, a KeyError is thrown.
451            topref = req['segmentdescription']['topdldescription']
452            # The fedid of the allocation we're attaching resources to
453            auth_attr = req['allocID']['fedid']
454        except KeyError:
455            raise service_error(service_error.req, "Badly formed request")
456
457        # String version of the allocation ID for keying
458        aid = "%s" % auth_attr
459        # Authorization check
460        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
461                with_proof=True)
462        if not access_ok:
463            raise service_error(service_error.access, "Access denied", 
464                    proof=proof)
465        else:
466            # See if this is a replay of an earlier succeeded StartSegment -
467            # sometimes SSL kills 'em.  If so, replay the response rather than
468            # redoing the allocation.
469            self.state_lock.acquire()
470            # Test and set :-)
471            running = self.state['running']
472            self.state['running'] = True
473            retval = self.state[aid].get('started', None)
474            self.state_lock.release()
475            if retval:
476                self.log.warning(
477                        "[StartSegment] Duplicate StartSegment for %s: " \
478                                % aid + \
479                        "replaying response")
480                return retval
481            if running:
482                self.log.debug('[StartSegment] already running')
483                raise service_error(service_error.federant,
484                        'Desktop is already in an experiment')
485
486        certfile = "%s/%s.pem" % (self.certdir, aid)
487
488        # Convert the topology into topdl data structures.  Again, the
489        # skeletion doesn't do anything with it, but this is how one parses a
490        # topology request.
491        if topref: topo = topdl.Topology(**topref)
492        else:
493            raise service_error(service_error.req, 
494                    "Request missing segmentdescription'")
495
496        err = None
497        try:
498            self.validate_topology(topo)
499
500            # The attributes of the request.  The ones we care about are the ssh
501            # keys to operate the tunnel.
502            attrs = req.get('fedAttr', [])
503            for a in attrs:
504                # Save the hosts and ssh_privkeys to our local dir
505                if a['attribute'] in ('hosts', 'ssh_secretkey'):
506                    self.log.debug('Getting %s from %s' % \
507                            (a['attribute'], a['value']))
508                    get_url(a['value'], certfile, self.localdir, log=self.log)
509                    base = os.path.basename(a['value'])
510                    if a['attribute'] == 'ssh_secretkey':
511                        self.ssh_identity = os.path.join(self.localdir, base)
512                    os.chmod(os.path.join(self.localdir, base), 0600)
513                else:
514                    self.log.debug('Ignoring attribute %s' % a['attribute'])
515
516            # Gather connection information and exchange parameters.
517            connInfo = req.get('connection', [])
518            self.validate_connInfo(connInfo)
519            self.export_store_info(certfile, connInfo)
520            self.import_store_info(certfile, connInfo)
521
522            #build it
523            self.configure_desktop(topo, connInfo)
524        except service_error, e:
525            err = e
526
527        # Save the information
528        if err is None:
529            # It's possible that the StartSegment call gets retried (!).  if
530            # the 'started' key is in the allocation, we'll return it rather
531            # than redo the setup.  The integer allocation was saved when we
532            # made it.
533            self.state_lock.acquire()
534            self.state[aid]['started'] = { 
535                    'allocID': req['allocID'],
536                    'allocationLog': "Allocatation complete",
537                    'segmentdescription': { 'topdldescription': topo.to_dict() },
538                    'proof': proof.to_dict(),
539                    }
540            retval = copy.deepcopy(self.state[aid]['started'])
541            self.write_state()
542            self.state_lock.release()
543        else:
544            # Something bad happened - clear the "running" flag so we can try
545            # again
546            self.state_lock.acquire()
547            self.state['running'] = False
548            self.state_lock.release()
549            raise err
550
551        return retval
552
553    def TerminateSegment(self, req, fid):
554        """
555        Remove the resources associated with th eallocation and stop the music.
556        In this example, this simply means removing the integer we allocated.
557        """
558        # Gather the same access information as for Start Segment
559        try:
560            req = req['TerminateSegmentRequestBody']
561        except KeyError:
562            raise service_error(service_error.req, "Badly formed request")
563
564        auth_attr = req['allocID']['fedid']
565        aid = "%s" % auth_attr
566
567        self.log.debug("Terminate request for %s" %aid)
568        # Check authorization
569        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
570                with_proof=True)
571        if not access_ok:
572            raise service_error(service_error.access, "Access denied", 
573                    proof=proof)
574        cscript = os.path.join(self.localdir, 'connect')
575        dscript = os.path.join(self.localdir, 'disconnect')
576        # Do the work of disconnecting
577        if os.path.exists(dscript):
578            self.log.debug('calling %s' % dscript)
579            rv = subprocess.call(['sudo', '/bin/sh', dscript])
580            if rv != 0:
581                self.log.warning('%s had an error: %d' % (dscript, rv))
582        else:
583            self.log.warn('No disconnection script!?')
584
585        try:
586            for bfn in os.listdir(self.localdir):
587                fn = os.path.join(self.localdir, bfn)
588                self.log.debug('Removing %s' % fn)
589                if os.path.exists(fn):
590                    os.remove(fn)
591        except EnvironmentError, e:
592            self.log.warn('Failed to remove %s: %s' % (e.filename, e.strerror))
593
594        self.ssh_identity = None
595
596        self.state_lock.acquire()
597        self.state['running'] = False
598        self.state_lock.release()
599   
600        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
Note: See TracBrowser for help on using the repository browser.