#!/usr/local/bin/python import os,sys import stat # for chmod constants import re import random import string import copy import pickle import logging import subprocess from threading import * from M2Crypto.SSL import SSLError from util import * from allocate_project import allocate_project_local, allocate_project_remote from access_project import access_project from fedid import fedid, generate_fedid from authorizer import authorizer from service_error import service_error from remote_service import xmlrpc_handler, soap_handler, service_caller import httplib import tempfile from urlparse import urlparse import topdl import list_log import proxy_emulab_segment import local_emulab_segment # Make log messages disappear if noone configures a fedd logger class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.access") fl.addHandler(nullHandler()) class access: """ The implementation of access control based on mapping users to projects. Users can be mapped to existing projects or have projects created dynamically. This implements both direct requests and proxies. """ class parse_error(RuntimeError): pass proxy_RequestAccess= service_caller('RequestAccess') proxy_ReleaseAccess= service_caller('ReleaseAccess') def __init__(self, config=None, auth=None): """ Initializer. Pulls parameters out of the ConfigParser's access section. """ def software_list(v): l = [ ] if v: ps = v.split(" ") while len(ps): loc, file = ps[0:2] del ps[0:2] l.append((loc, file)) return l # Make sure that the configuration is in place if not config: raise RunTimeError("No config to fedd.access") self.project_priority = config.getboolean("access", "project_priority") self.allow_proxy = config.getboolean("access", "allow_proxy") self.boss = config.get("access", "boss") self.ops = config.get("access", "ops") self.domain = config.get("access", "domain") self.fileserver = config.get("access", "fileserver") self.eventserver = config.get("access", "eventserver") self.certdir = config.get("access","certdir") self.userconfdir = config.get("access","userconfdir") self.userconfcmd = config.get("access","userconfcmd") self.userconfurl = config.get("access","userconfurl") self.federation_software = config.get("access", "federation_software") self.portal_software = config.get("access", "portal_software") self.local_seer_software = config.get("access", "local_seer_software") self.local_seer_image = config.get("access", "local_seer_image") self.local_seer_start = config.get("access", "local_seer_start") self.seer_master_start = config.get("access", "seer_master_start") self.ssh_privkey_file = config.get("access","ssh_privkey_file") self.ssh_pubkey_file = config.get("access","ssh_pubkey_file") self.ssh_port = config.get("access","ssh_port") or "22" self.create_debug = config.getboolean("access", "create_debug") self.cleanup = not config.getboolean("access", "leave_tmpfiles") self.access_type = config.get("access", "type") self.federation_software = software_list(self.federation_software) self.portal_software = software_list(self.portal_software) self.local_seer_software = software_list(self.local_seer_software) self.access_type = self.access_type.lower() if self.access_type == 'remote_emulab': self.start_segment = proxy_emulab_segment.start_segment self.stop_segment = proxy_emulab_segment.stop_segment elif self.access_type == 'local_emulab': self.start_segment = local_emulab_segment.start_segment self.stop_segment = local_emulab_segment.stop_segment else: self.start_segment = None self.stop_segment = None self.attrs = { } self.access = { } self.restricted = [ ] self.projects = { } self.keys = { } self.types = { } self.allocation = { } self.state = { 'projects': self.projects, 'allocation' : self.allocation, 'keys' : self.keys, 'types': self.types } self.log = logging.getLogger("fedd.access") set_log_level(config, "access", self.log) self.state_lock = Lock() # XXX: Configurable self.exports = set(('SMB', 'seer', 'tmcd', 'userconfig', 'project_export', 'local_seer_control', 'seer_master')) self.imports = set(('SMB', 'seer', 'userconfig', 'seer_master')) if not self.local_seer_image or not self.local_seer_software: self.exports.discard('local_seer_control') self.exports.discard('seer_master') if not self.local_seer_start: self.exports.discard('local_seer_control') if not self.seer_master_start: self.exports.discard('seer_master') if auth: self.auth = auth else: self.log.error(\ "[access]: No authorizer initialized, creating local one.") auth = authorizer() tb = config.get('access', 'testbed') if tb: self.testbed = [ t.strip() for t in tb.split(',') ] else: self.testbed = [ ] if config.has_option("access", "accessdb"): self.read_access(config.get("access", "accessdb")) self.state_filename = config.get("access", "access_state") self.read_state() # Keep cert_file and cert_pwd coming from the same place self.cert_file = config.get("access", "cert_file") if self.cert_file: self.sert_pwd = config.get("access", "cert_pw") else: self.cert_file = config.get("globals", "cert_file") self.sert_pwd = config.get("globals", "cert_pw") self.trusted_certs = config.get("access", "trusted_certs") or \ config.get("globals", "trusted_certs") self.soap_services = {\ 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 'StartSegment': soap_handler("StartSegment", self.StartSegment), 'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment), } self.xmlrpc_services = {\ 'RequestAccess': xmlrpc_handler('RequestAccess', self.RequestAccess), 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', self.ReleaseAccess), 'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment), 'TerminateSegment': xmlrpc_handler('TerminateSegment', self.TerminateSegment), } self.call_SetValue = service_caller('SetValue') self.call_GetValue = service_caller('GetValue', log=self.log) if not config.has_option("allocate", "uri"): self.allocate_project = \ allocate_project_local(config, auth) else: self.allocate_project = \ allocate_project_remote(config, auth) # If the project allocator exports services, put them in this object's # maps so that classes that instantiate this can call the services. self.soap_services.update(self.allocate_project.soap_services) self.xmlrpc_services.update(self.allocate_project.xmlrpc_services) @staticmethod def add_kit(e, kit): """ Add a Software object created from the list of (install, location) tuples passed as kit to the software attribute of an object e. We do this enough to break out the code, but it's kind of a hack to avoid changing the old tuple rep. """ s = [ topdl.Software(install=i, location=l) for i, l in kit] if isinstance(e.software, list): e.software.extend(s) else: e.software = s def read_access(self, config): """ Read a configuration file and set internal parameters. The format is more complex than one might hope. The basic format is attribute value pairs separated by colons(:) on a signle line. The attributes in bool_attrs, emulab_attrs and id_attrs can all be set directly using the name: value syntax. E.g. boss: hostname sets self.boss to hostname. In addition, there are access lines of the form (tb, proj, user) -> (aproj, auser) that map the first tuple of names to the second for access purposes. Names in the key (left side) can include " or " to act as wildcards or to require the fields to be empty. Similarly aproj or auser can be or indicating that either the matching key is to be used or a dynamic user or project will be created. These names can also be federated IDs (fedid's) if prefixed with fedid:. Finally, the aproj can be followed with a colon-separated list of node types to which that project has access (or will have access if dynamic). Testbed attributes outside the forms above can be given using the format attribute: name value: value. The name is a single word and the value continues to the end of the line. Empty lines and lines startin with a # are ignored. Parsing errors result in a self.parse_error exception being raised. """ lineno=0 name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+" fedid_expr = "fedid:[" + string.hexdigits + "]+" key_name = "(||"+fedid_expr + "|"+ name_expr + ")" access_proj = "((?::" + name_expr +")*|"+ \ "" + "(?::" + name_expr + ")*|" + \ fedid_expr + "(?::" + name_expr + ")*|" + \ name_expr + "(?::" + name_expr + ")*)" access_name = "(||" + fedid_expr + "|"+ name_expr + ")" restricted_re = re.compile("restricted:\s*(.*)", re.IGNORECASE) attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)', re.IGNORECASE) access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+ key_name+'\s*\)\s*->\s*\('+access_proj + '\s*,\s*' + access_name + '\s*,\s*' + access_name + '\s*\)', re.IGNORECASE) def parse_name(n): if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):]) else: return n def auth_name(n): if isinstance(n, basestring): if n =='' or n =='': return None else: return unicode(n) else: return n f = open(config, "r"); for line in f: lineno += 1 line = line.strip(); if len(line) == 0 or line.startswith('#'): continue # Extended (attribute: x value: y) attribute line m = attr_re.match(line) if m != None: attr, val = m.group(1,2) self.attrs[attr] = val continue # Restricted entry m = restricted_re.match(line) if m != None: val = m.group(1) self.restricted.append(val) continue # Access line (t, p, u) -> (ap, cu, su) line m = access_re.match(line) if m != None: access_key = tuple([ parse_name(x) for x in m.group(1,2,3)]) auth_key = tuple([ auth_name(x) for x in access_key]) aps = m.group(4).split(":"); if aps[0] == 'fedid:': del aps[0] aps[0] = fedid(hexstr=aps[0]) cu = parse_name(m.group(5)) su = parse_name(m.group(6)) access_val = (access_project(aps[0], aps[1:]), parse_name(m.group(5)), parse_name(m.group(6))) self.access[access_key] = access_val self.auth.set_attribute(auth_key, "access") continue # Nothing matched to here: unknown line - raise exception f.close() raise self.parse_error("Unknown statement at line %d of %s" % \ (lineno, config)) f.close() def get_users(self, obj): """ Return a list of the IDs of the users in dict """ if obj.has_key('user'): return [ unpack_id(u['userID']) \ for u in obj['user'] if u.has_key('userID') ] else: return None def write_state(self): if self.state_filename: try: f = open(self.state_filename, 'w') pickle.dump(self.state, f) except IOError, e: self.log.error("Can't write file %s: %s" % \ (self.state_filename, e)) except pickle.PicklingError, e: self.log.error("Pickling problem: %s" % e) except TypeError, e: self.log.error("Pickling problem (TypeError): %s" % e) def read_state(self): """ Read a new copy of access state. Old state is overwritten. State format is a simple pickling of the state dictionary. """ if self.state_filename: try: f = open(self.state_filename, "r") self.state = pickle.load(f) self.allocation = self.state['allocation'] self.projects = self.state['projects'] self.keys = self.state['keys'] self.types = self.state['types'] self.log.debug("[read_state]: Read state from %s" % \ self.state_filename) except IOError, e: self.log.warning(("[read_state]: No saved state: " +\ "Can't open %s: %s") % (self.state_filename, e)) except EOFError, e: self.log.warning(("[read_state]: " +\ "Empty or damaged state file: %s:") % \ self.state_filename) except pickle.UnpicklingError, e: self.log.warning(("[read_state]: No saved state: " + \ "Unpickling failed: %s") % e) # Add the ownership attributes to the authorizer. Note that the # indices of the allocation dict are strings, but the attributes are # fedids, so there is a conversion. for k in self.allocation.keys(): for o in self.allocation[k].get('owners', []): self.auth.set_attribute(o, fedid(hexstr=k)) if self.allocation[k].has_key('userconfig'): sfid = self.allocation[k]['userconfig'] fid = fedid(hexstr=sfid) self.auth.set_attribute(fid, "/%s" % sfid) def permute_wildcards(self, a, p): """Return a copy of a with various fields wildcarded. The bits of p control the wildcards. A set bit is a wildcard replacement with the lowest bit being user then project then testbed. """ if p & 1: user = [""] else: user = a[2] if p & 2: proj = "" else: proj = a[1] if p & 4: tb = "" else: tb = a[0] return (tb, proj, user) def find_access(self, search): """ Search the access DB for a match on this tuple. Return the matching access tuple and the user that matched. NB, if the initial tuple fails to match we start inserting wildcards in an order determined by self.project_priority. Try the list of users in order (when wildcarded, there's only one user in the list). """ if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7) else: perm = (0, 2, 1, 3, 4, 6, 5, 7) for p in perm: s = self.permute_wildcards(search, p) # s[2] is None on an anonymous, unwildcarded request if s[2] != None: for u in s[2]: if self.access.has_key((s[0], s[1], u)): return (self.access[(s[0], s[1], u)], u) else: if self.access.has_key(s): return (self.access[s], None) return None, None def lookup_access(self, req, fid): """ Determine the allowed access for this request. Return the access and which fields are dynamic. The fedid is needed to construct the request """ user_re = re.compile("user:\s(.*)") project_re = re.compile("project:\s(.*)") # Search keys tb = None project = None user = None # Return values rp = access_project(None, ()) ru = None user = [ user_re.findall(x)[0] for x in req.get('credential', []) \ if user_re.match(x)] project = [ project_re.findall(x)[0] \ for x in req.get('credential', []) \ if project_re.match(x)] if len(project) == 1: project = project[0] elif len(project) == 0: project = None else: raise service_error(service_error.req, "More than one project credential") user_fedids = [ u for u in user if isinstance(u, fedid)] # Determine how the caller is representing itself. If its fedid shows # up as a project or a singleton user, let that stand. If neither the # usernames nor the project name is a fedid, the caller is a testbed. if project and isinstance(project, fedid): if project == fid: # The caller is the project (which is already in the tuple # passed in to the authorizer) owners = user_fedids owners.append(project) else: raise service_error(service_error.req, "Project asserting different fedid") else: if fid not in user_fedids: tb = fid owners = user_fedids owners.append(fid) else: if len(fedids) > 1: raise service_error(service_error.req, "User asserting different fedid") else: # Which is a singleton owners = user_fedids # Confirm authorization for u in user: self.log.debug("[lookup_access] Checking access for %s" % \ ((tb, project, u),)) if self.auth.check_attribute((tb, project, u), 'access'): self.log.debug("[lookup_access] Access granted") break else: self.log.debug("[lookup_access] Access Denied") else: raise service_error(service_error.access, "Access denied") # This maps a valid user to the Emulab projects and users to use found, user_match = self.find_access((tb, project, user)) if found == None: raise service_error(service_error.access, "Access denied - cannot map access") # resolve and in found dyn_proj = False dyn_create_user = False dyn_service_user = False if found[0].name == "": if project != None: rp.name = project else : raise service_error(\ service_error.server_config, "Project matched when no project given") elif found[0].name == "": rp.name = None dyn_proj = True else: rp.name = found[0].name rp.node_types = found[0].node_types; if found[1] == "": if user_match == "": if user != None: rcu = user[0] else: raise service_error(\ service_error.server_config, "Matched on anonymous request") else: rcu = user_match elif found[1] == "": rcu = None dyn_create_user = True else: rcu = found[1] if found[2] == "": if user_match == "": if user != None: rsu = user[0] else: raise service_error(\ service_error.server_config, "Matched on anonymous request") else: rsu = user_match elif found[2] == "": rsu = None dyn_service_user = True else: rsu = found[2] return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\ owners def get_handler(self, path, fid): self.log.info("Get handler %s %s" % (path, fid)) if self.auth.check_attribute(fid, path) and self.userconfdir: return ("%s/%s" % (self.userconfdir, path), "application/binary") else: return (None, None) def export_userconf(self, project): dev_null = None confid, confcert = generate_fedid("test", dir=self.userconfdir, log=self.log) conffilename = "%s/%s" % (self.userconfdir, str(confid)) cf = None try: cf = open(conffilename, "w") os.chmod(conffilename, stat.S_IRUSR | stat.S_IWUSR) except IOError, e: raise service_error(service_error.internal, "Cannot create user configuration data") try: dev_null = open("/dev/null", "a") except IOError, e: self.log.error("export_userconf: can't open /dev/null: %s" % e) cmd = "%s %s" % (self.userconfcmd, project) conf = subprocess.call(cmd.split(" "), stdout=cf, stderr=dev_null, close_fds=True) self.auth.set_attribute(confid, "/%s" % str(confid)) return confid, confcert def export_SMB(self, id, state, project, user): return { 'id': id, 'name': 'SMB', 'visibility': 'export', 'server': 'http://fs:139', 'fedAttr': [ { 'attribute': 'SMBSHARE', 'value': 'USERS' }, { 'attribute': 'SMBUSER', 'value': user }, { 'attribute': 'SMBPROJ', 'value': project }, ] } def export_seer(self, id, state, project, user): return { 'id': id, 'name': 'seer', 'visibility': 'export', 'server': 'http://control:16606', } def export_local_seer(self, id, state, project, user): return { 'id': id, 'name': 'local_seer_control', 'visibility': 'export', 'server': 'http://control:16606', } def export_seer_master(self, id, state, project, user): return { 'id': id, 'name': 'seer_master', 'visibility': 'export', 'server': 'http://seer-master:17707', } def export_tmcd(self, id, state, project, user): return { 'id': id, 'name': 'seer', 'visibility': 'export', 'server': 'http://boss:7777', } def export_userconfig(self, id, state, project, user): if self.userconfdir and self.userconfcmd \ and self.userconfurl: cid, cert = self.export_userconf(project) state['userconfig'] = unicode(cid) return { 'id': id, 'name': 'userconfig', 'visibility': 'export', 'server': "%s/%s" % (self.userconfurl, str(cid)), 'fedAttr': [ { 'attribute': 'cert', 'value': cert }, ] } else: return None def export_services(self, sreq, project, user): exp = [ ] state = { } # XXX: Filthy shortcut here using http: so urlparse will give the right # answers. for s in sreq: sname = s.get('name', '') svis = s.get('visibility', '') if svis == 'export': if sname in self.exports: id = s.get('id', 'no_id') if sname == 'SMB': exp.append(self.export_SMB(id, state, project, user)) elif sname == 'seer': exp.append(self.export_seer(id, state, project, user)) elif sname == 'tmcd': exp.append(self.export_tmcd(id, state, project, user)) elif sname == 'userconfig': exp.append(self.export_userconfig(id, state, project, user)) elif sname == 'project_export': exp.append(self.export_SMB(id, state, project, user)) #exp.append(self.export_seer(id, state, project, user)) exp.append(self.export_userconfig(id, state, project, user)) elif sname == 'local_seer_control': exp.append(self.export_local_seer(id, state, project, user)) elif sname == 'seer_master': exp.append(self.export_seer_master(id, state, project, user)) return (exp, state) def build_response(self, alloc_id, ap, services): """ Create the SOAP response. Build the dictionary description of the response and use fedd_utils.pack_soap to create the soap message. ap is the allocate project message returned from a remote project allocation (even if that allocation was done locally). """ # Because alloc_id is already a fedd_services_types.IDType_Holder, # there's no need to repack it msg = { 'allocID': alloc_id, 'fedAttr': [ { 'attribute': 'domain', 'value': self.domain } , { 'attribute': 'project', 'value': ap['project'].get('name', {}).get('localname', "???") }, ] } if len(self.attrs) > 0: msg['fedAttr'].extend( [ { 'attribute': x, 'value' : y } \ for x,y in self.attrs.iteritems()]) if services: msg['service'] = services return msg def RequestAccess(self, req, fid): """ Handle the access request. Proxy if not for us. Parse out the fields and make the allocations or rejections if for us, otherwise, assuming we're willing to proxy, proxy the request out. """ def gateway_hardware(h): if h == 'GWTYPE': return self.attrs.get('connectorType', 'GWTYPE') else: return h def get_export_project(svcs): """ if the service requests includes one to export a project, return that project. """ rv = None for s in svcs: if s.get('name', '') == 'project_export' and \ s.get('visibility', '') == 'export': if not rv: for a in s.get('feddAttr', []): if a.get('attribute', '') == 'project' \ and 'value' in a: rv = a['value'] else: raise service_error(service_error, access, 'Requesting multiple project exports is ' + \ 'not supported'); return rv # The dance to get into the request body if req.has_key('RequestAccessRequestBody'): req = req['RequestAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") if req.has_key('destinationTestbed'): dt = unpack_id(req['destinationTestbed']) if dt == None or dt in self.testbed: # Request for this fedd found, dyn, owners = self.lookup_access(req, fid) restricted = None ap = None # if this includes a project export request and the exported # project is not the access project, access denied. if 'service' in req: ep = get_export_project(req['service']) if ep and ep != found[0].name: raise service_error(service_error.access, "Cannot export %s" % ep) # XXX # Check for access to restricted nodes if req.has_key('resources') and req['resources'].has_key('node'): resources = req['resources'] restricted = [ gateway_hardware(t) for n in resources['node'] \ if n.has_key('hardware') \ for t in n['hardware'] \ if gateway_hardware(t) \ in self.restricted ] inaccessible = [ t for t in restricted \ if t not in found[0].node_types] if len(inaccessible) > 0: raise service_error(service_error.access, "Access denied (nodetypes %s)" % \ str(', ').join(inaccessible)) # XXX # These were passed around before, but now are hidden from users # and configurators alike, beyond a configuration file entry. create_ssh = [ self.ssh_pubkey_file ] service_ssh = [ self.ssh_pubkey_file ] if len(create_ssh) > 0 and len(service_ssh) >0: if dyn[1]: # Compose the dynamic project request # (only dynamic, dynamic currently allowed) preq = { 'AllocateProjectRequestBody': \ { 'project' : {\ 'user': [ \ { \ 'access': [ { 'sshPubkey': s } \ for s in service_ssh ], 'role': "serviceAccess",\ }, \ { \ 'access': [ { 'sshPubkey': s } \ for s in create_ssh ], 'role': "experimentCreation",\ }, \ ], \ }\ }\ } if restricted != None and len(restricted) > 0: preq['AllocateProjectRequestBody']['resources'] = \ {'node': [ { 'hardware' : [ h ] } \ for h in restricted ] } ap = self.allocate_project.dynamic_project(preq) else: preq = {'StaticProjectRequestBody' : \ { 'project': \ { 'name' : { 'localname' : found[0].name },\ 'user' : [ \ {\ 'userID': { 'localname' : found[1] }, \ 'access': [ { 'sshPubkey': s } for s in create_ssh ], 'role': 'experimentCreation'\ },\ {\ 'userID': { 'localname' : found[2] }, \ 'access': [ { 'sshPubkey': s } for s in service_ssh ], 'role': 'serviceAccess'\ },\ ]}\ }\ } if restricted != None and len(restricted) > 0: preq['StaticProjectRequestBody']['resources'] = \ {'node': [ { 'hardware' : [ h ] } \ for h in restricted ] } ap = self.allocate_project.static_project(preq) else: raise service_error(service_error.req, "SSH access parameters required") # keep track of what's been added allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) aid = unicode(allocID) self.state_lock.acquire() self.allocation[aid] = { } try: pname = ap['project']['name']['localname'] except KeyError: pname = None if dyn[1]: if not pname: self.state_lock.release() raise service_error(service_error.internal, "Misformed allocation response?") if self.projects.has_key(pname): self.projects[pname] += 1 else: self.projects[pname] = 1 self.allocation[aid]['project'] = pname else: # sproject is a static project associated with this allocation. self.allocation[aid]['sproject'] = pname if ap.has_key('resources'): if not pname: self.state_lock.release() raise service_error(service_error.internal, "Misformed allocation response?") self.allocation[aid]['types'] = set() nodes = ap['resources'].get('node', []) for n in nodes: for h in n.get('hardware', []): if self.types.has_key((pname, h)): self.types[(pname, h)] += 1 else: self.types[(pname, h)] = 1 self.allocation[aid]['types'].add((pname,h)) self.allocation[aid]['keys'] = [ ] try: for u in ap['project']['user']: uname = u['userID']['localname'] if u['role'] == 'experimentCreation': self.allocation[aid]['user'] = uname for k in [ k['sshPubkey'] for k in u['access'] \ if k.has_key('sshPubkey') ]: kv = "%s:%s" % (uname, k) if self.keys.has_key(kv): self.keys[kv] += 1 else: self.keys[kv] = 1 self.allocation[aid]['keys'].append((uname, k)) except KeyError: self.state_lock.release() raise service_error(service_error.internal, "Misformed allocation response?") self.allocation[aid]['owners'] = owners services, svc_state = self.export_services(req.get('service',[]), pname, uname) # Store services state in global state for k, v in svc_state.items(): self.allocation[aid][k] = v self.write_state() self.state_lock.release() for o in owners: self.auth.set_attribute(o, allocID) try: f = open("%s/%s.pem" % (self.certdir, aid), "w") print >>f, alloc_cert f.close() except IOError, e: raise service_error(service_error.internal, "Can't open %s/%s : %s" % (self.certdir, aid, e)) resp = self.build_response({ 'fedid': allocID } , ap, services) return resp else: if self.allow_proxy: resp = self.proxy_RequestAccess.call_service(dt, req, self.cert_file, self.cert_pwd, self.trusted_certs) if resp.has_key('RequestAccessResponseBody'): return resp['RequestAccessResponseBody'] else: return None else: raise service_error(service_error.access, "Access proxying denied") def ReleaseAccess(self, req, fid): # The dance to get into the request body if req.has_key('ReleaseAccessRequestBody'): req = req['ReleaseAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") if req.has_key('destinationTestbed'): dt = unpack_id(req['destinationTestbed']) else: dt = None if dt == None or dt in self.testbed: # Local request try: if req['allocID'].has_key('localname'): auth_attr = aid = req['allocID']['localname'] elif req['allocID'].has_key('fedid'): aid = unicode(req['allocID']['fedid']) auth_attr = req['allocID']['fedid'] else: raise service_error(service_error.req, "Only localnames and fedids are understood") except KeyError: raise service_error(service_error.req, "Badly formed request") self.log.debug("[access] deallocation requested for %s", aid) if not self.auth.check_attribute(fid, auth_attr): self.log.debug("[access] deallocation denied for %s", aid) raise service_error(service_error.access, "Access Denied") # If we know this allocation, reduce the reference counts and # remove the local allocations. Otherwise report an error. If # there is an allocation to delete, del_users will be a dictonary # of sets where the key is the user that owns the keys in the set. # We use a set to avoid duplicates. del_project is just the name # of any dynamic project to delete. We're somewhat lazy about # deleting authorization attributes. Having access to something # that doesn't exist isn't harmful. del_users = { } del_project = None del_types = set() if self.allocation.has_key(aid): self.log.debug("Found allocation for %s" %aid) self.state_lock.acquire() for k in self.allocation[aid]['keys']: kk = "%s:%s" % k self.keys[kk] -= 1 if self.keys[kk] == 0: if not del_users.has_key(k[0]): del_users[k[0]] = set() del_users[k[0]].add(k[1]) del self.keys[kk] if self.allocation[aid].has_key('project'): pname = self.allocation[aid]['project'] self.projects[pname] -= 1 if self.projects[pname] == 0: del_project = pname del self.projects[pname] if self.allocation[aid].has_key('types'): for t in self.allocation[aid]['types']: self.types[t] -= 1 if self.types[t] == 0: if not del_project: del_project = t[0] del_types.add(t[1]) del self.types[t] del self.allocation[aid] self.write_state() self.state_lock.release() # If we actually have resources to deallocate, prepare the call. if del_project or del_users: msg = { 'project': { }} if del_project: msg['project']['name']= {'localname': del_project} users = [ ] for u in del_users.keys(): users.append({ 'userID': { 'localname': u },\ 'access' : \ [ {'sshPubkey' : s } for s in del_users[u]]\ }) if users: msg['project']['user'] = users if len(del_types) > 0: msg['resources'] = { 'node': \ [ {'hardware': [ h ] } for h in del_types ]\ } if self.allocate_project.release_project: msg = { 'ReleaseProjectRequestBody' : msg} self.allocate_project.release_project(msg) # And remove the access cert cf = "%s/%s.pem" % (self.certdir, aid) self.log.debug("Removing %s" % cf) os.remove(cf) return { 'allocID': req['allocID'] } else: raise service_error(service_error.req, "No such allocation") else: if self.allow_proxy: resp = self.proxy_ReleaseAccess.call_service(dt, req, self.cert_file, self.cert_pwd, self.trusted_certs) if resp.has_key('ReleaseAccessResponseBody'): return resp['ReleaseAccessResponseBody'] else: return None else: raise service_error(service_error.access, "Access proxying denied") def generate_portal_configs(self, topo, pubkey_base, secretkey_base, tmpdir, master, lproj, leid, connInfo, services): def conninfo_to_dict(key, info): """ Make a cpoy of the connection information about key, and flatten it into a single dict by parsing out any feddAttrs. """ rv = None for i in info: if key == i.get('portal', "") or \ key in [e.get('element', "") \ for e in i.get('member', [])]: rv = i.copy() break else: return rv if 'fedAttr' in rv: for a in rv['fedAttr']: attr = a.get('attribute', "") val = a.get('value', "") if attr and attr not in rv: rv[attr] = val del rv['fedAttr'] return rv # XXX: un hardcode this def client_null(f, s): print >>f, "Service: %s" % s['name'] def client_seer_master(f, s): print >>f, 'PortalAlias: seer-master'; def client_smb(f, s): print >>f, "Service: %s" % s['name'] smbshare = None smbuser = None smbproj = None for a in s.get('fedAttr', []): if a.get('attribute', '') == 'SMBSHARE': smbshare = a.get('value', None) elif a.get('attribute', '') == 'SMBUSER': smbuser = a.get('value', None) elif a.get('attribute', '') == 'SMBPROJ': smbproj = a.get('value', None) if all((smbshare, smbuser, smbproj)): print >>f, "SMBshare: %s" % smbshare print >>f, "ProjectUser: %s" % smbuser print >>f, "ProjectName: %s" % smbproj client_service_out = { 'SMB': client_smb, 'tmcd': client_null, 'seer': client_null, 'userconfig': client_null, 'project_export': client_null, 'seer_master': client_seer_master, } def client_seer_master_export(f, s): print >>f, "AddedNode: seer-master" def client_seer_local_export(f, s): print >>f, "AddedNode: control" client_export_service_out = { 'seer_master': client_seer_master_export, 'local_seer_control': client_seer_local_export, } def server_port(f, s): p = urlparse(s.get('server', 'http://localhost')) print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) def server_null(f,s): pass def server_seer(f, s): print >>f, 'seer: True' server_service_out = { 'SMB': server_port, 'tmcd': server_port, 'userconfig': server_null, 'project_export': server_null, 'seer': server_seer, 'seer_master': server_port, } # XXX: end un hardcode this seer_out = False client_out = False mproj = None mexp = None control_gw = None testbed = "" # Create configuration files for the portals for e in [ e for e in topo.elements \ if isinstance(e, topdl.Computer) and e.get_attribute('portal')]: myname = e.name type = e.get_attribute('portal_type') info = conninfo_to_dict(myname, connInfo) if not info: raise service_error(service_error.req, "No connectivity info for %s" % myname) peer = info.get('peer', "") ldomain = self.domain; ssh_port = info.get('ssh_port', 22) # Collect this for the client.conf file if 'masterexperiment' in info: mproj, meid = info['masterexperiment'].split("/", 1) if type in ('control', 'both'): testbed = e.get_attribute('testbed') control_gw = myname active = info.get('active', 'False') cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower()) tunnelconfig = self.attrs.has_key('TunnelCfg') try: f = open(cfn, "w") if active == 'True': print >>f, "active: True" print >>f, "ssh_port: %s" % ssh_port if type in ('control', 'both'): for s in [s for s in services \ if s.get('name', "") in self.imports]: server_service_out[s['name']](f, s) if tunnelconfig: print >>f, "tunnelip: %s" % tunnelconfig print >>f, "peer: %s" % peer.lower() print >>f, "ssh_pubkey: /proj/%s/exp/%s/tmp/%s" % \ (lproj, leid, pubkey_base) print >>f, "ssh_privkey: /proj/%s/exp/%s/tmp/%s" % \ (lproj, leid, secretkey_base) f.close() except IOError, e: raise service_error(service_error.internal, "Can't write protal config %s: %s" % (cfn, e)) # Done with portals, write the client config file. try: f = open("%s/client.conf" % tmpdir, "w") if control_gw: print >>f, "ControlGateway: %s.%s.%s%s" % \ (myname.lower(), leid.lower(), lproj.lower(), ldomain.lower()) for s in services: if s.get('name',"") in self.imports and \ s.get('visibility','') == 'import': client_service_out[s['name']](f, s) if s.get('name', '') in self.exports and \ s.get('visibility', '') == 'export' and \ s['name'] in client_export_service_out: client_export_service_out[s['name']](f, s) # Seer uses this. if mproj and meid: print >>f, "ExperimentID: %s/%s" % (mproj, meid) # Better way... if testbed == master: print >>f, "SEERBase: True" f.close() except IOError, e: raise service_error(service_error.internal, "Cannot write client.conf: %s" %s) def generate_ns2(self, topo, expfn, softdir, master, connInfo): class dragon_commands: """ Functor to spit out approrpiate dragon commands for nodes listed in the connectivity description. The constructor makes a dict mapping dragon nodes to their parameters and the __call__ checks each element in turn for membership. """ def __init__(self, map): self.node_info = map def __call__(self, e): s = "" if isinstance(e, topdl.Computer): if self.node_info.has_key(e.name): i = self.node_info[e.name] for ifname, vlan, type in i: for i in e.interface: if i.name == ifname: addr = i.get_attribute('ip4_address') subs = i.substrate[0] break else: raise service_error(service_error.internal, "No interface %s on element %s" % \ (ifname, e.name)) # XXX: do netmask right if type =='link': s = ("tb-allow-external ${%s} dragonportal " + \ "ip %s vlan %s netmask 255.255.255.0\n") % \ (e.name, addr, vlan) elif type =='lan': s = ("tb-allow-external ${%s} dragonportal " + \ "ip %s vlan %s usurp %s\n") % \ (e.name, addr, vlan, subs) else: raise service_error(service_error_internal, "Unknown DRAGON type %s" % type) return s class not_dragon: def __init__(self, map): self.nodes = set(map.keys()) def __call__(self, e): return e.name not in self.nodes t = topo.clone() dragon_map = { } for i in [ i for i in connInfo if i['type'] == 'transit']: for a in i.get('fedAttr', []): if a['attribute'] == 'vlan_id': vlan = a['value'] break else: raise service_error(service_error.internal, "No vlan tag") members = i.get('member', []) if len(members) > 1: type = 'lan' else: type = 'link' try: for m in members: if dragon_map.has_key(m['element']): dragon_map[m['element']].append(( m['interface'], vlan, type)) else: dragon_map[m['element']] = [( m['interface'], vlan, type),] except KeyError: raise service_error(service_error.req, "Missing connectivity info") # The startcmds for master and slave testbeds if master: gate_cmd = self.attrs.get('MasterConnectorStartCmd', '/bin/true') node_cmd = self.attrs.get('MasterNodeStartCmd', 'bin/true') else: gate_cmd = self.attrs.get('SlaveConnectorStartCmd', '/bin/true') node_cmd = self.attrs.get('SlaveNodeStartCmd', 'bin/true') # Weed out the things we aren't going to instantiate: Segments, portal # substrates, and portal interfaces. (The copy in the for loop allows # us to delete from e.elements in side the for loop). While we're # touching all the elements, we also adjust paths from the original # testbed to local testbed paths and put the federation commands into # the start commands for e in [e for e in t.elements]: if isinstance(e, topdl.Segment): t.elements.remove(e) if isinstance(e, topdl.Computer): self.add_kit(e, self.federation_software) if e.get_attribute('portal') and gate_cmd: # Add local portal support software self.add_kit(e, self.portal_software) # Portals never have a user-specified start command e.set_attribute('startup', gate_cmd) elif node_cmd: if e.get_attribute('startup'): e.set_attribute('startup', "%s \\$USER '%s'" % \ (node_cmd, e.get_attribute('startup'))) else: e.set_attribute('startup', node_cmd) dinf = [i[0] for i in dragon_map.get(e.name, []) ] # Remove portal interfaces that do not connect to DRAGON e.interface = [i for i in e.interface \ if not i.get_attribute('portal') or i.name in dinf ] # Fix software paths for s in getattr(e, 'software', []): s.location = re.sub("^.*/", softdir, s.location) t.substrates = [ s.clone() for s in t.substrates ] t.incorporate_elements() # Customize the ns2 output for local portal commands and images filters = [] # NB: these are extra commands issued for the node, not the startcmds if master: cmd = self.attrs.get('MasterConnectorCmd', '') else: cmd = self.attrs.get('SlaveConnectorCmd', '') if self.attrs.has_key('dragon'): add_filter = not_dragon(dragon_map) filters.append(dragon_commands(dragon_map)) else: add_filter = None if cmd: filters.append(topdl.generate_portal_command_filter(cmd, add_filter=add_filter)) if self.attrs.has_key('connectorImage'): filters.append(topdl.generate_portal_image_filter( self.attrs.get('connectorImage'))) if self.attrs.has_key('connectorType'): filters.append(topdl.generate_portal_hardware_filter( self.attrs.get('connectorType'))) # Convert to ns and write it out expfile = topdl.topology_to_ns2(t, filters) try: f = open(expfn, "w") print >>f, expfile f.close() except IOError: raise service_error(service_error.internal, "Cannot write experiment file %s: %s" % (expfn,e)) def export_store_info(self, cf, proj, ename, connInfo): """ For the export requests in the connection info, install the peer names at the experiment controller via SetValue calls. """ for c in connInfo: for p in [ p for p in c.get('parameter', []) \ if p.get('type', '') == 'output']: if p.get('name', '') == 'peer': k = p.get('key', None) surl = p.get('store', None) if surl and k and k.index('/') != -1: value = "%s.%s.%s%s" % \ (k[k.index('/')+1:], ename, proj, self.domain) req = { 'name': k, 'value': value } self.log.debug("Setting %s to %s on %s" % \ (k, value, surl)) self.call_SetValue(surl, req, cf) else: self.log.error("Bad export request: %s" % p) elif p.get('name', '') == 'ssh_port': k = p.get('key', None) surl = p.get('store', None) if surl and k: req = { 'name': k, 'value': self.ssh_port } self.log.debug("Setting %s to %s on %s" % \ (k, self.ssh_port, surl)) self.call_SetValue(surl, req, cf) else: self.log.error("Bad export request: %s" % p) else: self.log.error("Unknown export parameter: %s" % \ p.get('name')) continue def import_store_info(self, cf, connInfo): """ Pull any import parameters in connInfo in. We translate them either into known member names or fedAddrs. """ for c in connInfo: for p in [ p for p in c.get('parameter', []) \ if p.get('type', '') == 'input']: name = p.get('name', None) key = p.get('key', None) store = p.get('store', None) if name and key and store : req = { 'name': key, 'wait': True } self.log.debug("Waiting for %s (%s) from %s" % \ (name, key, store)) r = self.call_GetValue(store, req, cf) r = r.get('GetValueResponseBody', None) if r : if r.get('name', '') == key: v = r.get('value', None) if v is not None: if name == 'peer': self.log.debug("Got peer %s" % v) c['peer'] = v else: self.log.debug("Got %s %s" % (name, v)) if c.has_key('fedAttr'): c['fedAttr'].append({ 'attribute': name, 'value': v}) else: c['fedAttr']= [{ 'attribute': name, 'value': v}] else: raise service_error(service_error.internal, 'None value exported for %s' % key) else: raise service_error(service_error.internal, 'Different name returned for %s: %s' \ % (key, r.get('name',''))) else: raise service_error(service_error.internal, 'Badly formatted response: no GetValueResponseBody') else: raise service_error(service_error.internal, 'Bad Services missing info for import %s' % c) def configure_userconf(self, services): """ If the userconf service was imported, collect the configuration data. """ for s in services: s_name = s.get('name', '') s_vis = s.get('visibility','') if s_name == 'userconfig' and s_vis == 'import': # Collect ther server and certificate info. u = s.get('server', None) for a in s.get('fedAttr', []): if a.get('attribute',"") == 'cert': cert = a.get('value', None) break else: cert = None if cert: # Make a temporary certificate file for get_url. The # finally clause removes it whether something goes # wrong (including an exception from get_url) or not. try: tfos, tn = tempfile.mkstemp(suffix=".pem") tf = os.fdopen(tfos, 'w') print >>tf, cert tf.close() self.log.debug("Getting userconf info: %s" % u) get_url(u, tn, tmpdir, "userconf") self.log.debug("Got userconf info: %s" % u) except IOError, e: raise service_error(service.error.internal, "Cannot create temp file for " + "userconfig certificates: %s e") except: t, v, st = sys.exc_info() raise service_error(service_error.internal, "Error retrieving %s: %s" % (s, v)) finally: if tn: os.remove(tn) else: raise service_error(service_error.req, "No certificate for retreiving userconfig") break def add_seer_node(self, topo, name, startup): """ Add a seer node to the given topology, with the startup command passed in. """ c_node = topdl.Computer( name=name, os= topdl.OperatingSystem( attribute=[ { 'attribute': 'osid', 'value': self.local_seer_image }, ]), attribute=[ { 'attribute': 'startup', 'value': startup }, ] ) self.add_kit(c_node, self.local_seer_software) topo.elements.append(c_node) def configure_seer_services(self, services, topo, softdir): local_seer = False collect_seer = False seer_master= False for s in services: s_name = s.get('name', '') s_vis = s.get('visibility','') if s_name == 'local_seer_control' and s_vis == 'export': local_seer = True elif s_name == 'seer_master': if s_vis == 'import': collect_seer = True elif s_vis == 'export': seer_master = True # We've got the whole picture now, so add nodes if needed and configure # them to interconnect properly. if local_seer or seer_master: # Copy local seer control node software to the tempdir for l, f in self.local_seer_software: base = os.path.basename(f) copy_file(f, "%s/%s" % (softdir, base)) # If we're collecting seers somewhere the controllers need to talk to # the master. In testbeds that export the service, that will be a # local node that we'll add below. Elsewhere it will be the control # portal that will port forward to the exporting master. if local_seer: if collect_seer: startup = "%s -C %s" % (self.local_seer_start, "seer-master") else: startup = self.local_seer_start self.add_seer_node(topo, 'control', startup) # If this is the seer master, add that node, too. if seer_master: self.add_seer_node(topo, 'seer-master', self.seer_master_start) def StartSegment(self, req, fid): configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey')) err = None # Any service_error generated after tmpdir is created rv = None # Return value from segment creation try: req = req['StartSegmentRequestBody'] except KeyError: raise service_error(server_error.req, "Badly formed request") connInfo = req.get('connection', []) services = req.get('service', []) auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) if not self.auth.check_attribute(fid, auth_attr): raise service_error(service_error.access, "Access denied") else: # See if this is a replay of an earlier succeeded StartSegment - # sometimes SSL kills 'em. If so, replay the response rather than # redoing the allocation. self.state_lock.acquire() retval = self.allocation[aid].get('started', None) self.state_lock.release() if retval: self.log.warning("Duplicate StartSegment for %s: " % aid + \ "replaying response") return retval # A new request. Do it. if req.has_key('segmentdescription') and \ req['segmentdescription'].has_key('topdldescription'): topo = \ topdl.Topology(**req['segmentdescription']['topdldescription']) else: raise service_error(service_error.req, "Request missing segmentdescription'") master = req.get('master', False) certfile = "%s/%s.pem" % (self.certdir, auth_attr) try: tmpdir = tempfile.mkdtemp(prefix="access-") softdir = "%s/software" % tmpdir os.mkdir(softdir) except IOError: raise service_error(service_error.internal, "Cannot create tmp dir") # Try block alllows us to clean up temporary files. try: sw = set() for e in topo.elements: for s in getattr(e, 'software', []): sw.add(s.location) for s in sw: self.log.debug("Retrieving %s" % s) try: get_url(s, certfile, softdir) except: t, v, st = sys.exc_info() raise service_error(service_error.internal, "Error retrieving %s: %s" % (s, v)) # Copy local federation and portal node software to the tempdir for s in (self.federation_software, self.portal_software): for l, f in s: base = os.path.basename(f) copy_file(f, "%s/%s" % (softdir, base)) ename = None pubkey_base = None secretkey_base = None for a in attrs: if a['attribute'] in configs: try: self.log.debug("Retrieving %s from %s" % \ (a['attribute'], a['value'])) get_url(a['value'], certfile, tmpdir) except: t, v, st = sys.exc_info() raise service_error(service_error.internal, "Error retrieving %s: %s" % (s, v)) if a['attribute'] == 'ssh_pubkey': pubkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'ssh_secretkey': secretkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'experiment_name': ename = a['value'] if not ename: ename = "" for i in range(0,5): ename += random.choice(string.ascii_letters) self.log.warn("No experiment name: picked one randomly: %s" \ % ename) if not pubkey_base: raise service_error(service_error.req, "No public key attribute") if not secretkey_base: raise service_error(service_error.req, "No secret key attribute") # If the userconf service was imported, collect the configuration # data. self.configure_userconf(services) self.configure_seer_services(services, topo, softdir) proj = None user = None self.state_lock.acquire() if self.allocation.has_key(aid): proj = self.allocation[aid].get('project', None) if not proj: proj = self.allocation[aid].get('sproject', None) user = self.allocation[aid].get('user', None) self.allocation[aid]['experiment'] = ename self.allocation[aid]['log'] = [ ] # Create a logger that logs to the experiment's state object as # well as to the main log file. alloc_log = logging.getLogger('fedd.access.%s' % ename) h = logging.StreamHandler( list_log.list_log(self.allocation[aid]['log'])) # XXX: there should be a global one of these rather than # repeating the code. h.setFormatter(logging.Formatter( "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) alloc_log.addHandler(h) self.write_state() self.state_lock.release() if not proj: raise service_error(service_error.internal, "Can't find project for %s" %aid) if not user: raise service_error(service_error.internal, "Can't find creation user for %s" %aid) self.export_store_info(certfile, proj, ename, connInfo) self.import_store_info(certfile, connInfo) expfile = "%s/experiment.tcl" % tmpdir self.generate_portal_configs(topo, pubkey_base, secretkey_base, tmpdir, master, proj, ename, connInfo, services) self.generate_ns2(topo, expfile, "/proj/%s/software/%s/" % (proj, ename), master, connInfo) starter = self.start_segment(keyfile=self.ssh_privkey_file, debug=self.create_debug, log=alloc_log) rv = starter(self, ename, proj, user, expfile, tmpdir) # Copy the assigned names into the return topology rvtopo = topo.clone() for e in [ e for e in rvtopo.elements \ if isinstance(e, topdl.Computer)]: for n in e.name: if n in starter.node: e.set_attribute('hostname', "%s%s" % \ (starter.node[n], self.domain)) except service_error, e: err = e except e: err = service_error(service_error.internal, str(e)) # Walk up tmpdir, deleting as we go if self.cleanup: self.log.debug("[StartSegment]: removing %s" % tmpdir) for path, dirs, files in os.walk(tmpdir, topdown=False): for f in files: os.remove(os.path.join(path, f)) for d in dirs: os.rmdir(os.path.join(path, d)) os.rmdir(tmpdir) else: self.log.debug("[StartSegment]: not removing %s" % tmpdir) if rv: # Grab the log (this is some anal locking, but better safe than # sorry) self.state_lock.acquire() logv = "".join(self.allocation[aid]['log']) # It's possible that the StartSegment call gets retried (!). # if the 'started' key is in the allocation, we'll return it rather # than redo the setup. self.allocation[aid]['started'] = { 'allocID': req['allocID'], 'allocationLog': logv, } retval = copy.copy(self.allocation[aid]['started']) self.write_state() self.state_lock.release() retval['segmentdescription'] = \ { 'topdldescription': rvtopo.to_dict() } return retval elif err: raise service_error(service_error.federant, "Swapin failed: %s" % err) else: raise service_error(service_error.federant, "Swapin failed") def TerminateSegment(self, req, fid): try: req = req['TerminateSegmentRequestBody'] except KeyError: raise service_error(server_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) if not self.auth.check_attribute(fid, auth_attr): raise service_error(service_error.access, "Access denied") self.state_lock.acquire() if self.allocation.has_key(aid): proj = self.allocation[aid].get('project', None) if not proj: proj = self.allocation[aid].get('sproject', None) user = self.allocation[aid].get('user', None) ename = self.allocation[aid].get('experiment', None) else: proj = None user = None ename = None self.state_lock.release() if not proj: raise service_error(service_error.internal, "Can't find project for %s" % aid) if not user: raise service_error(service_error.internal, "Can't find creation user for %s" % aid) if not ename: raise service_error(service_error.internal, "Can't find experiment name for %s" % aid) stopper = self.stop_segment(keyfile=self.ssh_privkey_file, debug=self.create_debug) stopper(self, user, proj, ename) return { 'allocID': req['allocID'] }