#!/usr/local/bin/python
import os,sys
import re
import random
import string
import subprocess
import tempfile
from util import *
from fedid import fedid
from fixed_resource import read_key_db, read_project_db, read_user_db
from remote_service import xmlrpc_handler, soap_handler, service_caller
from service_error import service_error
import logging
# Configure loggers to dump to /dev/null which avoids errors if calling classes
# don't configure them.
class nullHandler(logging.Handler):
def emit(self, record): pass
fl = logging.getLogger("fedd.allocate.local")
fl.addHandler(nullHandler())
fl = logging.getLogger("fedd.allocate.remote")
fl.addHandler(nullHandler())
class allocate_project_local:
"""
Allocate projects on this machine in response to an access request.
"""
dynamic_projects = 4
dynamic_keys= 2
confirm_keys = 1
none = 0
levels = {
'dynamic_projects': dynamic_projects,
'dynamic_keys': dynamic_keys,
'confirm_keys': confirm_keys,
'none': none,
}
def __init__(self, config, auth=None):
"""
Initializer. Parses a configuration if one is given.
"""
self.debug = config.get("allocate", "debug", False)
self.wap = config.get('allocate', 'wap', '/usr/testbed/sbin/wap')
self.newproj = config.get('allocate', 'newproj',
'/usr/testbed/sbin/newproj')
self.mkproj = config.get('allocate', 'mkproj',
'/usr/testbed/sbin/mkproj')
self.rmproj = config.get('allocate', 'rmproj',
'/usr/testbed/sbin/rmproj')
self.rmuser = config.get('allocate', 'rmuser',
'/usr/testbed/sbin/rmuser')
self.newuser = config.get('allocate', 'newuser',
'/usr/testbed/sbin/newuser')
self.addpubkey = config.get('allocate', 'addpubkey',
'/usr/testbed/sbin/taddpubkey')
self.grantnodetype = config.get('allocate', 'grantnodetype',
'/usr/testbed/sbin/grantnodetype')
self.confirmkey = config.get('allocate', 'confirmkey',
'/usr/testbed/sbin/taddpubkey')
self.user_to_project=config.get("allocate", 'user_to_project',
'/usr/local/bin/user_to_project.py')
self.allocation_level = config.get("allocate", "allocation_level",
"none")
self.log = logging.getLogger("fedd.allocate.local")
set_log_level(config, "allocate", self.log)
if auth:
self.auth = auth
else:
auth = authorizer()
log.warn("[allocate] No authorizer passed in, using local one")
try:
self.allocation_level = \
self.levels[self.allocation_level.strip().lower()]
except KeyError:
self.log.error("Bad allocation_level %s. Defaulting to none" % \
self.allocation_error)
self.allocation_level = self.none
access_db = config.get("allocate", "accessdb")
if access_db:
try:
read_simple_accessdb(access_db, self.auth, 'allocate')
except IOError, e:
raise service_error(service_error.internal,
"Error reading accessDB %s: %s" % (access_db, e))
except ValueError:
raise service_error(service_error.internal, "%s" % e)
fixed_key_db = config.get("allocate", "fixed_keys", None)
fixed_project_db = config.get("allocate", "fixed_projects", None)
fixed_user_db = config.get("allocate", "fixed_users", None)
self.fixed_keys = set()
self.fixed_projects = set()
self.fixed_users = set()
# initialize the fixed resource sets
for db, rset, fcn in (\
(fixed_key_db, self.fixed_keys, read_key_db), \
(fixed_project_db, self.fixed_projects, read_project_db),
(fixed_user_db, self.fixed_users, read_user_db)):
if db:
try:
rset.update(fcn(db))
except:
self.log.debug("Can't read resources from %s" % db)
# Internal services are SOAP only
self.soap_services = {\
"AllocateProject": soap_handler("AllocateProject",
self.dynamic_project),
"StaticProject": soap_handler("StaticProject",
self.static_project),
"ReleaseProject": soap_handler("ReleaseProject",
self.release_project),
}
self.xmlrpc_services = { }
def random_string(self, s, n=3):
"""Append n random ASCII characters to s and return the string"""
rv = s
for i in range(0,n):
rv += random.choice(string.ascii_letters)
return rv
def write_attr_xml(self, file, root, lines):
"""
Write an emulab config file for a dynamic project.
Format is lines[1]
"""
# Convert a pair to an attribute line
out_attr = lambda a,v : \
'%s' % (a, v)
f = os.fdopen(file, "w")
f.write("<%s>\n" % root)
f.write("\n".join([out_attr(*l) for l in lines]))
f.write("%s>\n" % root)
f.close()
def dynamic_project(self, req, fedid=None):
"""
Create a dynamic project with ssh access
Req includes the project and resources as a dictionary
"""
# Internal calls do not have a fedid parameter (i.e., local calls on
# behalf of already vetted fedids)
if fedid and not self.auth.check_attribute(fedid, "allocate"):
self.log.debug("[allocate] Access denied (%s)" % fedid)
raise service_error(service_error.access, "Access Denied")
if self.allocation_level < self.dynamic_projects:
raise service_error(service_error.access,
"[dynamic_project] dynamic project allocation not " + \
"permitted: check allocation level")
# tempfiles for the parameter files
cuf, create_userfile = tempfile.mkstemp(prefix="usr", suffix=".xml",
dir="/tmp")
suf, service_userfile = tempfile.mkstemp(prefix="usr", suffix=".xml",
dir="/tmp")
pf, projfile = tempfile.mkstemp(prefix="proj", suffix=".xml",
dir="/tmp")
if req.has_key('AllocateProjectRequestBody') and \
req['AllocateProjectRequestBody'].has_key('project'):
proj = req['AllocateProjectRequestBody']['project']
else:
raise service_error(service_error.req,
"Badly formed allocation request")
# Take the first user and ssh key
name = proj.get('name', None) or self.random_string("proj",4)
user = proj.get('user', [])
uname = { }
ssh = { }
for u in user:
role = u.get('role', None)
if not role: continue
if u.has_key('userID'):
uid = u['userID']
uname[role] = uid.get('localname', None) or \
uid.get('kerberosUsername', None) or \
uid.get('uri', None)
if uname[role] == None:
raise service_error(service_error.req, "No ID for user")
else:
uname[role] = self.random_string("user", 3)
access = u.get('access', None)
if access:
# XXX collect and call addpubkey later, for now use first one.
for a in access:
ssh[role] = a.get('sshPubkey', None)
if ssh: break
else:
raise service_error(service_error.req,
"No SSH key for user %s" % uname[role])
else:
raise service_error(service_error.req,
"No access mechanisms for for user %s" % uname[role])
if not (uname.has_key('experimentCreation') and \
uname.has_key('serviceAccess')):
raise service_error(service_error.req,
"Must specify both user roles")
create_user_fields = [
("name", "Federation User %s" % uname['experimentCreation']),
("email", "%s-fed@isi.deterlab.net" % \
uname['experimentCreation']),
("password", self.random_string("", 8)),
("login", uname['experimentCreation']),
("address", "4676 Admiralty"),
("city", "Marina del Rey"),
("state", "CA"),
("zip", "90292"),
("country", "USA"),
("phone", "310-448-9190"),
("title", "None"),
("affiliation", "USC/ISI"),
("pubkey", ssh['experimentCreation'])
]
service_user_fields = [
("name", "Federation User %s" % uname['serviceAccess']),
("email", "%s-fed@isi.deterlab.net" % uname['serviceAccess']),
("password", self.random_string("", 8)),
("login", uname['serviceAccess']),
("address", "4676 Admiralty"),
("city", "Marina del Rey"),
("state", "CA"),
("zip", "90292"),
("country", "USA"),
("phone", "310-448-9190"),
("title", "None"),
("affiliation", "USC/ISI"),
("pubkey", ssh['serviceAccess'])
]
proj_fields = [
("name", name),
("short description", "dynamic federated project"),
("URL", "http://www.isi.edu/~faber"),
("funders", "USC/USU"),
("long description", "Federation access control"),
("public", "1"),
("num_pcs", "100"),
("linkedtous", "1"),
("newuser_xml", create_userfile)
]
# Write out the files
self.write_attr_xml(cuf, "user", create_user_fields)
self.write_attr_xml(suf, "user", service_user_fields)
self.write_attr_xml(pf, "project", proj_fields)
# Generate the commands (only grantnodetype's are dynamic)
cmds = [
(self.wap, self.newproj, projfile),
(self.wap, self.mkproj, name),
(self.wap, self.newuser, service_userfile),
(self.wap, self.user_to_project, uname['serviceAccess'], name),
]
# Add commands to grant access to any resources in the request.
for nt in [ h for r in req.get('resources', []) \
if r.has_key('node') and r['node'].has_key('hardware')\
for h in r['node']['hardware'] ] :
cmds.append((self.wap, self.grantnodetype, '-p', name, nt))
# Create the projects
rc = 0
for cmd in cmds:
self.log.debug("[dynamic_project]: %s" % ' '.join(cmd))
if not self.debug:
try:
rc = subprocess.call(cmd)
except OSerror, e:
raise service_error(service_error.internal,
"Dynamic project subprocess creation error "+ \
"[%s] (%s)" % (cmd[1], e.strerror))
if rc != 0:
raise service_error(service_error.internal,
"Dynamic project subprocess error " +\
"[%s] (%d)" % (cmd[1], rc))
# Clean up tempfiles
#os.unlink(create_userfile)
#os.unlink(service_userfile)
#os.unlink(projfile)
rv = {\
'project': {\
'name': { 'localname': name },
'user' : [\
{\
'userID': { 'localname' : uname['experimentCreation'] },
'access': [ {'sshPubkey': ssh['experimentCreation'] } ],
'role': 'experimentCreation',
}, \
{\
'userID': { 'localname' : uname['serviceAccess'] },
'access': [ { 'sshPubkey' : ssh['serviceAccess'] } ],
'role': 'serviceAccess',
} \
]\
}\
}
return rv
def static_project(self, req, fedid=None):
"""
Be certain that the local project in the request has access to the
proper resources and users have correct keys. Add them if necessary.
"""
cmds = []
# Internal calls do not have a fedid parameter (i.e., local calls on
# behalf of already vetted fedids)
if fedid and not self.auth.check_attribute(fedid, "allocate"):
self.log.debug("[allocate] Access denied (%s)" % fedid)
raise service_error(service_error.access, "Access Denied")
# While we should be more careful about this, for the short term, add
# the keys to the specified users.
try:
users = req['StaticProjectRequestBody']['project']['user']
pname = req['StaticProjectRequestBody']['project']\
['name']['localname']
resources = req['StaticProjectRequestBody'].get('resources', [])
except KeyError:
raise service_error(service_error.req, "Badly formed request")
for u in users:
try:
name = u['userID']['localname']
except KeyError:
raise service_error(service_error.req, "Badly formed user")
for sk in [ k['sshPubkey'] for k in u.get('access', []) \
if k.has_key('sshPubkey')]:
if self.allocation_level >= self.dynamic_keys:
cmds.append((self.wap, self.addpubkey, '-w', \
'-u', name, '-k', sk))
elif self.allocation_level >= self.confirm_keys:
cmds.append((self.wap, self.confirmkey, '-C', \
'-u', name, '-k', sk))
else:
self.log.warning("[static_project] no checking of " + \
"static keys")
# Add commands to grant access to any resources in the request. The
# list comprehension pulls out the hardware types in the node entries
# in the resources list.
for nt in [ h for r in resources \
if r.has_key('node') and r['node'].has_key('hardware')\
for h in r['node']['hardware'] ] :
if self.allocation_level >= self.confirm_keys:
cmds.append((self.wap, self.grantnodetype, '-p', pname, nt))
# Run the commands
rc = 0
for cmd in cmds:
self.log.debug("[static_project]: %s" % ' '.join(cmd))
if not self.debug:
try:
rc = subprocess.call(cmd)
except OSError, e:
raise service_error(service_error.internal,
"Static project subprocess creation error "+ \
"[%s] (%s)" % (cmd[0], e.strerror))
if rc != 0:
raise service_error(service_error.internal,
"Static project subprocess error " +\
"[%s] (%d)" % (cmd[0], rc))
return { 'project': req['StaticProjectRequestBody']['project']}
def release_project(self, req, fedid=None):
"""
Remove user keys from users and delete dynamic projects.
Only keys not in the set of fixed keys are deleted. and there are
similar protections for projects.
"""
# Internal calls do not have a fedid parameter (i.e., local calls on
# behalf of already vetted fedids)
if fedid and not self.auth.check_attribute(fedid, "allocate"):
self.log.debug("[allocate] Access denied (%s)" % fedid)
raise service_error(service_error.access, "Access Denied")
cmds = []
pname = None
users = []
try:
if req['ReleaseProjectRequestBody']['project'].has_key('name'):
pname = req['ReleaseProjectRequestBody']['project']\
['name']['localname']
if req['ReleaseProjectRequestBody']['project'].has_key('user'):
users = req['ReleaseProjectRequestBody']['project']['user']
except KeyError:
raise service_error(service_error.req, "Badly formed request")
if pname and pname not in self.fixed_projects and \
self.allocation_level >= self.dynamic_projects:
cmds.append((self.wap, self.rmproj, pname))
for u in users:
try:
name = u['userID']['localname']
except KeyError:
raise service_error(service_error.req, "Badly formed user")
if self.allocation_level >= self.dynamic_projects and \
name not in self.fixed_users:
cmds.append((self.wap, self.rmuser, name))
else:
for sk in [ k['sshPubkey'] for k in u.get('access', []) \
if k.has_key('sshPubkey')]:
if (name.rstrip(), sk.rstrip()) not in self.fixed_keys:
if self.allocation_level >= self.dynamic_keys:
cmds.append((self.wap, self.addpubkey, '-R', '-w', \
'-u', name, '-k', sk))
# Run the commands
rc = 0
for cmd in cmds:
self.log.debug("[release_project]: %s" % ' '.join(cmd))
if not self.debug:
try:
rc = subprocess.call(cmd)
except OSError, e:
raise service_error(service_error.internal,
"Release project subprocess creation error "+ \
"[%s] (%s)" % (cmd[0], e.strerror))
if rc != 0:
raise service_error(service_error.internal,
"Release project subprocess error " +\
"[%s] (%d)" % (cmd[0], rc))
return { 'project': req['ReleaseProjectRequestBody']['project']}
class allocate_project_remote:
"""
Allocate projects on a remote machine using the internal SOAP interface
"""
class proxy(service_caller):
"""
This class is a proxy functor (callable) that has the same signature as
a function called by soap_handler or xmlrpc_handler, but that used the
service_caller class to call the function remotely.
"""
def __init__(self, url, cert_file, cert_pwd, trusted_certs, auth,
method):
service_caller.__init__(self, method)
self.url = url
self.cert_file = cert_file
self.cert_pwd = cert_pwd
self.trusted_certs = trusted_certs
self.request_body__name = "%sRequestBody" % method
self.resp_name = "%sResponseBody" % method
self.auth = auth
# Calling the proxy object directly invokes the proxy_call method,
# not the service_call method.
self.__call__ = self.proxy_call
# Define the proxy, NB, the parameters to make_proxy are visible to the
# definition of proxy.
def proxy_call(self, req, fid=None):
"""
Send req on to a remote project instantiator.
Req is just the message to be sent. This function re-wraps it.
It also rethrows any faults.
"""
if req.has_key(self.request_body_name):
req = req[self.request_body_name]
else:
print "request error"
raise service_error(service_error.req, "Bad formated request");
r = self.call_service(self.url, req, self.cert_file, self.cert_pwd,
self.trusted_certs)
if r.has_key(self.resp_name):
return r[self.resp_name]
else:
print "response error"
raise service_error(service_error.protocol,
"Bad proxy response")
# back to defining the allocate_project_remote class
def __init__(self, config, auth=None):
"""
Initializer. Parses a configuration if one is given.
"""
self.debug = config.get("allocate", "debug", False)
self.url = config.get("allocate", "uri", "")
self.cert_file = config.get("allocate", "cert_file", None)
self.cert_pwd = config.get("allocate", "cert_pwd", None)
self.trusted_certs = config.get("allocate", "trusted_certs", None)
# Certs are promoted from the generic to the specific, so without a if
# no dynamic project certificates, then proxy certs are used, and if
# none of those the main certs.
if config.has_option("globals", "proxy_cert_file"):
if not self.cert_file:
self.cert_file = config.get("globals", "proxy_cert_file")
if config.has_option("globals", "proxy_cert_pwd"):
self.cert_pwd = config.get("globals", "proxy_cert_pwd")
if config.has_option("globals", "proxy_trusted_certs") and \
not self.trusted_certs:
self.trusted_certs = \
config.get("globals", "proxy_trusted_certs")
if config.has_option("globals", "cert_file"):
has_pwd = config.has_option("globals", "cert_pwd")
if not self.cert_file:
self.cert_file = config.get("globals", "cert_file")
if has_pwd:
self.cert_pwd = config.get("globals", "cert_pwd")
if config.get("globals", "trusted_certs") and not self.trusted_certs:
self.trusted_certs = \
config.get("globals", "trusted_certs")
self.soap_services = { }
self.xmlrpc_services = { }
self.log = logging.getLogger("fedd.allocate.remote")
set_log_level(config, "allocate", self.log)
if auth:
self.auth = auth
else:
auth = authorizer()
log.warn("[allocate] No authorizer passed in, using local one")
# The specializations of the proxy functions
self.dynamic_project = self.proxy(self.url, self.cert_file,
self.cert_pwd, self.trusted_certs, self.auth,
"AllocateProject")
self.static_project = self.proxy(self.url, self.cert_file,
self.cert_pwd, self.trusted_certs, self.auth,
"StaticProject")
self.release_project = self.proxy(self.url, self.cert_file,
self.cert_pwd, self.trusted_certs, self.auth,
"ReleaseProject")