source: fedd/fedd_allocate_project.py @ f8582c9

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since f8582c9 was f8582c9, checked in by Ted Faber <faber@…>, 16 years ago

Resource allocation and deallocation really working
Access handler selects allocation ID
Fedid allocation IDs work
Revamp of util code for maodifying messages (e.g. binaries)
Handlers now see fedids as objects in messages
Fedid bug in handlers in fedd_util

This should have been multiple commits

  • Property mode set to 100644
File size: 13.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from BaseHTTPServer import BaseHTTPRequestHandler
6from ZSI import *
7from M2Crypto import SSL
8from M2Crypto.m2xmlrpclib import SSL_Transport
9from M2Crypto.SSL.SSLServer import SSLServer
10import M2Crypto.httpslib
11import xmlrpclib
12
13import re
14import random
15import string
16import subprocess
17import tempfile
18
19from fedd_services import *
20from fedd_internal_services import *
21from fedd_util import *
22from fixed_resource import read_key_db, read_project_db
23from service_error import *
24import logging
25
26
27# Configure loggers to dump to /dev/null which avoids errors if calling classes
28# don't configure them.
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.allocate.local")
33fl.addHandler(nullHandler())
34fl = logging.getLogger("fedd.allocate.remote")
35fl.addHandler(nullHandler())
36
37
38class fedd_allocate_project_local:
39    """
40    Allocate projects on this machine in response to an access request.
41    """
42    def __init__(self, config):
43        """
44        Initializer.  Parses a configuration if one is given.
45        """
46
47        self.debug = config.get("access", "debug_project", False)
48        self.wap = config.get('access', 'wap', '/usr/testbed/sbin/wap')
49        self.newproj = config.get('access', 'newproj',
50                '/usr/testbed/sbin/newproj')
51        self.mkproj = config.get('access', 'mkproj', '/usr/testbed/sbin/mkproj')
52        self.rmproj = config.get('access', 'rmproj', '/usr/testbed/sbin/rmproj')
53        self.addpubkey = config.get('access', 'addpubkey', 
54                '/usr/testbed/sbin/taddpubkey')
55        self.grantnodetype = config.get('access', 'grantnodetype', 
56                '/usr/testbed/sbin/grantnodetype')
57        self.log = logging.getLogger("fedd.allocate.local")
58        set_log_level(config, "access", self.log)
59        fixed_key_db = config.get("access", "fixed_keys", None)
60        fixed_project_db = config.get("access", "fixed_projects", None)
61        self.fixed_keys = set()
62        self.fixed_projects = set()
63
64        # initialize the fixed resource sets
65        for db, rset, fcn in (\
66                (fixed_key_db, self.fixed_keys, read_key_db), \
67                (fixed_project_db, self.fixed_projects, read_project_db)):
68            if db:
69                try:
70                    rset.update(fcn(db))
71                except:
72                    self.log.debug("Can't read resources from %s" % db)
73       
74        # Internal services are SOAP only
75        self.soap_services = {\
76                "AllocateProject": make_soap_handler(\
77                AllocateProjectRequestMessage.typecode,\
78                self.dynamic_project, AllocateProjectResponseMessage,\
79                "AllocateProjectResponseBody"), 
80                "StaticProject": make_soap_handler(\
81                StaticProjectRequestMessage.typecode,\
82                self.static_project, StaticProjectResponseMessage,\
83                "StaticProjectResponseBody"),\
84                "ReleaseProject": make_soap_handler(\
85                ReleaseProjectRequestMessage.typecode,\
86                self.release_project, ReleaseProjectResponseMessage,\
87                "ReleaseProjectResponseBody")\
88                }
89        self.xmlrpc_services = { }
90
91    def random_string(self, s, n=3):
92        """Append n random ASCII characters to s and return the string"""
93        rv = s
94        for i in range(0,n):
95            rv += random.choice(string.ascii_letters)
96        return rv
97
98    def write_attr_xml(self, file, root, lines):
99        """
100        Write an emulab config file for a dynamic project.
101
102        Format is <root><attribute name=lines[0]>lines[1]</attribute></root>
103        """
104        # Convert a pair to an attribute line
105        out_attr = lambda a,v : \
106                '<attribute name="%s"><value>%s</value></attribute>' % (a, v)
107
108        f = os.fdopen(file, "w")
109        f.write("<%s>\n" % root)
110        f.write("\n".join([out_attr(*l) for l in lines]))
111        f.write("</%s>\n" % root)
112        f.close()
113
114
115    def dynamic_project(self, req, fedid=None):
116        """
117        Create a dynamic project with ssh access
118
119        Req includes the project and resources as a dictionary
120        """
121        # tempfiles for the parameter files
122        uf, userfile = tempfile.mkstemp(prefix="usr", suffix=".xml",
123                dir="/tmp")
124        pf, projfile = tempfile.mkstemp(prefix="proj", suffix=".xml",
125                dir="/tmp")
126
127        if req.has_key('AllocateProjectRequestBody') and \
128                req['AllocateProjectRequestBody'].has_key('project'):
129            proj = req['AllocateProjectRequestBody']['project']
130        else:
131            raise service_error(service_error.req, 
132                    "Badly formed allocation request")
133        # Take the first user and ssh key
134        name = proj.get('name', None) or self.random_string("proj",4)
135        user = proj.get('user', None)
136        if user != None:
137            user = user[0]      # User is a list, take the first entry
138            if not user.has_key("userID"):
139                uname = self.random_string("user", 3)
140            else:
141                uid = proj['userID']
142                # XXX: fedid
143                uname = uid.get('localname', None) or \
144                        uid.get('kerberosUsername', None) or \
145                        uid.get('uri', None)
146                if uname == None:
147                    raise fedd_proj.service_error(fedd_proj.service_error.req, 
148                            "No ID for user");
149
150            access = user.get('access', None)
151            if access != None:
152                ssh = access[0].get('sshPubkey', None)
153                if ssh == None:
154                    raise fedd_proj.service_error(fedd_proj.service_error.req, 
155                            "No ssh key for user");
156        else:
157            raise fedd_proj.service_error(fedd_proj.service_error.req, 
158                    "No access information for project");
159
160        # uname, name and ssh are set
161        user_fields = [
162                ("name", "Federation User %s" % uname),
163                ("email", "%s-fed@isi.deterlab.net" % uname),
164                ("password", self.random_string("", 8)),
165                ("login", uname),
166                ("address", "4676 Admiralty"),
167                ("city", "Marina del Rey"),
168                ("state", "CA"),
169                ("zip", "90292"),
170                ("country", "USA"),
171                ("phone", "310-448-9190"),
172                ("title", "None"),
173                ("affiliation", "USC/ISI"),
174                ("pubkey", ssh)
175        ]
176
177        proj_fields = [
178                ("name", name),
179                ("short description", "dynamic federated project"),
180                ("URL", "http://www.isi.edu/~faber"),
181                ("funders", "USC/USU"),
182                ("long description", "Federation access control"),
183                ("public", "1"),
184                ("num_pcs", "100"),
185                ("linkedtous", "1"),
186                ("newuser_xml", userfile)
187        ]
188       
189
190        # Write out the files
191        self.write_attr_xml(uf, "user", user_fields)
192        self.write_attr_xml(pf, "project", proj_fields)
193
194        # Generate the commands (only grantnodetype's are dynamic)
195        cmds = [
196                (self.wap, self.newproj, projfile),
197                (self.wap, self.mkproj, name)
198                ]
199
200        # Add commands to grant access to any resources in the request.
201        for nt in [ h for r in req.get('resources', []) \
202                if r.has_key('node') and r['node'].has_key('hardware')\
203                    for h in r['node']['hardware'] ] :
204            cmds.append((self.wap, self.grantnodetype, '-p', name, nt))
205
206        # Create the projects
207        rc = 0
208        for cmd in cmds:
209            self.log.debug("[dynamic_project]: %s" % ' '.join(cmd))
210            if not self.debug:
211                try:
212                    rc = subprocess.call(cmd)
213                except OSerror, e:
214                    raise service_error(service_error.internal,
215                            "Dynamic project subprocess creation error "+ \
216                                    "[%s] (%s)" %  (cmd[1], e.strerror))
217
218            if rc != 0: 
219                raise service_error(service_error.internal,
220                        "Dynamic project subprocess error " +\
221                                "[%s] (%d)" % (cmd[1], rc))
222        # Clean up tempfiles
223        os.unlink(userfile)
224        os.unlink(projfile)
225        rv = {\
226            'project': {\
227                'name': { 'localname': name }, 
228                'user' : [ {\
229                    'userID': { 'localname' : uname },
230                    'access': [ { 'sshPubkey' : ssh } ],
231                } ]\
232            }\
233        }
234        return rv
235
236    def static_project(self, req, fedid=None):
237        """
238        Be certain that the local project in the request has access to the
239        proper resources and users have correct keys.  Add them if necessary.
240        """
241
242        cmds =  []
243
244        # While we should be more careful about this, for the short term, add
245        # the keys to the specified users.
246
247        try:
248            users = req['StaticProjectRequestBody']['project']['user']
249            pname = req['StaticProjectRequestBody']['project']\
250                    ['name']['localname']
251            resources = req['StaticProjectRequestBody'].get('resources', [])
252        except KeyError:
253            raise service_error(service_error.req, "Badly formed request")
254
255
256        for u in users:
257            try:
258                name = u['userID']['localname']
259            except KeyError:
260                raise service_error(service_error.req, "Badly formed user")
261            for sk in [ k['sshPubkey'] for k in u.get('access', []) \
262                    if k.has_key('sshPubkey')]:
263                cmds.append((self.wap, self.addpubkey, '-w', \
264                        '-u', name, '-k', sk))
265       
266
267        # Add commands to grant access to any resources in the request.  The
268        # list comprehension pulls out the hardware types in the node entries
269        # in the resources list.
270        for nt in [ h for r in resources \
271                if r.has_key('node') and r['node'].has_key('hardware')\
272                    for h in r['node']['hardware'] ] :
273            cmds.append((self.wap, self.grantnodetype, '-p', pname, nt))
274
275        # Run the commands
276        rc = 0
277        for cmd in cmds:
278            self.log.debug("[static_project]: %s" % ' '.join(cmd))
279            if not self.debug:
280                try:
281                    rc = subprocess.call(cmd)
282                except OSError, e:
283                    raise service_error(service_error.internal,
284                            "Static project subprocess creation error "+ \
285                                    "[%s] (%s)" %  (cmd[0], e.strerror))
286
287            if rc != 0: 
288                raise service_error(service_error.internal,
289                        "Static project subprocess error " +\
290                                "[%s] (%d)" % (cmd[0], rc))
291
292        return { 'project': req['StaticProjectRequestBody']['project']}
293
294    def release_project(self, req, fedid=None):
295        """
296        Remove user keys from users and delete dynamic projects.
297
298        Only keys not in the set of fixed keys are deleted. and there are
299        similar protections for projects.
300        """
301
302        cmds = []
303        pname = None
304        users = []
305
306        try:
307            if req['ReleaseProjectRequestBody']['project'].has_key('name'):
308                pname = req['ReleaseProjectRequestBody']['project']\
309                        ['name']['localname']
310            if req['ReleaseProjectRequestBody']['project'].has_key('user'):
311                users = req['ReleaseProjectRequestBody']['project']['user']
312        except KeyError:
313            raise service_error(service_error.req, "Badly formed request")
314
315        for u in users:
316            try:
317                name = u['userID']['localname']
318            except KeyError:
319                raise service_error(service_error.req, "Badly formed user")
320            for sk in [ k['sshPubkey'] for k in u.get('access', []) \
321                    if k.has_key('sshPubkey')]:
322                if (name.rstrip(), sk.rstrip()) not in self.fixed_keys:
323                    cmds.append((self.wap, self.addpubkey, '-R', '-w', \
324                            '-u', name, '-k', sk))
325        if pname and pname not in self.fixed_projects:
326            cmds.append((self.wap, self.rmproj, pname))
327
328        # Run the commands
329        rc = 0
330        for cmd in cmds:
331            self.log.debug("[release_project]: %s" % ' '.join(cmd))
332            if not self.debug:
333                try:
334                    rc = subprocess.call(cmd)
335                except OSError, e:
336                    raise service_error(service_error.internal,
337                            "Release project subprocess creation error "+ \
338                                    "[%s] (%s)" %  (cmd[0], e.strerror))
339
340            if rc != 0: 
341                raise service_error(service_error.internal,
342                        "Release project subprocess error " +\
343                                "[%s] (%d)" % (cmd[0], rc))
344
345        return { 'project': req['ReleaseProjectRequestBody']['project']}
346
347def make_proxy(method, req_name, req_alloc, resp_name):
348    """
349    Construct the proxy calling function from the given parameters.
350    """
351
352    # Define the proxy, NB, the parameters to make_proxy are visible to the
353    # definition of proxy.
354    def proxy(self, req, fedid=None):
355        """
356        Send req on to a remote project instantiator.
357
358        Req is just the message to be sent.  This function re-wraps it.
359        It also rethrows any faults.
360        """
361
362        # No retry loop here.  Proxy servers must correctly authenticate
363        # themselves without help
364        try:
365            ctx = fedd_ssl_context(self.cert_file, self.trusted_certs,
366                    password=self.cert_pwd)
367        except SSL.SSLError:
368            raise service_error(service_error.server_config, 
369                    "Server certificates misconfigured")
370
371        loc = feddInternalServiceLocator();
372        port = loc.getfeddInternalPortType(self.url,
373                transport=M2Crypto.httpslib.HTTPSConnection, 
374                transdict={ 'ssl_context' : ctx })
375
376        if req.has_key(req_name):
377            req = req[req_name]
378        else:
379            raise service_error(service_error.req, "Bad formated request");
380
381        # Reconstruct the full request message
382        msg = req_alloc()
383        set_elem = getattr(msg, "set_element_%s" % req_name)
384        set_elem(pack_soap(msg, req_name, req))
385        try:
386            mattr = getattr(port, method)
387            resp = mattr(msg)
388        except ZSI.ParseException, e:
389            raise service_error(service_error.proxy,
390                    "Bad format message (XMLRPC??): %s" % str(e))
391        except ZSI.FaultException, e:
392            resp = e.fault.detail[0]
393
394        r = unpack_soap(resp)
395
396        if r.has_key(resp_name):
397            return r[resp_name]
398        else:
399            raise service_error(service_error.proxy, "Bad proxy response")
400    # NB: end of proxy function definition     
401    return proxy
402
403class fedd_allocate_project_remote:
404    """
405    Allocate projects on a remote machine using the internal SOAP interface
406    """
407    dynamic_project = make_proxy("AllocateProject",
408            "AllocateProjectRequestBody", AllocateProjectRequestMessage,
409            "AllocateProjectResponseBody")
410    static_project = make_proxy("StaticProject",
411            "StaticProjectRequestBody", StaticProjectRequestMessage,
412            "StaticProjectResponseBody")
413    release_project = make_proxy("ReleaseProject",
414            "ReleaseProjectRequestBody", ReleaseProjectRequestMessage,
415            "ReleaseProjectResponseBody")
416
417    def __init__(self, config):
418        """
419        Initializer.  Parses a configuration if one is given.
420        """
421
422        self.debug = config.get("access", "debug_project", False)
423        self.url = config.get("access", "dynamic_projects_url", "")
424
425        self.cert_file = config.get("access", "cert_file", None)
426        self.cert_pwd = config.get("access", "cert_pwd", None)
427        self.trusted_certs = config.get("access", "trusted_certs", None)
428
429        # Certs are promoted from the generic to the specific, so without a if
430        # no dynamic project certificates, then proxy certs are used, and if
431        # none of those the main certs.
432
433        if config.has_option("globals", "proxy_cert_file"):
434            if not self.cert_file:
435                self.cert_file = config.get("globals", "proxy_cert_file")
436                if config.has_option("globals", "porxy_cert_pwd"):
437                    self.cert_pwd = config.get("globals", "proxy_cert_pwd")
438
439        if config.has_option("globals", "proxy_trusted_certs") and \
440                not self.trusted_certs:
441                self.trusted_certs = \
442                        config.get("globals", "proxy_trusted_certs")
443
444        if config.has_option("globals", "cert_file"):
445            has_pwd = config.has_option("globals", "cert_pwd")
446            if not self.cert_file:
447                self.cert_file = config.get("globals", "cert_file")
448                if has_pwd: 
449                    self.cert_pwd = config.get("globals", "cert_pwd")
450
451        if config.get("globals", "trusted_certs") and not self.trusted_certs:
452                self.trusted_certs = \
453                        config.get("globals", "trusted_certs")
454
455        self.soap_services = { }
456        self.xmlrpc_services = { }
457        self.log = logging.getLogger("fedd.allocate.remote")
458        set_log_level(config, "access", self.log)
Note: See TracBrowser for help on using the repository browser.