Changeset 06cc65b
- Timestamp:
- May 28, 2010 2:23:12 AM (14 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
- Children:
- 5bf359d
- Parents:
- d6990a4
- Location:
- fedd/federation
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/access.py
rd6990a4 r06cc65b 205 205 f = open(self.state_filename, 'w') 206 206 pickle.dump(self.state, f) 207 self.log.debug("Wrote state to %s" % self.state_filename) 207 208 except EnvironmentError, e: 208 209 self.log.error("Can't write file %s: %s" % \ … … 224 225 f = open(self.state_filename, "r") 225 226 self.state = pickle.load(f) 226 227 self.log.debug("[read_state]: Read state from %s" % \ 228 self.state_filename) 227 229 except EnvironmentError, e: 228 230 self.log.warning(("[read_state]: No saved state: " +\ … … 236 238 "Unpickling failed: %s") % e) 237 239 238 self.log.debug("[read_state]: Read state from %s" % \239 self.state_filename)240 241 self.allocation = self.state['allocation']242 self.projects = self.state['projects']243 self.keys = self.state['keys']244 self.types = self.state['types']245 246 # Add the ownership attributes to the authorizer. Note that the247 # indices of the allocation dict are strings, but the attributes are248 # fedids, so there is a conversion.249 for k in self.allocation.keys():250 for o in self.allocation[k].get('owners', []):251 self.auth.set_attribute(o, fedid(hexstr=k))252 if self.allocation[k].has_key('userconfig'):253 sfid = self.allocation[k]['userconfig']254 fid = fedid(hexstr=sfid)255 self.auth.set_attribute(fid, "/%s" % sfid)256 240 257 241 -
fedd/federation/emulab_access.py
rd6990a4 r06cc65b 10 10 import logging 11 11 import subprocess 12 import traceback 12 13 13 14 from threading import * … … 48 49 dynamically. This implements both direct requests and proxies. 49 50 """ 50 51 proxy_RequestAccess= service_caller('RequestAccess')52 proxy_ReleaseAccess= service_caller('ReleaseAccess')53 51 54 52 def __init__(self, config=None, auth=None): … … 106 104 107 105 self.restricted = [ ] 108 self.projects = { }109 self.keys = { }110 self.types = { }111 self.allocation = { }112 self.state = {113 'projects': self.projects,114 'allocation' : self.allocation,115 'keys' : self.keys,116 'types': self.types117 }118 106 self.access = { } 119 107 if config.has_option("access", "accessdb"): … … 138 126 self.make_access_project) 139 127 140 # read 128 # read_state in the base_class 141 129 self.state_lock.acquire() 130 for a in ('allocation', 'projects', 'keys', 'types'): 131 if a not in self.state: 132 self.state[a] = { } 142 133 self.allocation = self.state['allocation'] 143 134 self.projects = self.state['projects'] … … 192 183 @staticmethod 193 184 def make_access_project(str): 185 """ 186 Convert a string of the form (id[:resources:resouces], id, id) into an 187 access_project. This is called by read_access to convert to local 188 attributes. It returns a tuple of the form (project, user, user) where 189 users may be names or fedids. 190 """ 194 191 def parse_name(n): 195 192 if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):]) … … 216 213 217 214 215 # RequestAccess support routines 216 218 217 def lookup_access(self, req, fid): 219 218 """ 220 Determine the allowed access for this request. Return the access and 221 which fields are dynamic. 222 223 The fedid is needed to construct the request 219 Look up the local access control information mapped to this fedid and 220 credentials. In this case it is a (project, create_user, access_user) 221 triple, and a triple of three booleans indicating which, if any will 222 need to be dynamically created. Finally a list of owners for that 223 allocation is returned. 224 225 lookup_access_base pulls the first triple out, and it is parsed by this 226 routine into the boolean map. Owners is always the controlling fedid. 224 227 """ 225 228 # Return values … … 380 383 return (pname, uname) 381 384 385 # End of RequestAccess support routines 386 382 387 def RequestAccess(self, req, fid): 383 388 """ … … 564 569 raise service_error(service_error.req, "No such allocation") 565 570 566 571 # These are subroutines for StartSegment 567 572 def generate_ns2(self, topo, expfn, softdir, connInfo): 573 """ 574 Convert topo into an ns2 file, decorated with appropriate commands for 575 the particular testbed setup. Convert all requests for software, etc 576 to point at the staged copies on this testbed and add the federation 577 startcommands. 578 """ 568 579 class dragon_commands: 569 580 """ … … 593 604 # XXX: do netmask right 594 605 if type =='link': 595 s = ("tb-allow-external ${%s} dragonportal " + \ 596 "ip %s vlan %s netmask 255.255.255.0\n") % \ 606 s = ("tb-allow-external ${%s} " + \ 607 "dragonportal ip %s vlan %s " + \ 608 "netmask 255.255.255.0\n") % \ 597 609 (e.name, addr, vlan) 598 610 elif type =='lan': 599 s = ("tb-allow-external ${%s} dragonportal " + \ 611 s = ("tb-allow-external ${%s} " + \ 612 "dragonportal " + \ 600 613 "ip %s vlan %s usurp %s\n") % \ 601 614 (e.name, addr, vlan, subs) … … 606 619 607 620 class not_dragon: 621 """ 622 Return true if a node is in the given map of dragon nodes. 623 """ 608 624 def __init__(self, map): 609 625 self.nodes = set(map.keys()) … … 612 628 return e.name not in self.nodes 613 629 614 630 # Main line of generate_ns2 615 631 t = topo.clone() 616 632 633 # Create the map of nodes that need direct connections (dragon 634 # connections) from the connInfo 617 635 dragon_map = { } 618 636 for i in [ i for i in connInfo if i['type'] == 'transit']: … … 752 770 """ 753 771 Add a seer node to the given topology, with the startup command passed 754 in. 772 in. Used by configure seer_services. 755 773 """ 756 774 c_node = topdl.Computer( … … 810 828 self.add_seer_node(topo, 'seer-master', self.seer_master_start) 811 829 812 830 def retrieve_software(self, topo, certfile, softdir): 831 """ 832 Collect the software that nodes in the topology need loaded and stage 833 it locally. This implies retrieving it from the experiment_controller 834 and placing it into softdir. Certfile is used to prove that this node 835 has access to that data (it's the allocation/segment fedid). Finally 836 local portal and federation software is also copied to the same staging 837 directory for simplicity - all software needed for experiment creation 838 is in softdir. 839 """ 840 sw = set() 841 for e in topo.elements: 842 for s in getattr(e, 'software', []): 843 sw.add(s.location) 844 for s in sw: 845 self.log.debug("Retrieving %s" % s) 846 try: 847 get_url(s, certfile, softdir) 848 except: 849 t, v, st = sys.exc_info() 850 raise service_error(service_error.internal, 851 "Error retrieving %s: %s" % (s, v)) 852 853 # Copy local federation and portal node software to the tempdir 854 for s in (self.federation_software, self.portal_software): 855 for l, f in s: 856 base = os.path.basename(f) 857 copy_file(f, "%s/%s" % (softdir, base)) 858 859 860 def initialize_experiment_info(self, attrs, aid, certfile, tmpdir): 861 """ 862 Gather common configuration files, retrieve or create an experiment 863 name and project name, and return the ssh_key filenames. Create an 864 allocation log bound to the state log variable as well. 865 """ 866 configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey')) 867 ename = None 868 pubkey_base = None 869 secretkey_base = None 870 proj = None 871 user = None 872 alloc_log = None 873 874 for a in attrs: 875 if a['attribute'] in configs: 876 try: 877 self.log.debug("Retrieving %s from %s" % \ 878 (a['attribute'], a['value'])) 879 get_url(a['value'], certfile, tmpdir) 880 except: 881 t, v, st = sys.exc_info() 882 raise service_error(service_error.internal, 883 "Error retrieving %s: %s" % (a.get('value', ""), v)) 884 if a['attribute'] == 'ssh_pubkey': 885 pubkey_base = a['value'].rpartition('/')[2] 886 if a['attribute'] == 'ssh_secretkey': 887 secretkey_base = a['value'].rpartition('/')[2] 888 if a['attribute'] == 'experiment_name': 889 ename = a['value'] 890 891 if not ename: 892 ename = "" 893 for i in range(0,5): 894 ename += random.choice(string.ascii_letters) 895 self.log.warn("No experiment name: picked one randomly: %s" \ 896 % ename) 897 898 if not pubkey_base: 899 raise service_error(service_error.req, 900 "No public key attribute") 901 902 if not secretkey_base: 903 raise service_error(service_error.req, 904 "No secret key attribute") 905 906 self.state_lock.acquire() 907 if aid in self.allocation: 908 proj = self.allocation[aid].get('project', None) 909 if not proj: 910 proj = self.allocation[aid].get('sproject', None) 911 user = self.allocation[aid].get('user', None) 912 self.allocation[aid]['experiment'] = ename 913 self.allocation[aid]['log'] = [ ] 914 # Create a logger that logs to the experiment's state object as 915 # well as to the main log file. 916 alloc_log = logging.getLogger('fedd.access.%s' % ename) 917 h = logging.StreamHandler( 918 list_log.list_log(self.allocation[aid]['log'])) 919 # XXX: there should be a global one of these rather than 920 # repeating the code. 921 h.setFormatter(logging.Formatter( 922 "%(asctime)s %(name)s %(message)s", 923 '%d %b %y %H:%M:%S')) 924 alloc_log.addHandler(h) 925 self.write_state() 926 self.state_lock.release() 927 928 if not proj: 929 raise service_error(service_error.internal, 930 "Can't find project for %s" %aid) 931 932 if not user: 933 raise service_error(service_error.internal, 934 "Can't find creation user for %s" %aid) 935 936 return (ename, proj, user, pubkey_base, secretkey_base, alloc_log) 937 938 def finalize_experiment(self, starter, topo, aid, alloc_id): 939 """ 940 Store key bits of experiment state in the global repository, including 941 the response that may need to be replayed, and return the response. 942 """ 943 # Copy the assigned names into the return topology 944 embedding = [ ] 945 for n in starter.node: 946 embedding.append({ 947 'toponame': n, 948 'physname': ["%s%s" % (starter.node[n], self.domain)], 949 }) 950 # Grab the log (this is some anal locking, but better safe than 951 # sorry) 952 self.state_lock.acquire() 953 logv = "".join(self.allocation[aid]['log']) 954 # It's possible that the StartSegment call gets retried (!). 955 # if the 'started' key is in the allocation, we'll return it rather 956 # than redo the setup. 957 self.allocation[aid]['started'] = { 958 'allocID': alloc_id, 959 'allocationLog': logv, 960 'segmentdescription': { 961 'topdldescription': topo.clone().to_dict() 962 }, 963 'embedding': embedding 964 } 965 retval = copy.copy(self.allocation[aid]['started']) 966 self.write_state() 967 self.state_lock.release() 968 return retval 969 970 def remove_dirs(self, dir): 971 """ 972 Remove the directory tree and all files rooted at dir. Log any errors, 973 but continue. 974 """ 975 self.log.debug("[removedirs]: removing %s" % dir) 976 try: 977 for path, dirs, files in os.walk(dir, topdown=False): 978 for f in files: 979 os.remove(os.path.join(path, f)) 980 for d in dirs: 981 os.rmdir(os.path.join(path, d)) 982 os.rmdir(dir) 983 except EnvironmentError, e: 984 self.log.error("Error deleting directory tree in %s" % e); 985 986 # End of StartSegment support routines 813 987 814 988 def StartSegment(self, req, fid): 815 816 configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))817 818 989 err = None # Any service_error generated after tmpdir is created 819 990 rv = None # Return value from segment creation … … 821 992 try: 822 993 req = req['StartSegmentRequestBody'] 994 auth_attr = req['allocID']['fedid'] 995 topref = req['segmentdescription']['topdldescription'] 823 996 except KeyError: 824 997 raise service_error(server_error.req, "Badly formed request") … … 826 999 connInfo = req.get('connection', []) 827 1000 services = req.get('service', []) 828 auth_attr = req['allocID']['fedid']829 1001 aid = "%s" % auth_attr 830 1002 attrs = req.get('fedAttr', []) … … 845 1017 # A new request. Do it. 846 1018 847 if req.has_key('segmentdescription') and \ 848 req['segmentdescription'].has_key('topdldescription'): 849 topo = \ 850 topdl.Topology(**req['segmentdescription']['topdldescription']) 1019 if topref: topo = topdl.Topology(**topref) 851 1020 else: 852 1021 raise service_error(service_error.req, 853 1022 "Request missing segmentdescription'") 854 1023 855 master = req.get('master', False)856 857 1024 certfile = "%s/%s.pem" % (self.certdir, auth_attr) 858 1025 try: … … 865 1032 # Try block alllows us to clean up temporary files. 866 1033 try: 867 sw = set() 868 for e in topo.elements: 869 for s in getattr(e, 'software', []): 870 sw.add(s.location) 871 for s in sw: 872 self.log.debug("Retrieving %s" % s) 873 try: 874 get_url(s, certfile, softdir) 875 except: 876 t, v, st = sys.exc_info() 877 raise service_error(service_error.internal, 878 "Error retrieving %s: %s" % (s, v)) 879 880 # Copy local federation and portal node software to the tempdir 881 for s in (self.federation_software, self.portal_software): 882 for l, f in s: 883 base = os.path.basename(f) 884 copy_file(f, "%s/%s" % (softdir, base)) 885 886 ename = None 887 pubkey_base = None 888 secretkey_base = None 889 for a in attrs: 890 if a['attribute'] in configs: 891 try: 892 self.log.debug("Retrieving %s from %s" % \ 893 (a['attribute'], a['value'])) 894 get_url(a['value'], certfile, tmpdir) 895 except: 896 t, v, st = sys.exc_info() 897 raise service_error(service_error.internal, 898 "Error retrieving %s: %s" % (s, v)) 899 if a['attribute'] == 'ssh_pubkey': 900 pubkey_base = a['value'].rpartition('/')[2] 901 if a['attribute'] == 'ssh_secretkey': 902 secretkey_base = a['value'].rpartition('/')[2] 903 if a['attribute'] == 'experiment_name': 904 ename = a['value'] 905 906 if not ename: 907 ename = "" 908 for i in range(0,5): 909 ename += random.choice(string.ascii_letters) 910 self.log.warn("No experiment name: picked one randomly: %s" \ 911 % ename) 912 913 if not pubkey_base: 914 raise service_error(service_error.req, 915 "No public key attribute") 916 917 if not secretkey_base: 918 raise service_error(service_error.req, 919 "No secret key attribute") 920 921 # If the userconf service was imported, collect the configuration 922 # data. 1034 self.retrieve_software(topo, certfile, softdir) 1035 ename, proj, user, pubkey_base, secretkey_base, alloc_log = \ 1036 self.initialize_experiment_info(attrs, aid, 1037 certfile, tmpdir) 1038 1039 # Set up userconf and seer if needed 923 1040 self.configure_userconf(services) 924 1041 self.configure_seer_services(services, topo, softdir) 925 926 proj = None 927 user = None 928 self.state_lock.acquire() 929 if self.allocation.has_key(aid): 930 proj = self.allocation[aid].get('project', None) 931 if not proj: 932 proj = self.allocation[aid].get('sproject', None) 933 user = self.allocation[aid].get('user', None) 934 self.allocation[aid]['experiment'] = ename 935 self.allocation[aid]['log'] = [ ] 936 # Create a logger that logs to the experiment's state object as 937 # well as to the main log file. 938 alloc_log = logging.getLogger('fedd.access.%s' % ename) 939 h = logging.StreamHandler( 940 list_log.list_log(self.allocation[aid]['log'])) 941 # XXX: there should be a global one of these rather than 942 # repeating the code. 943 h.setFormatter(logging.Formatter( 944 "%(asctime)s %(name)s %(message)s", 945 '%d %b %y %H:%M:%S')) 946 alloc_log.addHandler(h) 947 self.write_state() 948 self.state_lock.release() 949 950 if not proj: 951 raise service_error(service_error.internal, 952 "Can't find project for %s" %aid) 953 954 if not user: 955 raise service_error(service_error.internal, 956 "Can't find creation user for %s" %aid) 957 1042 # Get and send synch store variables 958 1043 self.export_store_info(certfile, proj, ename, connInfo) 959 1044 self.import_store_info(certfile, connInfo) … … 962 1047 963 1048 self.generate_portal_configs(topo, pubkey_base, 964 secretkey_base, tmpdir, master, proj, ename, connInfo, 965 services) 1049 secretkey_base, tmpdir, proj, ename, connInfo, services) 966 1050 self.generate_ns2(topo, expfile, 967 1051 "/proj/%s/software/%s/" % (proj, ename), connInfo) … … 970 1054 debug=self.create_debug, log=alloc_log) 971 1055 rv = starter(self, ename, proj, user, expfile, tmpdir) 972 rvtopo = topo.clone()973 974 # Copy the assigned names into the return topology975 embedding = [ ]976 for n in starter.node:977 embedding.append({978 'toponame': n,979 'physname': ["%s%s" % (starter.node[n], self.domain)],980 })981 982 983 1056 except service_error, e: 984 1057 err = e 985 except e: 986 err = service_error(service_error.internal, str(e)) 1058 except: 1059 t, v, st = sys.exc_info() 1060 err = service_error(service_error.internal, "%s: %s" % \ 1061 (v, traceback.extract_tb(st))) 987 1062 988 1063 # Walk up tmpdir, deleting as we go 989 if self.cleanup: 990 self.log.debug("[StartSegment]: removing %s" % tmpdir) 991 for path, dirs, files in os.walk(tmpdir, topdown=False): 992 for f in files: 993 os.remove(os.path.join(path, f)) 994 for d in dirs: 995 os.rmdir(os.path.join(path, d)) 996 os.rmdir(tmpdir) 997 else: 998 self.log.debug("[StartSegment]: not removing %s" % tmpdir) 1064 if self.cleanup: self.remove_dirs(tmpdir) 1065 else: self.log.debug("[StartSegment]: not removing %s" % tmpdir) 999 1066 1000 1067 if rv: 1001 # Grab the log (this is some anal locking, but better safe than 1002 # sorry) 1003 self.state_lock.acquire() 1004 logv = "".join(self.allocation[aid]['log']) 1005 # It's possible that the StartSegment call gets retried (!). 1006 # if the 'started' key is in the allocation, we'll return it rather 1007 # than redo the setup. 1008 self.allocation[aid]['started'] = { 1009 'allocID': req['allocID'], 1010 'allocationLog': logv, 1011 'segmentdescription': { 1012 'topdldescription': rvtopo.to_dict() 1013 }, 1014 'embedding': embedding 1015 } 1016 retval = copy.copy(self.allocation[aid]['started']) 1017 self.write_state() 1018 self.state_lock.release() 1019 return retval 1068 return self.finalize_experiment(starter, topo, aid, req['allocID']) 1020 1069 elif err: 1021 1070 raise service_error(service_error.federant, … … 1037 1086 1038 1087 self.state_lock.acquire() 1039 if self.allocation.has_key(aid):1088 if aid in self.allocation: 1040 1089 proj = self.allocation[aid].get('project', None) 1041 1090 if not proj:
Note: See TracChangeset
for help on using the changeset viewer.