#!/usr/local/bin/python import os,sys import re import string import copy import pickle import logging import time from threading import Thread, Lock from subprocess import Popen, call, PIPE, STDOUT from access import access_base from legacy_access import legacy_access from util import * from allocate_project import allocate_project_local, allocate_project_remote from fedid import fedid, generate_fedid from authorizer import authorizer, abac_authorizer from service_error import service_error from remote_service import xmlrpc_handler, soap_handler, service_caller import httplib import tempfile from urlparse import urlparse import topdl import list_log # Make log messages disappear if noone configures a fedd logger class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.access") fl.addHandler(nullHandler()) class access(access_base, legacy_access): """ The implementation of access control based on mapping users to projects. Users can be mapped to existing projects or have projects created dynamically. This implements both direct requests and proxies. """ def __init__(self, config=None, auth=None): """ Initializer. Pulls parameters out of the ConfigParser's access section. """ access_base.__init__(self, config, auth) self.cli_dir = config.get("access", "cli_dir") self.axis2_home = config.get("access", "axis2_home") self.idc_url = config.get("access", "idc") self.domain = config.get("access", "domain") self.duration = config.getint("access", "duration", 120) self.access = { } if not (self.cli_dir and self.axis2_home and self.idc_url): self.log.error("Must specify all of cli_dir, axis2_home, " +\ "idc in the [access] section of the configuration") # authorization information self.auth_type = config.get('access', 'auth_type') \ or 'legacy' self.auth_dir = config.get('access', 'auth_dir') accessdb = config.get("access", "accessdb") # initialize the authorization system if self.auth_type == 'legacy': self.access = { } if accessdb: self.legacy_read_access(accessdb, self.make_repo) # Add the ownership attributes to the authorizer. Note that the # indices of the allocation dict are strings, but the attributes are # fedids, so there is a conversion. self.state_lock.acquire() for k in self.state.keys(): for o in self.state[k].get('owners', []): self.auth.set_attribute(o, fedid(hexstr=k)) self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k)) self.state_lock.release() self.lookup_access = self.legacy_lookup_access_base elif self.auth_type == 'abac': self.auth = abac_authorizer(load=self.auth_dir) self.access = [ ] if accessdb: self.read_access(accessdb, self.make_repo) else: raise service_error(service_error.internal, "Unknown auth_type: %s" % self.auth_type) self.call_GetValue= service_caller('GetValue') self.call_SetValue= service_caller('SetValue') self.soap_services = {\ 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 'StartSegment': soap_handler("StartSegment", self.StartSegment), 'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment), } self.xmlrpc_services = {\ 'RequestAccess': xmlrpc_handler('RequestAccess', self.RequestAccess), 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', self.ReleaseAccess), 'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment), 'TerminateSegment': xmlrpc_handler('TerminateSegment', self.TerminateSegment), } @staticmethod def make_repo(s): """ Get the repo directory from an access line. This is removing the () from the string. """ rv = s.strip() if rv.startswith('(') and rv.endswith(')'): return rv[1:-1] else: raise self.parse_error("Repo should be in parens"); # RequestAccess and ReleaseAccess come from the base class def extract_parameters(self, top): """ DRAGON currently supports a fixed capacity link between two endpoints. Those endpoints may specify a VPN (or list or range) to use. This extracts the DRAGON endpoints and vpn preferences from the segments (as attributes) and the capacity of the connection as given by the substrate. The two endpoints VLAN choices are intersected to get set of VLANs that are acceptable (no VLAN requiremnets means any is acceptable). """ segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements) if len(segments) != 2 or len(top.substrates) != 1: raise service_error(service_error.req, "Requests to DRAGON must have exactlty two segments " +\ "and one substrate") ends = [ ] for s in segments: ep = s.get_attribute('dragon_endpoint') if not ep: raise service_error(service_error.req, "Missing DRAGON endpoint for %s" % s.id) v = s.get_attribute('vlans') vlans = None vset = set() # the vlans can be a single integer, a comma separated list or a # comma separated lists of dashed ranges. E.g 100 or 100,300 or # 100,300-305,400 if v: if v.count(",") > 0 : vl = [ x.strip() for x in v.split(",") ] else: vl = [ v ] for v in vl: try: if v.count("-")> 0: f, t = v.split("-", 1) for i in range(int(f), int(t)+1): vset.add(i) else: vset.add(int(v)) except ValueError: raise service_error(service_error.req, "VLAN tags must be integers (%s)" %s.name) if len(vset) > 0: if vlans: vlans &= vest else: vlans = vset ends.append(ep) sub = top.substrates[0] if sub.capacity: cap = int(sub.capacity.rate / 1000.0) if cap < 1: cap = 1 else: cap = 100 # DRAGON's command line tool barfs if the source (ends[0]) is not in # the domain controlled by the IDC. This code ensures this situation. if self.domain and not ends[0].endswith(self.domain): hold = ends[0] for i, e in enumerate(ends): if i == 0: continue if e.endswith(self.domain): ends[0] = e ends[i] = hold break else: raise service_error(service_error.req, "No endpoint in my domain") return cap, ends[0], ends[1], vlans def oscars_create_vpn(self, repo, fr, to, cap, v, start, end, log): gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE) status_re = re.compile("Status:\s*(.*)", re.IGNORECASE) cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 'createReservation', '-repo', repo , '-url', self.idc_url, '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap, '-vlan', "%s" % v, '-desc', 'fedd created connection', '-pathsetup', 'timer-automatic', '-start', "%d" % int(start), '-end', "%d" % int(end)] log.debug("[start_segment]: %s" % " ".join(cmd)) if not self.create_debug: p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT, close_fds=True) for line in p.stdout: m = status_re.match(line) if m: status = m.group(1) continue m = gri_re.match(line) if m: gri = m.group(1) continue rv = p.wait() else: rv = 0 status = 'ACCEPTED' gri = 'debug_gri' return (rv, status, gri) def oscars_query_vpn(self, repo, gri, v, log): """ Call the oscars query command from the command line and parse out the data to see if the current request succeeded. This is a lot of fiddly code to do a pretty simple thing. """ status_re = re.compile("Status:\s*(.*)", re.IGNORECASE) source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE) dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE) path_re = re.compile("Path:") cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 'query', '-repo', repo , '-url', self.idc_url, '-gri', gri] log.debug("[start_segment]: %s" % " ".join(cmd)) if not self.create_debug: # Really do the query p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT, close_fds=True) in_path = False vpn1 = None vpn2 = None src = None dest = None for line in p.stdout: if not in_path: m = status_re.match(line) if m: status = m.group(1) continue m = source_re.match(line) if m: src = m.group(1) continue m = dest_re.match(line) if m: dest = m.group(1) continue m = path_re.match(line) if m: in_path = True if src and dest: vpn1_re = re.compile( "\s*%s,\s*\w+\s*,\s*(\d+)" % \ src.replace("*", "\*")) vpn2_re = re.compile( "\s*%s,\s*\w+\s*,\s*(\d+)" % \ dest.replace("*", "\*")) else: raise service_error(service_error.internal, "Strange output from query") else: m = vpn1_re.match(line) if m: vpn1 = m.group(1) continue m = vpn2_re.match(line) if m: vpn2 = m.group(1) continue rv = p.wait() # Make sure that OSCARS did what we expected. if vpn1 == vpn2: if v is not None: if int(vpn1) == v: vlan_no = int(v) else: raise service_error(service_error.federant, "Unexpected vlan assignment") else: vlan_no = int(v or 0) else: raise service_error(service_error.internal, "Different VPNs on DRAGON ends") log.debug("Status: %s" % status or "none") else: rv = 0 status = 'ACTIVE' vlan_no = int(v or 1) return (rv, status, vlan_no) def start_segment(self, repo, fr, to, cap, vpns=None, start=None, end=None, log=None): """ Do the actual work of creating the dragon connecton. """ waiting_states = ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING') if not log: log = self.log if not vpns: vpns = [ None, None, None, None, None] if not start: start = time.time() if not end: end = start + self.duration *60 status = None gri = None rv = None vlan_no = None for v in vpns: rv, status, gri = self.oscars_create_vpn(repo, fr, to, cap, v, start, end, log) # Reservation in progress. Poll the IDC until we know the outcome while status in waiting_states: rv, status, vlan_no = self.oscars_query_vpn(repo, gri, v, log) if status in waiting_states: time.sleep(45) if status in ('ACTIVE', 'FINISHED', 'CANCELLED'): break if (rv == 0 and gri and vlan_no and status == 'ACTIVE'): self.log.debug("made reservation %s %s" % (gri, vlan_no)) return gri, vlan_no else: raise service_error(service_error.federant, "Cannot make reservation") def stop_segment(self, repo, gri, log=None): """ Terminate the reservation. """ if not log: log = self.log cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 'cancel', '-repo', repo , '-url', self.idc_url, '-gri', gri] self.log.debug("[stop_segment]: %s" % " ".join(cmd)) if not self.create_debug: try: f = open("/dev/null", "w") call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True) f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Failed to open /dev/null: %s" % e) def export_store_info(self, cf, vlan, connInfo): """ For the export requests in the connection info, install the peer names at the experiment controller via SetValue calls. """ for c in connInfo: for p in [ p for p in c.get('parameter', []) \ if p.get('type', '') == 'output']: if p.get('name', '') != 'vlan_id': self.log.error("Unknown export parameter: %s" % \ p.get('name')) continue k = p.get('key', None) surl = p.get('store', None) if surl and k: value = "%s" % vlan req = { 'name': k, 'value': value } print "calling SetValue %s %s" % (surl, req) self.call_SetValue(surl, req, cf) else: self.log.error("Bad export request: %s" % p) def initialize_experiment_info(self, aid, ename): repo = None self.state_lock.acquire() if aid in self.state: repo = self.state[aid].get('user', None) self.state[aid]['log'] = [ ] # Create a logger that logs to the experiment's state object as # well as to the main log file. alloc_log = logging.getLogger('fedd.access.%s' % ename) h = logging.StreamHandler( list_log.list_log(self.state[aid]['log'])) # XXX: there should be a global one of these rather than # repeating the code. h.setFormatter(logging.Formatter( "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) alloc_log.addHandler(h) self.write_state() self.state_lock.release() return (repo, alloc_log) def finalize_experiment(self, topo, vlan_no, gri, aid, alloc_id, proof): """ Place the relevant information in the global state block, and prepare the response. """ rtopo = topo.clone() for s in rtopo.substrates: s.set_attribute('vlan', vlan_no) s.set_attribute('gri', gri) # Grab the log (this is some anal locking, but better safe than # sorry) self.state_lock.acquire() self.state[aid]['gri'] = gri logv = "".join(self.state[aid]['log']) # It's possible that the StartSegment call gets retried (!). # if the 'started' key is in the allocation, we'll return it rather # than redo the setup. self.state[aid]['started'] = { 'allocID': alloc_id, 'allocationLog': logv, 'segmentdescription': { 'topdldescription': rtopo.to_dict() }, 'proof': proof.to_dict(), } retval = copy.deepcopy(self.state[aid]['started']) self.write_state() self.state_lock.release() return retval def StartSegment(self, req, fid): err = None # Any service_error generated after tmpdir is created rv = None # Return value from segment creation try: req = req['StartSegmentRequestBody'] topref = req['segmentdescription']['topdldescription'] except KeyError: raise service_error(server_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) access_ok, proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) if not access_ok: raise service_error(service_error.access, "Access denied") else: # See if this is a replay of an earlier succeeded StartSegment - # sometimes SSL kills 'em. If so, replay the response rather than # redoing the allocation. self.state_lock.acquire() retval = self.state[aid].get('started', None) self.state_lock.release() if retval: self.log.warning("Duplicate StartSegment for %s: " % aid + \ "replaying response") return retval certfile = "%s/%s.pem" % (self.certdir, aid) if topref: topo = topdl.Topology(**topref) else: raise service_error(service_error.req, "Request missing segmentdescription'") connInfo = req.get('connection', []) cap, src, dest, vlans = self.extract_parameters(topo) for a in attrs: if a['attribute'] == 'experiment_name': ename = a['value'] break else: ename = aid repo, alloc_log = self.initialize_experiment_info(aid, ename) if not repo: raise service_error(service_error.internal, "Can't find creation user for %s" % aid) gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans, log=alloc_log) self.export_store_info(certfile, vlan_no, connInfo) if gri: return self.finalize_experiment(topo, vlan_no, gri, aid, req['allocID'], proof) elif err: raise service_error(service_error.federant, "Swapin failed: %s" % err) else: raise service_error(service_error.federant, "Swapin failed") def TerminateSegment(self, req, fid): try: req = req['TerminateSegmentRequestBody'] except KeyError: raise service_error(server_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr self.log.debug("Terminate request for %s" %aid) attrs = req.get('fedAttr', []) access_ok, proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) if not access_ok: raise service_error(service_error.access, "Access denied") self.state_lock.acquire() if self.state.has_key(aid): gri = self.state[aid].get('gri', None) user = self.state[aid].get('user', None) else: gri = None user = None self.state_lock.release() self.log.debug("Stop segment for user: %s gre %s" %(user, gri)) if not gri: raise service_error(service_error.internal, "Can't find gri for %s" % aid) if not user: raise service_error(service_error.internal, "Can't find creation user for %s" % aid) self.log.debug("Stop segment for GRI: %s" %gri) self.stop_segment(user, gri) return { 'allocID': req['allocID'], 'proof': proof.to_dict() }