#!/usr/local/bin/python import os,sys import re import string import copy import pickle import logging import time from threading import Thread, Lock from subprocess import Popen, call, PIPE, STDOUT from util import * from allocate_project import allocate_project_local, allocate_project_remote from access_project import access_project from fedid import fedid, generate_fedid from authorizer import authorizer from service_error import service_error from remote_service import xmlrpc_handler, soap_handler, service_caller import httplib import tempfile from urlparse import urlparse import topdl import list_log import proxy_emulab_segment import local_emulab_segment # Make log messages disappear if noone configures a fedd logger class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.access") fl.addHandler(nullHandler()) class access: class parse_error(RuntimeError): pass proxy_RequestAccess= service_caller('RequestAccess') proxy_ReleaseAccess= service_caller('ReleaseAccess') @staticmethod def parse_vlans(v, log=None): """ Parse a vlan parameter into a set of vlan ids. Comma separated sequences of vlan ranges are acceptable. """ # the vlans can be a single integer, a comma separated list or a # comma separated lists of dashed ranges. E.g 100 or 100,300 or # 100,300-305,400 vset = set() if v: for v in [ x.strip() for x in v.split(",") ]: try: if v.count("-") == 1: f, t = v.split("-", 1) for i in range(int(f), int(t)+1): vset.add(i) else: vset.add(int(v)) except ValueError: if log: log.warn("Invalid expression in vlan list: %s" % v) return vset def __init__(self, config=None, auth=None): """ Initializer. Pulls parameters out of the ConfigParser's access section. """ # Make sure that the configuration is in place if not config: raise RunTimeError("No config to dragon_access.access") self.allow_proxy = False self.project_priority = config.getboolean("access", "project_priority") self.certdir = config.get("access","certdir") self.create_debug = config.getboolean("access", "create_debug") self.domain = config.get("access", "domain") vlan_str = config.get("access", "vlans") self.vlans = self.parse_vlans(vlan_str) self.attrs = { } self.access = { } # State is a dict of dicts indexed by segment fedid that includes the # owners of the segment as fedids (who can manipulate it, key: owners), # the repo dir/user for the allocation (key: user), Current allocation # log (key: log), and GRI of the reservation once made (key: gri) self.state = { } self.log = logging.getLogger("fedd.access") set_log_level(config, "access", self.log) self.state_lock = Lock() if auth: self.auth = auth else: self.log.error(\ "[access]: No authorizer initialized, creating local one.") auth = authorizer() tb = config.get('access', 'testbed') if tb: self.testbed = [ t.strip() for t in tb.split(',') ] else: self.testbed = [ ] if config.has_option("access", "accessdb"): self.read_access(config.get("access", "accessdb")) self.state_filename = config.get("access", "access_state") self.read_state() # Keep cert_file and cert_pwd coming from the same place self.cert_file = config.get("access", "cert_file") if self.cert_file: self.sert_pwd = config.get("access", "cert_pw") else: self.cert_file = config.get("globals", "cert_file") self.sert_pwd = config.get("globals", "cert_pw") self.trusted_certs = config.get("access", "trusted_certs") or \ config.get("globals", "trusted_certs") self.call_GetValue= service_caller('GetValue') self.call_SetValue= service_caller('SetValue') self.soap_services = {\ 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 'StartSegment': soap_handler("StartSegment", self.StartSegment), 'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment), } self.xmlrpc_services = {\ 'RequestAccess': xmlrpc_handler('RequestAccess', self.RequestAccess), 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', self.ReleaseAccess), 'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment), 'TerminateSegment': xmlrpc_handler('TerminateSegment', self.TerminateSegment), } def read_access(self, config): """ Read a configuration file and set internal parameters. There are access lines of the form (tb, proj, user) -> user that map the first tuple of names to the user for for access purposes. Names in the key (left side) can include " or " to act as wildcards or to require the fields to be empty. Similarly aproj or auser can be or indicating that either the matching key is to be used or a dynamic user or project will be created. These names can also be federated IDs (fedid's) if prefixed with fedid:. The user is the repo directory that contains the DRAGON user credentials. Testbed attributes outside the forms above can be given using the format attribute: name value: value. The name is a single word and the value continues to the end of the line. Empty lines and lines startin with a # are ignored. Parsing errors result in a self.parse_error exception being raised. """ lineno=0 name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+" fedid_expr = "fedid:[" + string.hexdigits + "]+" key_name = "(||"+fedid_expr + "|"+ name_expr + ")" attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)', re.IGNORECASE) access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+ key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*\)', re.IGNORECASE) def parse_name(n): if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):]) else: return n def auth_name(n): if isinstance(n, basestring): if n =='' or n =='': return None else: return unicode(n) else: return n f = open(config, "r"); for line in f: lineno += 1 line = line.strip(); if len(line) == 0 or line.startswith('#'): continue # Extended (attribute: x value: y) attribute line m = attr_re.match(line) if m != None: attr, val = m.group(1,2) self.attrs[attr] = val continue # Access line (t, p, u) -> (a) line m = access_re.match(line) if m != None: access_key = tuple([ parse_name(x) for x in m.group(1,2,3)]) auth_key = tuple([ auth_name(x) for x in access_key]) user_name = auth_name(parse_name(m.group(4))) self.access[access_key] = user_name self.auth.set_attribute(auth_key, "access") continue # Nothing matched to here: unknown line - raise exception f.close() raise self.parse_error("Unknown statement at line %d of %s" % \ (lineno, config)) f.close() def write_state(self): if self.state_filename: try: f = open(self.state_filename, 'w') pickle.dump(self.state, f) except EnvironmentError, e: self.log.error("Can't write file %s: %s" % \ (self.state_filename, e)) except pickle.PicklingError, e: self.log.error("Pickling problem: %s" % e) except TypeError, e: self.log.error("Pickling problem (TypeError): %s" % e) def read_state(self): """ Read a new copy of access state. Old state is overwritten. State format is a simple pickling of the state dictionary. """ if self.state_filename: try: f = open(self.state_filename, "r") self.state = pickle.load(f) self.log.debug("[read_state]: Read state from %s" % \ self.state_filename) except EnvironmentError, e: self.log.warning(("[read_state]: No saved state: " +\ "Can't open %s: %s") % (self.state_filename, e)) except EOFError, e: self.log.warning(("[read_state]: " +\ "Empty or damaged state file: %s:") % \ self.state_filename) except pickle.UnpicklingError, e: self.log.warning(("[read_state]: No saved state: " + \ "Unpickling failed: %s") % e) # Add the ownership attributes to the authorizer. Note that the # indices of the allocation dict are strings, but the attributes are # fedids, so there is a conversion. for k in self.state.keys(): for o in self.state[k].get('owners', []): self.auth.set_attribute(o, fedid(hexstr=k)) self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k)) # If the allocation has a vlan assigned, remove it from the # available vlans v = self.state[k].get('vlan', None) if v: self.vlans.discard(v) def permute_wildcards(self, a, p): """Return a copy of a with various fields wildcarded. The bits of p control the wildcards. A set bit is a wildcard replacement with the lowest bit being user then project then testbed. """ if p & 1: user = [""] else: user = a[2] if p & 2: proj = "" else: proj = a[1] if p & 4: tb = "" else: tb = a[0] return (tb, proj, user) def find_access(self, search): """ Search the access DB for a match on this tuple. Return the matching user (repo dir). NB, if the initial tuple fails to match we start inserting wildcards in an order determined by self.project_priority. Try the list of users in order (when wildcarded, there's only one user in the list). """ if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7) else: perm = (0, 2, 1, 3, 4, 6, 5, 7) for p in perm: s = self.permute_wildcards(search, p) # s[2] is None on an anonymous, unwildcarded request if s[2] != None: for u in s[2]: if self.access.has_key((s[0], s[1], u)): return self.access[(s[0], s[1], u)] else: if self.access.has_key(s): return self.access[s] return None def lookup_access(self, req, fid): """ Determine the allowed access for this request. Return the access and which fields are dynamic. The fedid is needed to construct the request """ # Search keys tb = None project = None user = None # Return values rp = access_project(None, ()) ru = None user_re = re.compile("user:\s(.*)") project_re = re.compile("project:\s(.*)") user = [ user_re.findall(x)[0] for x in req.get('credential', []) \ if user_re.match(x)] project = [ project_re.findall(x)[0] \ for x in req.get('credential', []) \ if project_re.match(x)] if len(project) == 1: project = project[0] elif len(project) == 0: project = None else: raise service_error(service_error.req, "More than one project credential") user_fedids = [ u for u in user if isinstance(u, fedid)] # Determine how the caller is representing itself. If its fedid shows # up as a project or a singleton user, let that stand. If neither the # usernames nor the project name is a fedid, the caller is a testbed. if project and isinstance(project, fedid): if project == fid: # The caller is the project (which is already in the tuple # passed in to the authorizer) owners = user_fedids owners.append(project) else: raise service_error(service_error.req, "Project asserting different fedid") else: if fid not in user_fedids: tb = fid owners = user_fedids owners.append(fid) else: if len(fedids) > 1: raise service_error(service_error.req, "User asserting different fedid") else: # Which is a singleton owners = user_fedids # Confirm authorization for u in user: self.log.debug("[lookup_access] Checking access for %s" % \ ((tb, project, u),)) if self.auth.check_attribute((tb, project, u), 'access'): self.log.debug("[lookup_access] Access granted") break else: self.log.debug("[lookup_access] Access Denied") else: raise service_error(service_error.access, "Access denied") # This maps a valid user to the Emulab projects and users to use found = self.find_access((tb, project, user)) if found == None: raise service_error(service_error.access, "Access denied - cannot map access") return found, owners def RequestAccess(self, req, fid): """ Handle the access request. Proxy if not for us. Parse out the fields and make the allocations or rejections if for us, otherwise, assuming we're willing to proxy, proxy the request out. """ # The dance to get into the request body if req.has_key('RequestAccessRequestBody'): req = req['RequestAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") if req.has_key('destinationTestbed'): dt = unpack_id(req['destinationTestbed']) if dt == None or dt in self.testbed: # Request for this fedd found, owners = self.lookup_access(req, fid) # keep track of what's been added allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) aid = unicode(allocID) self.state_lock.acquire() self.state[aid] = { } self.state[aid]['user'] = found self.state[aid]['owners'] = owners self.state[aid]['vlan'] = None self.write_state() self.state_lock.release() for o in owners: self.auth.set_attribute(o, allocID) self.auth.set_attribute(allocID, allocID) try: f = open("%s/%s.pem" % (self.certdir, aid), "w") print >>f, alloc_cert f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Can't open %s/%s : %s" % (self.certdir, aid, e)) return { 'allocID': { 'fedid': allocID } } else: if self.allow_proxy: resp = self.proxy_RequestAccess.call_service(dt, req, self.cert_file, self.cert_pwd, self.trusted_certs) if resp.has_key('RequestAccessResponseBody'): return resp['RequestAccessResponseBody'] else: return None else: raise service_error(service_error.access, "Access proxying denied") def ReleaseAccess(self, req, fid): # The dance to get into the request body if req.has_key('ReleaseAccessRequestBody'): req = req['ReleaseAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") if req.has_key('destinationTestbed'): dt = unpack_id(req['destinationTestbed']) else: dt = None if dt == None or dt in self.testbed: # Local request try: if req['allocID'].has_key('localname'): auth_attr = aid = req['allocID']['localname'] elif req['allocID'].has_key('fedid'): aid = unicode(req['allocID']['fedid']) auth_attr = req['allocID']['fedid'] else: raise service_error(service_error.req, "Only localnames and fedids are understood") except KeyError: raise service_error(service_error.req, "Badly formed request") self.log.debug("[access] deallocation requested for %s", aid) if not self.auth.check_attribute(fid, auth_attr): self.log.debug("[access] deallocation denied for %s", aid) raise service_error(service_error.access, "Access Denied") self.state_lock.acquire() if self.state.has_key(aid): self.log.debug("Found allocation for %s" %aid) del self.state[aid] self.write_state() self.state_lock.release() # And remove the access cert cf = "%s/%s.pem" % (self.certdir, aid) self.log.debug("Removing %s" % cf) os.remove(cf) return { 'allocID': req['allocID'] } else: self.state_lock.release() raise service_error(service_error.req, "No such allocation") else: if self.allow_proxy: resp = self.proxy_ReleaseAccess.call_service(dt, req, self.cert_file, self.cert_pwd, self.trusted_certs) if resp.has_key('ReleaseAccessResponseBody'): return resp['ReleaseAccessResponseBody'] else: return None else: raise service_error(service_error.access, "Access proxying denied") def extract_parameters(self, top): """ DRAGON currently supports a fixed capacity link between two endpoints. Those endpoints may specify a VPN (or list or range) to use. This extracts the DRAGON endpoints and vpn preferences from the segments (as attributes) and the capacity of the connection as given by the substrate. The two endpoints VLAN choices are intersected to get set of VLANs that are acceptable (no VLAN requiremnets means any is acceptable). """ segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements) if len(segments) != 2 or len(top.substrates) != 1: raise service_error(service_error.req, "Requests to DRAGON must have exactlty two segments " +\ "and one substrate") vlans = set() for s in segments: v = s.get_attribute('vlans') vlans &= self.parse_vlans(v) if len(vlans) == 0: vlans = None sub = top.substrates[0] if sub.capacity: cap = int(sub.capacity.rate / 1000.0) if cap < 1: cap = 1 else: cap = 100 return cap, vlans def export_store_info(self, cf, vlan, connInfo): """ For the export requests in the connection info, install the peer names at the experiment controller via SetValue calls. """ for c in connInfo: for p in [ p for p in c.get('parameter', []) \ if p.get('type', '') == 'output']: if p.get('name', '') != 'vlan_id': self.log.error("Unknown export parameter: %s" % \ p.get('name')) continue k = p.get('key', None) surl = p.get('store', None) if surl and k: value = "%s" % vlan req = { 'name': k, 'value': value } print "calling SetValue %s %s" % (surl, req) self.call_SetValue(surl, req, cf) else: self.log.error("Bad export request: %s" % p) def import_store_info(self, cf, connInfo): """ Pull any import parameters in connInfo in. We translate them either into known member names or fedAddrs. """ for c in connInfo: for p in [ p for p in c.get('parameter', []) \ if p.get('type', '') == 'input']: name = p.get('name', None) key = p.get('key', None) store = p.get('store', None) if name and key and store : req = { 'name': key, 'wait': True } r = self.call_GetValue(store, req, cf) r = r.get('GetValueResponseBody', None) if r : if r.get('name', '') == key: v = r.get('value', None) if v is not None: if name == 'peer': c['peer'] = v else: if c.has_key('fedAttr'): c['fedAttr'].append({ 'attribute': name, 'value': value}) else: c['fedAttr']= [{ 'attribute': name, 'value': value}] else: raise service_error(service_error.internal, 'None value exported for %s' % key) else: raise service_error(service_error.internal, 'Different name returned for %s: %s' \ % (key, r.get('name',''))) else: raise service_error(service_error.internal, 'Badly formatted response: no GetValueResponseBody') else: raise service_error(service_error.internal, 'Bad Services missing info for import %s' % c) def StartSegment(self, req, fid): err = None # Any service_error generated after tmpdir is created rv = None # Return value from segment creation try: req = req['StartSegmentRequestBody'] except KeyError: raise service_error(server_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) if not self.auth.check_attribute(fid, auth_attr): raise service_error(service_error.access, "Access denied") else: # See if this is a replay of an earlier succeeded StartSegment - # sometimes SSL kills 'em. If so, replay the response rather than # redoing the allocation. self.state_lock.acquire() retval = self.state[aid].get('started', None) self.state_lock.release() if retval: self.log.warning("Duplicate StartSegment for %s: " % aid + \ "replaying response") return retval certfile = "%s/%s.pem" % (self.certdir, aid) if req.has_key('segmentdescription') and \ req['segmentdescription'].has_key('topdldescription'): topo = \ topdl.Topology(**req['segmentdescription']['topdldescription']) else: raise service_error(service_error.req, "Request missing segmentdescription'") connInfo = req.get('connection', []) cap, vlans = self.extract_parameters(topo) # No vlans passes in, consider any vlan acceptable if not vlans: vlans = self.vlans avail = self.vlans | vlans if len(avail) != 0: vlan_no = avail.pop() self.vlans.discard(vlan_no) else: raise service_error(service_error.federant, "No vlan available") self.export_store_info(certfile, vlan_no, connInfo) rtopo = topo.clone() for s in rtopo.substrates: s.set_attribute('vlan', vlan_no) # Grab the log (this is some anal locking, but better safe than # sorry) self.state_lock.acquire() self.state[aid]['vlan'] = vlan_no logv = "Allocated vlan: %d" % vlan_no # It's possible that the StartSegment call gets retried (!). # if the 'started' key is in the allocation, we'll return it rather # than redo the setup. self.state[aid]['started'] = { 'allocID': req['allocID'], 'allocationLog': logv, } retval = copy.deepcopy(self.state[aid]['started']) retval['allocID'] = req['allocID'] # XXX: this doesn't pickle!! retval['segmentdescription'] = { 'topdldescription': topo.to_dict() } self.write_state() self.state_lock.release() return retval def TerminateSegment(self, req, fid): try: req = req['TerminateSegmentRequestBody'] except KeyError: raise service_error(server_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr self.log.debug("Terminate request for %s" %aid) if not self.auth.check_attribute(fid, auth_attr): raise service_error(service_error.access, "Access denied") self.state_lock.acquire() if self.state.has_key(aid): vlan_no = self.state[aid].get('vlan', None) else: vlan_no = None self.state_lock.release() self.log.debug("Stop segment for vlan: %s" % vlan_no) if not vlan_no: raise service_error(service_error.internal, "Can't find assigfned vlan for for %s" % aid) self.vlans.add(vlan_no) self.state_lock.acquire() self.state[aid]['vlan'] = None self.state_lock.release() return { 'allocID': req['allocID'] }