#!/usr/local/bin/python
import os,sys
import re
import random
import string
import subprocess
import threading
import pickle
import tempfile
from util import *
from fedid import fedid
from remote_service import xmlrpc_handler, soap_handler, service_caller
from service_error import service_error
import logging
# Configure loggers to dump to /dev/null which avoids errors if calling classes
# don't configure them.
class nullHandler(logging.Handler):
def emit(self, record): pass
fl = logging.getLogger("fedd.allocate.local")
fl.addHandler(nullHandler())
fl = logging.getLogger("fedd.allocate.remote")
fl.addHandler(nullHandler())
class allocate_project_local:
"""
Allocate projects on this machine in response to an access request.
"""
dynamic_projects = 4
dynamic_keys= 2
confirm_keys = 1
none = 0
levels = {
'dynamic_projects': dynamic_projects,
'dynamic_keys': dynamic_keys,
'confirm_keys': confirm_keys,
'none': none,
}
def __init__(self, config, auth=None):
"""
Initializer. Parses a configuration if one is given.
"""
self.debug = config.getboolean("allocate", "debug", False)
self.wap = config.get('allocate', 'wap', '/usr/testbed/sbin/wap')
self.newproj = config.get('allocate', 'newproj',
'/usr/testbed/sbin/newproj')
self.mkproj = config.get('allocate', 'mkproj',
'/usr/testbed/sbin/mkproj')
self.rmproj = config.get('allocate', 'rmproj',
'/usr/testbed/sbin/rmproj')
self.rmuser = config.get('allocate', 'rmuser',
'/usr/testbed/sbin/rmuser')
self.newuser = config.get('allocate', 'newuser',
'/usr/testbed/sbin/newuser')
self.addpubkey = config.get('allocate', 'addpubkey',
'/usr/testbed/sbin/addpubkey')
self.grantnodetype = config.get('allocate', 'grantnodetype',
'/usr/testbed/sbin/grantnodetype')
self.confirmkey = config.get('allocate', 'confirmkey',
'/usr/testbed/sbin/taddpubkey')
self.user_to_project=config.get("allocate", 'user_to_project',
'/usr/local/bin/user_to_project.py')
self.allocation_level = config.get("allocate", "allocation_level",
"none")
self.log = logging.getLogger("fedd.allocate.local")
set_log_level(config, "allocate", self.log)
if auth:
self.auth = auth
else:
auth = authorizer()
log.warn("[allocate] No authorizer passed in, using local one")
try:
self.allocation_level = \
self.levels[self.allocation_level.strip().lower()]
except KeyError:
self.log.error("Bad allocation_level %s. Defaulting to none" % \
self.allocation_error)
self.allocation_level = self.none
self.state = {
'keys': set(),
'types': set(),
'projects': set(),
'users': set(),
}
self.state_filename = config.get('allocate', 'allocation_state')
self.state_lock = threading.Lock()
self.read_state()
access_db = config.get("allocate", "accessdb")
if access_db:
try:
read_simple_accessdb(access_db, self.auth, 'allocate')
except IOError, e:
raise service_error(service_error.internal,
"Error reading accessDB %s: %s" % (access_db, e))
except ValueError:
raise service_error(service_error.internal, "%s" % e)
# Internal services are SOAP only
self.soap_services = {\
"AllocateProject": soap_handler("AllocateProject",
self.dynamic_project),
"StaticProject": soap_handler("StaticProject",
self.static_project),
"ReleaseProject": soap_handler("ReleaseProject",
self.release_project),
}
self.xmlrpc_services = { }
def read_state(self):
"""
Read a new copy of access state. Old state is overwritten.
State format is a simple pickling of the state dictionary.
"""
if self.state_filename:
try:
f = open(self.state_filename, "r")
self.state = pickle.load(f)
self.log.debug("[allocation]: Read state from %s" % \
self.state_filename)
except IOError, e:
self.log.warning(("[allocation]: No saved state: " +\
"Can't open %s: %s") % (self.state_filename, e))
except EOFError, e:
self.log.warning(("[allocation]: " +\
"Empty or damaged state file: %s:") % \
self.state_filename)
except pickle.UnpicklingError, e:
self.log.warning(("[allocation]: No saved state: " + \
"Unpickling failed: %s") % e)
# These should all be in the picked representation, but make sure
if not self.state.has_key('keys'): self.state['keys'] = set()
if not self.state.has_key('types'): self.state['types'] = set()
if not self.state.has_key('projects'):
self.state['projects'] = set()
if not self.state.has_key('users'): self.state['users'] = set()
def write_state(self):
if self.state_filename:
try:
f = open(self.state_filename, 'w')
pickle.dump(self.state, f)
except IOError, e:
self.log.error("Can't write file %s: %s" % \
(self.state_filename, e))
except pickle.PicklingError, e:
self.log.error("Pickling problem: %s" % e)
except TypeError, e:
self.log.error("Pickling problem (TypeError): %s" % e)
def random_string(self, s, n=3):
"""Append n random ASCII characters to s and return the string"""
rv = s
for i in range(0,n):
rv += random.choice(string.ascii_letters)
return rv
def write_attr_xml(self, file, root, lines):
"""
Write an emulab config file for a dynamic project.
Format is lines[1]
"""
# Convert a pair to an attribute line
out_attr = lambda a,v : \
'%s' % (a, v)
f = os.fdopen(file, "w")
f.write("<%s>\n" % root)
f.write("\n".join([out_attr(*l) for l in lines]))
f.write("%s>\n" % root)
f.close()
def run_cmd(self, cmd, log_prefix='allocate'):
"""
Run the command passed in. Cmd is a list containing the words of the
command. Return the exit value from the subprocess - that is 0 on
success. On an error running the command - python or OS error, raise
a service exception.
"""
self.log.debug("[%s]: %s" % (log_prefix, ' '.join(cmd)))
if not self.debug:
try:
return subprocess.call(cmd)
except OSError, e:
raise service_error(service_error.internal,
"Static project subprocess creation error "+ \
"[%s] (%s)" % (cmd[0], e.strerror))
else:
return 0
def confirm_key(self, user, key):
"""
Call run_cmd to comfirm the key. Return a boolean rather
than the subprocess code.
"""
return self.run_cmd((self.wap, self.confirmkey, '-C',
'-u', user, '-k', key)) ==0
def add_key(self, user, key):
"""
Call run_cmd to add the key. Return a boolean rather
than the subprocess code.
"""
return self.run_cmd((self.wap, self.addpubkey, '-u', user,
'-k', key)) == 0
def remove_key(self, user, key):
"""
Call run_cmd to remove the key. Return a boolean rather
than the subprocess code.
"""
return self.run_cmd((self.wap, self.addpubkey, '-R', '-u', user,
'-k', key)) == 0
def confirm_access(self, project, type):
"""
Call run_cmd to comfirm the key. Return a boolean rather
than the subprocess code.
"""
return self.run_cmd((self.wap, self.grantnodetype, '-C',
'-p', project, type)) ==0
def add_access(self, project, type):
"""
Call run_cmd to add the key. Return a boolean rather
than the subprocess code.
"""
return self.run_cmd((self.wap, self.grantnodetype,
'-p', project, type)) == 0
def remove_access(self, project, type):
"""
Call run_cmd to remove the key. Return a boolean rather
than the subprocess code.
"""
return self.run_cmd((self.wap, self.grantnodetype, '-R',
'-p', project, type)) == 0
def add_project(self, project, projfile):
"""
Create a project using run_cmd. This is two steps, and assumes that
the relevant XML files are in place and correct. Make the return value
boolean. Note that if a new user is specified in the XML, that user is
created on success.
"""
if self.run_cmd((self.wap, self.newproj, projfile)) == 0:
return self.run_cmd((self.wap, self.mkproj, project)) ==0
else:
return False
def remove_project(self, project):
"""
Call run_cmd to remove the project. Make the return value boolean.
"""
return self.run_cmd(self.wap, self.rmproj, project) == 0
def add_user(self, name, param_file, project):
"""
Create a user and link them to the given project. Similar to
add_project, this requires a two step approach. Returns True on success
False on failure.
"""
if self.run_cmd((self.wap, self.newuser, param_file)) == 0:
return self.run_cmd((self.wap, self.user_to_project,
user, project)) == 0
else:
return False
def remove_user(self, user):
"""
Call run_cmd to remove the user. Make the return value boolean.
"""
return self.run_cmd(self.wap, self.rmuser, user) == 0
def dynamic_project(self, req, fedid=None):
"""
Create a dynamic project with ssh access
Req includes the project and resources as a dictionary
"""
# Internal calls do not have a fedid parameter (i.e., local calls on
# behalf of already vetted fedids)
if fedid and not self.auth.check_attribute(fedid, "allocate"):
self.log.debug("[allocate] Access denied (%s)" % fedid)
raise service_error(service_error.access, "Access Denied")
if self.allocation_level < self.dynamic_projects:
raise service_error(service_error.access,
"[dynamic_project] dynamic project allocation not " + \
"permitted: check allocation level")
# tempfiles for the parameter files
cuf, create_userfile = tempfile.mkstemp(prefix="usr", suffix=".xml",
dir="/tmp")
suf, service_userfile = tempfile.mkstemp(prefix="usr", suffix=".xml",
dir="/tmp")
pf, projfile = tempfile.mkstemp(prefix="proj", suffix=".xml",
dir="/tmp")
if req.has_key('AllocateProjectRequestBody'):
proj = req['AllocateProjectRequestBody'].get('project', None)
if not proj:
raise service_error(service_error.req,
"Badly formed allocation request")
resources = req['AllocateProjectRequestBody'].get('resources', { })
else:
raise service_error(service_error.req,
"Badly formed allocation request")
# Take the first user and ssh key
name = proj.get('name', None) or self.random_string("proj",4)
user = proj.get('user', [])
uname = { }
ssh = { }
for u in user:
role = u.get('role', None)
if not role: continue
if u.has_key('userID'):
uid = u['userID']
uname[role] = uid.get('localname', None) or \
uid.get('kerberosUsername', None) or \
uid.get('uri', None)
if uname[role] == None:
raise service_error(service_error.req, "No ID for user")
else:
uname[role] = self.random_string("user", 3)
access = u.get('access', None)
if access:
# XXX collect and call addpubkey later, for now use first one.
for a in access:
ssh[role] = a.get('sshPubkey', None)
if ssh: break
else:
raise service_error(service_error.req,
"No SSH key for user %s" % uname[role])
else:
raise service_error(service_error.req,
"No access mechanisms for for user %s" % uname[role])
if not (uname.has_key('experimentCreation') and \
uname.has_key('serviceAccess')):
raise service_error(service_error.req,
"Must specify both user roles")
create_user_fields = [
("name", "Federation User %s" % uname['experimentCreation']),
("email", "%s-fed@isi.deterlab.net" % \
uname['experimentCreation']),
("password", self.random_string("", 8)),
("login", uname['experimentCreation']),
("address", "4676 Admiralty"),
("city", "Marina del Rey"),
("state", "CA"),
("zip", "90292"),
("country", "USA"),
("phone", "310-448-9190"),
("title", "None"),
("affiliation", "USC/ISI"),
("pubkey", ssh['experimentCreation'])
]
service_user_fields = [
("name", "Federation User %s" % uname['serviceAccess']),
("email", "%s-fed@isi.deterlab.net" % uname['serviceAccess']),
("password", self.random_string("", 8)),
("login", uname['serviceAccess']),
("address", "4676 Admiralty"),
("city", "Marina del Rey"),
("state", "CA"),
("zip", "90292"),
("country", "USA"),
("phone", "310-448-9190"),
("title", "None"),
("affiliation", "USC/ISI"),
("pubkey", ssh['serviceAccess'])
]
proj_fields = [
("name", name),
("short description", "dynamic federated project"),
("URL", "http://www.isi.edu/~faber"),
("funders", "USC/USU"),
("long description", "Federation access control"),
("public", "1"),
("num_pcs", "100"),
("linkedtous", "1"),
("newuser_xml", create_userfile)
]
added_projects = [ ]
added_users = [ ]
added_types = [ ]
self.state_lock.acquire()
try:
# Write out the files
self.write_attr_xml(cuf, "user", create_user_fields)
self.write_attr_xml(suf, "user", service_user_fields)
self.write_attr_xml(pf, "project", proj_fields)
try:
if self.add_project(name, projfile):
# add_project adds a user as well in this case
added_projects.append(name)
added_users.append(uname['createExperiment'])
self.state['projects'].add(name)
self.state['users'].add(uname['createExperiment'])
if self.add_user(uname['serviceAccess'],
service_userfile, name):
added_users.append(uname['serviceAccess'])
self.state['users'].add(uname['serviceAccess'])
else:
raise service_error("Unable to create user %s" % \
uname['serviceAccess'])
else:
raise service_error("Unable to create project/user %s/%s" % \
(name, uname['experimentCreation']))
nodes = resources.get('node', [])
# Grant access to restricted resources. This is simpler than
# the corresponding loop from static_project because this is a
# clean slate.
for nt in [ h for n in nodes\
if n.has_key('hardware')\
for h in n['hardware'] ] :
if self.add_access(name, nt):
self.state['types'].add((name, nt))
added_types.append((name, nt))
else:
raise service_error(service_error.internal,
"Failed to add access for %s to %s"\
% (name, nt))
except service_error, e:
# Something failed. Back out the partial allocation as
# completely as possible and re-raise the error.
for p, t in added_types:
self.state['types'].discard((p, t))
try:
self.remove_access(p, t)
except service_error:
pass
for u in added_users:
self.state['users'].discard(u)
try:
self.remove_user(u)
except service_error:
pass
for p in added_projects:
self.state['projects'].discard(p)
try:
self.remove_project(p)
except service_error:
pass
self.state_lock.release()
raise e
finally:
# Clean up tempfiles
os.unlink(create_userfile)
os.unlink(service_userfile)
os.unlink(projfile)
rv = {\
'project': {\
'name': { 'localname': name },
'user' : [\
{\
'userID': { 'localname' : uname['experimentCreation'] },
'access': [ {'sshPubkey': ssh['experimentCreation'] } ],
'role': 'experimentCreation',
}, \
{\
'userID': { 'localname' : uname['serviceAccess'] },
'access': [ { 'sshPubkey' : ssh['serviceAccess'] } ],
'role': 'serviceAccess',
} \
]\
}\
}
return rv
def static_project(self, req, fedid=None):
"""
Be certain that the local project in the request has access to the
proper resources and users have correct keys. Add them if necessary.
"""
# Internal calls do not have a fedid parameter (i.e., local calls on
# behalf of already vetted fedids)
if fedid and not self.auth.check_attribute(fedid, "allocate"):
self.log.debug("[allocate] Access denied (%s)" % fedid)
raise service_error(service_error.access, "Access Denied")
# While we should be more careful about this, for the short term, add
# the keys to the specified users.
try:
users = req['StaticProjectRequestBody']['project']['user']
pname = req['StaticProjectRequestBody']['project']\
['name']['localname']
resources = req['StaticProjectRequestBody'].get('resources', { })
except KeyError:
raise service_error(service_error.req, "Badly formed request")
added_keys = [ ]
added_types = [ ]
# Keep track of changes made to the system
self.state_lock.acquire()
try:
for u in users:
try:
name = u['userID']['localname']
except KeyError:
raise service_error(service_error.req, "Badly formed user")
for sk in [ k['sshPubkey'] for k in u.get('access', []) \
if k.has_key('sshPubkey')]:
if self.allocation_level >=self.confirm_keys:
key_ok = self.confirm_key(name, sk)
if not key_ok:
if self.allocation_level >= self.dynamic_keys:
if self.add_key(name, sk):
self.state['keys'].add((name, sk))
added_keys.append((name, sk))
else:
raise service_error(service_error.internal,
"Failed to add key for %s" % name)
else:
raise service_error(service_error.internal,
"Failed to confirm key for %s" % name)
else:
self.log.warning("[static_project] no checking of " + \
"static keys")
# Grant access to any resources in the request. The
# list comprehension pulls out the hardware types in the node
# entries in the resources list. The access module knows to
# only send resources that are restricted and needed by the
# project.
nodes = resources.get('node', [])
for nt in [ h for n in nodes\
if n.has_key('hardware')\
for h in n['hardware'] ] :
if self.allocation_level >= self.confirm_keys:
access_ok = self.confirm_access(pname, nt)
if not access_ok:
if self.allocation_level >= self.dynamic_keys:
if self.add_access(pname, nt):
self.state['types'].add((pname, nt))
added_types.append((pname, nt))
else:
raise service_error(service_error.internal,
"Failed to add access for %s to %s"\
% (pname, nt))
else:
raise service_error(service_error.internal,
"Failed to confirm access for %s to %s"\
% (pname, nt))
else:
self.log.warning("[static_project] no checking of " + \
"node access")
except service_error, e:
# Do our best to clean up partial allocation and reraise the
# error. Do our best to make sure that both allocation state and
# testbed state is restored.
for u, k in added_keys:
self.state['keys'].discard((u, k))
try:
self.remove_key(u, k)
except service_error:
pass
for p, t in added_types:
self.state['types'].discard((p, t))
try:
self.remove_access(p, t)
except service_error:
pass
self.state_lock.release()
raise e
# All is well, save state and release the lock
self.write_state()
self.state_lock.release()
# return { 'project': req['StaticProjectRequestBody']['project']}
return req['StaticProjectRequestBody']
def release_project(self, req, fedid=None):
"""
Remove user keys from users and delete dynamic projects.
Only keys this service created are deleted and there are
similar protections for projects.
"""
# Internal calls do not have a fedid parameter (i.e., local calls on
# behalf of already vetted fedids)
if fedid and not self.auth.check_attribute(fedid, "allocate"):
self.log.debug("[allocate] Access denied (%s)" % fedid)
raise service_error(service_error.access, "Access Denied")
pname = None
users = []
nodes = [ ]
print req
try:
if req['ReleaseProjectRequestBody']['project'].has_key('name'):
pname = req['ReleaseProjectRequestBody']['project']\
['name']['localname']
if req['ReleaseProjectRequestBody']['project'].has_key('user'):
users = req['ReleaseProjectRequestBody']['project']['user']
if req['ReleaseProjectRequestBody'].has_key('resources'):
nodes = req['ReleaseProjectRequestBody']\
['resources'].get('node', [])
except KeyError:
raise service_error(service_error.req, "Badly formed request")
if nodes and not pname:
raise service_error(service_error.req,
"Badly formed request (nodes without project)")
self.state_lock.acquire()
try:
for nt in [ h for n in nodes if n.has_key('hardware')\
for h in n['hardware'] ] :
if (pname, nt ) in self.state['types']:
self.remove_access(pname, nt)
self.state['types'].discard((pname, nt))
for u in users:
try:
name = u['userID']['localname']
except KeyError:
raise service_error(service_error.req, "Badly formed user")
if name in self.state['users']:
# If we created this user, discard the user, keys and all
self.remove_user(name)
self.state['users'].discard(name)
else:
# If not, just strip any keys we added
for sk in [ k['sshPubkey'] for k in u.get('access', []) \
if k.has_key('sshPubkey')]:
if (name, sk) in self.state['keys']:
self.remove_key(name, sk)
self.state['keys'].discard((name, sk))
if pname in self.state['projects']:
self.remove_project(pname)
self.state['projects'].discard(pname)
except service_error, e:
self.write_state()
self.state_lock.release()
raise e
self.write_state()
self.state_lock.release()
return { 'project': req['ReleaseProjectRequestBody']['project']}
class allocate_project_remote:
"""
Allocate projects on a remote machine using the internal SOAP interface
"""
class proxy(service_caller):
"""
This class is a proxy functor (callable) that has the same signature as
a function called by soap_handler or xmlrpc_handler, but that used the
service_caller class to call the function remotely.
"""
def __init__(self, url, cert_file, cert_pwd, trusted_certs, auth,
method):
service_caller.__init__(self, method)
self.url = url
self.cert_file = cert_file
self.cert_pwd = cert_pwd
self.trusted_certs = trusted_certs
self.request_body__name = "%sRequestBody" % method
self.resp_name = "%sResponseBody" % method
self.auth = auth
# Calling the proxy object directly invokes the proxy_call method,
# not the service_call method.
self.__call__ = self.proxy_call
# Define the proxy, NB, the parameters to make_proxy are visible to the
# definition of proxy.
def proxy_call(self, req, fid=None):
"""
Send req on to a remote project instantiator.
Req is just the message to be sent. This function re-wraps it.
It also rethrows any faults.
"""
if req.has_key(self.request_body_name):
req = req[self.request_body_name]
else:
raise service_error(service_error.req, "Bad formated request");
try:
r = self.call_service(self.url, req, self.cert_file,
self.cert_pwd, self.trusted_certs)
except service_error, e:
if e.code == service_error.connect:
raise service_error(service_error.internal,
"Cannot connect to internal service: (%d) %s" % \
(e.code, e.desc))
else: raise
if r.has_key(self.resp_name):
return r[self.resp_name]
else:
raise service_error(service_error.protocol,
"Bad proxy response")
# back to defining the allocate_project_remote class
def __init__(self, config, auth=None):
"""
Initializer. Parses a configuration if one is given.
"""
self.debug = config.get("allocate", "debug", False)
self.url = config.get("allocate", "uri", "")
# Keep cert file and password coming from the same source
self.cert_file = config.get("allocate", "cert_file", None)
if self.cert_file:
self.cert_pwd = config.get("allocate", "cert_pwd", None)
else:
self.cert_file = config.get("globals", "cert_file", None)
self.cert_pwd = config.get("globals", "cert_pwd", None)
self.trusted_certs = config.get("allocate", "trusted_certs", None) or \
config.get("globals", "trusted_certs")
self.soap_services = { }
self.xmlrpc_services = { }
self.log = logging.getLogger("fedd.allocate.remote")
set_log_level(config, "allocate", self.log)
if auth:
self.auth = auth
else:
auth = authorizer()
log.warn("[allocate] No authorizer passed in, using local one")
# The specializations of the proxy functions
self.dynamic_project = self.proxy(self.url, self.cert_file,
self.cert_pwd, self.trusted_certs, self.auth,
"AllocateProject")
self.static_project = self.proxy(self.url, self.cert_file,
self.cert_pwd, self.trusted_certs, self.auth,
"StaticProject")
self.release_project = self.proxy(self.url, self.cert_file,
self.cert_pwd, self.trusted_certs, self.auth,
"ReleaseProject")