#!/usr/local/bin/python import os,sys import re import string import copy import pickle import logging import time from threading import * from subprocess import Popen, call, PIPE, STDOUT from util import * from allocate_project import allocate_project_local, allocate_project_remote from access_project import access_project from fedid import fedid, generate_fedid from authorizer import authorizer from service_error import service_error from remote_service import xmlrpc_handler, soap_handler, service_caller import httplib import tempfile from urlparse import urlparse import topdl import list_log import proxy_emulab_segment import local_emulab_segment # Make log messages disappear if noone configures a fedd logger class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.access") fl.addHandler(nullHandler()) class access: """ The implementation of access control based on mapping users to projects. Users can be mapped to existing projects or have projects created dynamically. This implements both direct requests and proxies. """ class parse_error(RuntimeError): pass proxy_RequestAccess= service_caller('RequestAccess') proxy_ReleaseAccess= service_caller('ReleaseAccess') def __init__(self, config=None, auth=None): """ Initializer. Pulls parameters out of the ConfigParser's access section. """ # Make sure that the configuration is in place if not config: raise RunTimeError("No config to dragon_access.access") self.project_priority = config.getboolean("access", "project_priority") self.allow_proxy = False self.certdir = config.get("access","certdir") self.create_debug = config.getboolean("access", "create_debug") self.cli_dir = config.get("access", "cli_dir") self.axis2_home = config.get("access", "axis2_home") self.idc_url = config.get("access", "idc") self.domain = config.get("access", "domain") self.attrs = { } self.access = { } # State is a dict of dicts indexed by segment fedid that includes the # owners of the segment as fedids (who can manipulate it, key: owners), # the repo dir/user for the allocation (key: user), Current allocation # log (key: log), and GRI of the reservation once made (key: gri) self.state = { } self.log = logging.getLogger("fedd.access") set_log_level(config, "access", self.log) self.state_lock = Lock() if not (self.cli_dir and self.axis2_home and self.idc_url): self.log.error("Must specify all of cli_dir, axis2_home, " +\ "idc in the [access] section of the configuration") if auth: self.auth = auth else: self.log.error(\ "[access]: No authorizer initialized, creating local one.") auth = authorizer() tb = config.get('access', 'testbed') if tb: self.testbed = [ t.strip() for t in tb.split(',') ] else: self.testbed = [ ] if config.has_option("access", "accessdb"): self.read_access(config.get("access", "accessdb")) self.state_filename = config.get("access", "access_state") self.read_state() # Keep cert_file and cert_pwd coming from the same place self.cert_file = config.get("access", "cert_file") if self.cert_file: self.sert_pwd = config.get("access", "cert_pw") else: self.cert_file = config.get("globals", "cert_file") self.sert_pwd = config.get("globals", "cert_pw") self.trusted_certs = config.get("access", "trusted_certs") or \ config.get("globals", "trusted_certs") self.soap_services = {\ 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 'StartSegment': soap_handler("StartSegment", self.StartSegment), 'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment), } self.xmlrpc_services = {\ 'RequestAccess': xmlrpc_handler('RequestAccess', self.RequestAccess), 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', self.ReleaseAccess), 'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment), 'TerminateSegment': xmlrpc_handler('TerminateSegment', self.TerminateSegment), } def read_access(self, config): """ Read a configuration file and set internal parameters. There are access lines of the form (tb, proj, user) -> user that map the first tuple of names to the user for for access purposes. Names in the key (left side) can include " or " to act as wildcards or to require the fields to be empty. Similarly aproj or auser can be or indicating that either the matching key is to be used or a dynamic user or project will be created. These names can also be federated IDs (fedid's) if prefixed with fedid:. The user is the repo directory that contains the DRAGON user credentials. Testbed attributes outside the forms above can be given using the format attribute: name value: value. The name is a single word and the value continues to the end of the line. Empty lines and lines startin with a # are ignored. Parsing errors result in a self.parse_error exception being raised. """ lineno=0 name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+" fedid_expr = "fedid:[" + string.hexdigits + "]+" key_name = "(||"+fedid_expr + "|"+ name_expr + ")" attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)', re.IGNORECASE) access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+ key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*\)', re.IGNORECASE) def parse_name(n): if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):]) else: return n def auth_name(n): if isinstance(n, basestring): if n =='' or n =='': return None else: return unicode(n) else: return n f = open(config, "r"); for line in f: lineno += 1 line = line.strip(); if len(line) == 0 or line.startswith('#'): continue # Extended (attribute: x value: y) attribute line m = attr_re.match(line) if m != None: attr, val = m.group(1,2) self.attrs[attr] = val continue # Access line (t, p, u) -> (a) line # XXX: you are here m = access_re.match(line) if m != None: access_key = tuple([ parse_name(x) for x in m.group(1,2,3)]) auth_key = tuple([ auth_name(x) for x in access_key]) user_name = auth_name(parse_name(m.group(4))) self.access[access_key] = user_name self.auth.set_attribute(auth_key, "access") continue # Nothing matched to here: unknown line - raise exception f.close() raise self.parse_error("Unknown statement at line %d of %s" % \ (lineno, config)) f.close() print self.access def get_users(self, obj): """ Return a list of the IDs of the users in dict """ if obj.has_key('user'): return [ unpack_id(u['userID']) \ for u in obj['user'] if u.has_key('userID') ] else: return None def write_state(self): if self.state_filename: try: f = open(self.state_filename, 'w') pickle.dump(self.state, f) except IOError, e: self.log.error("Can't write file %s: %s" % \ (self.state_filename, e)) except pickle.PicklingError, e: self.log.error("Pickling problem: %s" % e) except TypeError, e: self.log.error("Pickling problem (TypeError): %s" % e) def read_state(self): """ Read a new copy of access state. Old state is overwritten. State format is a simple pickling of the state dictionary. """ if self.state_filename: try: f = open(self.state_filename, "r") self.state = pickle.load(f) self.log.debug("[read_state]: Read state from %s" % \ self.state_filename) except IOError, e: self.log.warning(("[read_state]: No saved state: " +\ "Can't open %s: %s") % (self.state_filename, e)) except EOFError, e: self.log.warning(("[read_state]: " +\ "Empty or damaged state file: %s:") % \ self.state_filename) except pickle.UnpicklingError, e: self.log.warning(("[read_state]: No saved state: " + \ "Unpickling failed: %s") % e) # Add the ownership attributes to the authorizer. Note that the # indices of the allocation dict are strings, but the attributes are # fedids, so there is a conversion. for k in self.state.keys(): for o in self.state[k].get('owners', []): self.auth.set_attribute(o, fedid(hexstr=k)) self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k)) def permute_wildcards(self, a, p): """Return a copy of a with various fields wildcarded. The bits of p control the wildcards. A set bit is a wildcard replacement with the lowest bit being user then project then testbed. """ if p & 1: user = [""] else: user = a[2] if p & 2: proj = "" else: proj = a[1] if p & 4: tb = "" else: tb = a[0] return (tb, proj, user) def find_access(self, search): """ Search the access DB for a match on this tuple. Return the matching user (repo dir). NB, if the initial tuple fails to match we start inserting wildcards in an order determined by self.project_priority. Try the list of users in order (when wildcarded, there's only one user in the list). """ if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7) else: perm = (0, 2, 1, 3, 4, 6, 5, 7) for p in perm: s = self.permute_wildcards(search, p) # s[2] is None on an anonymous, unwildcarded request if s[2] != None: for u in s[2]: if self.access.has_key((s[0], s[1], u)): return self.access[(s[0], s[1], u)] else: if self.access.has_key(s): return self.access[s] return None def lookup_access(self, req, fid): """ Determine the allowed access for this request. Return the access and which fields are dynamic. The fedid is needed to construct the request """ # Search keys tb = None project = None user = None # Return values rp = access_project(None, ()) ru = None if req.has_key('project'): p = req['project'] if p.has_key('name'): project = unpack_id(p['name']) user = self.get_users(p) else: user = self.get_users(req) user_fedids = [ u for u in user if isinstance(u, fedid)] # Determine how the caller is representing itself. If its fedid shows # up as a project or a singleton user, let that stand. If neither the # usernames nor the project name is a fedid, the caller is a testbed. if project and isinstance(project, fedid): if project == fid: # The caller is the project (which is already in the tuple # passed in to the authorizer) owners = user_fedids owners.append(project) else: raise service_error(service_error.req, "Project asserting different fedid") else: if fid not in user_fedids: tb = fid owners = user_fedids owners.append(fid) else: if len(fedids) > 1: raise service_error(service_error.req, "User asserting different fedid") else: # Which is a singleton owners = user_fedids # Confirm authorization for u in user: self.log.debug("[lookup_access] Checking access for %s" % \ ((tb, project, u),)) if self.auth.check_attribute((tb, project, u), 'access'): self.log.debug("[lookup_access] Access granted") break else: self.log.debug("[lookup_access] Access Denied") else: raise service_error(service_error.access, "Access denied") # This maps a valid user to the Emulab projects and users to use found = self.find_access((tb, project, user)) if found == None: raise service_error(service_error.access, "Access denied - cannot map access") return found, owners def RequestAccess(self, req, fid): """ Handle the access request. Proxy if not for us. Parse out the fields and make the allocations or rejections if for us, otherwise, assuming we're willing to proxy, proxy the request out. """ # The dance to get into the request body if req.has_key('RequestAccessRequestBody'): req = req['RequestAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") if req.has_key('destinationTestbed'): dt = unpack_id(req['destinationTestbed']) if dt == None or dt in self.testbed: # Request for this fedd found, owners = self.lookup_access(req, fid) # keep track of what's been added allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) aid = unicode(allocID) self.state_lock.acquire() self.state[aid] = { } self.state[aid]['user'] = found self.state[aid]['owners'] = owners self.write_state() self.state_lock.release() for o in owners: self.auth.set_attribute(o, allocID) self.auth.set_attribute(allocID, allocID) try: f = open("%s/%s.pem" % (self.certdir, aid), "w") print >>f, alloc_cert f.close() except IOError, e: raise service_error(service_error.internal, "Can't open %s/%s : %s" % (self.certdir, aid, e)) print { 'allocID': allocID } return { 'allocID': { 'fedid': allocID } } else: if self.allow_proxy: resp = self.proxy_RequestAccess.call_service(dt, req, self.cert_file, self.cert_pwd, self.trusted_certs) if resp.has_key('RequestAccessResponseBody'): return resp['RequestAccessResponseBody'] else: return None else: raise service_error(service_error.access, "Access proxying denied") def ReleaseAccess(self, req, fid): # The dance to get into the request body if req.has_key('ReleaseAccessRequestBody'): req = req['ReleaseAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") if req.has_key('destinationTestbed'): dt = unpack_id(req['destinationTestbed']) else: dt = None if dt == None or dt in self.testbed: # Local request try: if req['allocID'].has_key('localname'): auth_attr = aid = req['allocID']['localname'] elif req['allocID'].has_key('fedid'): aid = unicode(req['allocID']['fedid']) auth_attr = req['allocID']['fedid'] else: raise service_error(service_error.req, "Only localnames and fedids are understood") except KeyError: raise service_error(service_error.req, "Badly formed request") self.log.debug("[access] deallocation requested for %s", aid) if not self.auth.check_attribute(fid, auth_attr): self.log.debug("[access] deallocation denied for %s", aid) raise service_error(service_error.access, "Access Denied") self.state_lock.acquire() if self.state.has_key(aid): self.log.debug("Found allocation for %s" %aid) del self.state[aid] self.write_state() self.state_lock.release() # And remove the access cert cf = "%s/%s.pem" % (self.certdir, aid) self.log.debug("Removing %s" % cf) os.remove(cf) return { 'allocID': req['allocID'] } else: self.state_lock.release() raise service_error(service_error.req, "No such allocation") else: if self.allow_proxy: resp = self.proxy_ReleaseAccess.call_service(dt, req, self.cert_file, self.cert_pwd, self.trusted_certs) if resp.has_key('ReleaseAccessResponseBody'): return resp['ReleaseAccessResponseBody'] else: return None else: raise service_error(service_error.access, "Access proxying denied") def extract_parameters(self, top): """ DRAGON currently supports a fixed capacity link between two endpoints. Those endpoints may specify a VPN (or list or range) to use. This extracts the DRAGON endpoints and vpn preferences from the segments (as attributes) and the capacity of the connection as given by the substrate. The two endpoints VLAN choices are intersected to get set of VLANs that are acceptable (no VLAN requiremnets means any is acceptable). """ segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements) print segments if len(segments) != 2 or len(top.substrates) != 1: raise service_error(service_error.req, "Requests to DRAGON must have exactlty two segments " +\ "and one substrate") ends = [ ] for s in segments: ep = s.get_attribute('dragon_endpoint') if not ep: raise service_error(service_error.req, "Missing DRAGON endpoint for %s" % s.id) v = s.get_attribute('vlans') vlans = None vset = set() # the vlans can be a single integer, a comma separated list or a # comma separated lists of dashed ranges. E.g 100 or 100,300 or # 100,300-305,400 if v: if v.count(",") > 0 : vl = [ x.strip() for x in v.split(",") ] else: vl = [ v ] for v in vl: try: if v.count("-")> 0: f, t = v.split("-", 1) for i in range(int(f), int(t)+1): vset.add(i) else: vset.add(int(v)) except ValueError: raise service_error(service_error.req, "VLAN tags must be integers (%s)" %s.name) if len(vset) > 0: if vlans: vlans &= vest else: vlans = vset ends.append(ep) sub = top.substrates[0] if sub.capacity: cap = int(sub.capacity.rate / 1000.0) if cap < 1: cap = 1 else: cap = 100 # DRAGON's command line tool barfs if the source (ends[0]) is not in # the domain controlled by the IDC. This code ensures this situation. if self.domain and not ends[0].endswith(self.domain): hold = ends[0] for i, e in enumerate(ends): if i == 0: continue if e.endswith(self.domain): ends[0] = e ends[i] = hold break else: raise service_error(service_error.req, "No endpoint in my domain") return cap, ends[0], ends[1], vlans def start_segment(self, repo, fr, to, cap, vpns=[], start=None, end=None, log=None): """ Do the actual work of creating the dragon connecton. """ if not log: log = self.log gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE) status_re = re.compile("Status:\s*(.*)", re.IGNORECASE) source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE) dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE) path_re = re.compile("Path:") if not vpns: vpns = [ None, None, None, None, None] if not start: start = time.time() if not end: end = start + 120 *60 status = None gri = None rv = None vlan_no = None for v in vpns: cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 'createReservation', '-repo', repo , '-url', self.idc_url, '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap, '-vlan', "%s" % v, '-desc', 'fedd created connection', '-pathsetup', 'timer-automatic', '-start', "%d" % int(start), '-end', "%d" % int(end)] log.debug("[start_segment]: %s" % " ".join(cmd)) if not self.create_debug: p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT, close_fds=True) for line in p.stdout: m = status_re.match(line) if m: status = m.group(1) continue m = gri_re.match(line) if m: gri = m.group(1) rv = p.wait() else: rv = 0 status = 'ACCEPTED' gri = 'debug_gri' # Reservation in progress. Poll the IDC until we know the outcome cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 'query', '-repo', repo , '-url', self.idc_url, '-gri', gri] while status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'): log.debug("[start_segment]: %s" % " ".join(cmd)) if not self.create_debug: p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT, close_fds=True) in_path = False vpn1 = None vpn2 = None src = None dest = None for line in p.stdout: if not in_path: m = status_re.match(line) if m: status = m.group(1) continue m = source_re.match(line) if m: src = m.group(1) continue m = dest_re.match(line) if m: dest = m.group(1) continue m = path_re.match(line) if m: in_path = True if src and dest: vpn1_re = re.compile( "\s*%s,\s*\w+\s*,\s*(\d+)" % \ src.replace("*", "\*")) vpn2_re = re.compile( "\s*%s,\s*\w+\s*,\s*(\d+)" % \ dest.replace("*", "\*")) else: raise service_error(service_error.internal, "Strange output from query") else: m = vpn1_re.match(line) if m: vpn1 = m.group(1) continue m = vpn2_re.match(line) if m: vpn2 = m.group(1) continue rv = p.wait() if vpn1 == vpn2: if v is not None: if int(vpn1) == v: vlan_no = int(v) else: raise service_error(service_error.federant, "Unexpected vlan assignment") else: vlan_no = int(v) else: raise service_error(service_error.internal, "Different VPNs on DRAGON ends") log.debug("Status: %s" % status or "none") else: status = 'ACTIVE' vlan_no = int(v) or 1 if status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'): time.sleep(45) if status in ('ACTIVE', 'FINISHED', 'CANCELLED'): break if (rv == 0 and gri and vlan_no and status == 'ACTIVE'): return gri, vlan_no else: raise service_error(service_error.federant, "Cannot make reservation") def stop_segment(self, repo, gri, log=None): if not log: log = self.log cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 'cancel', '-repo', repo , '-url', self.idc_url, '-gri', gri] self.log.debug("[stop_segment]: %s" % " ".join(cmd)) if not self.create_debug: try: f = open("/dev/null", "w") call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True) f.close() except IOError, e: raise service_error(service_error.internal, "Failed to open /dev/null: %s" % e) def StartSegment(self, req, fid): err = None # Any service_error generated after tmpdir is created rv = None # Return value from segment creation try: req = req['StartSegmentRequestBody'] except KeyError: raise service_error(server_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) if not self.auth.check_attribute(fid, auth_attr): raise service_error(service_error.access, "Access denied") if req.has_key('segmentdescription') and \ req['segmentdescription'].has_key('topdldescription'): topo = \ topdl.Topology(**req['segmentdescription']['topdldescription']) else: raise service_error(service_error.req, "Request missing segmentdescription'") cap, src, dest, vlans = self.extract_parameters(topo) ename = aid for a in attrs: if a['attribute'] == 'experiment_name': ename = a['value'] repo = None self.state_lock.acquire() if self.state.has_key(aid): repo = self.state[aid]['user'] self.state[aid]['log'] = [ ] # Create a logger that logs to the experiment's state object as # well as to the main log file. alloc_log = logging.getLogger('fedd.access.%s' % ename) h = logging.StreamHandler( list_log.list_log(self.state[aid]['log'])) # XXX: there should be a global one of these rather than # repeating the code. h.setFormatter(logging.Formatter( "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) alloc_log.addHandler(h) self.write_state() self.state_lock.release() if not repo: raise service_error(service_error.internal, "Can't find creation user for %s" %aid) gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans, log=alloc_log) if gri: # Grab the log (this is some anal locking, but better safe than # sorry) self.state_lock.acquire() self.state[aid]['gri'] = gri logv = "".join(self.state[aid]['log']) self.write_state() self.state_lock.release() return { 'allocID': req['allocID'], 'allocationLog': logv, 'fedAttr': [ {'attribute': 'vlan', 'value': '%d' % vlan_no } ] } elif err: raise service_error(service_error.federant, "Swapin failed: %s" % err) else: raise service_error(service_error.federant, "Swapin failed") def TerminateSegment(self, req, fid): try: req = req['TerminateSegmentRequestBody'] except KeyError: raise service_error(server_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr self.log.debug("Terminate request for %s" %aid) attrs = req.get('fedAttr', []) if not self.auth.check_attribute(fid, auth_attr): raise service_error(service_error.access, "Access denied") self.state_lock.acquire() if self.state.has_key(aid): gri = self.state[aid].get('gri', None) user = self.state[aid].get('user', None) else: gri = None user = None self.state_lock.release() self.log.debug("Stop segment for user: %s gre %s" %(user, gri)) if not gri: raise service_error(service_error.internal, "Can't find gri for %s" % aid) if not user: raise service_error(service_error.internal, "Can't find creation user for %s" % aid) self.log.debug("Stop segment for GRI: %s" %gri) self.stop_segment(user, gri) return { 'allocID': req['allocID'] }