source: fedd/federation/desktop_access.py @ 972993c

Last change on this file since 972993c was 972993c, checked in by Ted Faber <faber@…>, 10 years ago

Deal with unspecified external_networks file

  • Property mode set to 100644
File size: 21.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import random
10import subprocess
11
12from util import *
13from deter import fedid, generate_fedid
14from authorizer import authorizer, abac_authorizer
15from service_error import service_error
16from remote_service import xmlrpc_handler, soap_handler, service_caller
17
18from deter import topdl
19
20from access import access_base
21
22# Make log messages disappear if noone configures a fedd logger.  This is
23# something of an incantation, but basically it creates a logger object
24# registered to fedd.access if no other module above us has.  It's an extra
25# belt for the suspenders.
26class nullHandler(logging.Handler):
27    def emit(self, record): pass
28
29fl = logging.getLogger("fedd.access")
30fl.addHandler(nullHandler())
31
32
33# The plug-in itself.
34class access(access_base):
35    """
36    This is a demonstration plug-in for fedd.  It responds to all the
37    experiment_control requests and keeps internal state.  The allocations it
38    makes are simple integers associated with each valid request.  It makes use
39    of the general routines in access.access_base.
40
41    Detailed comments in the code and info at
42    """
43    def __init__(self, config=None, auth=None):
44        """
45        Initializer.  Pulls parameters out of the ConfigParser's access
46        section, and initializes simple internal state.  This version reads a
47        maximum integer to assign from the configuration file, while most other
48        configuration entries  are read by the base class. 
49
50        An access database in the cannonical format is also read as well as a
51        state database that is a hash of internal state.  Routines to
52        manipulate these are in the base class, but specializations appear
53        here.
54
55        The access database maps users to a simple string.
56        """
57
58        # Calling the base initializer, which reads canonical configuration
59        # information and initializes canonical members.
60        access_base.__init__(self, config, auth)
61        # Reading the maximum integer parameter from the configuration file
62
63        self.src_addr = config.get('access', 'interface_address')
64        self.router = config.get('access', 'gateway')
65        self.hostname = config.get('access', 'hostname')
66        # Storage for ephemeral ssh keys and host files
67        self.localdir = config.get('access', 'localdir')
68        # File containing the routing entries for external networks
69        self.external_networks = config.get('access', 'external_networks')
70        self.ssh_identity = None
71
72        # hostname is the name of the ssh endpoint for the other side.  That
73        # side needs it to set up routing tables.  If hostname is not
74        # available, but an IP address is, use that.
75        if self.hostname is None:
76            if  self.src_addr is None:
77                raise service_error(service_error.server_config,
78                        'Hostname or interface_address must be set in config')
79            self.hostname = self.src_addr
80       
81        self.ssh_port = config.get('access', 'ssh_port', '22')
82
83        # authorization information
84        self.auth_type = config.get('access', 'auth_type') \
85                or 'abac'
86        self.auth_dir = config.get('access', 'auth_dir')
87        accessdb = config.get("access", "accessdb")
88        # initialize the authorization system.  We make a call to
89        # read the access database that maps from authorization information
90        # into local information.  The local information is parsed by the
91        # translator above.
92        if self.auth_type == 'abac':
93            #  Load the current authorization state
94            self.auth = abac_authorizer(load=self.auth_dir)
95            self.access = [ ]
96            if accessdb:
97                try:
98                    self.read_access(accessdb)
99                except EnvironmentError, e:
100                    self.log.error("Cannot read %s: %s" % \
101                            (config.get("access", "accessdb"), e))
102                    raise e
103        else:
104            raise service_error(service_error.internal, 
105                    "Unknown auth_type: %s" % self.auth_type)
106
107        # The superclass has read the state, but if this is the first run ever,
108        # we must initialise the running flag.  This plugin only supports one
109        # connection, so StartSegment will fail when self.state['running'] is
110        # true.
111        self.state_lock.acquire()
112        if 'running' not in self.state:
113            self.state['running'] = False
114        self.state_lock.release()
115
116        # These dictionaries register the plug-in's local routines for handline
117        # these four messages with the server code above.  There's a version
118        # for SOAP and XMLRPC, depending on which interfaces the plugin
119        # supports.  There's rarely a technical reason not to support one or
120        # the other - the plugin code almost never deals with the transport -
121        # but if a plug-in writer wanted to disable XMLRPC, they could leave
122        # the self.xmlrpc_services dictionary empty.
123        self.soap_services = {\
124            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
125            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
126            'StartSegment': soap_handler("StartSegment", self.StartSegment),
127            'TerminateSegment': soap_handler("TerminateSegment", 
128                self.TerminateSegment),
129            }
130        self.xmlrpc_services =  {\
131            'RequestAccess': xmlrpc_handler('RequestAccess',
132                self.RequestAccess),
133            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
134                self.ReleaseAccess),
135            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
136            'TerminateSegment': xmlrpc_handler('TerminateSegment',
137                self.TerminateSegment),
138            }
139        self.call_SetValue = service_caller('SetValue', log=self.log)
140        self.call_GetValue = service_caller('GetValue', log=self.log)
141
142    # ReleaseAccess come from the base class, this is a slightly modified
143    # RequestAccess from the base that includes a fedAttr to force this side to
144    # be active.
145    def RequestAccess(self, req, fid):
146        """
147        Handle an access request.  Success here maps the requester into the
148        local access control space and establishes state about that user keyed
149        to a fedid.  We also save a copy of the certificate underlying that
150        fedid so this allocation can access configuration information and
151        shared parameters on the experiment controller.
152        """
153
154        self.log.info("RequestAccess called by %s" % fid)
155        # The dance to get into the request body
156        if req.has_key('RequestAccessRequestBody'):
157            req = req['RequestAccessRequestBody']
158        else:
159            raise service_error(service_error.req, "No request!?")
160
161        # Base class lookup routine.  If this fails, it throws a service
162        # exception denying access that triggers a fault response back to the
163        # caller.
164        found,  owners, proof = self.lookup_access(req, fid)
165        self.log.info(
166                "[RequestAccess] Access granted local creds %s" % found)
167        # Make a fedid for this allocation
168        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
169        aid = unicode(allocID)
170
171        # Store the data about this allocation:
172        self.state_lock.acquire()
173        self.state[aid] = { }
174        self.state[aid]['user'] = found
175        self.state[aid]['owners'] = owners
176        self.state[aid]['auth'] = set()
177        # Authorize the creating fedid and the principal representing the
178        # allocation to manipulate it.
179        self.append_allocation_authorization(aid, 
180                ((fid, allocID), (allocID, allocID)))
181        self.write_state()
182        self.state_lock.release()
183
184        # Create a directory to stash the certificate in, ans stash it.
185        try:
186            f = open("%s/%s.pem" % (self.certdir, aid), "w")
187            print >>f, alloc_cert
188            f.close()
189        except EnvironmentError, e:
190            raise service_error(service_error.internal, 
191                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
192        self.log.debug('[RequestAccess] Returning allocation ID: %s' % allocID)
193        msg = { 
194                'allocID': { 'fedid': allocID }, 
195                'fedAttr': [{ 'attribute': 'nat_portals', 'value': 'True' }],
196                'proof': proof.to_dict()
197                }
198        return msg
199
200    def validate_topology(self, top):
201        '''
202        Validate the topology.  Desktops can only be single connections.
203        Though the topology will include a portal and a node, the access
204        controller will implement both on one node.
205
206        As more capabilities are added to the contoller the constraints here
207        will relax.
208        '''
209
210        comps = []
211        for e in top.elements:
212            if isinstance(e, topdl.Computer): comps.append(e)
213        if len(comps) > 2: 
214            raise service_error(service_error.req,
215                    "Desktop only supports 1-node subexperiments")
216
217        portals = 0
218        for c in comps:
219            if c.get_attribute('portal') is not None: 
220                portals += 1
221                continue
222            if len(c.interface) > 1:
223                raise service_error(service_error.req,
224                        "Desktop Node has more than one interface")
225            i  = c.interface[0]
226            if len(i.subs) > 1: 
227                raise service_error(service_error.req,
228                        "Desktop Node has more than one substate on interface")
229            sub = i.subs[0]
230            for i in sub.interfaces:
231                if i.element not in comps:
232                    raise service_error(service_error.req,
233                            "Desktop Node connected to non-portal")
234
235        if portals > 1:
236            raise service_error(service_error.req,
237                    "Desktop segment has more than one portal")
238        return True
239
240    def validate_connInfo(self, connInfo):
241        if len(connInfo) != 1: 
242            raise service_error(service_error.req,
243                    "Desktop segment requests multiple connections")
244        if connInfo[0]['type'] != 'ssh':
245            raise service_error(service_error.req,
246                    "Desktop segment requires ssh connecton")
247        return True
248
249    def export_store_info(self, certfile, connInfo):
250        '''
251        Tell the other portal node where to reach this desktop.  The other side
252        uses this information to set up routing, though the ssh_port is unused
253        as the Desktop always initiates ssh connections.
254        '''
255        values = { 'peer': self.hostname, 'ssh_port': self.ssh_port }
256        for c in connInfo:
257            for p in c.get('parameter', []):
258                if p.get('type','') == 'input': continue
259                pname = p.get('name', '')
260                key = p.get('key', '')
261                surl = p.get('store', '')
262                if pname not in values:
263                    self.log('Unknown export parameter: %s'  % pname)
264                    continue
265                val = values[pname]
266                req = { 'name': key, 'value': val }
267                self.log.debug('Setting %s (%s) to %s on %s' % \
268                        (pname, key,  val, surl))
269                self.call_SetValue(surl, req, certfile)
270
271    def set_route(self, dest, script, gw=None, src=None):
272        if sys.platform.startswith('freebsd'):
273            if src is not None and gw is not None:
274                raise service_error(service_error.internal, 
275                        'FreeBSD will not route based on src address')
276            elif src is not None:
277                raise service_error(service_error.internal, 
278                        'FreeBSD will not route based on src address')
279            elif gw is not None:
280                print >>script, 'route add %s %s' % (dest, gw)
281        elif sys.platform.startswith('linux'):
282            if src is not None and gw is not None:
283                print >>script, 'ip route add %s via %s src %s' % \
284                        (dest, gw, src)
285            elif src is not None:
286                print >>script, 'ip route add %s src %s' % \
287                        (dest, src)
288            elif gw is not None:
289                print >>script, 'ip route add %s via %s' % (dest, gw)
290        else:
291            raise service_error(service_error.internal, 
292                    'Unknown platform %s' % sys.platform)
293
294    def unset_route(self, dest, script):
295        rv = 0
296        if sys.platform.startswith('freebsd'):
297            print >>script, 'route delete %s' % dest
298        elif sys.platform.startswith('linux'):
299            print >>script, 'ip route delete %s' % dest
300
301    def find_a_peer(self, addr): 
302        '''
303        Find another node in the experiment that's on our subnet.  This is a
304        hack to handle the problem that we really cannot require the desktop to
305        dynamically route.  Will be improved by distributing static routes.
306        '''
307
308        peer = None
309        hosts = os.path.join(self.localdir, 'hosts')
310        p = addr.rfind('.')
311        if p == -1:
312            raise service_error(service_error.req, 'bad address in topology')
313        prefix = addr[0:p]
314        addr_re = re.compile('(%s.\\d+)' % prefix)
315        try:
316            f = open(hosts, 'r')
317            for line in f:
318                m = addr_re.search(line)
319                if m is not None and m.group(1) != addr:
320                    peer = m.group(1)
321                    break
322            else:
323                raise service_error(service_error.req, 
324                        'No other nodes in this subnet??')
325        except EnvironmentError, e:
326            raise service_error(service_error.internal, 
327                    'Cannot open %s: %s' % (e.filename, e.strerror))
328        return peer
329
330
331
332
333    def configure_desktop(self, top, connInfo):
334        '''
335        Build the connection.  Establish routing to the peer if using a
336        separate interface, wait until the other end confirms setup, establish
337        the ssh layer-two tunnel (tap), assign the in-experiment IP address to
338        the tunnel and establish routing to the experiment through the tap.
339        '''
340
341
342        # get the peer and ssh port from the portal and our IP from the other
343        peer = None
344        port = None
345        my_addr = None
346        my_name = None
347        for e in top.elements:
348            if not isinstance(e, topdl.Computer): continue
349            if e.get_attribute('portal') is None: 
350                my_name = e.name
351                # there should be one interface with one IPv4 address
352                if len(e.interface) <1 :
353                    raise service_error(service_error.internal,
354                            'No interface on experiment node!?!?')
355                my_addr = e.interface[0].get_attribute('ip4_address')
356            else:
357                for ci in connInfo:
358                    if ci.get('portal', '') != e.name: continue
359                    peer = ci.get('peer')
360                    port = '22'
361                    for a in ci.get('fedAttr', []):
362                        if a['attribute'] == 'ssh_port': port = a['value']
363
364        # XXX scan hosts for IP addresses and compose better routing entry
365       
366        if not all([peer, port, my_addr]):
367            raise service_error(service_error.req, 
368                    'Cannot find all config parameters %s %s %s' % (peer, port, my_addr))
369
370        exp_peer = self.find_a_peer(my_addr)
371
372        cscript = os.path.join(self.localdir, 'connect')
373        dscript = os.path.join(self.localdir, 'disconnect')
374        local_hosts = os.path.join(self.localdir, 'hosts')
375        zebra_conf = os.path.join(self.localdir, 'zebra.conf')
376        ospfd_conf = os.path.join(self.localdir, 'ospfd.conf')
377        try:
378            f = open(cscript, 'w')
379            print >>f, '#!/bin/sh'
380            # This picks the outgoing interface to the experiment using the
381            # routing system.
382            self.set_route(peer, f, self.router, self.src_addr)
383            # Wait until the other end reports that it is configured py placing
384            # a file this end can access into its local file system.  Try once
385            # a minute.
386            print >>f,'while ! /usr/bin/scp -o "StrictHostKeyChecking no" -i %s %s:/usr/local/federation/etc/prep_done /dev/null; do' % (self.ssh_identity, peer)
387            print >>f, 'sleep 60; done'
388            print >>f, ('ssh -w 0:0 -p %s -o "Tunnel ethernet" ' + \
389                    '-o "StrictHostKeyChecking no" -i %s %s perl -I/usr/local/federation/lib /usr/local/federation/bin/setup_bridge.pl --tapno=0 --addr=%s &') % \
390                    (port, self.ssh_identity, peer, my_addr)
391            # This should give the tap a a chance to come up
392            print >>f,'sleep 10'
393            # Add experiment nodes to hosts
394            print >>f, 'cp /etc/hosts /etc/hosts.DETER.fedd.hold'
395            print >>f, 'echo "#--- BEGIN FEDD ADDITIONS ---" >> /etc/hosts'
396            print >>f, 'cat %s >> /etc/hosts' % local_hosts
397            print >>f, 'echo "#--- END FEDD ADDITIONS ---" >> /etc/hosts'
398            # Assign tap address and route experiment connections through it.
399            print >>f, 'ifconfig tap0 %s netmask 255.255.255.0 up' % \
400                    my_addr
401            # self.set_route('10.0.0.0/8', f, exp_peer)
402            print >>f, '/usr/local/sbin/zebra -d -f %s' % zebra_conf
403            print >>f, '/usr/local/sbin/ospfd -d -f %s' % ospfd_conf
404            f.close()
405            os.chmod(cscript, 0755)
406            f = open(dscript, 'w')
407            print >>f, '#!/bin/sh'
408            print >>f, 'ifconfig tap0 destroy'
409            self.unset_route(peer, f)
410            #self.unset_route('10.0.0.0/8', f)
411            print >>f, 'mv /etc/hosts.DETER.fedd.hold /etc/hosts'
412            print >>f, 'kill `cat /var/run/quagga/ospfd.pid`'
413            print >>f, 'kill `cat /var/run/quagga/zebra.pid`'
414            f.close()
415            os.chmod(dscript, 0755)
416            f = open(zebra_conf, 'w')
417            print >>f, 'hostname %s' % my_name
418            print >>f, 'interface tap0'
419            if  self.external_networks is not None:
420                try:
421                    extern = open(self.external_networks, 'r')
422                    if extern is not None:
423                        for l in extern:
424                            print >>f, "%s" % l.strip()
425                        extern.close()
426                except EnvironmentError:
427                    # No external_networks or problem reading it, ignore
428                    pass
429            f.close()
430            os.chmod(zebra_conf, 0644)
431            f = open(ospfd_conf, 'w')
432            print >>f, 'hostname %s' % my_name
433            print >>f, 'router ospf'
434            print >>f, ' redistribute static'
435            print >>f, ' network %s/24 area 0.0.0.2' % my_addr
436        except EnvironmentError, e:
437            raise service_error(service_error.internal, 
438                    'Cannot create connect %s: %s' % (e.filename, e.strerror))
439        script_log = open('/tmp/connect.log', 'w')
440        subprocess.Popen(['sudo', '/bin/sh', cscript], stdout=script_log, stderr=script_log)
441        return True
442
443    def StartSegment(self, req, fid):
444        """
445        Start a segment.  In this simple skeleton, this means to parse the
446        request and assign an unassigned integer to it.  We store the integer
447        in the persistent state.
448        """
449        try:
450            req = req['StartSegmentRequestBody']
451            # Get the request topology.  If not present, a KeyError is thrown.
452            topref = req['segmentdescription']['topdldescription']
453            # The fedid of the allocation we're attaching resources to
454            auth_attr = req['allocID']['fedid']
455        except KeyError:
456            raise service_error(service_error.req, "Badly formed request")
457
458        # String version of the allocation ID for keying
459        aid = "%s" % auth_attr
460        # Authorization check
461        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
462                with_proof=True)
463        if not access_ok:
464            raise service_error(service_error.access, "Access denied", 
465                    proof=proof)
466        else:
467            # See if this is a replay of an earlier succeeded StartSegment -
468            # sometimes SSL kills 'em.  If so, replay the response rather than
469            # redoing the allocation.
470            self.state_lock.acquire()
471            # Test and set :-)
472            running = self.state['running']
473            self.state['running'] = True
474            retval = self.state[aid].get('started', None)
475            self.state_lock.release()
476            if retval:
477                self.log.warning(
478                        "[StartSegment] Duplicate StartSegment for %s: " \
479                                % aid + \
480                        "replaying response")
481                return retval
482            if running:
483                self.log.debug('[StartSegment] already running')
484                raise service_error(service_error.federant,
485                        'Desktop is already in an experiment')
486
487        certfile = "%s/%s.pem" % (self.certdir, aid)
488
489        # Convert the topology into topdl data structures.  Again, the
490        # skeletion doesn't do anything with it, but this is how one parses a
491        # topology request.
492        if topref: topo = topdl.Topology(**topref)
493        else:
494            raise service_error(service_error.req, 
495                    "Request missing segmentdescription'")
496
497        err = None
498        try:
499            self.validate_topology(topo)
500
501            # The attributes of the request.  The ones we care about are the ssh
502            # keys to operate the tunnel.
503            attrs = req.get('fedAttr', [])
504            for a in attrs:
505                # Save the hosts and ssh_privkeys to our local dir
506                if a['attribute'] in ('hosts', 'ssh_secretkey'):
507                    self.log.debug('Getting %s from %s' % \
508                            (a['attribute'], a['value']))
509                    get_url(a['value'], certfile, self.localdir, log=self.log)
510                    base = os.path.basename(a['value'])
511                    if a['attribute'] == 'ssh_secretkey':
512                        self.ssh_identity = os.path.join(self.localdir, base)
513                    os.chmod(os.path.join(self.localdir, base), 0600)
514                else:
515                    self.log.debug('Ignoring attribute %s' % a['attribute'])
516
517            # Gather connection information and exchange parameters.
518            connInfo = req.get('connection', [])
519            self.validate_connInfo(connInfo)
520            self.export_store_info(certfile, connInfo)
521            self.import_store_info(certfile, connInfo)
522
523            #build it
524            self.configure_desktop(topo, connInfo)
525        except service_error, e:
526            err = e
527
528        # Save the information
529        if err is None:
530            # It's possible that the StartSegment call gets retried (!).  if
531            # the 'started' key is in the allocation, we'll return it rather
532            # than redo the setup.  The integer allocation was saved when we
533            # made it.
534            self.state_lock.acquire()
535            self.state[aid]['started'] = { 
536                    'allocID': req['allocID'],
537                    'allocationLog': "Allocatation complete",
538                    'segmentdescription': { 'topdldescription': topo.to_dict() },
539                    'proof': proof.to_dict(),
540                    }
541            retval = copy.deepcopy(self.state[aid]['started'])
542            self.write_state()
543            self.state_lock.release()
544        else:
545            # Something bad happened - clear the "running" flag so we can try
546            # again
547            self.state_lock.acquire()
548            self.state['running'] = False
549            self.state_lock.release()
550            raise err
551
552        return retval
553
554    def TerminateSegment(self, req, fid):
555        """
556        Remove the resources associated with th eallocation and stop the music.
557        In this example, this simply means removing the integer we allocated.
558        """
559        # Gather the same access information as for Start Segment
560        try:
561            req = req['TerminateSegmentRequestBody']
562        except KeyError:
563            raise service_error(service_error.req, "Badly formed request")
564
565        auth_attr = req['allocID']['fedid']
566        aid = "%s" % auth_attr
567
568        self.log.debug("Terminate request for %s" %aid)
569        # Check authorization
570        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
571                with_proof=True)
572        if not access_ok:
573            raise service_error(service_error.access, "Access denied", 
574                    proof=proof)
575        cscript = os.path.join(self.localdir, 'connect')
576        dscript = os.path.join(self.localdir, 'disconnect')
577        # Do the work of disconnecting
578        if os.path.exists(dscript):
579            self.log.debug('calling %s' % dscript)
580            rv = subprocess.call(['sudo', '/bin/sh', dscript])
581            if rv != 0:
582                self.log.warning('%s had an error: %d' % (dscript, rv))
583        else:
584            self.log.warn('No disconnection script!?')
585
586        try:
587            for bfn in os.listdir(self.localdir):
588                fn = os.path.join(self.localdir, bfn)
589                self.log.debug('Removing %s' % fn)
590                if os.path.exists(fn):
591                    os.remove(fn)
592        except EnvironmentError, e:
593            self.log.warn('Failed to remove %s: %s' % (e.filename, e.strerror))
594
595        self.ssh_identity = None
596
597        self.state_lock.acquire()
598        self.state['running'] = False
599        self.state_lock.release()
600   
601        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
Note: See TracBrowser for help on using the repository browser.