- Timestamp:
- May 28, 2010 6:41:56 AM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
- Children:
- a20a20f
- Parents:
- c200d36
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/dragon_access.py
rc200d36 r6abed7b 11 11 from threading import Thread, Lock 12 12 from subprocess import Popen, call, PIPE, STDOUT 13 from access import access_base 13 14 14 15 from util import * … … 37 38 fl.addHandler(nullHandler()) 38 39 39 class access :40 class access(access_base): 40 41 """ 41 42 The implementation of access control based on mapping users to projects. … … 44 45 dynamically. This implements both direct requests and proxies. 45 46 """ 46 47 class parse_error(RuntimeError): pass48 49 50 proxy_RequestAccess= service_caller('RequestAccess')51 proxy_ReleaseAccess= service_caller('ReleaseAccess')52 47 53 48 def __init__(self, config=None, auth=None): … … 55 50 Initializer. Pulls parameters out of the ConfigParser's access section. 56 51 """ 57 58 # Make sure that the configuration is in place 59 if not config: 60 raise RunTimeError("No config to dragon_access.access") 61 62 self.project_priority = config.getboolean("access", "project_priority") 63 self.allow_proxy = False 64 self.certdir = config.get("access","certdir") 65 self.create_debug = config.getboolean("access", "create_debug") 52 access_base.__init__(self, config, auth) 53 66 54 self.cli_dir = config.get("access", "cli_dir") 67 55 self.axis2_home = config.get("access", "axis2_home") … … 70 58 self.duration = config.getint("access", "duration", 120) 71 59 72 self.attrs = { }73 60 self.access = { } 74 # State is a dict of dicts indexed by segment fedid that includes the75 # owners of the segment as fedids (who can manipulate it, key: owners),76 # the repo dir/user for the allocation (key: user), Current allocation77 # log (key: log), and GRI of the reservation once made (key: gri)78 self.state = { }79 self.log = logging.getLogger("fedd.access")80 set_log_level(config, "access", self.log)81 self.state_lock = Lock()82 61 if not (self.cli_dir and self.axis2_home and self.idc_url): 83 62 self.log.error("Must specify all of cli_dir, axis2_home, " +\ 84 63 "idc in the [access] section of the configuration") 85 64 86 if auth: self.auth = auth87 else:88 self.log.error(\89 "[access]: No authorizer initialized, creating local one.")90 auth = authorizer()91 92 tb = config.get('access', 'testbed')93 if tb: self.testbed = [ t.strip() for t in tb.split(',') ]94 else: self.testbed = [ ]95 96 65 if config.has_option("access", "accessdb"): 97 self.read_access(config.get("access", "accessdb")) 98 99 self.state_filename = config.get("access", "access_state") 100 self.read_state() 101 102 # Keep cert_file and cert_pwd coming from the same place 103 self.cert_file = config.get("access", "cert_file") 104 if self.cert_file: 105 self.sert_pwd = config.get("access", "cert_pw") 106 else: 107 self.cert_file = config.get("globals", "cert_file") 108 self.sert_pwd = config.get("globals", "cert_pw") 109 110 self.trusted_certs = config.get("access", "trusted_certs") or \ 111 config.get("globals", "trusted_certs") 66 self.read_access(config.get("access", "accessdb"), self.make_repo) 67 68 # Add the ownership attributes to the authorizer. Note that the 69 # indices of the allocation dict are strings, but the attributes are 70 # fedids, so there is a conversion. 71 self.state_lock.acquire() 72 for k in self.state.keys(): 73 for o in self.state[k].get('owners', []): 74 self.auth.set_attribute(o, fedid(hexstr=k)) 75 self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k)) 76 self.state_lock.release() 77 78 self.lookup_access = self.lookup_access_base 112 79 113 80 self.call_GetValue= service_caller('GetValue') … … 131 98 } 132 99 133 def read_access(self, config): 134 """ 135 Read a configuration file and set internal parameters. 136 137 There are access lines of the 138 form (tb, proj, user) -> user that map the first tuple of 139 names to the user for for access purposes. Names in the key (left side) 140 can include "<NONE> or <ANY>" to act as wildcards or to require the 141 fields to be empty. Similarly aproj or auser can be <SAME> or 142 <DYNAMIC> indicating that either the matching key is to be used or a 143 dynamic user or project will be created. These names can also be 144 federated IDs (fedid's) if prefixed with fedid:. The user is the repo 145 directory that contains the DRAGON user credentials. 146 Testbed attributes outside the forms above can be given using the 147 format attribute: name value: value. The name is a single word and the 148 value continues to the end of the line. Empty lines and lines startin 149 with a # are ignored. 150 151 Parsing errors result in a self.parse_error exception being raised. 152 """ 153 lineno=0 154 name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+" 155 fedid_expr = "fedid:[" + string.hexdigits + "]+" 156 key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")" 157 158 attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)', 159 re.IGNORECASE) 160 access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+ 161 key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*\)', re.IGNORECASE) 162 163 def parse_name(n): 164 if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):]) 165 else: return n 166 167 def auth_name(n): 168 if isinstance(n, basestring): 169 if n =='<any>' or n =='<none>': return None 170 else: return unicode(n) 171 else: 172 return n 173 174 f = open(config, "r"); 175 for line in f: 176 lineno += 1 177 line = line.strip(); 178 if len(line) == 0 or line.startswith('#'): 179 continue 180 181 # Extended (attribute: x value: y) attribute line 182 m = attr_re.match(line) 183 if m != None: 184 attr, val = m.group(1,2) 185 self.attrs[attr] = val 186 continue 187 188 # Access line (t, p, u) -> (a) line 189 # XXX: you are here 190 m = access_re.match(line) 191 if m != None: 192 access_key = tuple([ parse_name(x) for x in m.group(1,2,3)]) 193 auth_key = tuple([ auth_name(x) for x in access_key]) 194 user_name = auth_name(parse_name(m.group(4))) 195 196 self.access[access_key] = user_name 197 self.auth.set_attribute(auth_key, "access") 198 continue 199 200 # Nothing matched to here: unknown line - raise exception 201 f.close() 202 raise self.parse_error("Unknown statement at line %d of %s" % \ 203 (lineno, config)) 204 f.close() 205 206 def get_users(self, obj): 207 """ 208 Return a list of the IDs of the users in dict 209 """ 210 if obj.has_key('user'): 211 return [ unpack_id(u['userID']) \ 212 for u in obj['user'] if u.has_key('userID') ] 213 else: 214 return None 215 216 def write_state(self): 217 if self.state_filename: 218 try: 219 f = open(self.state_filename, 'w') 220 pickle.dump(self.state, f) 221 except EnvironmentError, e: 222 self.log.error("Can't write file %s: %s" % \ 223 (self.state_filename, e)) 224 except pickle.PicklingError, e: 225 self.log.error("Pickling problem: %s" % e) 226 except TypeError, e: 227 self.log.error("Pickling problem (TypeError): %s" % e) 228 229 230 def read_state(self): 231 """ 232 Read a new copy of access state. Old state is overwritten. 233 234 State format is a simple pickling of the state dictionary. 235 """ 236 if self.state_filename: 237 try: 238 f = open(self.state_filename, "r") 239 self.state = pickle.load(f) 240 self.log.debug("[read_state]: Read state from %s" % \ 241 self.state_filename) 242 except EnvironmentError, e: 243 self.log.warning(("[read_state]: No saved state: " +\ 244 "Can't open %s: %s") % (self.state_filename, e)) 245 except EOFError, e: 246 self.log.warning(("[read_state]: " +\ 247 "Empty or damaged state file: %s:") % \ 248 self.state_filename) 249 except pickle.UnpicklingError, e: 250 self.log.warning(("[read_state]: No saved state: " + \ 251 "Unpickling failed: %s") % e) 252 253 # Add the ownership attributes to the authorizer. Note that the 254 # indices of the allocation dict are strings, but the attributes are 255 # fedids, so there is a conversion. 256 for k in self.state.keys(): 257 for o in self.state[k].get('owners', []): 258 self.auth.set_attribute(o, fedid(hexstr=k)) 259 self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k)) 260 261 262 def permute_wildcards(self, a, p): 263 """Return a copy of a with various fields wildcarded. 264 265 The bits of p control the wildcards. A set bit is a wildcard 266 replacement with the lowest bit being user then project then testbed. 267 """ 268 if p & 1: user = ["<any>"] 269 else: user = a[2] 270 if p & 2: proj = "<any>" 271 else: proj = a[1] 272 if p & 4: tb = "<any>" 273 else: tb = a[0] 274 275 return (tb, proj, user) 276 277 def find_access(self, search): 278 """ 279 Search the access DB for a match on this tuple. Return the matching 280 user (repo dir). 281 282 NB, if the initial tuple fails to match we start inserting wildcards in 283 an order determined by self.project_priority. Try the list of users in 284 order (when wildcarded, there's only one user in the list). 285 """ 286 if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7) 287 else: perm = (0, 2, 1, 3, 4, 6, 5, 7) 288 289 for p in perm: 290 s = self.permute_wildcards(search, p) 291 # s[2] is None on an anonymous, unwildcarded request 292 if s[2] != None: 293 for u in s[2]: 294 if self.access.has_key((s[0], s[1], u)): 295 return self.access[(s[0], s[1], u)] 296 else: 297 if self.access.has_key(s): 298 return self.access[s] 299 return None 300 301 def lookup_access(self, req, fid): 302 """ 303 Determine the allowed access for this request. Return the access and 304 which fields are dynamic. 305 306 The fedid is needed to construct the request 307 """ 308 # Search keys 309 tb = None 310 project = None 311 user = None 312 # Return values 313 rp = access_project(None, ()) 314 ru = None 315 user_re = re.compile("user:\s(.*)") 316 project_re = re.compile("project:\s(.*)") 317 318 user = [ user_re.findall(x)[0] for x in req.get('credential', []) \ 319 if user_re.match(x)] 320 project = [ project_re.findall(x)[0] \ 321 for x in req.get('credential', []) \ 322 if project_re.match(x)] 323 324 if len(project) == 1: project = project[0] 325 elif len(project) == 0: project = None 326 else: 327 raise service_error(service_error.req, 328 "More than one project credential") 329 330 331 user_fedids = [ u for u in user if isinstance(u, fedid)] 332 333 # Determine how the caller is representing itself. If its fedid shows 334 # up as a project or a singleton user, let that stand. If neither the 335 # usernames nor the project name is a fedid, the caller is a testbed. 336 if project and isinstance(project, fedid): 337 if project == fid: 338 # The caller is the project (which is already in the tuple 339 # passed in to the authorizer) 340 owners = user_fedids 341 owners.append(project) 342 else: 343 raise service_error(service_error.req, 344 "Project asserting different fedid") 345 else: 346 if fid not in user_fedids: 347 tb = fid 348 owners = user_fedids 349 owners.append(fid) 350 else: 351 if len(fedids) > 1: 352 raise service_error(service_error.req, 353 "User asserting different fedid") 354 else: 355 # Which is a singleton 356 owners = user_fedids 357 # Confirm authorization 358 359 for u in user: 360 self.log.debug("[lookup_access] Checking access for %s" % \ 361 ((tb, project, u),)) 362 if self.auth.check_attribute((tb, project, u), 'access'): 363 self.log.debug("[lookup_access] Access granted") 364 break 365 else: 366 self.log.debug("[lookup_access] Access Denied") 367 else: 368 raise service_error(service_error.access, "Access denied") 369 370 # This maps a valid user to the Emulab projects and users to use 371 found = self.find_access((tb, project, user)) 372 373 if found == None: 374 raise service_error(service_error.access, 375 "Access denied - cannot map access") 376 return found, owners 100 @staticmethod 101 def make_repo(s): 102 """ 103 Get the repo directory from an access line. This is removing the () 104 from the string. 105 """ 106 rv = s.strip() 107 if rv.startswith('(') and rv.endswith(')'): return rv[1:-1] 108 else: raise self.parse_error("Repo should be in parens"); 377 109 378 110 def RequestAccess(self, req, fid): 379 111 """ 380 Handle the access request. Proxy if not for us.112 Handle the access request. 381 113 382 114 Parse out the fields and make the allocations or rejections if for us, … … 393 125 dt = unpack_id(req['destinationTestbed']) 394 126 395 if dt == None or dt in self.testbed: 396 # Request for this fedd 397 found, owners = self.lookup_access(req, fid) 398 # keep track of what's been added 399 allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) 400 aid = unicode(allocID) 401 402 self.state_lock.acquire() 403 self.state[aid] = { } 404 self.state[aid]['user'] = found 405 self.state[aid]['owners'] = owners 406 self.write_state() 407 self.state_lock.release() 408 for o in owners: 409 self.auth.set_attribute(o, allocID) 410 self.auth.set_attribute(allocID, allocID) 411 412 try: 413 f = open("%s/%s.pem" % (self.certdir, aid), "w") 414 print >>f, alloc_cert 415 f.close() 416 except EnvironmentError, e: 417 raise service_error(service_error.internal, 418 "Can't open %s/%s : %s" % (self.certdir, aid, e)) 419 return { 'allocID': { 'fedid': allocID } } 420 else: 421 if self.allow_proxy: 422 resp = self.proxy_RequestAccess.call_service(dt, req, 423 self.cert_file, self.cert_pwd, 424 self.trusted_certs) 425 if resp.has_key('RequestAccessResponseBody'): 426 return resp['RequestAccessResponseBody'] 427 else: 428 return None 429 else: 430 raise service_error(service_error.access, 431 "Access proxying denied") 127 # Request for this fedd 128 found, match = self.lookup_access(req, fid) 129 # keep track of what's been added 130 allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) 131 aid = unicode(allocID) 132 133 self.state_lock.acquire() 134 self.state[aid] = { } 135 self.state[aid]['user'] = found 136 self.state[aid]['owners'] = [ fid ] 137 self.write_state() 138 self.state_lock.release() 139 self.auth.set_attribute(fid, allocID) 140 self.auth.set_attribute(allocID, allocID) 141 142 try: 143 f = open("%s/%s.pem" % (self.certdir, aid), "w") 144 print >>f, alloc_cert 145 f.close() 146 except EnvironmentError, e: 147 raise service_error(service_error.internal, 148 "Can't open %s/%s : %s" % (self.certdir, aid, e)) 149 return { 'allocID': { 'fedid': allocID } } 432 150 433 151 def ReleaseAccess(self, req, fid): … … 438 156 raise service_error(service_error.req, "No request!?") 439 157 440 if req.has_key('destinationTestbed'): 441 dt = unpack_id(req['destinationTestbed']) 442 else: 443 dt = None 444 445 if dt == None or dt in self.testbed: 446 # Local request 447 try: 448 if req['allocID'].has_key('localname'): 449 auth_attr = aid = req['allocID']['localname'] 450 elif req['allocID'].has_key('fedid'): 451 aid = unicode(req['allocID']['fedid']) 452 auth_attr = req['allocID']['fedid'] 453 else: 454 raise service_error(service_error.req, 455 "Only localnames and fedids are understood") 456 except KeyError: 457 raise service_error(service_error.req, "Badly formed request") 458 459 self.log.debug("[access] deallocation requested for %s", aid) 460 if not self.auth.check_attribute(fid, auth_attr): 461 self.log.debug("[access] deallocation denied for %s", aid) 462 raise service_error(service_error.access, "Access Denied") 463 464 self.state_lock.acquire() 465 if self.state.has_key(aid): 466 self.log.debug("Found allocation for %s" %aid) 467 del self.state[aid] 468 self.write_state() 469 self.state_lock.release() 470 # And remove the access cert 471 cf = "%s/%s.pem" % (self.certdir, aid) 472 self.log.debug("Removing %s" % cf) 473 os.remove(cf) 474 return { 'allocID': req['allocID'] } 475 else: 476 self.state_lock.release() 477 raise service_error(service_error.req, "No such allocation") 478 479 else: 480 if self.allow_proxy: 481 resp = self.proxy_ReleaseAccess.call_service(dt, req, 482 self.cert_file, self.cert_pwd, 483 self.trusted_certs) 484 if resp.has_key('ReleaseAccessResponseBody'): 485 return resp['ReleaseAccessResponseBody'] 486 else: 487 return None 488 else: 489 raise service_error(service_error.access, 490 "Access proxying denied") 158 try: 159 if req['allocID'].has_key('localname'): 160 auth_attr = aid = req['allocID']['localname'] 161 elif req['allocID'].has_key('fedid'): 162 aid = unicode(req['allocID']['fedid']) 163 auth_attr = req['allocID']['fedid'] 164 else: 165 raise service_error(service_error.req, 166 "Only localnames and fedids are understood") 167 except KeyError: 168 raise service_error(service_error.req, "Badly formed request") 169 170 self.log.debug("[access] deallocation requested for %s", aid) 171 if not self.auth.check_attribute(fid, auth_attr): 172 self.log.debug("[access] deallocation denied for %s", aid) 173 raise service_error(service_error.access, "Access Denied") 174 175 self.state_lock.acquire() 176 if self.state.has_key(aid): 177 self.log.debug("Found allocation for %s" %aid) 178 del self.state[aid] 179 self.write_state() 180 self.state_lock.release() 181 # And remove the access cert 182 cf = "%s/%s.pem" % (self.certdir, aid) 183 self.log.debug("Removing %s" % cf) 184 os.remove(cf) 185 return { 'allocID': req['allocID'] } 186 else: 187 self.state_lock.release() 188 raise service_error(service_error.req, "No such allocation") 491 189 492 190 def extract_parameters(self, top): … … 566 264 return cap, ends[0], ends[1], vlans 567 265 568 569 def start_segment(self, repo, fr, to, cap, vpns=[], start=None, end=None, 570 log=None): 571 """ 572 Do the actual work of creating the dragon connecton. 573 """ 574 if not log: log = self.log 266 def oscars_create_vpn(self, repo, fr, to, cap, v, start, end, log): 267 575 268 gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE) 269 status_re = re.compile("Status:\s*(.*)", re.IGNORECASE) 270 271 cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 272 'createReservation', '-repo', repo , '-url', self.idc_url, 273 '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap, 274 '-vlan', "%s" % v, '-desc', 'fedd created connection', 275 '-pathsetup', 'timer-automatic', '-start', "%d" % int(start), 276 '-end', "%d" % int(end)] 277 log.debug("[start_segment]: %s" % " ".join(cmd)) 278 if not self.create_debug: 279 p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT, 280 close_fds=True) 281 for line in p.stdout: 282 m = status_re.match(line) 283 if m: 284 status = m.group(1) 285 continue 286 m = gri_re.match(line) 287 if m: 288 gri = m.group(1) 289 continue 290 rv = p.wait() 291 else: 292 rv = 0 293 status = 'ACCEPTED' 294 gri = 'debug_gri' 295 296 return (rv, status, gri) 297 298 def oscars_query_vpn(self, repo, gri, v, log): 299 """ 300 Call the oscars query command from the command line and parse out the 301 data to see if the current request succeeded. This is a lot of fiddly 302 code to do a pretty simple thing. 303 """ 576 304 status_re = re.compile("Status:\s*(.*)", re.IGNORECASE) 577 305 source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE) 578 306 dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE) 579 307 path_re = re.compile("Path:") 308 309 cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 310 'query', '-repo', repo , '-url', self.idc_url, '-gri', gri] 311 log.debug("[start_segment]: %s" % " ".join(cmd)) 312 if not self.create_debug: 313 # Really do the query 314 p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT, 315 close_fds=True) 316 in_path = False 317 vpn1 = None 318 vpn2 = None 319 src = None 320 dest = None 321 for line in p.stdout: 322 if not in_path: 323 m = status_re.match(line) 324 if m: 325 status = m.group(1) 326 continue 327 m = source_re.match(line) 328 if m: 329 src = m.group(1) 330 continue 331 m = dest_re.match(line) 332 if m: 333 dest = m.group(1) 334 continue 335 m = path_re.match(line) 336 if m: 337 in_path = True 338 if src and dest: 339 vpn1_re = re.compile( 340 "\s*%s,\s*\w+\s*,\s*(\d+)" % \ 341 src.replace("*", "\*")) 342 vpn2_re = re.compile( 343 "\s*%s,\s*\w+\s*,\s*(\d+)" % \ 344 dest.replace("*", "\*")) 345 else: 346 raise service_error(service_error.internal, 347 "Strange output from query") 348 else: 349 m = vpn1_re.match(line) 350 if m: 351 vpn1 = m.group(1) 352 continue 353 m = vpn2_re.match(line) 354 if m: 355 vpn2 = m.group(1) 356 continue 357 rv = p.wait() 358 # Make sure that OSCARS did what we expected. 359 if vpn1 == vpn2: 360 if v is not None: 361 if int(vpn1) == v: 362 vlan_no = int(v) 363 else: 364 raise service_error(service_error.federant, 365 "Unexpected vlan assignment") 366 else: 367 vlan_no = int(v or 0) 368 else: 369 raise service_error(service_error.internal, 370 "Different VPNs on DRAGON ends") 371 log.debug("Status: %s" % status or "none") 372 else: 373 rv = 0 374 status = 'ACTIVE' 375 vlan_no = int(v or 1) 376 377 return (rv, status, vlan_no) 378 379 380 381 def start_segment(self, repo, fr, to, cap, vpns=None, start=None, end=None, 382 log=None): 383 """ 384 Do the actual work of creating the dragon connecton. 385 """ 386 waiting_states = ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING') 387 if not log: log = self.log 580 388 581 389 if not vpns: … … 593 401 vlan_no = None 594 402 for v in vpns: 595 cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 596 'createReservation', '-repo', repo , '-url', self.idc_url, 597 '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap, 598 '-vlan', "%s" % v, '-desc', 'fedd created connection', 599 '-pathsetup', 'timer-automatic', '-start', "%d" % int(start), 600 '-end', "%d" % int(end)] 601 log.debug("[start_segment]: %s" % " ".join(cmd)) 602 if not self.create_debug: 603 p = Popen(cmd, cwd=self.cli_dir, 604 stdout=PIPE, stderr=STDOUT, 605 close_fds=True) 606 for line in p.stdout: 607 m = status_re.match(line) 608 if m: 609 status = m.group(1) 610 continue 611 m = gri_re.match(line) 612 if m: 613 gri = m.group(1) 614 rv = p.wait() 615 else: 616 rv = 0 617 status = 'ACCEPTED' 618 gri = 'debug_gri' 619 403 rv, status, gri = self.oscars_create_vpn(repo, fr, to, cap, v, 404 start, end, log) 620 405 # Reservation in progress. Poll the IDC until we know the outcome 621 cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 622 'query', '-repo', repo , '-url', self.idc_url, '-gri', gri] 623 624 while status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'): 625 log.debug("[start_segment]: %s" % " ".join(cmd)) 626 if not self.create_debug: 627 p = Popen(cmd, cwd=self.cli_dir, 628 stdout=PIPE, stderr=STDOUT, 629 close_fds=True) 630 in_path = False 631 vpn1 = None 632 vpn2 = None 633 src = None 634 dest = None 635 for line in p.stdout: 636 if not in_path: 637 m = status_re.match(line) 638 if m: 639 status = m.group(1) 640 continue 641 m = source_re.match(line) 642 if m: 643 src = m.group(1) 644 continue 645 m = dest_re.match(line) 646 if m: 647 dest = m.group(1) 648 continue 649 m = path_re.match(line) 650 if m: 651 in_path = True 652 if src and dest: 653 vpn1_re = re.compile( 654 "\s*%s,\s*\w+\s*,\s*(\d+)" % \ 655 src.replace("*", "\*")) 656 vpn2_re = re.compile( 657 "\s*%s,\s*\w+\s*,\s*(\d+)" % \ 658 dest.replace("*", "\*")) 659 else: 660 raise service_error(service_error.internal, 661 "Strange output from query") 662 else: 663 m = vpn1_re.match(line) 664 if m: 665 vpn1 = m.group(1) 666 continue 667 m = vpn2_re.match(line) 668 if m: 669 vpn2 = m.group(1) 670 continue 671 rv = p.wait() 672 if vpn1 == vpn2: 673 if v is not None: 674 if int(vpn1) == v: 675 vlan_no = int(v) 676 else: 677 raise service_error(service_error.federant, 678 "Unexpected vlan assignment") 679 else: 680 vlan_no = int(v) 681 else: 682 raise service_error(service_error.internal, 683 "Different VPNs on DRAGON ends") 684 log.debug("Status: %s" % status or "none") 685 else: 686 status = 'ACTIVE' 687 vlan_no = int(v) or 1 688 if status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'): 406 while status in waiting_states: 407 rv, status, vlan_no = self.oscars_query_vpn(repo, gri, v, log) 408 if status in waiting_states: 689 409 time.sleep(45) 690 410 if status in ('ACTIVE', 'FINISHED', 'CANCELLED'): 691 411 break 692 412 693 self.log.debug("made reservation %s %s" % (gri, vlan_no))694 695 413 if (rv == 0 and gri and vlan_no and status == 'ACTIVE'): 414 self.log.debug("made reservation %s %s" % (gri, vlan_no)) 696 415 return gri, vlan_no 697 416 else: … … 700 419 701 420 def stop_segment(self, repo, gri, log=None): 421 """ 422 Terminate the reservation. 423 """ 702 424 if not log: log = self.log 703 425 cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', … … 740 462 self.log.error("Bad export request: %s" % p) 741 463 742 def import_store_info(self, cf, connInfo): 743 """ 744 Pull any import parameters in connInfo in. We translate them either 745 into known member names or fedAddrs. 746 """ 747 748 for c in connInfo: 749 for p in [ p for p in c.get('parameter', []) \ 750 if p.get('type', '') == 'input']: 751 name = p.get('name', None) 752 key = p.get('key', None) 753 store = p.get('store', None) 754 755 if name and key and store : 756 req = { 'name': key, 'wait': True } 757 r = self.call_GetValue(store, req, cf) 758 r = r.get('GetValueResponseBody', None) 759 if r : 760 if r.get('name', '') == key: 761 v = r.get('value', None) 762 if v is not None: 763 if name == 'peer': 764 c['peer'] = v 765 else: 766 if c.has_key('fedAttr'): 767 c['fedAttr'].append({ 768 'attribute': name, 'value': value}) 769 else: 770 c['fedAttr']= [{ 771 'attribute': name, 'value': value}] 772 else: 773 raise service_error(service_error.internal, 774 'None value exported for %s' % key) 775 else: 776 raise service_error(service_error.internal, 777 'Different name returned for %s: %s' \ 778 % (key, r.get('name',''))) 779 else: 780 raise service_error(service_error.internal, 781 'Badly formatted response: no GetValueResponseBody') 782 else: 783 raise service_error(service_error.internal, 784 'Bad Services missing info for import %s' % c) 785 464 def initialize_experiment_info(self, aid, ename): 465 repo = None 466 self.state_lock.acquire() 467 if aid in self.state: 468 repo = self.state[aid].get('user', None) 469 self.state[aid]['log'] = [ ] 470 # Create a logger that logs to the experiment's state object as 471 # well as to the main log file. 472 alloc_log = logging.getLogger('fedd.access.%s' % ename) 473 h = logging.StreamHandler( 474 list_log.list_log(self.state[aid]['log'])) 475 # XXX: there should be a global one of these rather than 476 # repeating the code. 477 h.setFormatter(logging.Formatter( 478 "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) 479 alloc_log.addHandler(h) 480 self.write_state() 481 self.state_lock.release() 482 return (repo, alloc_log) 483 484 def finalize_experiment(self, topo, vlan_no, gri, aid, alloc_id): 485 """ 486 Place the relevant information in the global state block, and prepare 487 the response. 488 """ 489 rtopo = topo.clone() 490 for s in rtopo.substrates: 491 s.set_attribute('vlan', vlan_no) 492 s.set_attribute('gri', gri) 493 494 # Grab the log (this is some anal locking, but better safe than 495 # sorry) 496 self.state_lock.acquire() 497 self.state[aid]['gri'] = gri 498 logv = "".join(self.state[aid]['log']) 499 # It's possible that the StartSegment call gets retried (!). 500 # if the 'started' key is in the allocation, we'll return it rather 501 # than redo the setup. 502 self.state[aid]['started'] = { 503 'allocID': alloc_id, 504 'allocationLog': logv, 505 'segmentdescription': { 506 'topdldescription': rtopo.to_dict() 507 }, 508 } 509 retval = copy.deepcopy(self.state[aid]['started']) 510 self.write_state() 511 self.state_lock.release() 512 513 return retval 786 514 787 515 def StartSegment(self, req, fid): … … 791 519 try: 792 520 req = req['StartSegmentRequestBody'] 521 topref = req['segmentdescription']['topdldescription'] 793 522 except KeyError: 794 523 raise service_error(server_error.req, "Badly formed request") … … 813 542 certfile = "%s/%s.pem" % (self.certdir, aid) 814 543 815 if req.has_key('segmentdescription') and \ 816 req['segmentdescription'].has_key('topdldescription'): 817 topo = \ 818 topdl.Topology(**req['segmentdescription']['topdldescription']) 544 if topref: 545 topo = topdl.Topology(**topref) 819 546 else: 820 547 raise service_error(service_error.req, … … 824 551 825 552 cap, src, dest, vlans = self.extract_parameters(topo) 826 ename = aid827 553 828 554 for a in attrs: 829 555 if a['attribute'] == 'experiment_name': 830 556 ename = a['value'] 831 832 repo = None 833 self.state_lock.acquire() 834 if self.state.has_key(aid): 835 repo = self.state[aid]['user'] 836 self.state[aid]['log'] = [ ] 837 # Create a logger that logs to the experiment's state object as 838 # well as to the main log file. 839 alloc_log = logging.getLogger('fedd.access.%s' % ename) 840 h = logging.StreamHandler( 841 list_log.list_log(self.state[aid]['log'])) 842 # XXX: there should be a global one of these rather than 843 # repeating the code. 844 h.setFormatter(logging.Formatter( 845 "%(asctime)s %(name)s %(message)s", 846 '%d %b %y %H:%M:%S')) 847 alloc_log.addHandler(h) 848 self.write_state() 849 self.state_lock.release() 557 break 558 else: ename = aid 559 560 repo, alloc_log = self.initialize_experiment_info(aid, ename) 850 561 851 562 if not repo: 852 563 raise service_error(service_error.internal, 853 "Can't find creation user for %s" % aid)564 "Can't find creation user for %s" % aid) 854 565 855 566 gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans, … … 860 571 861 572 if gri: 862 rtopo = topo.clone() 863 for s in rtopo.substrates: 864 s.set_attribute('vlan', vlan_no) 865 s.set_attribute('gri', gri) 866 867 # Grab the log (this is some anal locking, but better safe than 868 # sorry) 869 self.state_lock.acquire() 870 self.state[aid]['gri'] = gri 871 logv = "".join(self.state[aid]['log']) 872 # It's possible that the StartSegment call gets retried (!). 873 # if the 'started' key is in the allocation, we'll return it rather 874 # than redo the setup. 875 self.state[aid]['started'] = { 876 'allocID': req['allocID'], 877 'allocationLog': logv, 878 'segmentdescription': { 879 'topdldescription': rtopo.to_dict() 880 }, 881 } 882 retval = self.state[aid]['started'] 883 self.write_state() 884 self.state_lock.release() 885 886 return retval 573 return self.finalize_experiment(topo, vlan_no, gri, aid, 574 req['allocID']) 887 575 elif err: 888 576 raise service_error(service_error.federant,
Note: See TracChangeset
for help on using the changeset viewer.