#!/usr/local/bin/python import os,sys import re import string import copy import pickle import logging import random import subprocess from util import * from deter import fedid, generate_fedid from authorizer import authorizer, abac_authorizer from service_error import service_error from remote_service import xmlrpc_handler, soap_handler, service_caller from deter import topdl from access import access_base # Make log messages disappear if noone configures a fedd logger. This is # something of an incantation, but basically it creates a logger object # registered to fedd.access if no other module above us has. It's an extra # belt for the suspenders. class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.access") fl.addHandler(nullHandler()) # The plug-in itself. class access(access_base): """ This is a demonstration plug-in for fedd. It responds to all the experiment_control requests and keeps internal state. The allocations it makes are simple integers associated with each valid request. It makes use of the general routines in access.access_base. Detailed comments in the code and info at """ def __init__(self, config=None, auth=None): """ Initializer. Pulls parameters out of the ConfigParser's access section, and initializes simple internal state. This version reads a maximum integer to assign from the configuration file, while most other configuration entries are read by the base class. An access database in the cannonical format is also read as well as a state database that is a hash of internal state. Routines to manipulate these are in the base class, but specializations appear here. The access database maps users to a simple string. """ # Calling the base initializer, which reads canonical configuration # information and initializes canonical members. access_base.__init__(self, config, auth) # Reading the maximum integer parameter from the configuration file self.src_addr = config.get('access', 'interface_address') self.router = config.get('access', 'gateway') self.hostname = config.get('access', 'hostname') # Storage for ephemeral ssh keys and host files self.localdir = config.get('access', 'localdir') # File containing the routing entries for external networks self.external_networks = config.get('access', 'external_networks') # Comma-separated list of interfaces to export (may be empty) self.export_interfaces = config.get('access', 'export_interfaces') if self.export_interfaces is not None: self.export_interfaces = \ [ x.strip() for x in self.export_interfaces.split(',')] else: self.export_interfaces = [] # Comma separated list of connected nets to export - these won't work # in external_networks self.export_networks = config.get('access', 'export_networks') if self.export_networks is not None: self.export_networks = \ [ x.strip() for x in self.export_networks.split(',')] else: self.export_networks = [] # values for locations of zebra and ospfd. self.zebra = config.get('access', 'zebra') if self.zebra is None: self.zebra = '/usr/local/sbin/zebra' self.ospfd = config.get('access', 'ospfd') if self.ospfd is None: self.ospfd = '/usr/local/sbin/ospfd' # If this is a linux box that will be NATing, the iptables value # must be the path of the iptables command and the nat_interface must # be the nat interface. self.iptables = config.get('access', 'iptables') self.nat_interface = config.get('access', 'nat_interface') self.ssh_identity = None # hostname is the name of the ssh endpoint for the other side. That # side needs it to set up routing tables. If hostname is not # available, but an IP address is, use that. if self.hostname is None: if self.src_addr is None: raise service_error(service_error.server_config, 'Hostname or interface_address must be set in config') self.hostname = self.src_addr self.ssh_port = config.get('access', 'ssh_port', '22') # authorization information self.auth_type = config.get('access', 'auth_type') \ or 'abac' self.auth_dir = config.get('access', 'auth_dir') accessdb = config.get("access", "accessdb") # initialize the authorization system. We make a call to # read the access database that maps from authorization information # into local information. The local information is parsed by the # translator above. if self.auth_type == 'abac': # Load the current authorization state self.auth = abac_authorizer(load=self.auth_dir) self.access = [ ] if accessdb: try: self.read_access(accessdb) except EnvironmentError, e: self.log.error("Cannot read %s: %s" % \ (config.get("access", "accessdb"), e)) raise e else: raise service_error(service_error.internal, "Unknown auth_type: %s" % self.auth_type) # The superclass has read the state, but if this is the first run ever, # we must initialise the running flag. This plugin only supports one # connection, so StartSegment will fail when self.state['running'] is # true. self.state_lock.acquire() if 'running' not in self.state: self.state['running'] = False self.state_lock.release() # These dictionaries register the plug-in's local routines for handline # these four messages with the server code above. There's a version # for SOAP and XMLRPC, depending on which interfaces the plugin # supports. There's rarely a technical reason not to support one or # the other - the plugin code almost never deals with the transport - # but if a plug-in writer wanted to disable XMLRPC, they could leave # the self.xmlrpc_services dictionary empty. self.soap_services = {\ 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 'StartSegment': soap_handler("StartSegment", self.StartSegment), 'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment), } self.xmlrpc_services = {\ 'RequestAccess': xmlrpc_handler('RequestAccess', self.RequestAccess), 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', self.ReleaseAccess), 'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment), 'TerminateSegment': xmlrpc_handler('TerminateSegment', self.TerminateSegment), } self.call_SetValue = service_caller('SetValue', log=self.log) self.call_GetValue = service_caller('GetValue', log=self.log) # ReleaseAccess come from the base class, this is a slightly modified # RequestAccess from the base that includes a fedAttr to force this side to # be active. def RequestAccess(self, req, fid): """ Handle an access request. Success here maps the requester into the local access control space and establishes state about that user keyed to a fedid. We also save a copy of the certificate underlying that fedid so this allocation can access configuration information and shared parameters on the experiment controller. """ self.log.info("RequestAccess called by %s" % fid) # The dance to get into the request body if req.has_key('RequestAccessRequestBody'): req = req['RequestAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") # Base class lookup routine. If this fails, it throws a service # exception denying access that triggers a fault response back to the # caller. found, owners, proof = self.lookup_access(req, fid) self.log.info( "[RequestAccess] Access granted local creds %s" % found) # Make a fedid for this allocation allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) aid = unicode(allocID) # Store the data about this allocation: self.state_lock.acquire() self.state[aid] = { } self.state[aid]['user'] = found self.state[aid]['owners'] = owners self.state[aid]['auth'] = set() # Authorize the creating fedid and the principal representing the # allocation to manipulate it. self.append_allocation_authorization(aid, ((fid, allocID), (allocID, allocID))) self.write_state() self.state_lock.release() # Create a directory to stash the certificate in, ans stash it. try: f = open("%s/%s.pem" % (self.certdir, aid), "w") print >>f, alloc_cert f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Can't open %s/%s : %s" % (self.certdir, aid, e)) self.log.debug('[RequestAccess] Returning allocation ID: %s' % allocID) msg = { 'allocID': { 'fedid': allocID }, 'fedAttr': [{ 'attribute': 'nat_portals', 'value': 'True' }], 'proof': proof.to_dict() } return msg def validate_topology(self, top): ''' Validate the topology. Desktops can only be single connections. Though the topology will include a portal and a node, the access controller will implement both on one node. As more capabilities are added to the contoller the constraints here will relax. ''' comps = [] for e in top.elements: if isinstance(e, topdl.Computer): comps.append(e) if len(comps) > 2: raise service_error(service_error.req, "Desktop only supports 1-node subexperiments") portals = 0 for c in comps: if c.get_attribute('portal') is not None: portals += 1 continue if len(c.interface) > 1: raise service_error(service_error.req, "Topology: Desktop Node has more than one interface") i = c.interface[0] if len(i.subs) > 1: raise service_error(service_error.req, "Topology: Desktop Node has more than " +\ "one substate on interface") sub = i.subs[0] for i in sub.interfaces: if i.element not in comps: raise service_error(service_error.req, "Topology: Desktop Node connected to non-portal") if portals > 1: raise service_error(service_error.req, "Desktop segment has more than one portal") return True def validate_connInfo(self, connInfo): if len(connInfo) != 1: raise service_error(service_error.req, "Desktop segment requests multiple connections") if connInfo[0]['type'] != 'ssh': raise service_error(service_error.req, "Desktop segment requires ssh connecton") return True def export_store_info(self, certfile, connInfo): ''' Tell the other portal node where to reach this desktop. The other side uses this information to set up routing, though the ssh_port is unused as the Desktop always initiates ssh connections. ''' values = { 'peer': self.hostname, 'ssh_port': self.ssh_port } for c in connInfo: for p in c.get('parameter', []): if p.get('type','') == 'input': continue pname = p.get('name', '') key = p.get('key', '') surl = p.get('store', '') if pname not in values: self.log('Unknown export parameter: %s' % pname) continue val = values[pname] req = { 'name': key, 'value': val } self.log.debug('Setting %s (%s) to %s on %s' % \ (pname, key, val, surl)) self.call_SetValue(surl, req, certfile) def set_route(self, dest, script, gw=None, src=None): if sys.platform.startswith('freebsd'): if src is not None and gw is not None: raise service_error(service_error.internal, 'FreeBSD will not route based on src address') elif src is not None: raise service_error(service_error.internal, 'FreeBSD will not route based on src address') elif gw is not None: print >>script, 'route add %s %s' % (dest, gw) elif sys.platform.startswith('linux'): if src is not None and gw is not None: print >>script, 'ip route add %s via %s src %s' % \ (dest, gw, src) elif src is not None: print >>script, 'ip route add %s src %s' % \ (dest, src) elif gw is not None: print >>script, 'ip route add %s via %s' % (dest, gw) else: raise service_error(service_error.internal, 'Unknown platform %s' % sys.platform) def unset_route(self, dest, script): rv = 0 if sys.platform.startswith('freebsd'): print >>script, 'route delete %s' % dest elif sys.platform.startswith('linux'): print >>script, 'ip route delete %s' % dest def find_a_peer(self, addr): ''' Find another node in the experiment that's on our subnet. This is a hack to handle the problem that we really cannot require the desktop to dynamically route. Will be improved by distributing static routes. ''' peer = None hosts = os.path.join(self.localdir, 'hosts') p = addr.rfind('.') if p == -1: raise service_error(service_error.req, 'bad address in topology') prefix = addr[0:p] addr_re = re.compile('(%s.\\d+)' % prefix) try: f = open(hosts, 'r') for line in f: m = addr_re.search(line) if m is not None and m.group(1) != addr: peer = m.group(1) break else: raise service_error(service_error.req, 'No other nodes in this subnet??') except EnvironmentError, e: raise service_error(service_error.internal, 'Cannot open %s: %s' % (e.filename, e.strerror)) return peer def configure_desktop(self, top, connInfo): ''' Build the connection. Establish routing to the peer if using a separate interface, wait until the other end confirms setup, establish the ssh layer-two tunnel (tap), assign the in-experiment IP address to the tunnel and establish routing to the experiment through the tap. ''' # get the peer and ssh port from the portal and our IP from the other peer = None port = None my_addr = None my_name = None for e in top.elements: if not isinstance(e, topdl.Computer): continue if e.get_attribute('portal') is None: my_name = e.name # there should be one interface with one IPv4 address if len(e.interface) <1 : raise service_error(service_error.internal, 'No interface on experiment node!?!?') my_addr = e.interface[0].get_attribute('ip4_address') else: for ci in connInfo: if ci.get('portal', '') != e.name: continue peer = ci.get('peer') port = '22' for a in ci.get('fedAttr', []): if a['attribute'] == 'ssh_port': port = a['value'] # XXX scan hosts for IP addresses and compose better routing entry if not all([peer, port, my_addr]): raise service_error(service_error.req, 'Cannot find all config parameters %s %s %s' % (peer, port, my_addr)) exp_peer = self.find_a_peer(my_addr) cscript = os.path.join(self.localdir, 'connect') dscript = os.path.join(self.localdir, 'disconnect') local_hosts = os.path.join(self.localdir, 'hosts') zebra_conf = os.path.join(self.localdir, 'zebra.conf') ospfd_conf = os.path.join(self.localdir, 'ospfd.conf') try: f = open(cscript, 'w') print >>f, '#!/bin/sh' # This picks the outgoing interface to the experiment using the # routing system. self.set_route(peer, f, self.router, self.src_addr) # Wait until the other end reports that it is configured py placing # a file this end can access into its local file system. Try once # a minute. print >>f,'while ! /usr/bin/scp -P %s -o "StrictHostKeyChecking no" -i %s %s:/usr/local/federation/etc/prep_done /dev/null; do' % (port, self.ssh_identity, peer) print >>f, 'sleep 60; done' print >>f, ('ssh -w 0:0 -p %s -o "Tunnel ethernet" ' + \ '-o "StrictHostKeyChecking no" -i %s %s perl ' + '-I/usr/local/federation/lib ' + '/usr/local/federation/bin/setup_bridge.pl --tapno=0 ' + '--addr=%s --use_file &') % \ (port, self.ssh_identity, peer, my_addr) # This should give the tap a a chance to come up print >>f,'sleep 10' # Add experiment nodes to hosts print >>f, 'cp /etc/hosts /etc/hosts.DETER.fedd.hold' print >>f, 'echo "#--- BEGIN FEDD ADDITIONS ---" >> /etc/hosts' print >>f, 'cat %s >> /etc/hosts' % local_hosts print >>f, 'echo "#--- END FEDD ADDITIONS ---" >> /etc/hosts' # Assign tap address and route experiment connections through it. print >>f, 'ifconfig tap0 %s netmask 255.255.255.0 up' % \ my_addr print >>f, '%s -d -f %s' % (self.zebra, zebra_conf) print >>f, '%s -d -f %s' % (self.ospfd, ospfd_conf) if self.iptables is not None and self.nat_interface is not None: print >>f, '%s -t nat -A POSTROUTING -o %s -j MASQUERADE' %\ (self.iptables, self.nat_interface) print >>f, ('%s -A FORWARD -i %s -o tap0 -m state ' + '--state RELATED,ESTABLISHED -j ACCEPT') % \ (self.iptables, self.nat_interface) print >>f, '%s -A FORWARD -i tap0 -o %s -j ACCEPT' % \ (self.iptables, self.nat_interface) f.close() os.chmod(cscript, 0755) f = open(dscript, 'w') print >>f, '#!/bin/sh' if self.iptables is not None and self.nat_interface is not None: print >>f, '%s -t nat -D POSTROUTING -o %s -j MASQUERADE' %\ (self.iptables, self.nat_interface) print >>f, ('%s -D FORWARD -i %s -o tap0 -m state ' + '--state RELATED,ESTABLISHED -j ACCEPT') % \ (self.iptables, self.nat_interface) print >>f, '%s -D FORWARD -i tap0 -o %s -j ACCEPT' % \ (self.iptables, self.nat_interface) print >>f, 'pkill -f setup_bridge.pl' print >>f, 'mv /etc/hosts.DETER.fedd.hold /etc/hosts' print >>f, ('ssh -p %s -o "StrictHostKeyChecking no" -i %s %s ' + 'perl -I/usr/local/federation/lib ' + '/usr/local/federation/bin/unsetup_bridge.pl --tapno=0 ' + '--addr=%s') % \ (port, self.ssh_identity, peer, my_addr) print >>f, 'kill `cat /var/run/quagga/ospfd.pid`' print >>f, 'kill `cat /var/run/quagga/zebra.pid`' if self.iptables is not None and self.nat_interface is not None: print >>f, '%s -t nat -D POSTROUTING -o %s -j MASQUERADE' %\ (self.iptables, self.nat_interface) print >>f, ('%s -D FORWARD -i %s -o tap0 -m state ' + '--state RELATED,ESTABLISHED -j ACCEPT') % \ (self.iptables, self.nat_interface) print >>f, '%s -D FORWARD -i tap0 -o %s -j ACCEPT' % \ (self.iptables, self.nat_interface) f.close() os.chmod(dscript, 0755) f = open(zebra_conf, 'w') print >>f, 'hostname %s' % my_name print >>f, 'interface tap0' for i in self.export_interfaces: print >>f, 'interface %s' % i if self.external_networks is not None: try: extern = open(self.external_networks, 'r') if extern is not None: for l in extern: print >>f, "%s" % l.strip() extern.close() except EnvironmentError: # No external_networks or problem reading it, ignore pass f.close() os.chmod(zebra_conf, 0644) f = open(ospfd_conf, 'w') print >>f, 'hostname %s' % my_name print >>f, 'router ospf' print >>f, ' redistribute static' print >>f, ' network %s/24 area 0.0.0.2' % my_addr for i in self.export_networks: print >>f, ' network %s area 0.0.0.2' % i except EnvironmentError, e: raise service_error(service_error.internal, 'Cannot create connect %s: %s' % (e.filename, e.strerror)) script_log = open('/tmp/connect.log', 'w') subprocess.Popen(['sudo', '/bin/sh', cscript], stdout=script_log, stderr=script_log) return True def StartSegment(self, req, fid): """ Start a segment. In this simple skeleton, this means to parse the request and assign an unassigned integer to it. We store the integer in the persistent state. """ try: req = req['StartSegmentRequestBody'] # Get the request topology. If not present, a KeyError is thrown. topref = req['segmentdescription']['topdldescription'] # The fedid of the allocation we're attaching resources to auth_attr = req['allocID']['fedid'] except KeyError: raise service_error(service_error.req, "Badly formed request") # String version of the allocation ID for keying aid = "%s" % auth_attr # Authorization check access_ok, proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) if not access_ok: raise service_error(service_error.access, "Access denied", proof=proof) else: # See if this is a replay of an earlier succeeded StartSegment - # sometimes SSL kills 'em. If so, replay the response rather than # redoing the allocation. self.state_lock.acquire() # Test and set :-) running = self.state['running'] self.state['running'] = True retval = self.state[aid].get('started', None) self.state_lock.release() if retval: self.log.warning( "[StartSegment] Duplicate StartSegment for %s: " \ % aid + \ "replaying response") return retval if running: self.log.debug('[StartSegment] already running') raise service_error(service_error.federant, 'Desktop is already in an experiment') certfile = "%s/%s.pem" % (self.certdir, aid) # Convert the topology into topdl data structures. Again, the # skeletion doesn't do anything with it, but this is how one parses a # topology request. if topref: topo = topdl.Topology(**topref) else: raise service_error(service_error.req, "Request missing segmentdescription'") err = None try: self.validate_topology(topo) # The attributes of the request. The ones we care about are the ssh # keys to operate the tunnel. attrs = req.get('fedAttr', []) for a in attrs: # Save the hosts and ssh_privkeys to our local dir if a['attribute'] in ('hosts', 'ssh_secretkey'): self.log.debug('Getting %s from %s' % \ (a['attribute'], a['value'])) get_url(a['value'], certfile, self.localdir, log=self.log) base = os.path.basename(a['value']) if a['attribute'] == 'ssh_secretkey': self.ssh_identity = os.path.join(self.localdir, base) os.chmod(os.path.join(self.localdir, base), 0600) else: self.log.debug('Ignoring attribute %s' % a['attribute']) # Gather connection information and exchange parameters. connInfo = req.get('connection', []) self.validate_connInfo(connInfo) self.export_store_info(certfile, connInfo) self.import_store_info(certfile, connInfo) #build it self.configure_desktop(topo, connInfo) except service_error, e: err = e # Save the information if err is None: # It's possible that the StartSegment call gets retried (!). if # the 'started' key is in the allocation, we'll return it rather # than redo the setup. The integer allocation was saved when we # made it. self.state_lock.acquire() self.state[aid]['started'] = { 'allocID': req['allocID'], 'allocationLog': "Allocatation complete", 'segmentdescription': { 'topdldescription': topo.to_dict() }, 'proof': proof.to_dict(), } retval = copy.deepcopy(self.state[aid]['started']) self.write_state() self.state_lock.release() else: # Something bad happened - clear the "running" flag so we can try # again self.state_lock.acquire() self.state['running'] = False self.state_lock.release() raise err return retval def TerminateSegment(self, req, fid): """ Remove the resources associated with th eallocation and stop the music. In this example, this simply means removing the integer we allocated. """ # Gather the same access information as for Start Segment try: req = req['TerminateSegmentRequestBody'] except KeyError: raise service_error(service_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr self.log.debug("Terminate request for %s" %aid) # Check authorization access_ok, proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) if not access_ok: raise service_error(service_error.access, "Access denied", proof=proof) cscript = os.path.join(self.localdir, 'connect') dscript = os.path.join(self.localdir, 'disconnect') # Do the work of disconnecting if os.path.exists(dscript): self.log.debug('calling %s' % dscript) rv = subprocess.call(['sudo', '/bin/sh', dscript]) if rv != 0: self.log.warning('%s had an error: %d' % (dscript, rv)) else: self.log.warn('No disconnection script!?') try: for bfn in os.listdir(self.localdir): fn = os.path.join(self.localdir, bfn) self.log.debug('Removing %s' % fn) if os.path.exists(fn): os.remove(fn) except EnvironmentError, e: self.log.warn('Failed to remove %s: %s' % (e.filename, e.strerror)) self.ssh_identity = None self.state_lock.acquire() self.state['running'] = False self.state_lock.release() return { 'allocID': req['allocID'], 'proof': proof.to_dict() }