source: fedd/federation/desktop_access.py @ b3125fa1

Last change on this file since b3125fa1 was b3125fa1, checked in by Ted Faber <faber@…>, 11 years ago

Read external nets

  • Property mode set to 100644
File size: 21.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import random
10import subprocess
11
12from util import *
13from deter import fedid, generate_fedid
14from authorizer import authorizer, abac_authorizer
15from service_error import service_error
16from remote_service import xmlrpc_handler, soap_handler, service_caller
17
18from deter import topdl
19
20from access import access_base
21
22# Make log messages disappear if noone configures a fedd logger.  This is
23# something of an incantation, but basically it creates a logger object
24# registered to fedd.access if no other module above us has.  It's an extra
25# belt for the suspenders.
26class nullHandler(logging.Handler):
27    def emit(self, record): pass
28
29fl = logging.getLogger("fedd.access")
30fl.addHandler(nullHandler())
31
32
33# The plug-in itself.
34class access(access_base):
35    """
36    This is a demonstration plug-in for fedd.  It responds to all the
37    experiment_control requests and keeps internal state.  The allocations it
38    makes are simple integers associated with each valid request.  It makes use
39    of the general routines in access.access_base.
40
41    Detailed comments in the code and info at
42    """
43    def __init__(self, config=None, auth=None):
44        """
45        Initializer.  Pulls parameters out of the ConfigParser's access
46        section, and initializes simple internal state.  This version reads a
47        maximum integer to assign from the configuration file, while most other
48        configuration entries  are read by the base class. 
49
50        An access database in the cannonical format is also read as well as a
51        state database that is a hash of internal state.  Routines to
52        manipulate these are in the base class, but specializations appear
53        here.
54
55        The access database maps users to a simple string.
56        """
57
58        # Calling the base initializer, which reads canonical configuration
59        # information and initializes canonical members.
60        access_base.__init__(self, config, auth)
61        # Reading the maximum integer parameter from the configuration file
62
63        self.src_addr = config.get('access', 'interface_address')
64        self.router = config.get('access', 'gateway')
65        self.hostname = config.get('access', 'hostname')
66        # Storage for ephemeral ssh keys and host files
67        self.localdir = config.get('access', 'localdir')
68        self.ssh_identity = None
69
70        # hostname is the name of the ssh endpoint for the other side.  That
71        # side needs it to set up routing tables.  If hostname is not
72        # available, but an IP address is, use that.
73        if self.hostname is None:
74            if  self.src_addr is None:
75                raise service_error(service_error.server_config,
76                        'Hostname or interface_address must be set in config')
77            self.hostname = self.src_addr
78       
79        self.ssh_port = config.get('access', 'ssh_port', '22')
80
81        # authorization information
82        self.auth_type = config.get('access', 'auth_type') \
83                or 'abac'
84        self.auth_dir = config.get('access', 'auth_dir')
85        accessdb = config.get("access", "accessdb")
86        # initialize the authorization system.  We make a call to
87        # read the access database that maps from authorization information
88        # into local information.  The local information is parsed by the
89        # translator above.
90        if self.auth_type == 'abac':
91            #  Load the current authorization state
92            self.auth = abac_authorizer(load=self.auth_dir)
93            self.access = [ ]
94            if accessdb:
95                try:
96                    self.read_access(accessdb)
97                except EnvironmentError, e:
98                    self.log.error("Cannot read %s: %s" % \
99                            (config.get("access", "accessdb"), e))
100                    raise e
101        else:
102            raise service_error(service_error.internal, 
103                    "Unknown auth_type: %s" % self.auth_type)
104
105        # The superclass has read the state, but if this is the first run ever,
106        # we must initialise the running flag.  This plugin only supports one
107        # connection, so StartSegment will fail when self.state['running'] is
108        # true.
109        self.state_lock.acquire()
110        if 'running' not in self.state:
111            self.state['running'] = False
112        self.state_lock.release()
113
114        # These dictionaries register the plug-in's local routines for handline
115        # these four messages with the server code above.  There's a version
116        # for SOAP and XMLRPC, depending on which interfaces the plugin
117        # supports.  There's rarely a technical reason not to support one or
118        # the other - the plugin code almost never deals with the transport -
119        # but if a plug-in writer wanted to disable XMLRPC, they could leave
120        # the self.xmlrpc_services dictionary empty.
121        self.soap_services = {\
122            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
123            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
124            'StartSegment': soap_handler("StartSegment", self.StartSegment),
125            'TerminateSegment': soap_handler("TerminateSegment", 
126                self.TerminateSegment),
127            }
128        self.xmlrpc_services =  {\
129            'RequestAccess': xmlrpc_handler('RequestAccess',
130                self.RequestAccess),
131            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
132                self.ReleaseAccess),
133            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
134            'TerminateSegment': xmlrpc_handler('TerminateSegment',
135                self.TerminateSegment),
136            }
137        self.call_SetValue = service_caller('SetValue', log=self.log)
138        self.call_GetValue = service_caller('GetValue', log=self.log)
139
140    # ReleaseAccess come from the base class, this is a slightly modified
141    # RequestAccess from the base that includes a fedAttr to force this side to
142    # be active.
143    def RequestAccess(self, req, fid):
144        """
145        Handle an access request.  Success here maps the requester into the
146        local access control space and establishes state about that user keyed
147        to a fedid.  We also save a copy of the certificate underlying that
148        fedid so this allocation can access configuration information and
149        shared parameters on the experiment controller.
150        """
151
152        self.log.info("RequestAccess called by %s" % fid)
153        # The dance to get into the request body
154        if req.has_key('RequestAccessRequestBody'):
155            req = req['RequestAccessRequestBody']
156        else:
157            raise service_error(service_error.req, "No request!?")
158
159        # Base class lookup routine.  If this fails, it throws a service
160        # exception denying access that triggers a fault response back to the
161        # caller.
162        found,  owners, proof = self.lookup_access(req, fid)
163        self.log.info(
164                "[RequestAccess] Access granted local creds %s" % found)
165        # Make a fedid for this allocation
166        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
167        aid = unicode(allocID)
168
169        # Store the data about this allocation:
170        self.state_lock.acquire()
171        self.state[aid] = { }
172        self.state[aid]['user'] = found
173        self.state[aid]['owners'] = owners
174        self.state[aid]['auth'] = set()
175        # Authorize the creating fedid and the principal representing the
176        # allocation to manipulate it.
177        self.append_allocation_authorization(aid, 
178                ((fid, allocID), (allocID, allocID)))
179        self.write_state()
180        self.state_lock.release()
181
182        # Create a directory to stash the certificate in, ans stash it.
183        try:
184            f = open("%s/%s.pem" % (self.certdir, aid), "w")
185            print >>f, alloc_cert
186            f.close()
187        except EnvironmentError, e:
188            raise service_error(service_error.internal, 
189                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
190        self.log.debug('[RequestAccess] Returning allocation ID: %s' % allocID)
191        msg = { 
192                'allocID': { 'fedid': allocID }, 
193                'fedAttr': [{ 'attribute': 'nat_portals', 'value': 'True' }],
194                'proof': proof.to_dict()
195                }
196        return msg
197
198    def validate_topology(self, top):
199        '''
200        Validate the topology.  Desktops can only be single connections.
201        Though the topology will include a portal and a node, the access
202        controller will implement both on one node.
203
204        As more capabilities are added to the contoller the constraints here
205        will relax.
206        '''
207
208        comps = []
209        for e in top.elements:
210            if isinstance(e, topdl.Computer): comps.append(e)
211        if len(comps) > 2: 
212            raise service_error(service_error.req,
213                    "Desktop only supports 1-node subexperiments")
214
215        portals = 0
216        for c in comps:
217            if c.get_attribute('portal') is not None: 
218                portals += 1
219                continue
220            if len(c.interface) > 1:
221                raise service_error(service_error.req,
222                        "Desktop Node has more than one interface")
223            i  = c.interface[0]
224            if len(i.subs) > 1: 
225                raise service_error(service_error.req,
226                        "Desktop Node has more than one substate on interface")
227            sub = i.subs[0]
228            for i in sub.interfaces:
229                if i.element not in comps:
230                    raise service_error(service_error.req,
231                            "Desktop Node connected to non-portal")
232
233        if portals > 1:
234            raise service_error(service_error.req,
235                    "Desktop segment has more than one portal")
236        return True
237
238    def validate_connInfo(self, connInfo):
239        if len(connInfo) != 1: 
240            raise service_error(service_error.req,
241                    "Desktop segment requests multiple connections")
242        if connInfo[0]['type'] != 'ssh':
243            raise service_error(service_error.req,
244                    "Desktop segment requires ssh connecton")
245        return True
246
247    def export_store_info(self, certfile, connInfo):
248        '''
249        Tell the other portal node where to reach this desktop.  The other side
250        uses this information to set up routing, though the ssh_port is unused
251        as the Desktop always initiates ssh connections.
252        '''
253        values = { 'peer': self.hostname, 'ssh_port': self.ssh_port }
254        for c in connInfo:
255            for p in c.get('parameter', []):
256                if p.get('type','') == 'input': continue
257                pname = p.get('name', '')
258                key = p.get('key', '')
259                surl = p.get('store', '')
260                if pname not in values:
261                    self.log('Unknown export parameter: %s'  % pname)
262                    continue
263                val = values[pname]
264                req = { 'name': key, 'value': val }
265                self.log.debug('Setting %s (%s) to %s on %s' % \
266                        (pname, key,  val, surl))
267                self.call_SetValue(surl, req, certfile)
268
269    def set_route(self, dest, script, gw=None, src=None):
270        if sys.platform.startswith('freebsd'):
271            if src is not None and gw is not None:
272                raise service_error(service_error.internal, 
273                        'FreeBSD will not route based on src address')
274            elif src is not None:
275                raise service_error(service_error.internal, 
276                        'FreeBSD will not route based on src address')
277            elif gw is not None:
278                print >>script, 'route add %s %s' % (dest, gw)
279        elif sys.platform.startswith('linux'):
280            if src is not None and gw is not None:
281                print >>script, 'ip route add %s via %s src %s' % \
282                        (dest, gw, src)
283            elif src is not None:
284                print >>script, 'ip route add %s src %s' % \
285                        (dest, src)
286            elif gw is not None:
287                print >>script, 'ip route add %s via %s' % (dest, gw)
288        else:
289            raise service_error(service_error.internal, 
290                    'Unknown platform %s' % sys.platform)
291
292    def unset_route(self, dest, script):
293        rv = 0
294        if sys.platform.startswith('freebsd'):
295            print >>script, 'route delete %s' % dest
296        elif sys.platform.startswith('linux'):
297            print >>script, 'ip route delete %s' % dest
298
299    def find_a_peer(self, addr): 
300        '''
301        Find another node in the experiment that's on our subnet.  This is a
302        hack to handle the problem that we really cannot require the desktop to
303        dynamically route.  Will be improved by distributing static routes.
304        '''
305
306        peer = None
307        hosts = os.path.join(self.localdir, 'hosts')
308        p = addr.rfind('.')
309        if p == -1:
310            raise service_error(service_error.req, 'bad address in topology')
311        prefix = addr[0:p]
312        addr_re = re.compile('(%s.\\d+)' % prefix)
313        try:
314            f = open(hosts, 'r')
315            for line in f:
316                m = addr_re.search(line)
317                if m is not None and m.group(1) != addr:
318                    peer = m.group(1)
319                    break
320            else:
321                raise service_error(service_error.req, 
322                        'No other nodes in this subnet??')
323        except EnvironmentError, e:
324            raise service_error(service_error.internal, 
325                    'Cannot open %s: %s' % (e.filename, e.strerror))
326        return peer
327
328
329
330
331    def configure_desktop(self, top, connInfo):
332        '''
333        Build the connection.  Establish routing to the peer if using a
334        separate interface, wait until the other end confirms setup, establish
335        the ssh layer-two tunnel (tap), assign the in-experiment IP address to
336        the tunnel and establish routing to the experiment through the tap.
337        '''
338
339
340        # get the peer and ssh port from the portal and our IP from the other
341        peer = None
342        port = None
343        my_addr = None
344        my_name = None
345        for e in top.elements:
346            if not isinstance(e, topdl.Computer): continue
347            if e.get_attribute('portal') is None: 
348                my_name = e.name
349                # there should be one interface with one IPv4 address
350                if len(e.interface) <1 :
351                    raise service_error(service_error.internal,
352                            'No interface on experiment node!?!?')
353                my_addr = e.interface[0].get_attribute('ip4_address')
354            else:
355                for ci in connInfo:
356                    if ci.get('portal', '') != e.name: continue
357                    peer = ci.get('peer')
358                    port = '22'
359                    for a in ci.get('fedAttr', []):
360                        if a['attribute'] == 'ssh_port': port = a['value']
361
362        # XXX scan hosts for IP addresses and compose better routing entry
363       
364        if not all([peer, port, my_addr]):
365            raise service_error(service_error.req, 
366                    'Cannot find all config parameters %s %s %s' % (peer, port, my_addr))
367
368        exp_peer = self.find_a_peer(my_addr)
369
370        cscript = os.path.join(self.localdir, 'connect')
371        dscript = os.path.join(self.localdir, 'disconnect')
372        local_hosts = os.path.join(self.localdir, 'hosts')
373        zebra_conf = os.path.join(self.localdir, 'zebra.conf')
374        ospfd_conf = os.path.join(self.localdir, 'ospfd.conf')
375        try:
376            f = open(cscript, 'w')
377            print >>f, '#!/bin/sh'
378            # This picks the outgoing interface to the experiment using the
379            # routing system.
380            self.set_route(peer, f, self.router, self.src_addr)
381            # Wait until the other end reports that it is configured py placing
382            # a file this end can access into its local file system.  Try once
383            # a minute.
384            print >>f,'while ! /usr/bin/scp -o "StrictHostKeyChecking no" -i %s %s:/usr/local/federation/etc/prep_done /dev/null; do' % (self.ssh_identity, peer)
385            print >>f, 'sleep 60; done'
386            print >>f, ('ssh -w 0:0 -p %s -o "Tunnel ethernet" ' + \
387                    '-o "StrictHostKeyChecking no" -i %s %s perl -I/usr/local/federation/lib /usr/local/federation/bin/setup_bridge.pl --tapno=0 --addr=%s &') % \
388                    (port, self.ssh_identity, peer, my_addr)
389            # This should give the tap a a chance to come up
390            print >>f,'sleep 10'
391            # Add experiment nodes to hosts
392            print >>f, 'cp /etc/hosts /etc/hosts.DETER.fedd.hold'
393            print >>f, 'echo "#--- BEGIN FEDD ADDITIONS ---" >> /etc/hosts'
394            print >>f, 'cat %s >> /etc/hosts' % local_hosts
395            print >>f, 'echo "#--- END FEDD ADDITIONS ---" >> /etc/hosts'
396            # Assign tap address and route experiment connections through it.
397            print >>f, 'ifconfig tap0 %s netmask 255.255.255.0 up' % \
398                    my_addr
399            # self.set_route('10.0.0.0/8', f, exp_peer)
400            print >>f, '/usr/local/sbin/zebra -d -f %s' % zebra_conf
401            print >>f, '/usr/local/sbin/ospfd -d -f %s' % ospfd_conf
402            f.close()
403            os.chmod(cscript, 0755)
404            f = open(dscript, 'w')
405            print >>f, '#!/bin/sh'
406            print >>f, 'ifconfig tap0 destroy'
407            self.unset_route(peer, f)
408            #self.unset_route('10.0.0.0/8', f)
409            print >>f, 'mv /etc/hosts.DETER.fedd.hold /etc/hosts'
410            print >>f, 'kill `cat /var/run/quagga/ospfd.pid`'
411            print >>f, 'kill `cat /var/run/quagga/zebra.pid`'
412            f.close()
413            os.chmod(dscript, 0755)
414            f = open(zebra_conf, 'w')
415            print >>f, 'hostname %s' % my_name
416            print >>f, 'interface tap0'
417            try:
418                extern = open(os.path.join(self.localdir, 
419                    'external_networks'), 'r')
420                if extern is not None:
421                    for l in extern:
422                        print >>f, "%s" % l.strip()
423                    extern.close()
424            except EnvironmentError:
425                # No external_networks or problem reading it, ignore
426                pass
427            f.close()
428            os.chmod(zebra_conf, 0644)
429            f = open(ospfd_conf, 'w')
430            print >>f, 'hostname %s' % my_name
431            print >>f, 'router ospf'
432            print >>f, ' redistribute static'
433            print >>f, ' network %s/24 area 0.0.0.2' % my_addr
434        except EnvironmentError, e:
435            raise service_error(service_error.internal, 
436                    'Cannot create connect %s: %s' % (e.filename, e.strerror))
437        script_log = open('/tmp/connect.log', 'w')
438        subprocess.Popen(['sudo', '/bin/sh', cscript], stdout=script_log, stderr=script_log)
439        return True
440
441    def StartSegment(self, req, fid):
442        """
443        Start a segment.  In this simple skeleton, this means to parse the
444        request and assign an unassigned integer to it.  We store the integer
445        in the persistent state.
446        """
447        try:
448            req = req['StartSegmentRequestBody']
449            # Get the request topology.  If not present, a KeyError is thrown.
450            topref = req['segmentdescription']['topdldescription']
451            # The fedid of the allocation we're attaching resources to
452            auth_attr = req['allocID']['fedid']
453        except KeyError:
454            raise service_error(service_error.req, "Badly formed request")
455
456        # String version of the allocation ID for keying
457        aid = "%s" % auth_attr
458        # Authorization check
459        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
460                with_proof=True)
461        if not access_ok:
462            raise service_error(service_error.access, "Access denied", 
463                    proof=proof)
464        else:
465            # See if this is a replay of an earlier succeeded StartSegment -
466            # sometimes SSL kills 'em.  If so, replay the response rather than
467            # redoing the allocation.
468            self.state_lock.acquire()
469            # Test and set :-)
470            running = self.state['running']
471            self.state['running'] = True
472            retval = self.state[aid].get('started', None)
473            self.state_lock.release()
474            if retval:
475                self.log.warning(
476                        "[StartSegment] Duplicate StartSegment for %s: " \
477                                % aid + \
478                        "replaying response")
479                return retval
480            if running:
481                self.log.debug('[StartSegment] already running')
482                raise service_error(service_error.federant,
483                        'Desktop is already in an experiment')
484
485        certfile = "%s/%s.pem" % (self.certdir, aid)
486
487        # Convert the topology into topdl data structures.  Again, the
488        # skeletion doesn't do anything with it, but this is how one parses a
489        # topology request.
490        if topref: topo = topdl.Topology(**topref)
491        else:
492            raise service_error(service_error.req, 
493                    "Request missing segmentdescription'")
494
495        err = None
496        try:
497            self.validate_topology(topo)
498
499            # The attributes of the request.  The ones we care about are the ssh
500            # keys to operate the tunnel.
501            attrs = req.get('fedAttr', [])
502            for a in attrs:
503                # Save the hosts and ssh_privkeys to our local dir
504                if a['attribute'] in ('hosts', 'ssh_secretkey'):
505                    self.log.debug('Getting %s from %s' % \
506                            (a['attribute'], a['value']))
507                    get_url(a['value'], certfile, self.localdir, log=self.log)
508                    base = os.path.basename(a['value'])
509                    if a['attribute'] == 'ssh_secretkey':
510                        self.ssh_identity = os.path.join(self.localdir, base)
511                    os.chmod(os.path.join(self.localdir, base), 0600)
512                else:
513                    self.log.debug('Ignoring attribute %s' % a['attribute'])
514
515            # Gather connection information and exchange parameters.
516            connInfo = req.get('connection', [])
517            self.validate_connInfo(connInfo)
518            self.export_store_info(certfile, connInfo)
519            self.import_store_info(certfile, connInfo)
520
521            #build it
522            self.configure_desktop(topo, connInfo)
523        except service_error, e:
524            err = e
525
526        # Save the information
527        if err is None:
528            # It's possible that the StartSegment call gets retried (!).  if
529            # the 'started' key is in the allocation, we'll return it rather
530            # than redo the setup.  The integer allocation was saved when we
531            # made it.
532            self.state_lock.acquire()
533            self.state[aid]['started'] = { 
534                    'allocID': req['allocID'],
535                    'allocationLog': "Allocatation complete",
536                    'segmentdescription': { 'topdldescription': topo.to_dict() },
537                    'proof': proof.to_dict(),
538                    }
539            retval = copy.deepcopy(self.state[aid]['started'])
540            self.write_state()
541            self.state_lock.release()
542        else:
543            # Something bad happened - clear the "running" flag so we can try
544            # again
545            self.state_lock.acquire()
546            self.state['running'] = False
547            self.state_lock.release()
548            raise err
549
550        return retval
551
552    def TerminateSegment(self, req, fid):
553        """
554        Remove the resources associated with th eallocation and stop the music.
555        In this example, this simply means removing the integer we allocated.
556        """
557        # Gather the same access information as for Start Segment
558        try:
559            req = req['TerminateSegmentRequestBody']
560        except KeyError:
561            raise service_error(service_error.req, "Badly formed request")
562
563        auth_attr = req['allocID']['fedid']
564        aid = "%s" % auth_attr
565
566        self.log.debug("Terminate request for %s" %aid)
567        # Check authorization
568        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
569                with_proof=True)
570        if not access_ok:
571            raise service_error(service_error.access, "Access denied", 
572                    proof=proof)
573        cscript = os.path.join(self.localdir, 'connect')
574        dscript = os.path.join(self.localdir, 'disconnect')
575        # Do the work of disconnecting
576        if os.path.exists(dscript):
577            self.log.debug('calling %s' % dscript)
578            rv = subprocess.call(['sudo', '/bin/sh', dscript])
579            if rv != 0:
580                self.log.warning('%s had an error: %d' % (dscript, rv))
581        else:
582            self.log.warn('No disconnection script!?')
583
584        try:
585            for bfn in os.listdir(self.localdir):
586                fn = os.path.join(self.localdir, bfn)
587                self.log.debug('Removing %s' % fn)
588                if os.path.exists(fn):
589                    os.remove(fn)
590        except EnvironmentError, e:
591            self.log.warn('Failed to remove %s: %s' % (e.filename, e.strerror))
592
593        self.ssh_identity = None
594
595        self.state_lock.acquire()
596        self.state['running'] = False
597        self.state_lock.release()
598   
599        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
Note: See TracBrowser for help on using the repository browser.