source: fedd/federation/desktop_access.py @ 1819839

Last change on this file since 1819839 was 1819839, checked in by Ted Faber <faber@…>, 12 years ago

Works with null swapin. Check connectivity next

  • Property mode set to 100644
File size: 16.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import random
10import subprocess
11
12from util import *
13from deter import fedid, generate_fedid
14from authorizer import authorizer, abac_authorizer
15from service_error import service_error
16from remote_service import xmlrpc_handler, soap_handler, service_caller
17
18from deter import topdl
19
20from access import access_base
21
22# Make log messages disappear if noone configures a fedd logger.  This is
23# something of an incantation, but basically it creates a logger object
24# registered to fedd.access if no other module above us has.  It's an extra
25# belt for the suspenders.
26class nullHandler(logging.Handler):
27    def emit(self, record): pass
28
29fl = logging.getLogger("fedd.access")
30fl.addHandler(nullHandler())
31
32
33# The plug-in itself.
34class access(access_base):
35    """
36    This is a demonstration plug-in for fedd.  It responds to all the
37    experiment_control requests and keeps internal state.  The allocations it
38    makes are simple integers associated with each valid request.  It makes use
39    of the general routines in access.access_base.
40
41    Detailed comments in the code and info at
42    """
43    def __init__(self, config=None, auth=None):
44        """
45        Initializer.  Pulls parameters out of the ConfigParser's access
46        section, and initializes simple internal state.  This version reads a
47        maximum integer to assign from the configuration file, while most other
48        configuration entries  are read by the base class. 
49
50        An access database in the cannonical format is also read as well as a
51        state database that is a hash of internal state.  Routines to
52        manipulate these are in the base class, but specializations appear
53        here.
54
55        The access database maps users to a simple string.
56        """
57
58        # Calling the base initializer, which reads canonical configuration
59        # information and initializes canonical members.
60        access_base.__init__(self, config, auth)
61        # Reading the maximum integer parameter from the configuration file
62
63        self.src_addr = config.get('access', 'interface_address')
64        self.router = config.get('access', 'gateway')
65        self.hostname = config.get('access', 'hostname')
66        # Storage for ephemeral ssh keys and host files
67        self.localdir = config.get('access', 'localdir')
68        self.ssh_identity = None
69
70        # hostname is the name of the ssh endpoint for the other side.  That
71        # side needs it to set up routing tables.  If hostname is not
72        # available, but an IP address is, use that.
73        if self.hostname is None:
74            if  self.src_addr is None:
75                raise service_error(service_error.server_config,
76                        'Hostname or interface_address must be set in config')
77            self.hostname = self.src_addr
78       
79        self.ssh_port = config.get('access', 'ssh_port', '22')
80
81        # authorization information
82        self.auth_type = config.get('access', 'auth_type') \
83                or 'abac'
84        self.auth_dir = config.get('access', 'auth_dir')
85        accessdb = config.get("access", "accessdb")
86        # initialize the authorization system.  We make a call to
87        # read the access database that maps from authorization information
88        # into local information.  The local information is parsed by the
89        # translator above.
90        if self.auth_type == 'abac':
91            #  Load the current authorization state
92            self.auth = abac_authorizer(load=self.auth_dir)
93            self.access = [ ]
94            if accessdb:
95                try:
96                    self.read_access(accessdb)
97                except EnvironmentError, e:
98                    self.log.error("Cannot read %s: %s" % \
99                            (config.get("access", "accessdb"), e))
100                    raise e
101        else:
102            raise service_error(service_error.internal, 
103                    "Unknown auth_type: %s" % self.auth_type)
104
105        # The superclass has read the state, but if this is the first run ever,
106        # we must initialise the running flag.  This plugin only supports one
107        # connection, so StartSegment will fail when self.state['running'] is
108        # true.
109        self.state_lock.acquire()
110        if 'running' not in self.state:
111            self.state['running'] = False
112        self.state_lock.release()
113
114        # These dictionaries register the plug-in's local routines for handline
115        # these four messages with the server code above.  There's a version
116        # for SOAP and XMLRPC, depending on which interfaces the plugin
117        # supports.  There's rarely a technical reason not to support one or
118        # the other - the plugin code almost never deals with the transport -
119        # but if a plug-in writer wanted to disable XMLRPC, they could leave
120        # the self.xmlrpc_services dictionary empty.
121        self.soap_services = {\
122            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
123            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
124            'StartSegment': soap_handler("StartSegment", self.StartSegment),
125            'TerminateSegment': soap_handler("TerminateSegment", 
126                self.TerminateSegment),
127            }
128        self.xmlrpc_services =  {\
129            'RequestAccess': xmlrpc_handler('RequestAccess',
130                self.RequestAccess),
131            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
132                self.ReleaseAccess),
133            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
134            'TerminateSegment': xmlrpc_handler('TerminateSegment',
135                self.TerminateSegment),
136            }
137        self.call_SetValue = service_caller('SetValue', log=self.log)
138        self.call_GetValue = service_caller('GetValue', log=self.log)
139
140    # RequestAccess and ReleaseAccess come from the base class
141
142    def validate_topology(self, top):
143        '''
144        Validate the topology.  Desktops can only be single connections.
145        Though the topology will include a portal and a node, the access
146        controller will implement both on one node.
147
148        As more capabilities are added to the contoller the constraints here
149        will relax.
150        '''
151
152        comps = []
153        for e in top.elements:
154            if isinstance(e, topdl.Computer): comps.append(e)
155        if len(comps) > 2: 
156            raise service_error(service_error.req,
157                    "Desktop only supports 1-node subexperiments")
158
159        portals = 0
160        for c in comps:
161            if c.get_attribute('portal') is not None: 
162                portals += 1
163                continue
164            if len(c.interface) > 1:
165                raise service_error(service_error.req,
166                        "Desktop Node has more than one interface")
167            i  = c.interface[0]
168            if len(i.subs) > 1: 
169                raise service_error(service_error.req,
170                        "Desktop Node has more than one substate on interface")
171            sub = i.subs[0]
172            for i in sub.interfaces:
173                if i.element not in comps:
174                    raise service_error(service_error.req,
175                            "Desktop Node connected to non-portal")
176
177        if portals > 1:
178            raise service_error(service_error.req,
179                    "Desktop segment has more than one portal")
180        return True
181
182    def validate_connInfo(self, connInfo):
183        if len(connInfo) != 1: 
184            raise service_error(service_error.req,
185                    "Desktop segment requests multiple connections")
186        if connInfo[0]['type'] != 'ssh':
187            raise service_error(service_error.req,
188                    "Desktop segment requires ssh connecton")
189        return True
190
191    def export_store_info(self, certfile, connInfo):
192        '''
193        Tell the other portal node where to reach this desktop.  The other side
194        uses this information to set up routing, though the ssh_port is unused
195        as the Desktop always initiates ssh connections.
196        '''
197        values = { 'peer': self.hostname, 'ssh_port': self.ssh_port }
198        for c in connInfo:
199            for p in c.get('parameter', []):
200                if p.get('type','') == 'input': continue
201                pname = p.get('name', '')
202                key = p.get('key', '')
203                surl = p.get('store', '')
204                if pname not in values:
205                    self.log('Unknown export parameter: %s'  % pname)
206                    continue
207                val = values[pname]
208                req = { 'name': key, 'value': val }
209                self.log.debug('Setting %s (%s) to %s on %s' % \
210                        (pname, key,  val, surl))
211                self.call_SetValue(surl, req, certfile)
212
213    def set_route(self, dest, script, gw=None, src=None):
214        if sys.platform.startswith('freebsd'):
215            if src is not None and gw is not None:
216                raise service_error(service_error.internal, 
217                        'FreeBSD will not route based on src address')
218            elif src is not None:
219                raise service_error(service_error.internal, 
220                        'FreeBSD will not route based on src address')
221            elif gw is not None:
222                print >>script, 'sudo route add %s %s' % (dest, gw)
223        elif sys.platform.startswith('linux'):
224            if src is not None and gw is not None:
225                print >>script, 'sudo ip route add %s via %s src %s' % \
226                        (dest, gw, src)
227            elif src is not None:
228                print >>script, 'sudo ip route add %s src %s' % \
229                        (dest, src)
230            elif gw is not None:
231                print >>script, 'sudo ip route add %s via %s' % (dest, gw)
232        else:
233            raise service_error(service_error.internal, 
234                    'Unknown platform %s' % sys.platform)
235
236    def unset_route(self, dest, script):
237        rv = 0
238        if sys.platform.startswith('freebsd'):
239            print >>script, 'sudo route delete %s' % dest
240        elif sys.platform.startswith('linux'):
241            print >>script, 'sudo ip route delete %s' % dest
242
243
244
245    def configure_desktop(self, top, connInfo):
246        '''
247        Build the connection.  Establish routing to the peer if using a
248        separate interface, wait until the other end confirms setup, establish
249        the ssh layer-two tunnel (tap), assign the in-experiment IP address to
250        the tunnel and establish routing to the experiment through the tap.
251        '''
252
253
254        # get the peer and ssh port from the portal and our IP from the other
255        peer = None
256        port = None
257        my_addr = None
258        for e in top.elements:
259            if not isinstance(e, topdl.Computer): continue
260            if e.get_attribute('portal') is None: 
261                # there should be one interface with one IPv4 address
262                if len(e.interface) <1 :
263                    raise service_error(service_error.internal,
264                            'No interface on experiment node!?!?')
265                my_addr = e.interface[0].get_attribute('ip4_address')
266            else:
267                for ci in connInfo:
268                    if ci.get('portal', '') != e.name: continue
269                    peer = ci.get('peer')
270                    port = '22'
271                    for a in ci.get('fedAttr', []):
272                        if a['attribute'] == 'ssh_port': port = a['value']
273
274        # XXX scan hosts for IP addresses and compose better routing entry
275       
276        if not all([peer, port, my_addr]):
277            raise service_error(service_error.req, 
278                    'Cannot find all config parameters %s %s %s' % (peer, port, my_addr))
279
280        cscript = os.path.join(self.localdir, 'connect')
281        dscript = os.path.join(self.localdir, 'disconnect')
282        try:
283            f = open(cscript, 'w')
284            print >>f, '#!/bin/sh'
285            # This picks the outgoing interface to the experiment using the
286            # routing system.
287            self.set_route(peer, f, self.router, self.src_addr)
288            # Wait until the other end reports that it is configured py placing
289            # a file this end can access into its local file system.  Try once
290            # a minute.
291            print >>f,'while !/usr/bin/scp -i "StrictHostKeyChecking no" -i %s %s:/usr/local/federation/etc/prep_done /dev/null; do' % (self.ssh_identity, peer)
292            print >>f, 'sleep 60; done'
293            print >>f, ('sudo ssh -w 0:0 -p %s -o "Tunnel ethernet" ' + \
294                    '-o "StrictHostKeyChecking no" -i %s -N %s &') % \
295                    (port, self.ssh_identity, peer)
296            # This should give the tap a a chance to come up
297            print >>f,'sleep 10'
298            # Assign its address and route connections through it.
299            print >>f, 'sudo ifconfig tap0 %s netmask 255.255.255.0 up' % \
300                    my_addr
301            self.set_route('10.0.0.0/8', f, peer)
302            f.close()
303            os.chmod(cscript, 0755)
304            f = open(dscript, 'w')
305            print >>f, '#!/bin/sh'
306            print >>f, 'sudo ifconfig tap0 destroy'
307            self.unset_route(peer, f)
308            self.unset_route('10.0.0.0/8', f)
309            f.close()
310            os.chmod(dscript, 0755)
311        except EnvironmentError, e:
312            raise service_error(service_error.internal, 
313                    'Cannot create connect %s: %s' % (e.filename, e.strerror))
314        #subprocess.call(['/bin/sh', cscript])
315        return True
316
317    def StartSegment(self, req, fid):
318        """
319        Start a segment.  In this simple skeleton, this means to parse the
320        request and assign an unassigned integer to it.  We store the integer
321        in the persistent state.
322        """
323        try:
324            req = req['StartSegmentRequestBody']
325            # Get the request topology.  If not present, a KeyError is thrown.
326            topref = req['segmentdescription']['topdldescription']
327            # The fedid of the allocation we're attaching resources to
328            auth_attr = req['allocID']['fedid']
329        except KeyError:
330            raise service_error(service_error.req, "Badly formed request")
331
332        # String version of the allocation ID for keying
333        aid = "%s" % auth_attr
334        # Authorization check
335        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
336                with_proof=True)
337        if not access_ok:
338            raise service_error(service_error.access, "Access denied", 
339                    proof=proof)
340        else:
341            # See if this is a replay of an earlier succeeded StartSegment -
342            # sometimes SSL kills 'em.  If so, replay the response rather than
343            # redoing the allocation.
344            self.state_lock.acquire()
345            # Test and set :-)
346            running = self.state['running']
347            self.state['running'] = True
348            retval = self.state[aid].get('started', None)
349            self.state_lock.release()
350            if retval:
351                self.log.warning(
352                        "[StartSegment] Duplicate StartSegment for %s: " \
353                                % aid + \
354                        "replaying response")
355                return retval
356            if running:
357                self.log.debug('[StartSegment] already running')
358                raise service_error(service_error.federant,
359                        'Desktop is already in an experiment')
360
361        certfile = "%s/%s.pem" % (self.certdir, aid)
362
363        # Convert the topology into topdl data structures.  Again, the
364        # skeletion doesn't do anything with it, but this is how one parses a
365        # topology request.
366        if topref: topo = topdl.Topology(**topref)
367        else:
368            raise service_error(service_error.req, 
369                    "Request missing segmentdescription'")
370
371        err = None
372        try:
373            self.validate_topology(topo)
374
375            # The attributes of the request.  The ones we care about are the ssh
376            # keys to operate the tunnel.
377            attrs = req.get('fedAttr', [])
378            for a in attrs:
379                # Save the hosts and ssh_privkeys to our local dir
380                if a['attribute'] in ('hosts', 'ssh_secretkey'):
381                    self.log.debug('Getting %s from %s' % \
382                            (a['attribute'], a['value']))
383                    get_url(a['value'], certfile, self.localdir, log=self.log)
384                    base = os.path.basename(a['value'])
385                    if a['attribute'] == 'ssh_secretkey':
386                        self.ssh_identity = os.path.join(self.localdir, base)
387                    os.chmod(os.path.join(self.localdir, base), 0600)
388                else:
389                    self.log.debug('Ignoring attribute %s' % a['attribute'])
390
391            # Gather connection information and exchange parameters.
392            connInfo = req.get('connection', [])
393            self.validate_connInfo(connInfo)
394            self.export_store_info(certfile, connInfo)
395            self.import_store_info(certfile, connInfo)
396
397            #build it
398            self.configure_desktop(topo, connInfo)
399        except service_error, e:
400            err = e
401
402        # Save the information
403        if err is None:
404            # It's possible that the StartSegment call gets retried (!).  if
405            # the 'started' key is in the allocation, we'll return it rather
406            # than redo the setup.  The integer allocation was saved when we
407            # made it.
408            self.state_lock.acquire()
409            self.state[aid]['started'] = { 
410                    'allocID': req['allocID'],
411                    'allocationLog': "Allocatation complete",
412                    'segmentdescription': { 'topdldescription': topo.to_dict() },
413                    'proof': proof.to_dict(),
414                    }
415            retval = copy.deepcopy(self.state[aid]['started'])
416            self.write_state()
417            self.state_lock.release()
418        else:
419            # Something bad happened - clear the "running" flag so we can try
420            # again
421            self.state_lock.acquire()
422            self.state['running'] = False
423            self.state_lock.release()
424            raise err
425
426        return retval
427
428    def TerminateSegment(self, req, fid):
429        """
430        Remove the resources associated with th eallocation and stop the music.
431        In this example, this simply means removing the integer we allocated.
432        """
433        # Gather the same access information as for Start Segment
434        try:
435            req = req['TerminateSegmentRequestBody']
436        except KeyError:
437            raise service_error(service_error.req, "Badly formed request")
438
439        auth_attr = req['allocID']['fedid']
440        aid = "%s" % auth_attr
441
442        self.log.debug("Terminate request for %s" %aid)
443        # Check authorization
444        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
445                with_proof=True)
446        if not access_ok:
447            raise service_error(service_error.access, "Access denied", 
448                    proof=proof)
449        cscript = os.path.join(self.localdir, 'connect')
450        dscript = os.path.join(self.localdir, 'disconnect')
451        # XXX restore this
452        #if os.path.exists(dscript):
453            #self.log.debug('calling %s' % dscript)
454            #rv = subprocess.call(['/bin/sh', dscript])
455            #if rv != 0:
456                #self.log.warning('%s had an error: %d' % (dscript, rv))
457        #else:
458            #self.log.warn('No disconnection script!?')
459
460        try:
461            for bfn in os.listdir(self.localdir):
462                fn = os.path.join(self.localdir, bfn)
463                self.log.debug('Removing %s' % fn)
464                if os.path.exists(fn):
465                    os.remove(fn)
466        except EnvironmentError, e:
467            self.log.warn('Failed to remove %s: %s' % (e.filename, e.strerror))
468
469        self.ssh_identity = None
470
471        self.state_lock.acquire()
472        self.state['running'] = False
473        self.state_lock.release()
474   
475        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
Note: See TracBrowser for help on using the repository browser.