#!/usr/local/bin/python import os,sys import re import string import copy import pickle import logging from threading import * import subprocess import signal import time from util import * from allocate_project import allocate_project_local, allocate_project_remote from access_project import access_project from fedid import fedid, generate_fedid from authorizer import authorizer from service_error import service_error from remote_service import xmlrpc_handler, soap_handler, service_caller import topdl import httplib import tempfile from urlparse import urlparse # Make log messages disappear if noone configures a fedd logger class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.access") fl.addHandler(nullHandler()) class access: """ The implementation of access control based on mapping users to projects. Users can be mapped to existing projects or have projects created dynamically. This implements both direct requests and proxies. """ class parse_error(RuntimeError): pass proxy_RequestAccess= service_caller('RequestAccess') proxy_ReleaseAccess= service_caller('ReleaseAccess') def __init__(self, config=None, auth=None): """ Initializer. Pulls parameters out of the ConfigParser's access section. """ # Make sure that the configuration is in place if not config: raise RunTimeError("No config to fedd.access") self.project_priority = config.getboolean("access", "project_priority") self.allow_proxy = config.getboolean("access", "allow_proxy") self.boss = config.get("access", "boss") self.ops = config.get("access", "ops") self.domain = config.get("access", "domain") self.fileserver = config.get("access", "fileserver") self.eventserver = config.get("access", "eventserver") self.certdir = config.get("access","certdir") self.ssh_privkey_file = config.get("access","ssh_privkey_file") self.create_debug = config.getboolean("access", "create_debug") self.attrs = { } self.access = { } self.restricted = [ ] self.projects = { } self.keys = { } self.types = { } self.allocation = { } self.state = { 'projects': self.projects, 'allocation' : self.allocation, 'keys' : self.keys, 'types': self.types } self.log = logging.getLogger("fedd.access") set_log_level(config, "access", self.log) self.state_lock = Lock() if auth: self.auth = auth else: self.log.error(\ "[access]: No authorizer initialized, creating local one.") auth = authorizer() tb = config.get('access', 'testbed') if tb: self.testbed = [ t.strip() for t in tb.split(',') ] else: self.testbed = [ ] if config.has_option("access", "accessdb"): self.read_access(config.get("access", "accessdb")) self.state_filename = config.get("access", "access_state") self.read_state() # Keep cert_file and cert_pwd coming from the same place self.cert_file = config.get("access", "cert_file") if self.cert_file: self.sert_pwd = config.get("access", "cert_pw") else: self.cert_file = config.get("globals", "cert_file") self.sert_pwd = config.get("globals", "cert_pw") self.trusted_certs = config.get("access", "trusted_certs") or \ config.get("globals", "trusted_certs") self.soap_services = {\ 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 'StartSegment': soap_handler("StartSegment", self.StartSegment), 'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment), } self.xmlrpc_services = {\ 'RequestAccess': xmlrpc_handler('RequestAccess', self.RequestAccess), 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', self.ReleaseAccess), 'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment), 'TerminateSegment': xmlrpc_handler('TerminateSegment', self.TerminateSegment), } if not config.has_option("allocate", "uri"): self.allocate_project = \ allocate_project_local(config, auth) else: self.allocate_project = \ allocate_project_remote(config, auth) # If the project allocator exports services, put them in this object's # maps so that classes that instantiate this can call the services. self.soap_services.update(self.allocate_project.soap_services) self.xmlrpc_services.update(self.allocate_project.xmlrpc_services) def read_access(self, config): """ Read a configuration file and set internal parameters. The format is more complex than one might hope. The basic format is attribute value pairs separated by colons(:) on a signle line. The attributes in bool_attrs, emulab_attrs and id_attrs can all be set directly using the name: value syntax. E.g. boss: hostname sets self.boss to hostname. In addition, there are access lines of the form (tb, proj, user) -> (aproj, auser) that map the first tuple of names to the second for access purposes. Names in the key (left side) can include " or " to act as wildcards or to require the fields to be empty. Similarly aproj or auser can be or indicating that either the matching key is to be used or a dynamic user or project will be created. These names can also be federated IDs (fedid's) if prefixed with fedid:. Finally, the aproj can be followed with a colon-separated list of node types to which that project has access (or will have access if dynamic). Testbed attributes outside the forms above can be given using the format attribute: name value: value. The name is a single word and the value continues to the end of the line. Empty lines and lines startin with a # are ignored. Parsing errors result in a self.parse_error exception being raised. """ lineno=0 name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+" fedid_expr = "fedid:[" + string.hexdigits + "]+" key_name = "(||"+fedid_expr + "|"+ name_expr + ")" access_proj = "((?::" + name_expr +")*|"+ \ "" + "(?::" + name_expr + ")*|" + \ fedid_expr + "(?::" + name_expr + ")*|" + \ name_expr + "(?::" + name_expr + ")*)" access_name = "(||" + fedid_expr + "|"+ name_expr + ")" restricted_re = re.compile("restricted:\s*(.*)", re.IGNORECASE) attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)', re.IGNORECASE) access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+ key_name+'\s*\)\s*->\s*\('+access_proj + '\s*,\s*' + access_name + '\s*,\s*' + access_name + '\s*\)', re.IGNORECASE) def parse_name(n): if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):]) else: return n def auth_name(n): if isinstance(n, basestring): if n =='' or n =='': return None else: return unicode(n) else: return n f = open(config, "r"); for line in f: lineno += 1 line = line.strip(); if len(line) == 0 or line.startswith('#'): continue # Extended (attribute: x value: y) attribute line m = attr_re.match(line) if m != None: attr, val = m.group(1,2) self.attrs[attr] = val continue # Restricted entry m = restricted_re.match(line) if m != None: val = m.group(1) self.restricted.append(val) continue # Access line (t, p, u) -> (ap, cu, su) line m = access_re.match(line) if m != None: access_key = tuple([ parse_name(x) for x in m.group(1,2,3)]) auth_key = tuple([ auth_name(x) for x in access_key]) aps = m.group(4).split(":"); if aps[0] == 'fedid:': del aps[0] aps[0] = fedid(hexstr=aps[0]) cu = parse_name(m.group(5)) su = parse_name(m.group(6)) access_val = (access_project(aps[0], aps[1:]), parse_name(m.group(5)), parse_name(m.group(6))) self.access[access_key] = access_val self.auth.set_attribute(auth_key, "access") continue # Nothing matched to here: unknown line - raise exception f.close() raise self.parse_error("Unknown statement at line %d of %s" % \ (lineno, config)) f.close() def get_users(self, obj): """ Return a list of the IDs of the users in dict """ if obj.has_key('user'): return [ unpack_id(u['userID']) \ for u in obj['user'] if u.has_key('userID') ] else: return None def write_state(self): if self.state_filename: try: f = open(self.state_filename, 'w') pickle.dump(self.state, f) except IOError, e: self.log.error("Can't write file %s: %s" % \ (self.state_filename, e)) except pickle.PicklingError, e: self.log.error("Pickling problem: %s" % e) except TypeError, e: self.log.error("Pickling problem (TypeError): %s" % e) def read_state(self): """ Read a new copy of access state. Old state is overwritten. State format is a simple pickling of the state dictionary. """ if self.state_filename: try: f = open(self.state_filename, "r") self.state = pickle.load(f) self.allocation = self.state['allocation'] self.projects = self.state['projects'] self.keys = self.state['keys'] self.types = self.state['types'] self.log.debug("[read_state]: Read state from %s" % \ self.state_filename) except IOError, e: self.log.warning(("[read_state]: No saved state: " +\ "Can't open %s: %s") % (self.state_filename, e)) except EOFError, e: self.log.warning(("[read_state]: " +\ "Empty or damaged state file: %s:") % \ self.state_filename) except pickle.UnpicklingError, e: self.log.warning(("[read_state]: No saved state: " + \ "Unpickling failed: %s") % e) # Add the ownership attributes to the authorizer. Note that the # indices of the allocation dict are strings, but the attributes are # fedids, so there is a conversion. for k in self.allocation.keys(): for o in self.allocation[k].get('owners', []): self.auth.set_attribute(o, fedid(hexstr=k)) def permute_wildcards(self, a, p): """Return a copy of a with various fields wildcarded. The bits of p control the wildcards. A set bit is a wildcard replacement with the lowest bit being user then project then testbed. """ if p & 1: user = [""] else: user = a[2] if p & 2: proj = "" else: proj = a[1] if p & 4: tb = "" else: tb = a[0] return (tb, proj, user) def find_access(self, search): """ Search the access DB for a match on this tuple. Return the matching access tuple and the user that matched. NB, if the initial tuple fails to match we start inserting wildcards in an order determined by self.project_priority. Try the list of users in order (when wildcarded, there's only one user in the list). """ if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7) else: perm = (0, 2, 1, 3, 4, 6, 5, 7) for p in perm: s = self.permute_wildcards(search, p) # s[2] is None on an anonymous, unwildcarded request if s[2] != None: for u in s[2]: if self.access.has_key((s[0], s[1], u)): return (self.access[(s[0], s[1], u)], u) else: if self.access.has_key(s): return (self.access[s], None) return None, None def lookup_access(self, req, fid): """ Determine the allowed access for this request. Return the access and which fields are dynamic. The fedid is needed to construct the request """ # Search keys tb = None project = None user = None # Return values rp = access_project(None, ()) ru = None if req.has_key('project'): p = req['project'] if p.has_key('name'): project = unpack_id(p['name']) user = self.get_users(p) else: user = self.get_users(req) user_fedids = [ u for u in user if isinstance(u, fedid)] # Determine how the caller is representing itself. If its fedid shows # up as a project or a singleton user, let that stand. If neither the # usernames nor the project name is a fedid, the caller is a testbed. if project and isinstance(project, fedid): if project == fid: # The caller is the project (which is already in the tuple # passed in to the authorizer) owners = user_fedids owners.append(project) else: raise service_error(service_error.req, "Project asserting different fedid") else: if fid not in user_fedids: tb = fid owners = user_fedids owners.append(fid) else: if len(fedids) > 1: raise service_error(service_error.req, "User asserting different fedid") else: # Which is a singleton owners = user_fedids # Confirm authorization for u in user: self.log.debug("[lookup_access] Checking access for %s" % \ ((tb, project, u),)) if self.auth.check_attribute((tb, project, u), 'access'): self.log.debug("[lookup_access] Access granted") break else: self.log.debug("[lookup_access] Access Denied") else: raise service_error(service_error.access, "Access denied") # This maps a valid user to the Emulab projects and users to use found, user_match = self.find_access((tb, project, user)) if found == None: raise service_error(service_error.access, "Access denied - cannot map access") # resolve and in found dyn_proj = False dyn_create_user = False dyn_service_user = False if found[0].name == "": if project != None: rp.name = project else : raise service_error(\ service_error.server_config, "Project matched when no project given") elif found[0].name == "": rp.name = None dyn_proj = True else: rp.name = found[0].name rp.node_types = found[0].node_types; if found[1] == "": if user_match == "": if user != None: rcu = user[0] else: raise service_error(\ service_error.server_config, "Matched on anonymous request") else: rcu = user_match elif found[1] == "": rcu = None dyn_create_user = True else: rcu = found[1] if found[2] == "": if user_match == "": if user != None: rsu = user[0] else: raise service_error(\ service_error.server_config, "Matched on anonymous request") else: rsu = user_match elif found[2] == "": rsu = None dyn_service_user = True else: rsu = found[2] return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\ owners def build_response(self, alloc_id, ap): """ Create the SOAP response. Build the dictionary description of the response and use fedd_utils.pack_soap to create the soap message. ap is the allocate project message returned from a remote project allocation (even if that allocation was done locally). """ # Because alloc_id is already a fedd_services_types.IDType_Holder, # there's no need to repack it msg = { 'allocID': alloc_id, 'emulab': { 'domain': self.domain, 'boss': self.boss, 'ops': self.ops, 'fileServer': self.fileserver, 'eventServer': self.eventserver, 'project': ap['project'] }, } if len(self.attrs) > 0: msg['emulab']['fedAttr'] = \ [ { 'attribute': x, 'value' : y } \ for x,y in self.attrs.iteritems()] return msg def RequestAccess(self, req, fid): """ Handle the access request. Proxy if not for us. Parse out the fields and make the allocations or rejections if for us, otherwise, assuming we're willing to proxy, proxy the request out. """ def gateway_hardware(h): if h == 'GWTYPE': return self.attrs.get('connectorType', 'GWTYPE') else: return h # The dance to get into the request body if req.has_key('RequestAccessRequestBody'): req = req['RequestAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") if req.has_key('destinationTestbed'): dt = unpack_id(req['destinationTestbed']) if dt == None or dt in self.testbed: # Request for this fedd found, dyn, owners = self.lookup_access(req, fid) restricted = None ap = None # If this is a request to export a project and the access project # is not the project to export, access denied. if req.has_key('exportProject'): ep = unpack_id(req['exportProject']) if ep != found[0].name: raise service_error(service_error.access, "Cannot export %s" % ep) # Check for access to restricted nodes if req.has_key('resources') and req['resources'].has_key('node'): resources = req['resources'] restricted = [ gateway_hardware(t) for n in resources['node'] \ if n.has_key('hardware') \ for t in n['hardware'] \ if gateway_hardware(t) \ in self.restricted ] inaccessible = [ t for t in restricted \ if t not in found[0].node_types] if len(inaccessible) > 0: raise service_error(service_error.access, "Access denied (nodetypes %s)" % \ str(', ').join(inaccessible)) # These collect the keys for the two roles into single sets, one # for creation and one for service. The sets are a simple way to # eliminate duplicates create_ssh = set([ x['sshPubkey'] \ for x in req['createAccess'] \ if x.has_key('sshPubkey')]) service_ssh = set([ x['sshPubkey'] \ for x in req['serviceAccess'] \ if x.has_key('sshPubkey')]) if len(create_ssh) > 0 and len(service_ssh) >0: if dyn[1]: # Compose the dynamic project request # (only dynamic, dynamic currently allowed) preq = { 'AllocateProjectRequestBody': \ { 'project' : {\ 'user': [ \ { \ 'access': [ { 'sshPubkey': s } \ for s in service_ssh ], 'role': "serviceAccess",\ }, \ { \ 'access': [ { 'sshPubkey': s } \ for s in create_ssh ], 'role': "experimentCreation",\ }, \ ], \ }\ }\ } if restricted != None and len(restricted) > 0: preq['AllocateProjectRequestBody']['resources'] = \ {'node': [ { 'hardware' : [ h ] } \ for h in restricted ] } ap = self.allocate_project.dynamic_project(preq) else: preq = {'StaticProjectRequestBody' : \ { 'project': \ { 'name' : { 'localname' : found[0].name },\ 'user' : [ \ {\ 'userID': { 'localname' : found[1] }, \ 'access': [ { 'sshPubkey': s } for s in create_ssh ], 'role': 'experimentCreation'\ },\ {\ 'userID': { 'localname' : found[2] }, \ 'access': [ { 'sshPubkey': s } for s in service_ssh ], 'role': 'serviceAccess'\ },\ ]}\ }\ } if restricted != None and len(restricted) > 0: preq['StaticProjectRequestBody']['resources'] = \ {'node': [ { 'hardware' : [ h ] } \ for h in restricted ] } ap = self.allocate_project.static_project(preq) else: raise service_error(service_error.req, "SSH access parameters required") # keep track of what's been added allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) aid = unicode(allocID) self.state_lock.acquire() self.allocation[aid] = { } try: pname = ap['project']['name']['localname'] except KeyError: pname = None if dyn[1]: if not pname: self.state_lock.release() raise service_error(service_error.internal, "Misformed allocation response?") if self.projects.has_key(pname): self.projects[pname] += 1 else: self.projects[pname] = 1 self.allocation[aid]['project'] = pname else: # sproject is a static project associated with this allocation. self.allocation[aid]['sproject'] = pname if ap.has_key('resources'): if not pname: self.state_lock.release() raise service_error(service_error.internal, "Misformed allocation response?") self.allocation[aid]['types'] = set() nodes = ap['resources'].get('node', []) for n in nodes: for h in n.get('hardware', []): if self.types.has_key((pname, h)): self.types[(pname, h)] += 1 else: self.types[(pname, h)] = 1 self.allocation[aid]['types'].add((pname,h)) self.allocation[aid]['keys'] = [ ] try: for u in ap['project']['user']: uname = u['userID']['localname'] if u['role'] == 'experimentCreation': self.allocation[aid]['user'] = uname for k in [ k['sshPubkey'] for k in u['access'] \ if k.has_key('sshPubkey') ]: kv = "%s:%s" % (uname, k) if self.keys.has_key(kv): self.keys[kv] += 1 else: self.keys[kv] = 1 self.allocation[aid]['keys'].append((uname, k)) except KeyError: self.state_lock.release() raise service_error(service_error.internal, "Misformed allocation response?") self.allocation[aid]['owners'] = owners self.write_state() self.state_lock.release() for o in owners: self.auth.set_attribute(o, allocID) try: f = open("%s/%s.pem" % (self.certdir, aid), "w") print >>f, alloc_cert f.close() except IOError, e: raise service_error(service_error.internal, "Can't open %s/%s : %s" % (self.certdir, aid, e)) resp = self.build_response({ 'fedid': allocID } , ap) return resp else: if self.allow_proxy: resp = self.proxy_RequestAccess.call_service(dt, req, self.cert_file, self.cert_pwd, self.trusted_certs) if resp.has_key('RequestAccessResponseBody'): return resp['RequestAccessResponseBody'] else: return None else: raise service_error(service_error.access, "Access proxying denied") def ReleaseAccess(self, req, fid): # The dance to get into the request body if req.has_key('ReleaseAccessRequestBody'): req = req['ReleaseAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") if req.has_key('destinationTestbed'): dt = unpack_id(req['destinationTestbed']) else: dt = None if dt == None or dt in self.testbed: # Local request try: if req['allocID'].has_key('localname'): auth_attr = aid = req['allocID']['localname'] elif req['allocID'].has_key('fedid'): aid = unicode(req['allocID']['fedid']) auth_attr = req['allocID']['fedid'] else: raise service_error(service_error.req, "Only localnames and fedids are understood") except KeyError: raise service_error(service_error.req, "Badly formed request") self.log.debug("[access] deallocation requested for %s", aid) if not self.auth.check_attribute(fid, auth_attr): self.log.debug("[access] deallocation denied for %s", aid) raise service_error(service_error.access, "Access Denied") # If we know this allocation, reduce the reference counts and # remove the local allocations. Otherwise report an error. If # there is an allocation to delete, del_users will be a dictonary # of sets where the key is the user that owns the keys in the set. # We use a set to avoid duplicates. del_project is just the name # of any dynamic project to delete. We're somewhat lazy about # deleting authorization attributes. Having access to something # that doesn't exist isn't harmful. del_users = { } del_project = None del_types = set() if self.allocation.has_key(aid): self.log.debug("Found allocation for %s" %aid) self.state_lock.acquire() for k in self.allocation[aid]['keys']: kk = "%s:%s" % k self.keys[kk] -= 1 if self.keys[kk] == 0: if not del_users.has_key(k[0]): del_users[k[0]] = set() del_users[k[0]].add(k[1]) del self.keys[kk] if self.allocation[aid].has_key('project'): pname = self.allocation[aid]['project'] self.projects[pname] -= 1 if self.projects[pname] == 0: del_project = pname del self.projects[pname] if self.allocation[aid].has_key('types'): for t in self.allocation[aid]['types']: self.types[t] -= 1 if self.types[t] == 0: if not del_project: del_project = t[0] del_types.add(t[1]) del self.types[t] del self.allocation[aid] self.write_state() self.state_lock.release() # If we actually have resources to deallocate, prepare the call. if del_project or del_users: msg = { 'project': { }} if del_project: msg['project']['name']= {'localname': del_project} users = [ ] for u in del_users.keys(): users.append({ 'userID': { 'localname': u },\ 'access' : \ [ {'sshPubkey' : s } for s in del_users[u]]\ }) if users: msg['project']['user'] = users if len(del_types) > 0: msg['resources'] = { 'node': \ [ {'hardware': [ h ] } for h in del_types ]\ } if self.allocate_project.release_project: msg = { 'ReleaseProjectRequestBody' : msg} self.allocate_project.release_project(msg) return { 'allocID': req['allocID'] } else: raise service_error(service_error.req, "No such allocation") else: if self.allow_proxy: resp = self.proxy_ReleaseAccess.call_service(dt, req, self.cert_file, self.cert_pwd, self.trusted_certs) if resp.has_key('ReleaseAccessResponseBody'): return resp['ReleaseAccessResponseBody'] else: return None else: raise service_error(service_error.access, "Access proxying denied") class proxy_emulab_segment: class ssh_cmd_timeout(RuntimeError): pass def __init__(self, log=None, keyfile=None, debug=False): self.log = log or logging.getLogger(\ 'fedd.access.proxy_emulab_segment') self.ssh_privkey_file = keyfile self.debug = debug self.ssh_exec="/usr/bin/ssh" self.scp_exec = "/usr/bin/scp" self.ssh_cmd_timeout = access.proxy_emulab_segment.ssh_cmd_timeout def scp_file(self, file, user, host, dest=""): """ scp a file to the remote host. If debug is set the action is only logged. """ scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', '-o', 'StrictHostKeyChecking yes', '-i', self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)] rv = 0 try: dnull = open("/dev/null", "w") except IOError: self.log.debug("[ssh_file]: failed to open " + \ "/dev/null for redirect") dnull = Null self.log.debug("[scp_file]: %s" % " ".join(scp_cmd)) if not self.debug: rv = subprocess.call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True, close_fds=True) return rv == 0 def ssh_cmd(self, user, host, cmd, wname=None, timeout=None): """ Run a remote command on host as user. If debug is set, the action is only logged. Commands are run without stdin, to avoid stray SIGTTINs. """ sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \ "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \ (self.ssh_exec, self.ssh_privkey_file, user, host, cmd) try: dnull = open("/dev/null", "w") except IOError: self.log.debug("[ssh_cmd]: failed to open /dev/null " + \ "for redirect") dnull = Null self.log.debug("[ssh_cmd]: %s" % sh_str) if not self.debug: if dnull: sub = subprocess.Popen(sh_str, shell=True, stdout=dnull, stderr=dnull, close_fds=True) else: sub = subprocess.Popen(sh_str, shell=True, close_fds=True) if timeout: i = 0 rv = sub.poll() while i < timeout: if rv is not None: break else: time.sleep(1) rv = sub.poll() i += 1 else: self.log.debug("Process exceeded runtime: %s" % sh_str) os.kill(sub.pid, signal.SIGKILL) raise self.ssh_cmd_timeout(); return rv == 0 else: return sub.wait() == 0 else: if timeout == 0: self.log.debug("debug timeout raised on %s " % sh_str) raise self.ssh_cmd_timeout() else: return True class start_segment(proxy_emulab_segment): def __init__(self, log=None, keyfile=None, debug=False): access.proxy_emulab_segment.__init__(self, log=log, keyfile=keyfile, debug=debug) self.null = """ set ns [new Simulator] source tb_compat.tcl set a [$ns node] $ns rtproto Session $ns run """ def get_state(self, user, host, pid, eid): # command to test experiment state expinfo_exec = "/usr/testbed/bin/expinfo" # Regular expressions to parse the expinfo response state_re = re.compile("State:\s+(\w+)") no_exp_re = re.compile("^No\s+such\s+experiment") swapping_re = re.compile("^No\s+information\s+available.") state = None # Experiment state parsed from expinfo # The expinfo ssh command. Note the identity restriction to use # only the identity provided in the pubkey given. cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 'StrictHostKeyChecking yes', '-i', self.ssh_privkey_file, "%s@%s" % (user, host), expinfo_exec, pid, eid] dev_null = None try: dev_null = open("/dev/null", "a") except IOError, e: self.log.error("[get_state]: can't open /dev/null: %s" %e) if self.debug: state = 'swapped' rv = 0 else: self.log.debug("Checking state") status = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=dev_null, close_fds=True) for line in status.stdout: m = state_re.match(line) if m: state = m.group(1) else: for reg, st in ((no_exp_re, "none"), (swapping_re, "swapping")): m = reg.match(line) if m: state = st rv = status.wait() # If the experiment is not present the subcommand returns a # non-zero return value. If we successfully parsed a "none" # outcome, ignore the return code. if rv != 0 and state != 'none': raise service_error(service_error.internal, "Cannot get status of segment:%s/%s" % (pid, eid)) elif state not in ('active', 'swapped', 'swapping', 'none'): raise service_error(service_error.internal, "Cannot get status of segment:%s/%s" % (pid, eid)) else: self.log.debug("State is %s" % state) return state def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0): """ Start a sub-experiment on a federant. Get the current state, modify or create as appropriate, ship data and configs and start the experiment. There are small ordering differences based on the initial state of the sub-experiment. """ # ops node in the federant host = "%s%s" % (parent.ops, parent.domain) # Configuration directories on the remote machine proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid) softdir = "/proj/%s/software/%s" % (pid, eid) # Local software dir lsoftdir = "%s/software" % tmpdir state = self.get_state(user, host, pid, eid) if not self.scp_file(tclfile, user, host): return False if state == 'none': # Create a null copy of the experiment so that we capture any # logs there if the modify fails. Emulab software discards the # logs from a failed startexp try: f = open("%s/null.tcl" % tmpdir, "w") print >>f, self.null f.close() except IOError, e: raise service_error(service_error.internal, "Cannot stage tarfile/rpm: %s" % e.strerror) if not self.scp_file("%s/null.tcl" % tmpdir, user, host): return False self.log.info("[start_segment]: Creating %s" % eid) timedout = False try: if not self.ssh_cmd(user, host, ("/usr/testbed/bin/startexp -i -f -w -p %s " + "-e %s null.tcl") % (pid, eid), "startexp", timeout=60 * 10): return False except self.ssh_cmd_timeout: timedout = True if timedout: state = self.get_state(user, host, pid, eid) if state != "swapped": return False # Open up a temporary file to contain a script for setting up the # filespace for the new experiment. self.log.info("[start_segment]: creating script file") try: sf, scriptname = tempfile.mkstemp() scriptfile = os.fdopen(sf, 'w') except IOError: return False scriptbase = os.path.basename(scriptname) # Script the filesystem changes print >>scriptfile, "/bin/rm -rf %s" % proj_dir # Clear and create the software directory print >>scriptfile, "/bin/rm -rf %s/*" % softdir print >>scriptfile, 'mkdir -p %s' % proj_dir if os.path.isdir(lsoftdir): print >>scriptfile, 'mkdir -p %s' % softdir print >>scriptfile, "rm -f %s" % scriptbase scriptfile.close() # Move the script to the remote machine # XXX: could collide tempfile names on the remote host if self.scp_file(scriptname, user, host, scriptbase): os.remove(scriptname) else: return False # Execute the script (and the script's last line deletes it) if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase): return False for f in os.listdir(tmpdir): if not os.path.isdir("%s/%s" % (tmpdir, f)): if not self.scp_file("%s/%s" % (tmpdir, f), user, host, "%s/%s" % (proj_dir, f)): return False if os.path.isdir(lsoftdir): for f in os.listdir(lsoftdir): if not os.path.isdir("%s/%s" % (lsoftdir, f)): if not self.scp_file("%s/%s" % (lsoftdir, f), user, host, "%s/%s" % (softdir, f)): return False # Stage the new configuration (active experiments will stay swapped # in now) self.log.info("[start_segment]: Modifying %s" % eid) try: if not self.ssh_cmd(user, host, "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ (pid, eid, tclfile.rpartition('/')[2]), "modexp", timeout= 60 * 10): return False except self.ssh_cmd_timeout: self.log.error("Modify command failed to complete in time") # There's really no way to see if this succeeded or failed, so # if it hangs, assume the worst. return False # Active experiments are still swapped, this swaps the others in. if state != 'active': self.log.info("[start_segment]: Swapping %s" % eid) timedout = False try: if not self.ssh_cmd(user, host, "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), "swapexp", timeout=10*60): return False except self.ssh_cmd_timeout: timedout = True # If the command was terminated, but completed successfully, # report success. if timedout: self.log.debug("[start_segment]: swapin timed out " +\ "checking state") state = self.get_state(user, host, pid, eid) self.log.debug("[start_segment]: state is %s" % state) return state == 'active' # Everything has gone OK. return True class stop_segment(proxy_emulab_segment): def __init__(self, log=None, keyfile=None, debug=False): access.proxy_emulab_segment.__init__(self, log=log, keyfile=keyfile, debug=debug) def __call__(self, parent, user, pid, eid): """ Stop a sub experiment by calling swapexp on the federant """ host = "%s%s" % (parent.ops, parent.domain) self.log.info("[stop_segment]: Stopping %s" % eid) rv = False try: # Clean out tar files: we've gone over quota in the past self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \ (pid, eid)) rv = self.ssh_cmd(user, host, "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid)) except self.ssh_cmd_timeout: rv = False return rv def generate_portal_configs(self, topo, pubkey_base, secretkey_base, tmpdir, master): seer_out = False client_out = False for p in [ e for e in topo.elements \ if isinstance(e, topdl.Computer) and e.get_attribute('portal')]: myname = e.name[0] peer = e.get_attribute('peer') lexp = e.get_attribute('experiment') lproj, leid = lexp.split('/', 1) ldomain = e.get_attribute('domain') mexp = e.get_attribute('masterexperiment') mproj, meid = mexp.split("/", 1) mdomain = e.get_attribute('masterdomain') muser = e.get_attribute('masteruser') or 'root' smbshare = e.get_attribute('smbshare') or 'USERS' scriptdir = e.get_attribute('scriptdir') active = e.get_attribute('active') type = e.get_attribute('portal_type') segid = fedid(hexstr=e.get_attribute('peer_segment')) for e in topo.elements: if isinstance(e, topdl.Segment) and e.id.fedid == segid: seg = e break else: raise service_error(service_error.req, "Can't find segment for portal %s" % myname) rexp = seg.get_attribute('experiment') rproj, reid = rexp.split("/", 1) rdomain = seg.get_attribute('domain') cfn = "%s/%s.%s.%s%s.gw.conf" % \ (tmpdir, myname.lower(), leid.lower(), lproj.lower(), ldomain.lower()) tunnelconfig = self.attrs.has_key('TunnelCfg') try: f = open(cfn, "w") print >>f, "Active: %s" % active print >>f, "TunnelCfg: %s" % tunnelconfig print >>f, "BossName: boss" print >>f, "FsName: fs" print >>f, "EventServerName: event-server%s" % ldomain print >>f, "RemoteEventServerName: event-server%s" % rdomain print >>f, "SeerControl: control.%s.%s%s" % \ (meid.lower(), mproj.lower(), mdomain) print >>f, "Type: %s" % type print >>f, "RemoteExperiment: %s" % rexp print >>f, "LocalExperiment: %s" % lexp print >>f, "RemoteConfigFile: " + \ "/proj/%s/exp/%s/tmp/%s.%s.%s%s.gw.conf" \ % (rproj, reid, peer.lower(), reid.lower(), rproj.lower(), rdomain) print >>f, "Peer: %s.%s.%s%s" % \ (peer.lower(), reid.lower(), rproj.lower(), rdomain) print >>f, "RemoteScriptDir: %s" % scriptdir print >>f, "Pubkeys: /proj/%s/exp/%s/tmp/%s" % \ (lproj, leid, pubkey_base) print >>f, "Privkeys: /proj/%s/exp/%s/tmp/%s" % \ (lproj, leid, secretkey_base) f.close() except IOError, e: raise service_error(service_error.internal, "Can't write protal config %s: %s" % (cfn, e)) # XXX: This little seer config file needs to go away. if not seer_out: try: seerfn = "%s/seer.conf" % tmpdir f = open(seerfn, "w") if not master: print >>f, "ControlNode: control.%s.%s%s" % \ (meid.lower(), mproj.lower(), mdomain) print >>f, "ExperimentID: %s" % mexp f.close() except IOError, e: raise service_error(service_error.internal, "Can't write seer.conf: %s" %e) seer_out = True if not client_out and type in ('control', 'both'): try: f = open("%s/client.conf" % tmpdir, "w") print >>f, "ControlGateway: %s.%s.%s%s" % \ (myname.lower(), leid.lower(), lproj.lower(), ldomain.lower()) print >>f, "SMBshare: %s" % smbshare print >>f, "ProjectUser: %s" % muser print >>f, "ProjectName: %s" % mproj print >>f, "ExperimentID: %s/%s" % (mproj, meid) f.close() except IOError, e: raise service_error(service_error.internal, "Cannot write client.conf: %s" %s) client_out = True def generate_ns2(self, topo, expfn, softdir, master): t = topo.clone() # Weed out the things we aren't going to instantiate: Segments, portal # substrates, and portal interfaces. (The copy in the for loop allows # us to delete from e.elements in side the for loop). for e in [e for e in t.elements]: if isinstance(e, topdl.Segment): t.elements.remove(e) if isinstance(e, topdl.Computer): e.interface = [i for i in e.interface \ if not i.get_attribute('portal')] t.substrates = [ s for s in t.substrates \ if not s.get_attribute('portal')] t.incorporate_elements() # Localize the software locations for e in t.elements: for s in getattr(e, 'software', []): s.location = re.sub("^.*/", softdir, s.location) # Customize the ns2 output for local portal commands and images filters = [] if master: cmdname = 'MasterConnectorCmd' else:cmdname = 'SlaveConnectorCmd' if self.attrs.has_key(cmdname): filters.append(topdl.generate_portal_command_filter( self.attrs.get(cmdname))) if self.attrs.has_key('connectorImage'): filters.append(topdl.generate_portal_image_filter( self.attrs.get('connectorImage'))) if self.attrs.has_key('connectorType'): filters.append(topdl.generate_portal_hardware_filter( self.attrs.get('connectorType'))) # Convert to ns and write it out expfile = topdl.topology_to_ns2(t, filters) try: f = open(expfn, "w") print >>f, expfile f.close() except IOError: raise service_error(service_error.internal, "Cannot write experiment file %s: %s" % (expfn,e)) def StartSegment(self, req, fid): def get_url(url, cf, destdir): po = urlparse(url) fn = po.path.rpartition('/')[2] try: conn = httplib.HTTPSConnection(po.hostname, port=po.port, cert_file=cf, key_file=cf) conn.putrequest('GET', po.path) conn.endheaders() response = conn.getresponse() lf = open("%s/%s" % (destdir, fn), "w") buf = response.read(4096) while buf: lf.write(buf) buf = response.read(4096) lf.close() except IOError, e: raise service_error(service_error.internal, "Erro writing tempfile: %s" %e) except httplib.HTTPException, e: raise service_error(service_error.internal, "Error retrieving data: %s" % e) configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey')) err = None # Any service_error generated after tmpdir is created rv = None # Return value from segment creation try: req = req['StartSegmentRequestBody'] except KeyError: raise service_error(server_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) if not self.auth.check_attribute(fid, auth_attr): raise service_error(service_error.access, "Access denied") if req.has_key('segmentdescription') and \ req['segmentdescription'].has_key('topdldescription'): topo = \ topdl.Topology(**req['segmentdescription']['topdldescription']) else: raise service_error(service_error.req, "Request missing segmentdescription'") master = req.get('master', False) certfile = "%s/%s.pem" % (self.certdir, auth_attr) try: tmpdir = tempfile.mkdtemp(prefix="access-") softdir = "%s/software" % tmpdir except IOError: raise service_error(service_error.internal, "Cannot create tmp dir") # Try block alllows us to clean up temporary files. try: sw = set() for e in topo.elements: for s in getattr(e, 'software', []): sw.add(s.location) if len(sw) > 0: os.mkdir(softdir) for s in sw: get_url(s, certfile, softdir) for a in attrs: if a['attribute'] in configs: get_url(a['value'], certfile, tmpdir) if a['attribute'] == 'ssh_pubkey': pubkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'ssh_secretkey': secretkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'experiment_name': ename = a['value'] proj = None user = None self.state_lock.acquire() if self.allocation.has_key(aid): proj = self.allocation[aid].get('project', None) if not proj: proj = self.allocation[aid].get('sproject', None) user = self.allocation[aid].get('user', None) self.allocation[aid]['experiment'] = ename self.write_state() self.state_lock.release() if not proj: raise service_error(service_error.internal, "Can't find project for %s" %aid) if not user: raise service_error(service_error.internal, "Can't find creation user for %s" %aid) expfile = "%s/experiment.tcl" % tmpdir self.generate_portal_configs(topo, pubkey_base, secretkey_base, tmpdir, master) self.generate_ns2(topo, expfile, "/proj/%s/software/%s/" % (proj, ename), master) starter = self.start_segment(keyfile=self.ssh_privkey_file, debug=self.create_debug) rv = starter(self, ename, proj, user, expfile, tmpdir) except service_error, e: err = e self.log.debug("[StartSegment]: removing %s" % tmpdir) # Walk up tmpdir, deleting as we go for path, dirs, files in os.walk(tmpdir, topdown=False): for f in files: os.remove(os.path.join(path, f)) for d in dirs: os.rmdir(os.path.join(path, d)) os.rmdir(tmpdir) if rv: return { 'allocID': req['allocID'] } elif err: raise service_error(service_error.federant, "Swapin failed: %s" % err) else: raise service_error(service_error.federant, "Swapin failed") def TerminateSegment(self, req, fid): try: req = req['TerminateSegmentRequestBody'] except KeyError: raise service_error(server_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr attrs = req.get('fedAttr', []) if not self.auth.check_attribute(fid, auth_attr): raise service_error(service_error.access, "Access denied") self.state_lock.acquire() if self.allocation.has_key(aid): proj = self.allocation[aid].get('project', None) if not proj: proj = self.allocation[aid].get('sproject', None) user = self.allocation[aid].get('user', None) ename = self.allocation[aid].get('experiment', None) self.state_lock.release() if not proj: raise service_error(service_error.internal, "Can't find project for %s" % aid) if not user: raise service_error(service_error.internal, "Can't find creation user for %s" % aid) if not ename: raise service_error(service_error.internal, "Can't find experiment name for %s" % aid) stopper = self.stop_segment(keyfile=self.ssh_privkey_file, debug=self.create_debug) stopper(self, user, proj, ename) return { 'allocID': req['allocID'] }