Changeset 6abed7b


Ignore:
Timestamp:
May 28, 2010 6:41:56 AM (14 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
Children:
a20a20f
Parents:
c200d36
Message:

derive from access_base and refactor a little.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/dragon_access.py

    rc200d36 r6abed7b  
    1111from threading import Thread, Lock
    1212from subprocess import Popen, call, PIPE, STDOUT
     13from access import access_base
    1314
    1415from util import *
     
    3738fl.addHandler(nullHandler())
    3839
    39 class access:
     40class access(access_base):
    4041    """
    4142    The implementation of access control based on mapping users to projects.
     
    4445    dynamically.  This implements both direct requests and proxies.
    4546    """
    46 
    47     class parse_error(RuntimeError): pass
    48 
    49 
    50     proxy_RequestAccess= service_caller('RequestAccess')
    51     proxy_ReleaseAccess= service_caller('ReleaseAccess')
    5247
    5348    def __init__(self, config=None, auth=None):
     
    5550        Initializer.  Pulls parameters out of the ConfigParser's access section.
    5651        """
    57 
    58         # Make sure that the configuration is in place
    59         if not config:
    60             raise RunTimeError("No config to dragon_access.access")
    61 
    62         self.project_priority = config.getboolean("access", "project_priority")
    63         self.allow_proxy = False
    64         self.certdir = config.get("access","certdir")
    65         self.create_debug = config.getboolean("access", "create_debug")
     52        access_base.__init__(self, config, auth)
     53
    6654        self.cli_dir = config.get("access", "cli_dir")
    6755        self.axis2_home = config.get("access", "axis2_home")
     
    7058        self.duration = config.getint("access", "duration", 120)
    7159
    72         self.attrs = { }
    7360        self.access = { }
    74         # State is a dict of dicts indexed by segment fedid that includes the
    75         # owners of the segment as fedids (who can manipulate it, key: owners),
    76         # the repo dir/user for the allocation (key: user),  Current allocation
    77         # log (key: log), and GRI of the reservation once made (key: gri)
    78         self.state = { }
    79         self.log = logging.getLogger("fedd.access")
    80         set_log_level(config, "access", self.log)
    81         self.state_lock = Lock()
    8261        if not (self.cli_dir and self.axis2_home and self.idc_url):
    8362            self.log.error("Must specify all of cli_dir, axis2_home, " +\
    8463                    "idc in the [access] section of the configuration")
    8564
    86         if auth: self.auth = auth
    87         else:
    88             self.log.error(\
    89                     "[access]: No authorizer initialized, creating local one.")
    90             auth = authorizer()
    91 
    92         tb = config.get('access', 'testbed')
    93         if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
    94         else: self.testbed = [ ]
    95 
    9665        if config.has_option("access", "accessdb"):
    97             self.read_access(config.get("access", "accessdb"))
    98 
    99         self.state_filename = config.get("access", "access_state")
    100         self.read_state()
    101 
    102         # Keep cert_file and cert_pwd coming from the same place
    103         self.cert_file = config.get("access", "cert_file")
    104         if self.cert_file:
    105             self.sert_pwd = config.get("access", "cert_pw")
    106         else:
    107             self.cert_file = config.get("globals", "cert_file")
    108             self.sert_pwd = config.get("globals", "cert_pw")
    109 
    110         self.trusted_certs = config.get("access", "trusted_certs") or \
    111                 config.get("globals", "trusted_certs")
     66            self.read_access(config.get("access", "accessdb"), self.make_repo)
     67
     68        # Add the ownership attributes to the authorizer.  Note that the
     69        # indices of the allocation dict are strings, but the attributes are
     70        # fedids, so there is a conversion.
     71        self.state_lock.acquire()
     72        for k in self.state.keys():
     73            for o in self.state[k].get('owners', []):
     74                self.auth.set_attribute(o, fedid(hexstr=k))
     75            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
     76        self.state_lock.release()
     77
     78        self.lookup_access = self.lookup_access_base
    11279
    11380        self.call_GetValue= service_caller('GetValue')
     
    13198            }
    13299
    133     def read_access(self, config):
    134         """
    135         Read a configuration file and set internal parameters.
    136 
    137         There are access lines of the
    138         form (tb, proj, user) -> user that map the first tuple of
    139         names to the user for for access purposes.  Names in the key (left side)
    140         can include "<NONE> or <ANY>" to act as wildcards or to require the
    141         fields to be empty.  Similarly aproj or auser can be <SAME> or
    142         <DYNAMIC> indicating that either the matching key is to be used or a
    143         dynamic user or project will be created.  These names can also be
    144         federated IDs (fedid's) if prefixed with fedid:.  The user is the repo
    145         directory that contains the DRAGON user credentials.
    146         Testbed attributes outside the forms above can be given using the
    147         format attribute: name value: value.  The name is a single word and the
    148         value continues to the end of the line.  Empty lines and lines startin
    149         with a # are ignored.
    150 
    151         Parsing errors result in a self.parse_error exception being raised.
    152         """
    153         lineno=0
    154         name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+"
    155         fedid_expr = "fedid:[" + string.hexdigits + "]+"
    156         key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
    157 
    158         attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
    159                 re.IGNORECASE)
    160         access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+
    161                 key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*\)', re.IGNORECASE)
    162 
    163         def parse_name(n):
    164             if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
    165             else: return n
    166        
    167         def auth_name(n):
    168             if isinstance(n, basestring):
    169                 if n =='<any>' or n =='<none>': return None
    170                 else: return unicode(n)
    171             else:
    172                 return n
    173 
    174         f = open(config, "r");
    175         for line in f:
    176             lineno += 1
    177             line = line.strip();
    178             if len(line) == 0 or line.startswith('#'):
    179                 continue
    180 
    181             # Extended (attribute: x value: y) attribute line
    182             m = attr_re.match(line)
    183             if m != None:
    184                 attr, val = m.group(1,2)
    185                 self.attrs[attr] = val
    186                 continue
    187 
    188             # Access line (t, p, u) -> (a) line
    189             # XXX: you are here
    190             m = access_re.match(line)
    191             if m != None:
    192                 access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
    193                 auth_key = tuple([ auth_name(x) for x in access_key])
    194                 user_name = auth_name(parse_name(m.group(4)))
    195 
    196                 self.access[access_key] = user_name
    197                 self.auth.set_attribute(auth_key, "access")
    198                 continue
    199 
    200             # Nothing matched to here: unknown line - raise exception
    201             f.close()
    202             raise self.parse_error("Unknown statement at line %d of %s" % \
    203                     (lineno, config))
    204         f.close()
    205 
    206     def get_users(self, obj):
    207         """
    208         Return a list of the IDs of the users in dict
    209         """
    210         if obj.has_key('user'):
    211             return [ unpack_id(u['userID']) \
    212                     for u in obj['user'] if u.has_key('userID') ]
    213         else:
    214             return None
    215 
    216     def write_state(self):
    217         if self.state_filename:
    218             try:
    219                 f = open(self.state_filename, 'w')
    220                 pickle.dump(self.state, f)
    221             except EnvironmentError, e:
    222                 self.log.error("Can't write file %s: %s" % \
    223                         (self.state_filename, e))
    224             except pickle.PicklingError, e:
    225                 self.log.error("Pickling problem: %s" % e)
    226             except TypeError, e:
    227                 self.log.error("Pickling problem (TypeError): %s" % e)
    228 
    229 
    230     def read_state(self):
    231         """
    232         Read a new copy of access state.  Old state is overwritten.
    233 
    234         State format is a simple pickling of the state dictionary.
    235         """
    236         if self.state_filename:
    237             try:
    238                 f = open(self.state_filename, "r")
    239                 self.state = pickle.load(f)
    240                 self.log.debug("[read_state]: Read state from %s" % \
    241                         self.state_filename)
    242             except EnvironmentError, e:
    243                 self.log.warning(("[read_state]: No saved state: " +\
    244                         "Can't open %s: %s") % (self.state_filename, e))
    245             except EOFError, e:
    246                 self.log.warning(("[read_state]: " +\
    247                         "Empty or damaged state file: %s:") % \
    248                         self.state_filename)
    249             except pickle.UnpicklingError, e:
    250                 self.log.warning(("[read_state]: No saved state: " + \
    251                         "Unpickling failed: %s") % e)
    252 
    253             # Add the ownership attributes to the authorizer.  Note that the
    254             # indices of the allocation dict are strings, but the attributes are
    255             # fedids, so there is a conversion.
    256             for k in self.state.keys():
    257                 for o in self.state[k].get('owners', []):
    258                     self.auth.set_attribute(o, fedid(hexstr=k))
    259                 self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
    260 
    261 
    262     def permute_wildcards(self, a, p):
    263         """Return a copy of a with various fields wildcarded.
    264 
    265         The bits of p control the wildcards.  A set bit is a wildcard
    266         replacement with the lowest bit being user then project then testbed.
    267         """
    268         if p & 1: user = ["<any>"]
    269         else: user = a[2]
    270         if p & 2: proj = "<any>"
    271         else: proj = a[1]
    272         if p & 4: tb = "<any>"
    273         else: tb = a[0]
    274 
    275         return (tb, proj, user)
    276 
    277     def find_access(self, search):
    278         """
    279         Search the access DB for a match on this tuple.  Return the matching
    280         user (repo dir).
    281        
    282         NB, if the initial tuple fails to match we start inserting wildcards in
    283         an order determined by self.project_priority.  Try the list of users in
    284         order (when wildcarded, there's only one user in the list).
    285         """
    286         if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
    287         else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
    288 
    289         for p in perm:
    290             s = self.permute_wildcards(search, p)
    291             # s[2] is None on an anonymous, unwildcarded request
    292             if s[2] != None:
    293                 for u in s[2]:
    294                     if self.access.has_key((s[0], s[1], u)):
    295                         return self.access[(s[0], s[1], u)]
    296             else:
    297                 if self.access.has_key(s):
    298                     return self.access[s]
    299         return None
    300 
    301     def lookup_access(self, req, fid):
    302         """
    303         Determine the allowed access for this request.  Return the access and
    304         which fields are dynamic.
    305 
    306         The fedid is needed to construct the request
    307         """
    308         # Search keys
    309         tb = None
    310         project = None
    311         user = None
    312         # Return values
    313         rp = access_project(None, ())
    314         ru = None
    315         user_re = re.compile("user:\s(.*)")
    316         project_re = re.compile("project:\s(.*)")
    317 
    318         user = [ user_re.findall(x)[0] for x in req.get('credential', []) \
    319                 if user_re.match(x)]
    320         project = [ project_re.findall(x)[0] \
    321                 for x in req.get('credential', []) \
    322                     if project_re.match(x)]
    323 
    324         if len(project) == 1: project = project[0]
    325         elif len(project) == 0: project = None
    326         else:
    327             raise service_error(service_error.req,
    328                     "More than one project credential")
    329 
    330 
    331         user_fedids = [ u for u in user if isinstance(u, fedid)]
    332 
    333         # Determine how the caller is representing itself.  If its fedid shows
    334         # up as a project or a singleton user, let that stand.  If neither the
    335         # usernames nor the project name is a fedid, the caller is a testbed.
    336         if project and isinstance(project, fedid):
    337             if project == fid:
    338                 # The caller is the project (which is already in the tuple
    339                 # passed in to the authorizer)
    340                 owners = user_fedids
    341                 owners.append(project)
    342             else:
    343                 raise service_error(service_error.req,
    344                         "Project asserting different fedid")
    345         else:
    346             if fid not in user_fedids:
    347                 tb = fid
    348                 owners = user_fedids
    349                 owners.append(fid)
    350             else:
    351                 if len(fedids) > 1:
    352                     raise service_error(service_error.req,
    353                             "User asserting different fedid")
    354                 else:
    355                     # Which is a singleton
    356                     owners = user_fedids
    357         # Confirm authorization
    358 
    359         for u in user:
    360             self.log.debug("[lookup_access] Checking access for %s" % \
    361                     ((tb, project, u),))
    362             if self.auth.check_attribute((tb, project, u), 'access'):
    363                 self.log.debug("[lookup_access] Access granted")
    364                 break
    365             else:
    366                 self.log.debug("[lookup_access] Access Denied")
    367         else:
    368             raise service_error(service_error.access, "Access denied")
    369 
    370         # This maps a valid user to the Emulab projects and users to use
    371         found = self.find_access((tb, project, user))
    372        
    373         if found == None:
    374             raise service_error(service_error.access,
    375                     "Access denied - cannot map access")
    376         return found, owners
     100    @staticmethod
     101    def make_repo(s):
     102        """
     103        Get the repo directory from an access line.  This is removing the ()
     104        from the string.
     105        """
     106        rv = s.strip()
     107        if rv.startswith('(') and rv.endswith(')'): return rv[1:-1]
     108        else: raise self.parse_error("Repo should be in parens");
    377109
    378110    def RequestAccess(self, req, fid):
    379111        """
    380         Handle the access request.  Proxy if not for us.
     112        Handle the access request.
    381113
    382114        Parse out the fields and make the allocations or rejections if for us,
     
    393125            dt = unpack_id(req['destinationTestbed'])
    394126
    395         if dt == None or dt in self.testbed:
    396             # Request for this fedd
    397             found, owners = self.lookup_access(req, fid)
    398             # keep track of what's been added
    399             allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
    400             aid = unicode(allocID)
    401 
    402             self.state_lock.acquire()
    403             self.state[aid] = { }
    404             self.state[aid]['user'] = found
    405             self.state[aid]['owners'] = owners
    406             self.write_state()
    407             self.state_lock.release()
    408             for o in owners:
    409                 self.auth.set_attribute(o, allocID)
    410             self.auth.set_attribute(allocID, allocID)
    411 
    412             try:
    413                 f = open("%s/%s.pem" % (self.certdir, aid), "w")
    414                 print >>f, alloc_cert
    415                 f.close()
    416             except EnvironmentError, e:
    417                 raise service_error(service_error.internal,
    418                         "Can't open %s/%s : %s" % (self.certdir, aid, e))
    419             return { 'allocID': { 'fedid': allocID } }
    420         else:
    421             if self.allow_proxy:
    422                 resp = self.proxy_RequestAccess.call_service(dt, req,
    423                             self.cert_file, self.cert_pwd,
    424                             self.trusted_certs)
    425                 if resp.has_key('RequestAccessResponseBody'):
    426                     return resp['RequestAccessResponseBody']
    427                 else:
    428                     return None
    429             else:
    430                 raise service_error(service_error.access,
    431                         "Access proxying denied")
     127        # Request for this fedd
     128        found, match = self.lookup_access(req, fid)
     129        # keep track of what's been added
     130        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
     131        aid = unicode(allocID)
     132
     133        self.state_lock.acquire()
     134        self.state[aid] = { }
     135        self.state[aid]['user'] = found
     136        self.state[aid]['owners'] = [ fid ]
     137        self.write_state()
     138        self.state_lock.release()
     139        self.auth.set_attribute(fid, allocID)
     140        self.auth.set_attribute(allocID, allocID)
     141
     142        try:
     143            f = open("%s/%s.pem" % (self.certdir, aid), "w")
     144            print >>f, alloc_cert
     145            f.close()
     146        except EnvironmentError, e:
     147            raise service_error(service_error.internal,
     148                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
     149        return { 'allocID': { 'fedid': allocID } }
    432150
    433151    def ReleaseAccess(self, req, fid):
     
    438156            raise service_error(service_error.req, "No request!?")
    439157
    440         if req.has_key('destinationTestbed'):
    441             dt = unpack_id(req['destinationTestbed'])
    442         else:
    443             dt = None
    444 
    445         if dt == None or dt in self.testbed:
    446             # Local request
    447             try:
    448                 if req['allocID'].has_key('localname'):
    449                     auth_attr = aid = req['allocID']['localname']
    450                 elif req['allocID'].has_key('fedid'):
    451                     aid = unicode(req['allocID']['fedid'])
    452                     auth_attr = req['allocID']['fedid']
    453                 else:
    454                     raise service_error(service_error.req,
    455                             "Only localnames and fedids are understood")
    456             except KeyError:
    457                 raise service_error(service_error.req, "Badly formed request")
    458 
    459             self.log.debug("[access] deallocation requested for %s", aid)
    460             if not self.auth.check_attribute(fid, auth_attr):
    461                 self.log.debug("[access] deallocation denied for %s", aid)
    462                 raise service_error(service_error.access, "Access Denied")
    463 
    464             self.state_lock.acquire()
    465             if self.state.has_key(aid):
    466                 self.log.debug("Found allocation for %s" %aid)
    467                 del self.state[aid]
    468                 self.write_state()
    469                 self.state_lock.release()
    470                 # And remove the access cert
    471                 cf = "%s/%s.pem" % (self.certdir, aid)
    472                 self.log.debug("Removing %s" % cf)
    473                 os.remove(cf)
    474                 return { 'allocID': req['allocID'] }
    475             else:
    476                 self.state_lock.release()
    477                 raise service_error(service_error.req, "No such allocation")
    478 
    479         else:
    480             if self.allow_proxy:
    481                 resp = self.proxy_ReleaseAccess.call_service(dt, req,
    482                             self.cert_file, self.cert_pwd,
    483                             self.trusted_certs)
    484                 if resp.has_key('ReleaseAccessResponseBody'):
    485                     return resp['ReleaseAccessResponseBody']
    486                 else:
    487                     return None
    488             else:
    489                 raise service_error(service_error.access,
    490                         "Access proxying denied")
     158        try:
     159            if req['allocID'].has_key('localname'):
     160                auth_attr = aid = req['allocID']['localname']
     161            elif req['allocID'].has_key('fedid'):
     162                aid = unicode(req['allocID']['fedid'])
     163                auth_attr = req['allocID']['fedid']
     164            else:
     165                raise service_error(service_error.req,
     166                        "Only localnames and fedids are understood")
     167        except KeyError:
     168            raise service_error(service_error.req, "Badly formed request")
     169
     170        self.log.debug("[access] deallocation requested for %s", aid)
     171        if not self.auth.check_attribute(fid, auth_attr):
     172            self.log.debug("[access] deallocation denied for %s", aid)
     173            raise service_error(service_error.access, "Access Denied")
     174
     175        self.state_lock.acquire()
     176        if self.state.has_key(aid):
     177            self.log.debug("Found allocation for %s" %aid)
     178            del self.state[aid]
     179            self.write_state()
     180            self.state_lock.release()
     181            # And remove the access cert
     182            cf = "%s/%s.pem" % (self.certdir, aid)
     183            self.log.debug("Removing %s" % cf)
     184            os.remove(cf)
     185            return { 'allocID': req['allocID'] }
     186        else:
     187            self.state_lock.release()
     188            raise service_error(service_error.req, "No such allocation")
    491189
    492190    def extract_parameters(self, top):
     
    566264        return cap, ends[0], ends[1], vlans
    567265
    568 
    569     def start_segment(self, repo, fr, to, cap, vpns=[], start=None, end=None,
    570             log=None):
    571         """
    572         Do the actual work of creating the dragon connecton.
    573         """
    574         if not log: log = self.log
     266    def oscars_create_vpn(self, repo, fr, to, cap, v, start, end, log):
     267
    575268        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
     269        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
     270
     271        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh',
     272            'createReservation', '-repo',  repo , '-url', self.idc_url,
     273            '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
     274            '-vlan', "%s" % v, '-desc', 'fedd created connection',
     275            '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
     276            '-end', "%d" % int(end)]
     277        log.debug("[start_segment]: %s" % " ".join(cmd))
     278        if not self.create_debug:
     279            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
     280                    close_fds=True)
     281            for line in p.stdout:
     282                m = status_re.match(line)
     283                if m:
     284                    status = m.group(1)
     285                    continue
     286                m = gri_re.match(line)
     287                if m:
     288                    gri = m.group(1)
     289                    continue
     290            rv = p.wait()
     291        else:
     292            rv = 0
     293            status = 'ACCEPTED'
     294            gri = 'debug_gri'
     295
     296        return (rv, status, gri)
     297
     298    def oscars_query_vpn(self, repo, gri, v, log):
     299        """
     300        Call the oscars query command from the command line and parse out the
     301        data to see if the current request succeeded.  This is a lot of fiddly
     302        code to do  a pretty simple thing.
     303        """
    576304        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
    577305        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
    578306        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
    579307        path_re = re.compile("Path:")
     308
     309        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh',
     310            'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
     311        log.debug("[start_segment]: %s" % " ".join(cmd))
     312        if not self.create_debug:
     313            # Really do the query
     314            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
     315                    close_fds=True)
     316            in_path = False
     317            vpn1 = None
     318            vpn2 = None
     319            src = None
     320            dest = None
     321            for line in p.stdout:
     322                if not in_path:
     323                    m = status_re.match(line)
     324                    if m:
     325                        status = m.group(1)
     326                        continue
     327                    m = source_re.match(line)
     328                    if m:
     329                        src = m.group(1)
     330                        continue
     331                    m = dest_re.match(line)
     332                    if m:
     333                        dest = m.group(1)
     334                        continue
     335                    m = path_re.match(line)
     336                    if m:
     337                        in_path = True
     338                        if src and dest:
     339                            vpn1_re = re.compile(
     340                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
     341                                            src.replace("*", "\*"))
     342                            vpn2_re = re.compile(
     343                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
     344                                            dest.replace("*", "\*"))
     345                        else:
     346                            raise service_error(service_error.internal,
     347                                    "Strange output from query")
     348                else:
     349                    m = vpn1_re.match(line)
     350                    if m:
     351                        vpn1 = m.group(1)
     352                        continue
     353                    m = vpn2_re.match(line)
     354                    if m:
     355                        vpn2 = m.group(1)
     356                        continue
     357            rv = p.wait()
     358            # Make sure that OSCARS did what we expected.
     359            if vpn1 == vpn2:
     360                if v is not None:
     361                    if int(vpn1) == v:
     362                        vlan_no = int(v)
     363                    else:
     364                        raise service_error(service_error.federant,
     365                                "Unexpected vlan assignment")
     366                else:
     367                    vlan_no = int(v or 0)
     368            else:
     369                raise service_error(service_error.internal,
     370                        "Different VPNs on DRAGON ends")
     371            log.debug("Status: %s" % status or "none")
     372        else:
     373            rv = 0
     374            status = 'ACTIVE'
     375            vlan_no = int(v or 1)
     376
     377        return (rv, status, vlan_no)
     378
     379
     380
     381    def start_segment(self, repo, fr, to, cap, vpns=None, start=None, end=None,
     382            log=None):
     383        """
     384        Do the actual work of creating the dragon connecton.
     385        """
     386        waiting_states = ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING')
     387        if not log: log = self.log
    580388
    581389        if not vpns:
     
    593401        vlan_no = None
    594402        for v in vpns:
    595             cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh',
    596                 'createReservation', '-repo',  repo , '-url', self.idc_url,
    597                 '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
    598                 '-vlan', "%s" % v, '-desc', 'fedd created connection',
    599                 '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
    600                 '-end', "%d" % int(end)]
    601             log.debug("[start_segment]: %s" % " ".join(cmd))
    602             if not self.create_debug:
    603                 p = Popen(cmd, cwd=self.cli_dir,
    604                         stdout=PIPE, stderr=STDOUT,
    605                         close_fds=True)
    606                 for line in p.stdout:
    607                     m = status_re.match(line)
    608                     if m:
    609                         status = m.group(1)
    610                         continue
    611                     m = gri_re.match(line)
    612                     if m:
    613                         gri = m.group(1)
    614                 rv = p.wait()
    615             else:
    616                 rv = 0
    617                 status = 'ACCEPTED'
    618                 gri = 'debug_gri'
    619 
     403            rv, status, gri = self.oscars_create_vpn(repo, fr, to, cap, v,
     404                    start, end, log)
    620405            # Reservation in progress.  Poll the IDC until we know the outcome
    621             cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh',
    622                 'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
    623 
    624             while status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'):
    625                 log.debug("[start_segment]: %s" % " ".join(cmd))
    626                 if not self.create_debug:
    627                     p = Popen(cmd, cwd=self.cli_dir,
    628                             stdout=PIPE, stderr=STDOUT,
    629                             close_fds=True)
    630                     in_path = False
    631                     vpn1 = None
    632                     vpn2 = None
    633                     src = None
    634                     dest = None
    635                     for line in p.stdout:
    636                         if not in_path:
    637                             m = status_re.match(line)
    638                             if m:
    639                                 status = m.group(1)
    640                                 continue
    641                             m = source_re.match(line)
    642                             if m:
    643                                 src = m.group(1)
    644                                 continue
    645                             m = dest_re.match(line)
    646                             if m:
    647                                 dest = m.group(1)
    648                                 continue
    649                             m = path_re.match(line)
    650                             if m:
    651                                 in_path = True
    652                                 if src and dest:
    653                                     vpn1_re = re.compile(
    654                                             "\s*%s,\s*\w+\s*,\s*(\d+)" % \
    655                                                     src.replace("*", "\*"))
    656                                     vpn2_re = re.compile(
    657                                             "\s*%s,\s*\w+\s*,\s*(\d+)" % \
    658                                                     dest.replace("*", "\*"))
    659                                 else:
    660                                     raise service_error(service_error.internal,
    661                                             "Strange output from query")
    662                         else:
    663                             m = vpn1_re.match(line)
    664                             if m:
    665                                 vpn1 = m.group(1)
    666                                 continue
    667                             m = vpn2_re.match(line)
    668                             if m:
    669                                 vpn2 = m.group(1)
    670                                 continue
    671                     rv = p.wait()
    672                     if vpn1 == vpn2:
    673                         if v is not None:
    674                             if int(vpn1) == v:
    675                                 vlan_no = int(v)
    676                             else:
    677                                 raise service_error(service_error.federant,
    678                                         "Unexpected vlan assignment")
    679                         else:
    680                             vlan_no = int(v)
    681                     else:
    682                         raise service_error(service_error.internal,
    683                                 "Different VPNs on DRAGON ends")
    684                     log.debug("Status: %s" % status or "none")
    685                 else:
    686                     status = 'ACTIVE'
    687                     vlan_no = int(v) or 1
    688                 if status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'):
     406            while status in waiting_states:
     407                rv, status, vlan_no = self.oscars_query_vpn(repo, gri, v, log)
     408                if status in waiting_states:
    689409                    time.sleep(45)
    690410            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
    691411                break
    692412
    693         self.log.debug("made reservation %s %s" % (gri, vlan_no))
    694 
    695413        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
     414            self.log.debug("made reservation %s %s" % (gri, vlan_no))
    696415            return gri, vlan_no
    697416        else:
     
    700419
    701420    def stop_segment(self, repo, gri, log=None):
     421        """
     422        Terminate the reservation.
     423        """
    702424        if not log: log = self.log
    703425        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh',
     
    740462                    self.log.error("Bad export request: %s" % p)
    741463
    742     def import_store_info(self, cf, connInfo):
    743         """
    744         Pull any import parameters in connInfo in.  We translate them either
    745         into known member names or fedAddrs.
    746         """
    747 
    748         for c in connInfo:
    749             for p in [ p for p in c.get('parameter', []) \
    750                     if p.get('type', '') == 'input']:
    751                 name = p.get('name', None)
    752                 key = p.get('key', None)
    753                 store = p.get('store', None)
    754 
    755                 if name and key and store :
    756                     req = { 'name': key, 'wait': True }
    757                     r = self.call_GetValue(store, req, cf)
    758                     r = r.get('GetValueResponseBody', None)
    759                     if r :
    760                         if r.get('name', '') == key:
    761                             v = r.get('value', None)
    762                             if v is not None:
    763                                 if name == 'peer':
    764                                     c['peer'] = v
    765                                 else:
    766                                     if c.has_key('fedAttr'):
    767                                         c['fedAttr'].append({
    768                                             'attribute': name, 'value': value})
    769                                     else:
    770                                         c['fedAttr']= [{
    771                                             'attribute': name, 'value': value}]
    772                             else:
    773                                 raise service_error(service_error.internal,
    774                                         'None value exported for %s'  % key)
    775                         else:
    776                             raise service_error(service_error.internal,
    777                                     'Different name returned for %s: %s' \
    778                                             % (key, r.get('name','')))
    779                     else:
    780                         raise service_error(service_error.internal,
    781                             'Badly formatted response: no GetValueResponseBody')
    782                 else:
    783                     raise service_error(service_error.internal,
    784                         'Bad Services missing info for import %s' % c)
    785 
     464    def initialize_experiment_info(self, aid, ename):
     465        repo = None
     466        self.state_lock.acquire()
     467        if aid in self.state:
     468            repo = self.state[aid].get('user', None)
     469            self.state[aid]['log'] = [ ]
     470            # Create a logger that logs to the experiment's state object as
     471            # well as to the main log file.
     472            alloc_log = logging.getLogger('fedd.access.%s' % ename)
     473            h = logging.StreamHandler(
     474                    list_log.list_log(self.state[aid]['log']))
     475            # XXX: there should be a global one of these rather than
     476            # repeating the code.
     477            h.setFormatter(logging.Formatter(
     478                "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S'))
     479            alloc_log.addHandler(h)
     480            self.write_state()
     481        self.state_lock.release()
     482        return (repo, alloc_log)
     483
     484    def finalize_experiment(self, topo, vlan_no, gri, aid, alloc_id):
     485        """
     486        Place the relevant information in the global state block, and prepare
     487        the response.
     488        """
     489        rtopo = topo.clone()
     490        for s in rtopo.substrates:
     491            s.set_attribute('vlan', vlan_no)
     492            s.set_attribute('gri', gri)
     493
     494        # Grab the log (this is some anal locking, but better safe than
     495        # sorry)
     496        self.state_lock.acquire()
     497        self.state[aid]['gri'] = gri
     498        logv = "".join(self.state[aid]['log'])
     499        # It's possible that the StartSegment call gets retried (!).
     500        # if the 'started' key is in the allocation, we'll return it rather
     501        # than redo the setup.
     502        self.state[aid]['started'] = {
     503                'allocID': alloc_id,
     504                'allocationLog': logv,
     505                'segmentdescription': {
     506                    'topdldescription': rtopo.to_dict()
     507                    },
     508                }
     509        retval = copy.deepcopy(self.state[aid]['started'])
     510        self.write_state()
     511        self.state_lock.release()
     512
     513        return retval
    786514
    787515    def StartSegment(self, req, fid):
     
    791519        try:
    792520            req = req['StartSegmentRequestBody']
     521            topref = req['segmentdescription']['topdldescription']
    793522        except KeyError:
    794523            raise service_error(server_error.req, "Badly formed request")
     
    813542        certfile = "%s/%s.pem" % (self.certdir, aid)
    814543
    815         if req.has_key('segmentdescription') and \
    816                 req['segmentdescription'].has_key('topdldescription'):
    817             topo = \
    818                 topdl.Topology(**req['segmentdescription']['topdldescription'])
     544        if topref:
     545            topo = topdl.Topology(**topref)
    819546        else:
    820547            raise service_error(service_error.req,
     
    824551
    825552        cap, src, dest, vlans = self.extract_parameters(topo)
    826         ename = aid
    827553
    828554        for a in attrs:
    829555            if a['attribute'] == 'experiment_name':
    830556                ename = a['value']
    831 
    832         repo = None
    833         self.state_lock.acquire()
    834         if self.state.has_key(aid):
    835             repo = self.state[aid]['user']
    836             self.state[aid]['log'] = [ ]
    837             # Create a logger that logs to the experiment's state object as
    838             # well as to the main log file.
    839             alloc_log = logging.getLogger('fedd.access.%s' % ename)
    840             h = logging.StreamHandler(
    841                     list_log.list_log(self.state[aid]['log']))
    842             # XXX: there should be a global one of these rather than
    843             # repeating the code.
    844             h.setFormatter(logging.Formatter(
    845                 "%(asctime)s %(name)s %(message)s",
    846                         '%d %b %y %H:%M:%S'))
    847             alloc_log.addHandler(h)
    848             self.write_state()
    849         self.state_lock.release()
     557                break
     558        else: ename = aid
     559       
     560        repo, alloc_log = self.initialize_experiment_info(aid, ename)
    850561
    851562        if not repo:
    852563            raise service_error(service_error.internal,
    853                     "Can't find creation user for %s" %aid)
     564                    "Can't find creation user for %s" % aid)
    854565
    855566        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
     
    860571
    861572        if gri:
    862             rtopo = topo.clone()
    863             for s in rtopo.substrates:
    864                 s.set_attribute('vlan', vlan_no)
    865                 s.set_attribute('gri', gri)
    866 
    867             # Grab the log (this is some anal locking, but better safe than
    868             # sorry)
    869             self.state_lock.acquire()
    870             self.state[aid]['gri'] = gri
    871             logv = "".join(self.state[aid]['log'])
    872             # It's possible that the StartSegment call gets retried (!).
    873             # if the 'started' key is in the allocation, we'll return it rather
    874             # than redo the setup.
    875             self.state[aid]['started'] = {
    876                     'allocID': req['allocID'],
    877                     'allocationLog': logv,
    878                     'segmentdescription': {
    879                         'topdldescription': rtopo.to_dict()
    880                         },
    881                     }
    882             retval = self.state[aid]['started']
    883             self.write_state()
    884             self.state_lock.release()
    885 
    886             return retval
     573            return self.finalize_experiment(topo, vlan_no, gri, aid,
     574                    req['allocID'])
    887575        elif err:
    888576            raise service_error(service_error.federant,
Note: See TracChangeset for help on using the changeset viewer.