source: fedd/federation/allocate_project.py @ 266e866

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 266e866 was b312431b, checked in by Ted Faber <faber@…>, 16 years ago

A few small bugs

  • Property mode set to 100644
File size: 18.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import random
6import string
7import subprocess
8import tempfile
9
10from util import *
11from fedid import fedid
12from fixed_resource import read_key_db, read_project_db, read_user_db
13from remote_service import xmlrpc_handler, soap_handler, service_caller
14from service_error import service_error
15import logging
16
17
18# Configure loggers to dump to /dev/null which avoids errors if calling classes
19# don't configure them.
20class nullHandler(logging.Handler):
21    def emit(self, record): pass
22
23fl = logging.getLogger("fedd.allocate.local")
24fl.addHandler(nullHandler())
25fl = logging.getLogger("fedd.allocate.remote")
26fl.addHandler(nullHandler())
27
28
29class allocate_project_local:
30    """
31    Allocate projects on this machine in response to an access request.
32    """
33    dynamic_projects = 4
34    dynamic_keys= 2
35    confirm_keys = 1
36    none = 0
37
38    levels = {
39            'dynamic_projects': dynamic_projects,
40            'dynamic_keys': dynamic_keys,
41            'confirm_keys': confirm_keys,
42            'none': none,
43    }
44
45    def __init__(self, config, auth=None):
46        """
47        Initializer.  Parses a configuration if one is given.
48        """
49
50        self.debug = config.getboolean("allocate", "debug", False)
51        self.wap = config.get('allocate', 'wap', '/usr/testbed/sbin/wap')
52        self.newproj = config.get('allocate', 'newproj',
53                '/usr/testbed/sbin/newproj')
54        self.mkproj = config.get('allocate', 'mkproj', 
55                '/usr/testbed/sbin/mkproj')
56        self.rmproj = config.get('allocate', 'rmproj',
57                '/usr/testbed/sbin/rmproj')
58        self.rmuser = config.get('allocate', 'rmuser',
59                '/usr/testbed/sbin/rmuser')
60        self.newuser = config.get('allocate', 'newuser',
61                '/usr/testbed/sbin/newuser')
62        self.addpubkey = config.get('allocate', 'addpubkey', 
63                '/usr/testbed/sbin/addpubkey')
64        self.grantnodetype = config.get('allocate', 'grantnodetype', 
65                '/usr/testbed/sbin/grantnodetype')
66        self.confirmkey = config.get('allocate', 'confirmkey', 
67                '/usr/testbed/sbin/taddpubkey')
68        self.user_to_project=config.get("allocate", 'user_to_project',
69                '/usr/local/bin/user_to_project.py')
70        self.allocation_level = config.get("allocate", "allocation_level", 
71                "none")
72        self.log = logging.getLogger("fedd.allocate.local")
73        set_log_level(config, "allocate", self.log)
74
75        if auth:
76            self.auth = auth
77        else:
78            auth = authorizer()
79            log.warn("[allocate] No authorizer passed in, using local one")
80
81        try:
82            self.allocation_level = \
83                    self.levels[self.allocation_level.strip().lower()]
84        except KeyError:
85            self.log.error("Bad allocation_level %s.  Defaulting to none" % \
86                    self.allocation_error)
87            self.allocation_level = self.none
88
89        access_db = config.get("allocate", "accessdb")
90        if access_db:
91            try:
92                read_simple_accessdb(access_db, self.auth, 'allocate')
93            except IOError, e:
94                raise service_error(service_error.internal,
95                        "Error reading accessDB %s: %s" % (access_db, e))
96            except ValueError:
97                raise service_error(service_error.internal, "%s" % e)
98
99
100        fixed_key_db = config.get("allocate", "fixed_keys", None)
101        fixed_project_db = config.get("allocate", "fixed_projects", None)
102        fixed_user_db = config.get("allocate", "fixed_users", None)
103        self.fixed_keys = set()
104        self.fixed_projects = set()
105        self.fixed_users = set()
106
107        # initialize the fixed resource sets
108        for db, rset, fcn in (\
109                (fixed_key_db, self.fixed_keys, read_key_db), \
110                (fixed_project_db, self.fixed_projects, read_project_db),
111                (fixed_user_db, self.fixed_users, read_user_db)):
112            if db:
113                try:
114                    rset.update(fcn(db))
115                except:
116                    self.log.debug("Can't read resources from %s" % db)
117       
118        # Internal services are SOAP only
119        self.soap_services = {\
120                "AllocateProject": soap_handler("AllocateProject", 
121                    self.dynamic_project),
122                "StaticProject": soap_handler("StaticProject", 
123                    self.static_project),
124                "ReleaseProject": soap_handler("ReleaseProject", 
125                    self.release_project),
126                }
127        self.xmlrpc_services = { }
128
129    def random_string(self, s, n=3):
130        """Append n random ASCII characters to s and return the string"""
131        rv = s
132        for i in range(0,n):
133            rv += random.choice(string.ascii_letters)
134        return rv
135
136    def write_attr_xml(self, file, root, lines):
137        """
138        Write an emulab config file for a dynamic project.
139
140        Format is <root><attribute name=lines[0]>lines[1]</attribute></root>
141        """
142        # Convert a pair to an attribute line
143        out_attr = lambda a,v : \
144                '<attribute name="%s"><value>%s</value></attribute>' % (a, v)
145
146        f = os.fdopen(file, "w")
147        f.write("<%s>\n" % root)
148        f.write("\n".join([out_attr(*l) for l in lines]))
149        f.write("</%s>\n" % root)
150        f.close()
151
152
153    def dynamic_project(self, req, fedid=None):
154        """
155        Create a dynamic project with ssh access
156
157        Req includes the project and resources as a dictionary
158        """
159
160        # Internal calls do not have a fedid parameter (i.e., local calls on
161        # behalf of already vetted fedids)
162        if fedid and not self.auth.check_attribute(fedid, "allocate"):
163            self.log.debug("[allocate] Access denied (%s)" % fedid)
164            raise service_error(service_error.access, "Access Denied")
165
166        if self.allocation_level < self.dynamic_projects:
167            raise service_error(service_error.access, 
168                    "[dynamic_project] dynamic project allocation not " + \
169                            "permitted: check allocation level")
170        # tempfiles for the parameter files
171        cuf, create_userfile = tempfile.mkstemp(prefix="usr", suffix=".xml",
172                dir="/tmp")
173        suf, service_userfile = tempfile.mkstemp(prefix="usr", suffix=".xml",
174                dir="/tmp")
175        pf, projfile = tempfile.mkstemp(prefix="proj", suffix=".xml",
176                dir="/tmp")
177
178        if req.has_key('AllocateProjectRequestBody') and \
179                req['AllocateProjectRequestBody'].has_key('project'):
180            proj = req['AllocateProjectRequestBody']['project']
181        else:
182            raise service_error(service_error.req, 
183                    "Badly formed allocation request")
184        # Take the first user and ssh key
185        name = proj.get('name', None) or self.random_string("proj",4)
186        user = proj.get('user', [])
187
188        uname = { }
189        ssh = { }
190        for u in user:
191            role = u.get('role', None)
192            if not role: continue
193            if u.has_key('userID'):
194                uid = u['userID']
195                uname[role] = uid.get('localname', None) or \
196                        uid.get('kerberosUsername', None) or \
197                        uid.get('uri', None)
198                if uname[role] == None:
199                    raise service_error(service_error.req, "No ID for user")
200            else:
201                uname[role] = self.random_string("user", 3)
202
203            access = u.get('access', None)
204            if access:
205                # XXX collect and call addpubkey later, for now use first one.
206                for a in access:
207                    ssh[role] = a.get('sshPubkey', None)
208                    if ssh: break
209                else:
210                    raise service_error(service_error.req,
211                            "No SSH key for user %s" % uname[role])
212            else:
213                raise service_error(service_error.req,
214                        "No access mechanisms for for user %s" % uname[role])
215
216        if not (uname.has_key('experimentCreation') and \
217                uname.has_key('serviceAccess')):
218            raise service_error(service_error.req,
219                    "Must specify both user roles")
220       
221
222        create_user_fields = [
223                ("name", "Federation User %s" % uname['experimentCreation']),
224                ("email", "%s-fed@isi.deterlab.net" % \
225                        uname['experimentCreation']),
226                ("password", self.random_string("", 8)),
227                ("login", uname['experimentCreation']),
228                ("address", "4676 Admiralty"),
229                ("city", "Marina del Rey"),
230                ("state", "CA"),
231                ("zip", "90292"),
232                ("country", "USA"),
233                ("phone", "310-448-9190"),
234                ("title", "None"),
235                ("affiliation", "USC/ISI"),
236                ("pubkey", ssh['experimentCreation'])
237        ]
238
239        service_user_fields = [
240                ("name", "Federation User %s" % uname['serviceAccess']),
241                ("email", "%s-fed@isi.deterlab.net" % uname['serviceAccess']),
242                ("password", self.random_string("", 8)),
243                ("login", uname['serviceAccess']),
244                ("address", "4676 Admiralty"),
245                ("city", "Marina del Rey"),
246                ("state", "CA"),
247                ("zip", "90292"),
248                ("country", "USA"),
249                ("phone", "310-448-9190"),
250                ("title", "None"),
251                ("affiliation", "USC/ISI"),
252                ("pubkey", ssh['serviceAccess'])
253        ]
254
255        proj_fields = [
256                ("name", name),
257                ("short description", "dynamic federated project"),
258                ("URL", "http://www.isi.edu/~faber"),
259                ("funders", "USC/USU"),
260                ("long description", "Federation access control"),
261                ("public", "1"),
262                ("num_pcs", "100"),
263                ("linkedtous", "1"),
264                ("newuser_xml", create_userfile)
265        ]
266       
267
268        # Write out the files
269        self.write_attr_xml(cuf, "user", create_user_fields)
270        self.write_attr_xml(suf, "user", service_user_fields)
271        self.write_attr_xml(pf, "project", proj_fields)
272
273        # Generate the commands (only grantnodetype's are dynamic)
274        cmds = [
275                (self.wap, self.newproj, projfile),
276                (self.wap, self.mkproj, name),
277                (self.wap, self.newuser, service_userfile),
278                (self.wap, self.user_to_project, uname['serviceAccess'], name),
279                ]
280
281        # Add commands to grant access to any resources in the request.
282        for nt in [ h for r in req.get('resources', []) \
283                if r.has_key('node') and r['node'].has_key('hardware')\
284                    for h in r['node']['hardware'] ] :
285            cmds.append((self.wap, self.grantnodetype, '-p', name, nt))
286
287        # Create the projects
288        rc = 0
289        for cmd in cmds:
290            self.log.debug("[dynamic_project]: %s" % ' '.join(cmd))
291            if not self.debug:
292                try:
293                    rc = subprocess.call(cmd)
294                except OSerror, e:
295                    raise service_error(service_error.internal,
296                            "Dynamic project subprocess creation error "+ \
297                                    "[%s] (%s)" %  (cmd[1], e.strerror))
298
299            if rc != 0: 
300                raise service_error(service_error.internal,
301                        "Dynamic project subprocess error " +\
302                                "[%s] (%d)" % (cmd[1], rc))
303        # Clean up tempfiles
304        #os.unlink(create_userfile)
305        #os.unlink(service_userfile)
306        #os.unlink(projfile)
307        rv = {\
308            'project': {\
309                'name': { 'localname': name }, 
310                'user' : [\
311                    {\
312                        'userID': { 'localname' : uname['experimentCreation'] },
313                        'access': [ {'sshPubkey': ssh['experimentCreation'] } ],
314                        'role': 'experimentCreation',
315                    }, \
316                    {\
317                        'userID': { 'localname' : uname['serviceAccess'] },
318                        'access': [ { 'sshPubkey' : ssh['serviceAccess'] } ], 
319                        'role': 'serviceAccess',
320                    } \
321                ]\
322            }\
323        }
324        return rv
325
326    def static_project(self, req, fedid=None):
327        """
328        Be certain that the local project in the request has access to the
329        proper resources and users have correct keys.  Add them if necessary.
330        """
331
332        cmds =  []
333
334        # Internal calls do not have a fedid parameter (i.e., local calls on
335        # behalf of already vetted fedids)
336        if fedid and not self.auth.check_attribute(fedid, "allocate"):
337            self.log.debug("[allocate] Access denied (%s)" % fedid)
338            raise service_error(service_error.access, "Access Denied")
339        # While we should be more careful about this, for the short term, add
340        # the keys to the specified users.
341
342        try:
343            users = req['StaticProjectRequestBody']['project']['user']
344            pname = req['StaticProjectRequestBody']['project']\
345                    ['name']['localname']
346            resources = req['StaticProjectRequestBody'].get('resources', [])
347        except KeyError:
348            raise service_error(service_error.req, "Badly formed request")
349
350
351        for u in users:
352            try:
353                name = u['userID']['localname']
354            except KeyError:
355                raise service_error(service_error.req, "Badly formed user")
356            for sk in [ k['sshPubkey'] for k in u.get('access', []) \
357                    if k.has_key('sshPubkey')]:
358                if self.allocation_level >= self.dynamic_keys:
359                    cmds.append((self.wap, self.addpubkey, '-r', \
360                            '-u', name, '-k', sk))
361                elif self.allocation_level >= self.confirm_keys:
362                    cmds.append((self.wap, self.confirmkey, '-C', \
363                            '-u', name, '-k', sk))
364                else:
365                    self.log.warning("[static_project] no checking of " + \
366                            "static keys")
367       
368
369        # Add commands to grant access to any resources in the request.  The
370        # list comprehension pulls out the hardware types in the node entries
371        # in the resources list.
372        for nt in [ h for r in resources \
373                if r.has_key('node') and r['node'].has_key('hardware')\
374                    for h in r['node']['hardware'] ] :
375            if self.allocation_level >= self.confirm_keys:
376                cmds.append((self.wap, self.grantnodetype, '-p', pname, nt))
377
378        # Run the commands
379        rc = 0
380        for cmd in cmds:
381            self.log.debug("[static_project]: %s" % ' '.join(cmd))
382            if not self.debug:
383                try:
384                    rc = subprocess.call(cmd)
385                except OSError, e:
386                    raise service_error(service_error.internal,
387                            "Static project subprocess creation error "+ \
388                                    "[%s] (%s)" %  (cmd[0], e.strerror))
389
390            if rc != 0: 
391                raise service_error(service_error.internal,
392                        "Static project subprocess error " +\
393                                "[%s] (%d)" % (cmd[0], rc))
394
395        return { 'project': req['StaticProjectRequestBody']['project']}
396
397    def release_project(self, req, fedid=None):
398        """
399        Remove user keys from users and delete dynamic projects.
400
401        Only keys not in the set of fixed keys are deleted. and there are
402        similar protections for projects.
403        """
404        # Internal calls do not have a fedid parameter (i.e., local calls on
405        # behalf of already vetted fedids)
406        if fedid and not self.auth.check_attribute(fedid, "allocate"):
407            self.log.debug("[allocate] Access denied (%s)" % fedid)
408            raise service_error(service_error.access, "Access Denied")
409
410        cmds = []
411        pname = None
412        users = []
413
414        try:
415            if req['ReleaseProjectRequestBody']['project'].has_key('name'):
416                pname = req['ReleaseProjectRequestBody']['project']\
417                        ['name']['localname']
418            if req['ReleaseProjectRequestBody']['project'].has_key('user'):
419                users = req['ReleaseProjectRequestBody']['project']['user']
420        except KeyError:
421            raise service_error(service_error.req, "Badly formed request")
422
423        if pname and pname not in self.fixed_projects and \
424                self.allocation_level >= self.dynamic_projects:
425            cmds.append((self.wap, self.rmproj, pname))
426
427        for u in users:
428            try:
429                name = u['userID']['localname']
430            except KeyError:
431                raise service_error(service_error.req, "Badly formed user")
432            if self.allocation_level >= self.dynamic_projects and \
433                    name not in self.fixed_users:
434                cmds.append((self.wap, self.rmuser, name))
435            else:
436                for sk in [ k['sshPubkey'] for k in u.get('access', []) \
437                        if k.has_key('sshPubkey')]:
438                    if (name.rstrip(), sk.rstrip()) not in self.fixed_keys:
439                        if self.allocation_level >= self.dynamic_keys:
440                            cmds.append((self.wap, self.addpubkey, '-R', '-r', \
441                                    '-u', name, '-k', sk))
442
443        # Run the commands
444        rc = 0
445        for cmd in cmds:
446            self.log.debug("[release_project]: %s" % ' '.join(cmd))
447            if not self.debug:
448                try:
449                    rc = subprocess.call(cmd)
450                except OSError, e:
451                    raise service_error(service_error.internal,
452                            "Release project subprocess creation error "+ \
453                                    "[%s] (%s)" %  (cmd[0], e.strerror))
454
455            if rc != 0: 
456                raise service_error(service_error.internal,
457                        "Release project subprocess error " +\
458                                "[%s] (%d)" % (cmd[0], rc))
459
460        return { 'project': req['ReleaseProjectRequestBody']['project']}
461
462class allocate_project_remote:
463    """
464    Allocate projects on a remote machine using the internal SOAP interface
465    """
466    class proxy(service_caller):
467        """
468        This class is a proxy functor (callable) that has the same signature as
469        a function called by soap_handler or xmlrpc_handler, but that used the
470        service_caller class to call the function remotely.
471        """
472
473        def __init__(self, url, cert_file, cert_pwd, trusted_certs, auth, 
474                method):
475            service_caller.__init__(self, method)
476            self.url = url
477            self.cert_file = cert_file
478            self.cert_pwd = cert_pwd
479            self.trusted_certs = trusted_certs
480            self.request_body__name = "%sRequestBody" % method
481            self.resp_name = "%sResponseBody" % method
482            self.auth = auth
483            # Calling the proxy object directly invokes the proxy_call method,
484            # not the service_call method.
485            self.__call__ = self.proxy_call
486           
487
488        # Define the proxy, NB, the parameters to make_proxy are visible to the
489        # definition of proxy.
490        def proxy_call(self, req, fid=None):
491            """
492            Send req on to a remote project instantiator.
493
494            Req is just the message to be sent.  This function re-wraps it.
495            It also rethrows any faults.
496            """
497
498            if req.has_key(self.request_body_name):
499                req = req[self.request_body_name]
500            else:
501                raise service_error(service_error.req, "Bad formated request");
502
503            try:
504                r = self.call_service(self.url, req, self.cert_file,
505                        self.cert_pwd, self.trusted_certs)
506            except service_error, e:
507                if e.code == service_error.connect:
508                    raise service_error(service_error.internal, 
509                            "Cannot connect to internal service: (%d) %s" % \
510                                    (e.code, e.desc))
511                else: raise
512            if r.has_key(self.resp_name):
513                return r[self.resp_name]
514            else:
515                raise service_error(service_error.protocol, 
516                        "Bad proxy response")
517
518    # back to defining the allocate_project_remote class
519    def __init__(self, config, auth=None):
520        """
521        Initializer.  Parses a configuration if one is given.
522        """
523
524        self.debug = config.get("allocate", "debug", False)
525        self.url = config.get("allocate", "uri", "")
526
527        self.cert_file = config.get("allocate", "cert_file", None)
528        self.cert_pwd = config.get("allocate", "cert_pwd", None)
529        self.trusted_certs = config.get("allocate", "trusted_certs", None)
530
531        # Certs are promoted from the generic to the specific, so without a if
532        # no dynamic project certificates, then proxy certs are used, and if
533        # none of those the main certs.
534
535        if config.has_option("globals", "proxy_cert_file"):
536            if not self.cert_file:
537                self.cert_file = config.get("globals", "proxy_cert_file")
538                if config.has_option("globals", "proxy_cert_pwd"):
539                    self.cert_pwd = config.get("globals", "proxy_cert_pwd")
540
541        if config.has_option("globals", "proxy_trusted_certs") and \
542                not self.trusted_certs:
543                self.trusted_certs = \
544                        config.get("globals", "proxy_trusted_certs")
545
546        if config.has_option("globals", "cert_file"):
547            has_pwd = config.has_option("globals", "cert_pwd")
548            if not self.cert_file:
549                self.cert_file = config.get("globals", "cert_file")
550                if has_pwd: 
551                    self.cert_pwd = config.get("globals", "cert_pwd")
552
553        if config.get("globals", "trusted_certs") and not self.trusted_certs:
554                self.trusted_certs = \
555                        config.get("globals", "trusted_certs")
556
557        self.soap_services = { }
558        self.xmlrpc_services = { }
559        self.log = logging.getLogger("fedd.allocate.remote")
560        set_log_level(config, "allocate", self.log)
561
562        if auth:
563            self.auth = auth
564        else:
565            auth = authorizer()
566            log.warn("[allocate] No authorizer passed in, using local one")
567
568        # The specializations of the proxy functions
569        self.dynamic_project = self.proxy(self.url, self.cert_file, 
570                self.cert_pwd, self.trusted_certs, self.auth, 
571                "AllocateProject")
572        self.static_project = self.proxy(self.url, self.cert_file, 
573                self.cert_pwd, self.trusted_certs, self.auth, 
574                "StaticProject")
575        self.release_project = self.proxy(self.url, self.cert_file,
576                self.cert_pwd, self.trusted_certs, self.auth, 
577                "ReleaseProject")
578
Note: See TracBrowser for help on using the repository browser.