source: fedd/federation/allocate_project.py @ 2ac63f7d

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 2ac63f7d was 1b376ca, checked in by Ted Faber <faber@…>, 16 years ago

Correct resource communication to allocation and wildcarding of gateway requests

  • Property mode set to 100644
File size: 18.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import random
6import string
7import subprocess
8import tempfile
9
10from util import *
11from fedid import fedid
12from fixed_resource import read_key_db, read_project_db, read_user_db
13from remote_service import xmlrpc_handler, soap_handler, service_caller
14from service_error import service_error
15import logging
16
17
18# Configure loggers to dump to /dev/null which avoids errors if calling classes
19# don't configure them.
20class nullHandler(logging.Handler):
21    def emit(self, record): pass
22
23fl = logging.getLogger("fedd.allocate.local")
24fl.addHandler(nullHandler())
25fl = logging.getLogger("fedd.allocate.remote")
26fl.addHandler(nullHandler())
27
28
29class allocate_project_local:
30    """
31    Allocate projects on this machine in response to an access request.
32    """
33    dynamic_projects = 4
34    dynamic_keys= 2
35    confirm_keys = 1
36    none = 0
37
38    levels = {
39            'dynamic_projects': dynamic_projects,
40            'dynamic_keys': dynamic_keys,
41            'confirm_keys': confirm_keys,
42            'none': none,
43    }
44
45    def __init__(self, config, auth=None):
46        """
47        Initializer.  Parses a configuration if one is given.
48        """
49
50        self.debug = config.getboolean("allocate", "debug", False)
51        self.wap = config.get('allocate', 'wap', '/usr/testbed/sbin/wap')
52        self.newproj = config.get('allocate', 'newproj',
53                '/usr/testbed/sbin/newproj')
54        self.mkproj = config.get('allocate', 'mkproj', 
55                '/usr/testbed/sbin/mkproj')
56        self.rmproj = config.get('allocate', 'rmproj',
57                '/usr/testbed/sbin/rmproj')
58        self.rmuser = config.get('allocate', 'rmuser',
59                '/usr/testbed/sbin/rmuser')
60        self.newuser = config.get('allocate', 'newuser',
61                '/usr/testbed/sbin/newuser')
62        self.addpubkey = config.get('allocate', 'addpubkey', 
63                '/usr/testbed/sbin/addpubkey')
64        self.grantnodetype = config.get('allocate', 'grantnodetype', 
65                '/usr/testbed/sbin/grantnodetype')
66        self.confirmkey = config.get('allocate', 'confirmkey', 
67                '/usr/testbed/sbin/taddpubkey')
68        self.user_to_project=config.get("allocate", 'user_to_project',
69                '/usr/local/bin/user_to_project.py')
70        self.allocation_level = config.get("allocate", "allocation_level", 
71                "none")
72        self.log = logging.getLogger("fedd.allocate.local")
73        set_log_level(config, "allocate", self.log)
74
75        if auth:
76            self.auth = auth
77        else:
78            auth = authorizer()
79            log.warn("[allocate] No authorizer passed in, using local one")
80
81        try:
82            self.allocation_level = \
83                    self.levels[self.allocation_level.strip().lower()]
84        except KeyError:
85            self.log.error("Bad allocation_level %s.  Defaulting to none" % \
86                    self.allocation_error)
87            self.allocation_level = self.none
88
89        access_db = config.get("allocate", "accessdb")
90        if access_db:
91            try:
92                read_simple_accessdb(access_db, self.auth, 'allocate')
93            except IOError, e:
94                raise service_error(service_error.internal,
95                        "Error reading accessDB %s: %s" % (access_db, e))
96            except ValueError:
97                raise service_error(service_error.internal, "%s" % e)
98
99
100        fixed_key_db = config.get("allocate", "fixed_keys", None)
101        fixed_project_db = config.get("allocate", "fixed_projects", None)
102        fixed_user_db = config.get("allocate", "fixed_users", None)
103        self.fixed_keys = set()
104        self.fixed_projects = set()
105        self.fixed_users = set()
106
107        # initialize the fixed resource sets
108        for db, rset, fcn in (\
109                (fixed_key_db, self.fixed_keys, read_key_db), \
110                (fixed_project_db, self.fixed_projects, read_project_db),
111                (fixed_user_db, self.fixed_users, read_user_db)):
112            if db:
113                try:
114                    rset.update(fcn(db))
115                except:
116                    self.log.debug("Can't read resources from %s" % db)
117       
118        # Internal services are SOAP only
119        self.soap_services = {\
120                "AllocateProject": soap_handler("AllocateProject", 
121                    self.dynamic_project),
122                "StaticProject": soap_handler("StaticProject", 
123                    self.static_project),
124                "ReleaseProject": soap_handler("ReleaseProject", 
125                    self.release_project),
126                }
127        self.xmlrpc_services = { }
128
129    def random_string(self, s, n=3):
130        """Append n random ASCII characters to s and return the string"""
131        rv = s
132        for i in range(0,n):
133            rv += random.choice(string.ascii_letters)
134        return rv
135
136    def write_attr_xml(self, file, root, lines):
137        """
138        Write an emulab config file for a dynamic project.
139
140        Format is <root><attribute name=lines[0]>lines[1]</attribute></root>
141        """
142        # Convert a pair to an attribute line
143        out_attr = lambda a,v : \
144                '<attribute name="%s"><value>%s</value></attribute>' % (a, v)
145
146        f = os.fdopen(file, "w")
147        f.write("<%s>\n" % root)
148        f.write("\n".join([out_attr(*l) for l in lines]))
149        f.write("</%s>\n" % root)
150        f.close()
151
152
153    def dynamic_project(self, req, fedid=None):
154        """
155        Create a dynamic project with ssh access
156
157        Req includes the project and resources as a dictionary
158        """
159
160        # Internal calls do not have a fedid parameter (i.e., local calls on
161        # behalf of already vetted fedids)
162        if fedid and not self.auth.check_attribute(fedid, "allocate"):
163            self.log.debug("[allocate] Access denied (%s)" % fedid)
164            raise service_error(service_error.access, "Access Denied")
165
166        if self.allocation_level < self.dynamic_projects:
167            raise service_error(service_error.access, 
168                    "[dynamic_project] dynamic project allocation not " + \
169                            "permitted: check allocation level")
170        # tempfiles for the parameter files
171        cuf, create_userfile = tempfile.mkstemp(prefix="usr", suffix=".xml",
172                dir="/tmp")
173        suf, service_userfile = tempfile.mkstemp(prefix="usr", suffix=".xml",
174                dir="/tmp")
175        pf, projfile = tempfile.mkstemp(prefix="proj", suffix=".xml",
176                dir="/tmp")
177
178        if req.has_key('AllocateProjectRequestBody'):
179            proj = req['AllocateProjectRequestBody'].get('project', None)
180            if not proj:
181                raise service_error(service_error.req, 
182                        "Badly formed allocation request")
183            resources = req['AllocateProjectRequestBody'].get('resources', { })
184        else:
185            raise service_error(service_error.req, 
186                    "Badly formed allocation request")
187        # Take the first user and ssh key
188        name = proj.get('name', None) or self.random_string("proj",4)
189        user = proj.get('user', [])
190
191        uname = { }
192        ssh = { }
193        for u in user:
194            role = u.get('role', None)
195            if not role: continue
196            if u.has_key('userID'):
197                uid = u['userID']
198                uname[role] = uid.get('localname', None) or \
199                        uid.get('kerberosUsername', None) or \
200                        uid.get('uri', None)
201                if uname[role] == None:
202                    raise service_error(service_error.req, "No ID for user")
203            else:
204                uname[role] = self.random_string("user", 3)
205
206            access = u.get('access', None)
207            if access:
208                # XXX collect and call addpubkey later, for now use first one.
209                for a in access:
210                    ssh[role] = a.get('sshPubkey', None)
211                    if ssh: break
212                else:
213                    raise service_error(service_error.req,
214                            "No SSH key for user %s" % uname[role])
215            else:
216                raise service_error(service_error.req,
217                        "No access mechanisms for for user %s" % uname[role])
218
219        if not (uname.has_key('experimentCreation') and \
220                uname.has_key('serviceAccess')):
221            raise service_error(service_error.req,
222                    "Must specify both user roles")
223       
224
225        create_user_fields = [
226                ("name", "Federation User %s" % uname['experimentCreation']),
227                ("email", "%s-fed@isi.deterlab.net" % \
228                        uname['experimentCreation']),
229                ("password", self.random_string("", 8)),
230                ("login", uname['experimentCreation']),
231                ("address", "4676 Admiralty"),
232                ("city", "Marina del Rey"),
233                ("state", "CA"),
234                ("zip", "90292"),
235                ("country", "USA"),
236                ("phone", "310-448-9190"),
237                ("title", "None"),
238                ("affiliation", "USC/ISI"),
239                ("pubkey", ssh['experimentCreation'])
240        ]
241
242        service_user_fields = [
243                ("name", "Federation User %s" % uname['serviceAccess']),
244                ("email", "%s-fed@isi.deterlab.net" % uname['serviceAccess']),
245                ("password", self.random_string("", 8)),
246                ("login", uname['serviceAccess']),
247                ("address", "4676 Admiralty"),
248                ("city", "Marina del Rey"),
249                ("state", "CA"),
250                ("zip", "90292"),
251                ("country", "USA"),
252                ("phone", "310-448-9190"),
253                ("title", "None"),
254                ("affiliation", "USC/ISI"),
255                ("pubkey", ssh['serviceAccess'])
256        ]
257
258        proj_fields = [
259                ("name", name),
260                ("short description", "dynamic federated project"),
261                ("URL", "http://www.isi.edu/~faber"),
262                ("funders", "USC/USU"),
263                ("long description", "Federation access control"),
264                ("public", "1"),
265                ("num_pcs", "100"),
266                ("linkedtous", "1"),
267                ("newuser_xml", create_userfile)
268        ]
269       
270
271        # Write out the files
272        self.write_attr_xml(cuf, "user", create_user_fields)
273        self.write_attr_xml(suf, "user", service_user_fields)
274        self.write_attr_xml(pf, "project", proj_fields)
275
276        # Generate the commands (only grantnodetype's are dynamic)
277        cmds = [
278                (self.wap, self.newproj, projfile),
279                (self.wap, self.mkproj, name),
280                (self.wap, self.newuser, service_userfile),
281                (self.wap, self.user_to_project, uname['serviceAccess'], name),
282                ]
283
284        # Add commands to grant access to any resources in the request.  The
285        # list comprehension pulls out the hardware types in the node entries
286        # in the resources list.
287        if resources.has_key('node'):
288            for nt in [ h for n in resources['node']\
289                    if n.has_key('hardware') for h in n['hardware'] ] :
290                if self.allocation_level >= self.confirm_keys:
291                    cmds.append((self.wap, self.grantnodetype, '-p', pname, nt))
292
293
294        # Create the projects
295        rc = 0
296        for cmd in cmds:
297            self.log.debug("[dynamic_project]: %s" % ' '.join(cmd))
298            if not self.debug:
299                try:
300                    rc = subprocess.call(cmd)
301                except OSerror, e:
302                    raise service_error(service_error.internal,
303                            "Dynamic project subprocess creation error "+ \
304                                    "[%s] (%s)" %  (cmd[1], e.strerror))
305
306            if rc != 0: 
307                raise service_error(service_error.internal,
308                        "Dynamic project subprocess error " +\
309                                "[%s] (%d)" % (cmd[1], rc))
310        # Clean up tempfiles
311        #os.unlink(create_userfile)
312        #os.unlink(service_userfile)
313        #os.unlink(projfile)
314        rv = {\
315            'project': {\
316                'name': { 'localname': name }, 
317                'user' : [\
318                    {\
319                        'userID': { 'localname' : uname['experimentCreation'] },
320                        'access': [ {'sshPubkey': ssh['experimentCreation'] } ],
321                        'role': 'experimentCreation',
322                    }, \
323                    {\
324                        'userID': { 'localname' : uname['serviceAccess'] },
325                        'access': [ { 'sshPubkey' : ssh['serviceAccess'] } ], 
326                        'role': 'serviceAccess',
327                    } \
328                ]\
329            }\
330        }
331        return rv
332
333    def static_project(self, req, fedid=None):
334        """
335        Be certain that the local project in the request has access to the
336        proper resources and users have correct keys.  Add them if necessary.
337        """
338
339        cmds =  []
340
341        # Internal calls do not have a fedid parameter (i.e., local calls on
342        # behalf of already vetted fedids)
343        if fedid and not self.auth.check_attribute(fedid, "allocate"):
344            self.log.debug("[allocate] Access denied (%s)" % fedid)
345            raise service_error(service_error.access, "Access Denied")
346        # While we should be more careful about this, for the short term, add
347        # the keys to the specified users.
348
349        try:
350            users = req['StaticProjectRequestBody']['project']['user']
351            pname = req['StaticProjectRequestBody']['project']\
352                    ['name']['localname']
353            resources = req['StaticProjectRequestBody'].get('resources', { })
354        except KeyError:
355            raise service_error(service_error.req, "Badly formed request")
356
357
358        for u in users:
359            try:
360                name = u['userID']['localname']
361            except KeyError:
362                raise service_error(service_error.req, "Badly formed user")
363            for sk in [ k['sshPubkey'] for k in u.get('access', []) \
364                    if k.has_key('sshPubkey')]:
365                if self.allocation_level >= self.dynamic_keys:
366                    cmds.append((self.wap, self.addpubkey, '-r', \
367                            '-u', name, '-k', sk))
368                elif self.allocation_level >= self.confirm_keys:
369                    cmds.append((self.wap, self.confirmkey, '-C', \
370                            '-u', name, '-k', sk))
371                else:
372                    self.log.warning("[static_project] no checking of " + \
373                            "static keys")
374       
375
376        # Add commands to grant access to any resources in the request.  The
377        # list comprehension pulls out the hardware types in the node entries
378        # in the resources list.
379        if resources.has_key('node'):
380            for nt in [ h for n in resources['node']\
381                    if n.has_key('hardware') for h in n['hardware'] ] :
382                if self.allocation_level >= self.confirm_keys:
383                    cmds.append((self.wap, self.grantnodetype, '-p', pname, nt))
384
385        # Run the commands
386        rc = 0
387        for cmd in cmds:
388            self.log.debug("[static_project]: %s" % ' '.join(cmd))
389            if not self.debug:
390                try:
391                    rc = subprocess.call(cmd)
392                except OSError, e:
393                    raise service_error(service_error.internal,
394                            "Static project subprocess creation error "+ \
395                                    "[%s] (%s)" %  (cmd[0], e.strerror))
396
397            if rc != 0: 
398                raise service_error(service_error.internal,
399                        "Static project subprocess error " +\
400                                "[%s] (%d)" % (cmd[0], rc))
401
402        return { 'project': req['StaticProjectRequestBody']['project']}
403
404    def release_project(self, req, fedid=None):
405        """
406        Remove user keys from users and delete dynamic projects.
407
408        Only keys not in the set of fixed keys are deleted. and there are
409        similar protections for projects.
410        """
411        # Internal calls do not have a fedid parameter (i.e., local calls on
412        # behalf of already vetted fedids)
413        if fedid and not self.auth.check_attribute(fedid, "allocate"):
414            self.log.debug("[allocate] Access denied (%s)" % fedid)
415            raise service_error(service_error.access, "Access Denied")
416
417        cmds = []
418        pname = None
419        users = []
420
421        try:
422            if req['ReleaseProjectRequestBody']['project'].has_key('name'):
423                pname = req['ReleaseProjectRequestBody']['project']\
424                        ['name']['localname']
425            if req['ReleaseProjectRequestBody']['project'].has_key('user'):
426                users = req['ReleaseProjectRequestBody']['project']['user']
427        except KeyError:
428            raise service_error(service_error.req, "Badly formed request")
429
430        if pname and pname not in self.fixed_projects and \
431                self.allocation_level >= self.dynamic_projects:
432            cmds.append((self.wap, self.rmproj, pname))
433
434        for u in users:
435            try:
436                name = u['userID']['localname']
437            except KeyError:
438                raise service_error(service_error.req, "Badly formed user")
439            if self.allocation_level >= self.dynamic_projects and \
440                    name not in self.fixed_users:
441                cmds.append((self.wap, self.rmuser, name))
442            else:
443                for sk in [ k['sshPubkey'] for k in u.get('access', []) \
444                        if k.has_key('sshPubkey')]:
445                    if (name.rstrip(), sk.rstrip()) not in self.fixed_keys:
446                        if self.allocation_level >= self.dynamic_keys:
447                            cmds.append((self.wap, self.addpubkey, '-R', '-r', \
448                                    '-u', name, '-k', sk))
449
450        # Run the commands
451        rc = 0
452        for cmd in cmds:
453            self.log.debug("[release_project]: %s" % ' '.join(cmd))
454            if not self.debug:
455                try:
456                    rc = subprocess.call(cmd)
457                except OSError, e:
458                    raise service_error(service_error.internal,
459                            "Release project subprocess creation error "+ \
460                                    "[%s] (%s)" %  (cmd[0], e.strerror))
461
462            if rc != 0: 
463                raise service_error(service_error.internal,
464                        "Release project subprocess error " +\
465                                "[%s] (%d)" % (cmd[0], rc))
466
467        return { 'project': req['ReleaseProjectRequestBody']['project']}
468
469class allocate_project_remote:
470    """
471    Allocate projects on a remote machine using the internal SOAP interface
472    """
473    class proxy(service_caller):
474        """
475        This class is a proxy functor (callable) that has the same signature as
476        a function called by soap_handler or xmlrpc_handler, but that used the
477        service_caller class to call the function remotely.
478        """
479
480        def __init__(self, url, cert_file, cert_pwd, trusted_certs, auth, 
481                method):
482            service_caller.__init__(self, method)
483            self.url = url
484            self.cert_file = cert_file
485            self.cert_pwd = cert_pwd
486            self.trusted_certs = trusted_certs
487            self.request_body__name = "%sRequestBody" % method
488            self.resp_name = "%sResponseBody" % method
489            self.auth = auth
490            # Calling the proxy object directly invokes the proxy_call method,
491            # not the service_call method.
492            self.__call__ = self.proxy_call
493           
494
495        # Define the proxy, NB, the parameters to make_proxy are visible to the
496        # definition of proxy.
497        def proxy_call(self, req, fid=None):
498            """
499            Send req on to a remote project instantiator.
500
501            Req is just the message to be sent.  This function re-wraps it.
502            It also rethrows any faults.
503            """
504
505            if req.has_key(self.request_body_name):
506                req = req[self.request_body_name]
507            else:
508                raise service_error(service_error.req, "Bad formated request");
509
510            try:
511                r = self.call_service(self.url, req, self.cert_file,
512                        self.cert_pwd, self.trusted_certs)
513            except service_error, e:
514                if e.code == service_error.connect:
515                    raise service_error(service_error.internal, 
516                            "Cannot connect to internal service: (%d) %s" % \
517                                    (e.code, e.desc))
518                else: raise
519            if r.has_key(self.resp_name):
520                return r[self.resp_name]
521            else:
522                raise service_error(service_error.protocol, 
523                        "Bad proxy response")
524
525    # back to defining the allocate_project_remote class
526    def __init__(self, config, auth=None):
527        """
528        Initializer.  Parses a configuration if one is given.
529        """
530
531        self.debug = config.get("allocate", "debug", False)
532        self.url = config.get("allocate", "uri", "")
533
534        self.cert_file = config.get("allocate", "cert_file", None)
535        self.cert_pwd = config.get("allocate", "cert_pwd", None)
536        self.trusted_certs = config.get("allocate", "trusted_certs", None)
537
538        # Certs are promoted from the generic to the specific, so without a if
539        # no dynamic project certificates, then proxy certs are used, and if
540        # none of those the main certs.
541
542        if config.has_option("globals", "proxy_cert_file"):
543            if not self.cert_file:
544                self.cert_file = config.get("globals", "proxy_cert_file")
545                if config.has_option("globals", "proxy_cert_pwd"):
546                    self.cert_pwd = config.get("globals", "proxy_cert_pwd")
547
548        if config.has_option("globals", "proxy_trusted_certs") and \
549                not self.trusted_certs:
550                self.trusted_certs = \
551                        config.get("globals", "proxy_trusted_certs")
552
553        if config.has_option("globals", "cert_file"):
554            has_pwd = config.has_option("globals", "cert_pwd")
555            if not self.cert_file:
556                self.cert_file = config.get("globals", "cert_file")
557                if has_pwd: 
558                    self.cert_pwd = config.get("globals", "cert_pwd")
559
560        if config.get("globals", "trusted_certs") and not self.trusted_certs:
561                self.trusted_certs = \
562                        config.get("globals", "trusted_certs")
563
564        self.soap_services = { }
565        self.xmlrpc_services = { }
566        self.log = logging.getLogger("fedd.allocate.remote")
567        set_log_level(config, "allocate", self.log)
568
569        if auth:
570            self.auth = auth
571        else:
572            auth = authorizer()
573            log.warn("[allocate] No authorizer passed in, using local one")
574
575        # The specializations of the proxy functions
576        self.dynamic_project = self.proxy(self.url, self.cert_file, 
577                self.cert_pwd, self.trusted_certs, self.auth, 
578                "AllocateProject")
579        self.static_project = self.proxy(self.url, self.cert_file, 
580                self.cert_pwd, self.trusted_certs, self.auth, 
581                "StaticProject")
582        self.release_project = self.proxy(self.url, self.cert_file,
583                self.cert_pwd, self.trusted_certs, self.auth, 
584                "ReleaseProject")
585
Note: See TracBrowser for help on using the repository browser.