Changeset c3dcf48


Ignore:
Timestamp:
Dec 4, 2008 9:49:39 PM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
416292f
Parents:
2ac63f7d
Message:

Snapshot of new state-based allocation

Location:
fedd/federation
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/access.py

    r2ac63f7d rc3dcf48  
    8080        self.projects = { }
    8181        self.keys = { }
     82        self.types = { }
    8283        self.allocation = { }
    8384        self.state = {
    8485            'projects': self.projects,
    8586            'allocation' : self.allocation,
    86             'keys' : self.keys
     87            'keys' : self.keys,
     88            'types': self.types
    8789        }
    8890        self.log = logging.getLogger("fedd.access")
     
    331333                self.projects = self.state['projects']
    332334                self.keys = self.state['keys']
     335                self.types = self.state['types']
    333336
    334337                self.log.debug("[read_state]: Read state from %s" % \
     
    653656            self.state_lock.acquire()
    654657            self.allocation[aid] = { }
     658            try:
     659                pname = ap['project']['name']['localname']
     660            except KeyError:
     661                pname = None
     662
    655663            if dyn[1]:
    656                 try:
    657                     pname = ap['project']['name']['localname']
    658                 except KeyError:
     664                if not pname:
    659665                    self.state_lock.release()
    660666                    raise service_error(service_error.internal,
     
    663669                else: self.projects[pname] = 1
    664670                self.allocation[aid]['project'] = pname
     671
     672            if ap.has_key('resources'):
     673                if not pname:
     674                    self.state_lock.release()
     675                    raise service_error(service_error.internal,
     676                            "Misformed allocation response?")
     677                self.allocation[aid]['types'] = set()
     678                nodes = ap['resources'].get('node', [])
     679                for n in nodes:
     680                    for h in n.get('hardware', []):
     681                        if self.types.has_key((pname, h)):
     682                            self.types[(pname, h)] += 1
     683                        else:
     684                            self.types[(pname, h)] = 1
     685                        self.allocation[aid]['types'].add((pname,h))
     686
    665687
    666688            self.allocation[aid]['keys'] = [ ]
     
    680702                        "Misformed allocation response?")
    681703
     704
    682705            self.allocation[aid]['owners'] = owners
    683706            self.write_state()
     
    741764            del_users = { }
    742765            del_project = None
     766            del_types = set()
     767
    743768            if self.allocation.has_key(aid):
    744769                self.log.debug("Found allocation for %s" %aid)
     
    759784                        del_project = pname
    760785                        del self.projects[pname]
     786
     787                if self.allocation[aid].has_key('types'):
     788                    for t in self.allocation[aid]['types']:
     789                        self.types[t] -= 1
     790                        if self.types[t] == 0:
     791                            if not del_project: del_project = t[0]
     792                            del_types.add(t[1])
     793                            del self.types[t]
    761794
    762795                del self.allocation[aid]
     
    776809                    if users:
    777810                        msg['project']['user'] = users
     811                    if len(del_types) > 0:
     812                        msg['resources'] = { 'node': \
     813                                [ {'hardware': [ h ] } for h in del_types ]\
     814                            }
    778815                    if self.allocate_project.release_project:
    779816                        msg = { 'ReleaseProjectRequestBody' : msg}
  • fedd/federation/allocate_project.py

    r2ac63f7d rc3dcf48  
    66import string
    77import subprocess
     8import threading
     9import pickle
    810import tempfile
    911
     
    8789            self.allocation_level = self.none
    8890
     91        self.state = {
     92                'keys': set(),
     93                'types': set(),
     94                'projects': set(),
     95                'users': set(),
     96                }
     97        self.state_filename = config.get('allocate', 'allocation_state')
     98        self.state_lock = threading.Lock()
     99        self.read_state()
     100
    89101        access_db = config.get("allocate", "accessdb")
    90102        if access_db:
     
    127139        self.xmlrpc_services = { }
    128140
     141    def read_state(self):
     142        """
     143        Read a new copy of access state.  Old state is overwritten.
     144
     145        State format is a simple pickling of the state dictionary.
     146        """
     147        if self.state_filename:
     148            try:
     149                f = open(self.state_filename, "r")
     150                self.state = pickle.load(f)
     151                self.log.debug("[allocation]: Read state from %s" % \
     152                        self.state_filename)
     153            except IOError, e:
     154                self.log.warning(("[allocation]: No saved state: " +\
     155                        "Can't open %s: %s") % (self.state_filename, e))
     156            except EOFError, e:
     157                self.log.warning(("[allocation]: " +\
     158                        "Empty or damaged state file: %s:") % \
     159                        self.state_filename)
     160            except pickle.UnpicklingError, e:
     161                self.log.warning(("[allocation]: No saved state: " + \
     162                        "Unpickling failed: %s") % e)
     163            # These should all be in the picked representation, but make sure
     164            if not self.state.has_key('keys'): self.state['keys'] = set()
     165            if not self.state.has_key('types'): self.state['types'] = set()
     166            if not self.state.has_key('projects'):
     167                self.state['projects'] = set()
     168            if not self.state.has_key('users'): self.state['users'] = set()
     169
     170    def write_state(self):
     171        if self.state_filename:
     172            try:
     173                f = open(self.state_filename, 'w')
     174                pickle.dump(self.state, f)
     175            except IOError, e:
     176                self.log.error("Can't write file %s: %s" % \
     177                        (self.state_filename, e))
     178            except pickle.PicklingError, e:
     179                self.log.error("Pickling problem: %s" % e)
     180            except TypeError, e:
     181                self.log.error("Pickling problem (TypeError): %s" % e)
     182
     183
    129184    def random_string(self, s, n=3):
    130185        """Append n random ASCII characters to s and return the string"""
     
    149204        f.write("</%s>\n" % root)
    150205        f.close()
     206
     207    def run_cmd(self, cmd, log_prefix='allocate'):
     208        """
     209        Run the command passed in.  Cmd is a list containing the words of the
     210        command.  Return the exit value from the subprocess - that is 0 on
     211        success.  On an error running the command -  python or OS error, raise
     212        a service exception.
     213        """
     214        self.log.debug("[%s]: %s" % (log_prefix, ' '.join(cmd)))
     215        if not self.debug:
     216            try:
     217                return subprocess.call(cmd)
     218            except OSError, e:
     219                raise service_error(service_error.internal,
     220                        "Static project subprocess creation error "+ \
     221                                "[%s] (%s)" %  (cmd[0], e.strerror))
     222        else:
     223            return 0
     224
     225    def confirm_key(self, user, key):
     226        """
     227        Call run_cmd to comfirm the key.  Return a boolean rather
     228        than the subprocess code.
     229        """
     230        return self.run_cmd((self.wap, self.confirmkey, '-C', 
     231            '-u', user, '-k', key)) ==0
     232
     233    def add_key(self, user, key):
     234        """
     235        Call run_cmd to add the key.  Return a boolean rather
     236        than the subprocess code.
     237        """
     238        return self.run_cmd((self.wap, self.addpubkey, '-u', user,
     239            '-k', key)) == 0
     240
     241    def remove_key(self, user, key):
     242        """
     243        Call run_cmd to remove the key.  Return a boolean rather
     244        than the subprocess code.
     245        """
     246        return self.run_cmd((self.wap, self.addpubkey, '-R', '-u', user,
     247            '-k', key)) == 0
     248
     249    def confirm_access(self, project, type):
     250        """
     251        Call run_cmd to comfirm the key.  Return a boolean rather
     252        than the subprocess code.
     253        """
     254        return self.run_cmd((self.wap, self.grantnodetype, '-C', 
     255            '-p', project, type)) ==0
     256
     257    def add_access(self, project, type):
     258        """
     259        Call run_cmd to add the key.  Return a boolean rather
     260        than the subprocess code.
     261        """
     262        return self.run_cmd((self.wap, self.grantnodetype,
     263            '-p', project, type)) == 0
     264
     265    def remove_access(self, project, type):
     266        """
     267        Call run_cmd to remove the key.  Return a boolean rather
     268        than the subprocess code.
     269        """
     270
     271        return self.run_cmd((self.wap, self.grantnodetype, '-R',
     272            '-p', project, type)) == 0
     273
     274    def add_project(self, project, projfile):
     275        """
     276        Create a project using run_cmd.  This is two steps, and assumes that
     277        the relevant XML files are in place and correct.  Make the return value
     278        boolean.  Note that if a new user is specified in the XML, that user is
     279        created on success.
     280        """
     281
     282        if self.run_cmd((self.wap, self.newproj, projfile)) == 0:
     283            return self.run_cmd((self.wap, self.mkproj, project)) ==0
     284        else:
     285            return False
     286
     287    def remove_project(self, project):
     288        """
     289        Call run_cmd to remove the project.  Make the return value boolean.
     290        """
     291
     292        return self.run_cmd(self.wap, self.rmproj, project) == 0
     293
     294   
     295    def add_user(self, name, param_file, project):
     296        """
     297        Create a user and link them to the given project.  Similar to
     298        add_project, this requires a two step approach.  Returns True on success
     299        False on failure.
     300        """
     301
     302        if self.run_cmd((self.wap, self.newuser, param_file)) == 0:
     303           return self.run_cmd((self.wap, self.user_to_project,
     304               user, project)) == 0
     305        else:
     306           return False
     307
     308    def remove_user(self, user):
     309        """
     310        Call run_cmd to remove the user.  Make the return value boolean.
     311        """
     312
     313        return self.run_cmd(self.wap, self.rmuser, user) == 0
     314
    151315
    152316
     
    269433       
    270434
    271         # Write out the files
    272         self.write_attr_xml(cuf, "user", create_user_fields)
    273         self.write_attr_xml(suf, "user", service_user_fields)
    274         self.write_attr_xml(pf, "project", proj_fields)
    275 
    276         # Generate the commands (only grantnodetype's are dynamic)
    277         cmds = [
    278                 (self.wap, self.newproj, projfile),
    279                 (self.wap, self.mkproj, name),
    280                 (self.wap, self.newuser, service_userfile),
    281                 (self.wap, self.user_to_project, uname['serviceAccess'], name),
    282                 ]
    283 
    284         # Add commands to grant access to any resources in the request.  The
    285         # list comprehension pulls out the hardware types in the node entries
    286         # in the resources list.
    287         if resources.has_key('node'):
    288             for nt in [ h for n in resources['node']\
    289                     if n.has_key('hardware') for h in n['hardware'] ] :
    290                 if self.allocation_level >= self.confirm_keys:
    291                     cmds.append((self.wap, self.grantnodetype, '-p', pname, nt))
    292 
    293 
    294         # Create the projects
    295         rc = 0
    296         for cmd in cmds:
    297             self.log.debug("[dynamic_project]: %s" % ' '.join(cmd))
    298             if not self.debug:
    299                 try:
    300                     rc = subprocess.call(cmd)
    301                 except OSerror, e:
    302                     raise service_error(service_error.internal,
    303                             "Dynamic project subprocess creation error "+ \
    304                                     "[%s] (%s)" %  (cmd[1], e.strerror))
    305 
    306             if rc != 0:
    307                 raise service_error(service_error.internal,
    308                         "Dynamic project subprocess error " +\
    309                                 "[%s] (%d)" % (cmd[1], rc))
    310         # Clean up tempfiles
    311         #os.unlink(create_userfile)
    312         #os.unlink(service_userfile)
    313         #os.unlink(projfile)
     435
     436        added_projects = [ ]
     437        added_users = [ ]
     438        added_types = [ ]
     439
     440        self.state_lock.acquire()
     441        try:
     442            # Write out the files
     443            self.write_attr_xml(cuf, "user", create_user_fields)
     444            self.write_attr_xml(suf, "user", service_user_fields)
     445            self.write_attr_xml(pf, "project", proj_fields)
     446            try:
     447                if self.add_project(name, projfile):
     448                    # add_project adds a user as well in this case
     449                    added_projects.append(name)
     450                    added_users.append(uname['createExperiment'])
     451                    self.state['projects'].add(name)
     452                    self.state['users'].add(uname['createExperiment'])
     453
     454                    if self.add_user(uname['serviceAccess'],
     455                            service_userfile, name):
     456                        added_users.append(uname['serviceAccess'])
     457                        self.state['users'].add(uname['serviceAccess'])
     458                    else:
     459                        raise service_error("Unable to create user %s" % \
     460                                uname['serviceAccess'])
     461                else:
     462                    raise service_error("Unable to create project/user %s/%s" % \
     463                            (name, uname['experimentCreation']))
     464
     465                nodes = resources.get('node', [])
     466                # Grant access to restricted resources.  This is simpler than
     467                # the corresponding loop from static_project because this is a
     468                # clean slate.
     469                for nt in [ h for n in nodes\
     470                        if n.has_key('hardware')\
     471                            for h in n['hardware'] ] :
     472                    if self.add_access(name, nt):
     473                        self.state['types'].add((name, nt))
     474                        added_types.append((name, nt))
     475                    else:
     476                        raise service_error(service_error.internal,
     477                                "Failed to add access for %s to %s"\
     478                                        % (name, nt))
     479            except service_error, e:
     480                # Something failed.  Back out the partial allocation as
     481                # completely as possible and re-raise the error.
     482                for p, t in added_types:
     483                    self.state['types'].discard((p, t))
     484                    try:
     485                        self.remove_access(p, t)
     486                    except service_error:
     487                        pass
     488                for u in added_users:
     489                    self.state['users'].discard(u)
     490                    try:
     491                        self.remove_user(u)
     492                    except service_error:
     493                        pass
     494
     495                for p in added_projects:
     496                    self.state['projects'].discard(p)
     497                    try:
     498                        self.remove_project(p)
     499                    except service_error:
     500                        pass
     501                self.state_lock.release()
     502                raise e
     503        finally:
     504            # Clean up tempfiles
     505            os.unlink(create_userfile)
     506            os.unlink(service_userfile)
     507            os.unlink(projfile)
     508
    314509        rv = {\
    315510            'project': {\
     
    336531        proper resources and users have correct keys.  Add them if necessary.
    337532        """
    338 
    339         cmds =  []
    340 
    341533        # Internal calls do not have a fedid parameter (i.e., local calls on
    342534        # behalf of already vetted fedids)
     
    355547            raise service_error(service_error.req, "Badly formed request")
    356548
    357 
    358         for u in users:
    359             try:
    360                 name = u['userID']['localname']
    361             except KeyError:
    362                 raise service_error(service_error.req, "Badly formed user")
    363             for sk in [ k['sshPubkey'] for k in u.get('access', []) \
    364                     if k.has_key('sshPubkey')]:
    365                 if self.allocation_level >= self.dynamic_keys:
    366                     cmds.append((self.wap, self.addpubkey, '-r', \
    367                             '-u', name, '-k', sk))
    368                 elif self.allocation_level >= self.confirm_keys:
    369                     cmds.append((self.wap, self.confirmkey, '-C', \
    370                             '-u', name, '-k', sk))
     549        added_keys = [ ]
     550        added_types = [ ]
     551        # Keep track of changes made to the system
     552        self.state_lock.acquire()
     553
     554        try:
     555            for u in users:
     556                try:
     557                    name = u['userID']['localname']
     558                except KeyError:
     559                    raise service_error(service_error.req, "Badly formed user")
     560                for sk in [ k['sshPubkey'] for k in u.get('access', []) \
     561                        if k.has_key('sshPubkey')]:
     562                    if self.allocation_level >=self.confirm_keys:
     563                        key_ok = self.confirm_key(name, sk)
     564                        if not key_ok:
     565                            if self.allocation_level >= self.dynamic_keys:
     566                                if self.add_key(name, sk):
     567                                    self.state['keys'].add((name, sk))
     568                                    added_keys.append((name, sk))
     569                                else:
     570                                    raise service_error(service_error.internal,
     571                                            "Failed to add key for %s" % name)
     572                            else:
     573                                raise service_error(service_error.internal,
     574                                        "Failed to confirm key for %s" % name)
     575                    else:
     576                        self.log.warning("[static_project] no checking of " + \
     577                            "static keys")
     578
     579            # Grant access to any resources in the request.  The
     580            # list comprehension pulls out the hardware types in the node
     581            # entries in the resources list.  The access module knows to
     582            # only send resources that are restricted and needed by the
     583            # project.
     584            nodes = resources.get('node', [])
     585            for nt in [ h for n in nodes\
     586                    if n.has_key('hardware')\
     587                        for h in n['hardware'] ] :
     588                if self.allocation_level >= self.confirm_keys:
     589                    access_ok = self.confirm_access(pname, nt)
     590                    if not access_ok:
     591                        if self.allocation_level >= self.dynamic_keys:
     592                            if self.add_access(pname, nt):
     593                                self.state['types'].add((pname, nt))
     594                                added_types.append((pname, nt))
     595                            else:
     596                                raise service_error(service_error.internal,
     597                                        "Failed to add access for %s to %s"\
     598                                                % (pname, nt))
     599                        else:
     600                            raise service_error(service_error.internal,
     601                                    "Failed to confirm access for %s to %s"\
     602                                            % (pname, nt))
    371603                else:
    372604                    self.log.warning("[static_project] no checking of " + \
    373                             "static keys")
    374        
    375 
    376         # Add commands to grant access to any resources in the request.  The
    377         # list comprehension pulls out the hardware types in the node entries
    378         # in the resources list.
    379         if resources.has_key('node'):
    380             for nt in [ h for n in resources['node']\
    381                     if n.has_key('hardware') for h in n['hardware'] ] :
    382                 if self.allocation_level >= self.confirm_keys:
    383                     cmds.append((self.wap, self.grantnodetype, '-p', pname, nt))
    384 
    385         # Run the commands
    386         rc = 0
    387         for cmd in cmds:
    388             self.log.debug("[static_project]: %s" % ' '.join(cmd))
    389             if not self.debug:
     605                        "node access")
     606        except service_error, e:
     607            # Do our best to clean up partial allocation and reraise the
     608            # error.  Do our best to make sure that both allocation state and
     609            # testbed state is restored.
     610            for u, k in added_keys:
     611                self.state['keys'].discard((u, k))
    390612                try:
    391                     rc = subprocess.call(cmd)
    392                 except OSError, e:
    393                     raise service_error(service_error.internal,
    394                             "Static project subprocess creation error "+ \
    395                                     "[%s] (%s)" %  (cmd[0], e.strerror))
    396 
    397             if rc != 0:
    398                 raise service_error(service_error.internal,
    399                         "Static project subprocess error " +\
    400                                 "[%s] (%d)" % (cmd[0], rc))
    401 
    402         return { 'project': req['StaticProjectRequestBody']['project']}
     613                    self.remove_key(u, k)
     614                except service_error:
     615                    pass
     616            for p, t in added_types:
     617                self.state['types'].discard((p, t))
     618                try:
     619                    self.remove_access(p, t)
     620                except service_error:
     621                    pass
     622            self.state_lock.release()
     623            raise e
     624        # All is well, save state and release the lock
     625        self.write_state()
     626        self.state_lock.release()
     627        # return { 'project': req['StaticProjectRequestBody']['project']}
     628        return req['StaticProjectRequestBody']
    403629
    404630    def release_project(self, req, fedid=None):
     
    415641            raise service_error(service_error.access, "Access Denied")
    416642
    417         cmds = []
    418643        pname = None
    419644        users = []
     645        nodes = [ ]
     646
     647        print req
    420648
    421649        try:
     
    425653            if req['ReleaseProjectRequestBody']['project'].has_key('user'):
    426654                users = req['ReleaseProjectRequestBody']['project']['user']
     655            if req['ReleaseProjectRequestBody'].has_key('resources'):
     656                nodes = req['ReleaseProjectRequestBody']\
     657                        ['resources'].get('node', [])
    427658        except KeyError:
    428659            raise service_error(service_error.req, "Badly formed request")
    429660
    430         if pname and pname not in self.fixed_projects and \
    431                 self.allocation_level >= self.dynamic_projects:
    432             cmds.append((self.wap, self.rmproj, pname))
    433 
    434         for u in users:
    435             try:
    436                 name = u['userID']['localname']
    437             except KeyError:
    438                 raise service_error(service_error.req, "Badly formed user")
    439             if self.allocation_level >= self.dynamic_projects and \
    440                     name not in self.fixed_users:
    441                 cmds.append((self.wap, self.rmuser, name))
    442             else:
    443                 for sk in [ k['sshPubkey'] for k in u.get('access', []) \
    444                         if k.has_key('sshPubkey')]:
    445                     if (name.rstrip(), sk.rstrip()) not in self.fixed_keys:
    446                         if self.allocation_level >= self.dynamic_keys:
    447                             cmds.append((self.wap, self.addpubkey, '-R', '-r', \
    448                                     '-u', name, '-k', sk))
    449 
    450         # Run the commands
    451         rc = 0
    452         for cmd in cmds:
    453             self.log.debug("[release_project]: %s" % ' '.join(cmd))
    454             if not self.debug:
     661        if nodes and not pname:
     662            raise service_error(service_error.req,
     663                    "Badly formed request (nodes without project)")
     664
     665        self.state_lock.acquire()
     666        try:
     667            for nt in [ h for n in nodes if n.has_key('hardware')\
     668                    for h in n['hardware'] ] :
     669                if (pname, nt ) in self.state['types']:
     670                    self.remove_access(pname, nt)
     671                    self.state['types'].discard((pname, nt))
     672
     673            for u in users:
    455674                try:
    456                     rc = subprocess.call(cmd)
    457                 except OSError, e:
    458                     raise service_error(service_error.internal,
    459                             "Release project subprocess creation error "+ \
    460                                     "[%s] (%s)" %  (cmd[0], e.strerror))
    461 
    462             if rc != 0:
    463                 raise service_error(service_error.internal,
    464                         "Release project subprocess error " +\
    465                                 "[%s] (%d)" % (cmd[0], rc))
     675                    name = u['userID']['localname']
     676                except KeyError:
     677                    raise service_error(service_error.req, "Badly formed user")
     678                if name in self.state['users']:
     679                    # If we created this user, discard the user, keys and all
     680                    self.remove_user(name)
     681                    self.state['users'].discard(name)
     682                else:
     683                    # If not, just strip any keys we added
     684                    for sk in [ k['sshPubkey'] for k in u.get('access', []) \
     685                            if k.has_key('sshPubkey')]:
     686                        if (name, sk) in self.state['keys']:
     687                            self.remove_key(name, sk)
     688                            self.state['keys'].discard((name, sk))
     689            if pname in self.state['projects']:
     690                self.remove_project(pname)
     691                self.state['projects'].discard(pname)
     692
     693        except service_error, e:
     694            self.write_state()
     695            self.state_lock.release()
     696            raise e
     697        self.write_state()
     698        self.state_lock.release()
    466699
    467700        return { 'project': req['ReleaseProjectRequestBody']['project']}
Note: See TracChangeset for help on using the changeset viewer.