Changeset 866c983 for fedd/federation


Ignore:
Timestamp:
Jul 21, 2009 2:23:40 PM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
55c074c
Parents:
8780cbec
Message:

whitespace conversion

Location:
fedd/federation
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/access.py

    r8780cbec r866c983  
    4141
    4242    def __init__(self, config=None, auth=None):
    43         """
    44         Initializer.  Pulls parameters out of the ConfigParser's access section.
    45         """
    46 
    47         # Make sure that the configuration is in place
    48         if not config:
    49             raise RunTimeError("No config to fedd.access")
    50 
    51         self.project_priority = config.getboolean("access", "project_priority")
    52         self.allow_proxy = config.getboolean("access", "allow_proxy")
    53 
    54         self.boss = config.get("access", "boss")
    55         self.ops = config.get("access", "ops")
    56         self.domain = config.get("access", "domain")
    57         self.fileserver = config.get("access", "fileserver")
    58         self.eventserver = config.get("access", "eventserver")
    59 
    60         self.attrs = { }
    61         self.access = { }
    62         self.restricted = [ ]
    63         self.projects = { }
    64         self.keys = { }
    65         self.types = { }
    66         self.allocation = { }
    67         self.state = {
    68             'projects': self.projects,
    69             'allocation' : self.allocation,
    70             'keys' : self.keys,
    71             'types': self.types
    72         }
    73         self.log = logging.getLogger("fedd.access")
    74         set_log_level(config, "access", self.log)
    75         self.state_lock = Lock()
    76 
    77         if auth: self.auth = auth
    78         else:
    79             self.log.error(\
    80                     "[access]: No authorizer initialized, creating local one.")
    81             auth = authorizer()
    82 
    83         tb = config.get('access', 'testbed')
    84         if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
    85         else: self.testbed = [ ]
    86 
    87         if config.has_option("access", "accessdb"):
    88             self.read_access(config.get("access", "accessdb"))
    89 
    90         self.state_filename = config.get("access", "access_state")
    91         self.read_state()
    92 
    93         # Keep cert_file and cert_pwd coming from the same place
    94         self.cert_file = config.get("access", "cert_file")
    95         if self.cert_file:
    96             self.sert_pwd = config.get("access", "cert_pw")
    97         else:
    98             self.cert_file = config.get("globals", "cert_file")
    99             self.sert_pwd = config.get("globals", "cert_pw")
    100 
    101         self.trusted_certs = config.get("access", "trusted_certs") or \
    102                 config.get("globals", "trusted_certs")
    103 
    104         self.soap_services = {\
    105             'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
    106             'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
    107             }
    108         self.xmlrpc_services =  {\
    109             'RequestAccess': xmlrpc_handler('RequestAccess',
    110                 self.RequestAccess),
    111             'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
    112                 self.ReleaseAccess),
    113             }
    114 
    115 
    116         if not config.has_option("allocate", "uri"):
    117             self.allocate_project = \
    118                 allocate_project_local(config, auth)
    119         else:
    120             self.allocate_project = \
    121                 allocate_project_remote(config, auth)
    122 
    123         # If the project allocator exports services, put them in this object's
    124         # maps so that classes that instantiate this can call the services.
    125         self.soap_services.update(self.allocate_project.soap_services)
    126         self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
     43        """
     44        Initializer.  Pulls parameters out of the ConfigParser's access section.
     45        """
     46
     47        # Make sure that the configuration is in place
     48        if not config:
     49            raise RunTimeError("No config to fedd.access")
     50
     51        self.project_priority = config.getboolean("access", "project_priority")
     52        self.allow_proxy = config.getboolean("access", "allow_proxy")
     53
     54        self.boss = config.get("access", "boss")
     55        self.ops = config.get("access", "ops")
     56        self.domain = config.get("access", "domain")
     57        self.fileserver = config.get("access", "fileserver")
     58        self.eventserver = config.get("access", "eventserver")
     59
     60        self.attrs = { }
     61        self.access = { }
     62        self.restricted = [ ]
     63        self.projects = { }
     64        self.keys = { }
     65        self.types = { }
     66        self.allocation = { }
     67        self.state = {
     68            'projects': self.projects,
     69            'allocation' : self.allocation,
     70            'keys' : self.keys,
     71            'types': self.types
     72        }
     73        self.log = logging.getLogger("fedd.access")
     74        set_log_level(config, "access", self.log)
     75        self.state_lock = Lock()
     76
     77        if auth: self.auth = auth
     78        else:
     79            self.log.error(\
     80                    "[access]: No authorizer initialized, creating local one.")
     81            auth = authorizer()
     82
     83        tb = config.get('access', 'testbed')
     84        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
     85        else: self.testbed = [ ]
     86
     87        if config.has_option("access", "accessdb"):
     88            self.read_access(config.get("access", "accessdb"))
     89
     90        self.state_filename = config.get("access", "access_state")
     91        self.read_state()
     92
     93        # Keep cert_file and cert_pwd coming from the same place
     94        self.cert_file = config.get("access", "cert_file")
     95        if self.cert_file:
     96            self.sert_pwd = config.get("access", "cert_pw")
     97        else:
     98            self.cert_file = config.get("globals", "cert_file")
     99            self.sert_pwd = config.get("globals", "cert_pw")
     100
     101        self.trusted_certs = config.get("access", "trusted_certs") or \
     102                config.get("globals", "trusted_certs")
     103
     104        self.soap_services = {\
     105            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
     106            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
     107            }
     108        self.xmlrpc_services =  {\
     109            'RequestAccess': xmlrpc_handler('RequestAccess',
     110                self.RequestAccess),
     111            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
     112                self.ReleaseAccess),
     113            }
     114
     115
     116        if not config.has_option("allocate", "uri"):
     117            self.allocate_project = \
     118                allocate_project_local(config, auth)
     119        else:
     120            self.allocate_project = \
     121                allocate_project_remote(config, auth)
     122
     123        # If the project allocator exports services, put them in this object's
     124        # maps so that classes that instantiate this can call the services.
     125        self.soap_services.update(self.allocate_project.soap_services)
     126        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
    127127
    128128
    129129    def read_access(self, config):
    130         """
    131         Read a configuration file and set internal parameters.
    132 
    133         The format is more complex than one might hope.  The basic format is
    134         attribute value pairs separated by colons(:) on a signle line.  The
    135         attributes in bool_attrs, emulab_attrs and id_attrs can all be set
    136         directly using the name: value syntax.  E.g.
    137         boss: hostname
    138         sets self.boss to hostname.  In addition, there are access lines of the
    139         form (tb, proj, user) -> (aproj, auser) that map the first tuple of
    140         names to the second for access purposes.  Names in the key (left side)
    141         can include "<NONE> or <ANY>" to act as wildcards or to require the
    142         fields to be empty.  Similarly aproj or auser can be <SAME> or
    143         <DYNAMIC> indicating that either the matching key is to be used or a
    144         dynamic user or project will be created.  These names can also be
    145         federated IDs (fedid's) if prefixed with fedid:.  Finally, the aproj
    146         can be followed with a colon-separated list of node types to which that
    147         project has access (or will have access if dynamic).
    148         Testbed attributes outside the forms above can be given using the
    149         format attribute: name value: value.  The name is a single word and the
    150         value continues to the end of the line.  Empty lines and lines startin
    151         with a # are ignored.
    152 
    153         Parsing errors result in a self.parse_error exception being raised.
    154         """
    155         lineno=0
    156         name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+"
    157         fedid_expr = "fedid:[" + string.hexdigits + "]+"
    158         key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
    159         access_proj = "(<DYNAMIC>(?::" + name_expr +")*|"+ \
    160                 "<SAME>" + "(?::" + name_expr + ")*|" + \
    161                 fedid_expr + "(?::" + name_expr + ")*|" + \
    162                 name_expr + "(?::" + name_expr + ")*)"
    163         access_name = "(<DYNAMIC>|<SAME>|" + fedid_expr + "|"+ name_expr + ")"
    164 
    165         restricted_re = re.compile("restricted:\s*(.*)", re.IGNORECASE)
    166         attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
    167                 re.IGNORECASE)
    168         access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+
    169                 key_name+'\s*\)\s*->\s*\('+access_proj + '\s*,\s*' +
    170                 access_name + '\s*,\s*' + access_name + '\s*\)', re.IGNORECASE)
    171 
    172         def parse_name(n):
    173             if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
    174             else: return n
    175        
    176         def auth_name(n):
    177             if isinstance(n, basestring):
    178                 if n =='<any>' or n =='<none>': return None
    179                 else: return unicode(n)
    180             else:
    181                 return n
    182 
    183         f = open(config, "r");
    184         for line in f:
    185             lineno += 1
    186             line = line.strip();
    187             if len(line) == 0 or line.startswith('#'):
    188                 continue
    189 
    190             # Extended (attribute: x value: y) attribute line
    191             m = attr_re.match(line)
    192             if m != None:
    193                 attr, val = m.group(1,2)
    194                 self.attrs[attr] = val
    195                 continue
    196 
    197             # Restricted entry
    198             m = restricted_re.match(line)
    199             if m != None:
    200                 val = m.group(1)
    201                 self.restricted.append(val)
    202                 continue
    203 
    204             # Access line (t, p, u) -> (ap, cu, su) line
    205             m = access_re.match(line)
    206             if m != None:
    207                 access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
    208                 auth_key = tuple([ auth_name(x) for x in access_key])
    209                 aps = m.group(4).split(":");
    210                 if aps[0] == 'fedid:':
    211                     del aps[0]
    212                     aps[0] = fedid(hexstr=aps[0])
    213 
    214                 cu = parse_name(m.group(5))
    215                 su = parse_name(m.group(6))
    216 
    217                 access_val = (access_project(aps[0], aps[1:]),
    218                         parse_name(m.group(5)), parse_name(m.group(6)))
    219 
    220                 self.access[access_key] = access_val
    221                 self.auth.set_attribute(auth_key, "access")
    222                 continue
    223 
    224             # Nothing matched to here: unknown line - raise exception
    225             f.close()
    226             raise self.parse_error("Unknown statement at line %d of %s" % \
    227                     (lineno, config))
    228         f.close()
     130        """
     131        Read a configuration file and set internal parameters.
     132
     133        The format is more complex than one might hope.  The basic format is
     134        attribute value pairs separated by colons(:) on a signle line.  The
     135        attributes in bool_attrs, emulab_attrs and id_attrs can all be set
     136        directly using the name: value syntax.  E.g.
     137        boss: hostname
     138        sets self.boss to hostname.  In addition, there are access lines of the
     139        form (tb, proj, user) -> (aproj, auser) that map the first tuple of
     140        names to the second for access purposes.  Names in the key (left side)
     141        can include "<NONE> or <ANY>" to act as wildcards or to require the
     142        fields to be empty.  Similarly aproj or auser can be <SAME> or
     143        <DYNAMIC> indicating that either the matching key is to be used or a
     144        dynamic user or project will be created.  These names can also be
     145        federated IDs (fedid's) if prefixed with fedid:.  Finally, the aproj
     146        can be followed with a colon-separated list of node types to which that
     147        project has access (or will have access if dynamic).
     148        Testbed attributes outside the forms above can be given using the
     149        format attribute: name value: value.  The name is a single word and the
     150        value continues to the end of the line.  Empty lines and lines startin
     151        with a # are ignored.
     152
     153        Parsing errors result in a self.parse_error exception being raised.
     154        """
     155        lineno=0
     156        name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+"
     157        fedid_expr = "fedid:[" + string.hexdigits + "]+"
     158        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
     159        access_proj = "(<DYNAMIC>(?::" + name_expr +")*|"+ \
     160                "<SAME>" + "(?::" + name_expr + ")*|" + \
     161                fedid_expr + "(?::" + name_expr + ")*|" + \
     162                name_expr + "(?::" + name_expr + ")*)"
     163        access_name = "(<DYNAMIC>|<SAME>|" + fedid_expr + "|"+ name_expr + ")"
     164
     165        restricted_re = re.compile("restricted:\s*(.*)", re.IGNORECASE)
     166        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
     167                re.IGNORECASE)
     168        access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+
     169                key_name+'\s*\)\s*->\s*\('+access_proj + '\s*,\s*' +
     170                access_name + '\s*,\s*' + access_name + '\s*\)', re.IGNORECASE)
     171
     172        def parse_name(n):
     173            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
     174            else: return n
     175       
     176        def auth_name(n):
     177            if isinstance(n, basestring):
     178                if n =='<any>' or n =='<none>': return None
     179                else: return unicode(n)
     180            else:
     181                return n
     182
     183        f = open(config, "r");
     184        for line in f:
     185            lineno += 1
     186            line = line.strip();
     187            if len(line) == 0 or line.startswith('#'):
     188                continue
     189
     190            # Extended (attribute: x value: y) attribute line
     191            m = attr_re.match(line)
     192            if m != None:
     193                attr, val = m.group(1,2)
     194                self.attrs[attr] = val
     195                continue
     196
     197            # Restricted entry
     198            m = restricted_re.match(line)
     199            if m != None:
     200                val = m.group(1)
     201                self.restricted.append(val)
     202                continue
     203
     204            # Access line (t, p, u) -> (ap, cu, su) line
     205            m = access_re.match(line)
     206            if m != None:
     207                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
     208                auth_key = tuple([ auth_name(x) for x in access_key])
     209                aps = m.group(4).split(":");
     210                if aps[0] == 'fedid:':
     211                    del aps[0]
     212                    aps[0] = fedid(hexstr=aps[0])
     213
     214                cu = parse_name(m.group(5))
     215                su = parse_name(m.group(6))
     216
     217                access_val = (access_project(aps[0], aps[1:]),
     218                        parse_name(m.group(5)), parse_name(m.group(6)))
     219
     220                self.access[access_key] = access_val
     221                self.auth.set_attribute(auth_key, "access")
     222                continue
     223
     224            # Nothing matched to here: unknown line - raise exception
     225            f.close()
     226            raise self.parse_error("Unknown statement at line %d of %s" % \
     227                    (lineno, config))
     228        f.close()
    229229
    230230    def get_users(self, obj):
    231         """
    232         Return a list of the IDs of the users in dict
    233         """
    234         if obj.has_key('user'):
    235             return [ unpack_id(u['userID']) \
    236                     for u in obj['user'] if u.has_key('userID') ]
    237         else:
    238             return None
     231        """
     232        Return a list of the IDs of the users in dict
     233        """
     234        if obj.has_key('user'):
     235            return [ unpack_id(u['userID']) \
     236                    for u in obj['user'] if u.has_key('userID') ]
     237        else:
     238            return None
    239239
    240240    def write_state(self):
    241         if self.state_filename:
    242             try:
    243                 f = open(self.state_filename, 'w')
    244                 pickle.dump(self.state, f)
    245             except IOError, e:
    246                 self.log.error("Can't write file %s: %s" % \
    247                         (self.state_filename, e))
    248             except pickle.PicklingError, e:
    249                 self.log.error("Pickling problem: %s" % e)
    250             except TypeError, e:
    251                 self.log.error("Pickling problem (TypeError): %s" % e)
     241        if self.state_filename:
     242            try:
     243                f = open(self.state_filename, 'w')
     244                pickle.dump(self.state, f)
     245            except IOError, e:
     246                self.log.error("Can't write file %s: %s" % \
     247                        (self.state_filename, e))
     248            except pickle.PicklingError, e:
     249                self.log.error("Pickling problem: %s" % e)
     250            except TypeError, e:
     251                self.log.error("Pickling problem (TypeError): %s" % e)
    252252
    253253
    254254    def read_state(self):
    255         """
    256         Read a new copy of access state.  Old state is overwritten.
    257 
    258         State format is a simple pickling of the state dictionary.
    259         """
    260         if self.state_filename:
    261             try:
    262                 f = open(self.state_filename, "r")
    263                 self.state = pickle.load(f)
    264 
    265                 self.allocation = self.state['allocation']
    266                 self.projects = self.state['projects']
    267                 self.keys = self.state['keys']
    268                 self.types = self.state['types']
    269 
    270                 self.log.debug("[read_state]: Read state from %s" % \
    271                         self.state_filename)
    272             except IOError, e:
    273                 self.log.warning(("[read_state]: No saved state: " +\
    274                         "Can't open %s: %s") % (self.state_filename, e))
    275             except EOFError, e:
    276                 self.log.warning(("[read_state]: " +\
    277                         "Empty or damaged state file: %s:") % \
    278                         self.state_filename)
    279             except pickle.UnpicklingError, e:
    280                 self.log.warning(("[read_state]: No saved state: " + \
    281                         "Unpickling failed: %s") % e)
    282 
    283             # Add the ownership attributes to the authorizer.  Note that the
    284             # indices of the allocation dict are strings, but the attributes are
    285             # fedids, so there is a conversion.
    286             for k in self.allocation.keys():
    287                 for o in self.allocation[k].get('owners', []):
    288                     self.auth.set_attribute(o, fedid(hexstr=k))
     255        """
     256        Read a new copy of access state.  Old state is overwritten.
     257
     258        State format is a simple pickling of the state dictionary.
     259        """
     260        if self.state_filename:
     261            try:
     262                f = open(self.state_filename, "r")
     263                self.state = pickle.load(f)
     264
     265                self.allocation = self.state['allocation']
     266                self.projects = self.state['projects']
     267                self.keys = self.state['keys']
     268                self.types = self.state['types']
     269
     270                self.log.debug("[read_state]: Read state from %s" % \
     271                        self.state_filename)
     272            except IOError, e:
     273                self.log.warning(("[read_state]: No saved state: " +\
     274                        "Can't open %s: %s") % (self.state_filename, e))
     275            except EOFError, e:
     276                self.log.warning(("[read_state]: " +\
     277                        "Empty or damaged state file: %s:") % \
     278                        self.state_filename)
     279            except pickle.UnpicklingError, e:
     280                self.log.warning(("[read_state]: No saved state: " + \
     281                        "Unpickling failed: %s") % e)
     282
     283            # Add the ownership attributes to the authorizer.  Note that the
     284            # indices of the allocation dict are strings, but the attributes are
     285            # fedids, so there is a conversion.
     286            for k in self.allocation.keys():
     287                for o in self.allocation[k].get('owners', []):
     288                    self.auth.set_attribute(o, fedid(hexstr=k))
    289289
    290290
    291291    def permute_wildcards(self, a, p):
    292         """Return a copy of a with various fields wildcarded.
    293 
    294         The bits of p control the wildcards.  A set bit is a wildcard
    295         replacement with the lowest bit being user then project then testbed.
    296         """
    297         if p & 1: user = ["<any>"]
    298         else: user = a[2]
    299         if p & 2: proj = "<any>"
    300         else: proj = a[1]
    301         if p & 4: tb = "<any>"
    302         else: tb = a[0]
    303 
    304         return (tb, proj, user)
     292        """Return a copy of a with various fields wildcarded.
     293
     294        The bits of p control the wildcards.  A set bit is a wildcard
     295        replacement with the lowest bit being user then project then testbed.
     296        """
     297        if p & 1: user = ["<any>"]
     298        else: user = a[2]
     299        if p & 2: proj = "<any>"
     300        else: proj = a[1]
     301        if p & 4: tb = "<any>"
     302        else: tb = a[0]
     303
     304        return (tb, proj, user)
    305305
    306306    def find_access(self, search):
    307         """
    308         Search the access DB for a match on this tuple.  Return the matching
    309         access tuple and the user that matched.
    310        
    311         NB, if the initial tuple fails to match we start inserting wildcards in
    312         an order determined by self.project_priority.  Try the list of users in
    313         order (when wildcarded, there's only one user in the list).
    314         """
    315         if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
    316         else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
    317 
    318         for p in perm:
    319             s = self.permute_wildcards(search, p)
    320             # s[2] is None on an anonymous, unwildcarded request
    321             if s[2] != None:
    322                 for u in s[2]:
    323                     if self.access.has_key((s[0], s[1], u)):
    324                         return (self.access[(s[0], s[1], u)], u)
    325             else:
    326                 if self.access.has_key(s):
    327                     return (self.access[s], None)
    328         return None, None
     307        """
     308        Search the access DB for a match on this tuple.  Return the matching
     309        access tuple and the user that matched.
     310       
     311        NB, if the initial tuple fails to match we start inserting wildcards in
     312        an order determined by self.project_priority.  Try the list of users in
     313        order (when wildcarded, there's only one user in the list).
     314        """
     315        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
     316        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
     317
     318        for p in perm:
     319            s = self.permute_wildcards(search, p)
     320            # s[2] is None on an anonymous, unwildcarded request
     321            if s[2] != None:
     322                for u in s[2]:
     323                    if self.access.has_key((s[0], s[1], u)):
     324                        return (self.access[(s[0], s[1], u)], u)
     325            else:
     326                if self.access.has_key(s):
     327                    return (self.access[s], None)
     328        return None, None
    329329
    330330    def lookup_access(self, req, fid):
    331         """
    332         Determine the allowed access for this request.  Return the access and
    333         which fields are dynamic.
    334 
    335         The fedid is needed to construct the request
    336         """
    337         # Search keys
    338         tb = None
    339         project = None
    340         user = None
    341         # Return values
    342         rp = access_project(None, ())
    343         ru = None
    344 
    345         if req.has_key('project'):
    346             p = req['project']
    347             if p.has_key('name'):
    348                 project = unpack_id(p['name'])
    349             user = self.get_users(p)
    350         else:
    351             user = self.get_users(req)
    352 
    353         user_fedids = [ u for u in user if isinstance(u, fedid)]
    354         # Determine how the caller is representing itself.  If its fedid shows
    355         # up as a project or a singleton user, let that stand.  If neither the
    356         # usernames nor the project name is a fedid, the caller is a testbed.
    357         if project and isinstance(project, fedid):
    358             if project == fid:
    359                 # The caller is the project (which is already in the tuple
    360                 # passed in to the authorizer)
    361                 owners = user_fedids
    362                 owners.append(project)
    363             else:
    364                 raise service_error(service_error.req,
    365                         "Project asserting different fedid")
    366         else:
    367             if fid not in user_fedids:
    368                 tb = fid
    369                 owners = user_fedids
    370                 owners.append(fid)
    371             else:
    372                 if len(fedids) > 1:
    373                     raise service_error(service_error.req,
    374                             "User asserting different fedid")
    375                 else:
    376                     # Which is a singleton
    377                     owners = user_fedids
    378         # Confirm authorization
    379 
    380         for u in user:
    381             self.log.debug("[lookup_access] Checking access for %s" % \
    382                     ((tb, project, u),))
    383             if self.auth.check_attribute((tb, project, u), 'access'):
    384                 self.log.debug("[lookup_access] Access granted")
    385                 break
    386             else:
    387                 self.log.debug("[lookup_access] Access Denied")
    388         else:
    389             raise service_error(service_error.access, "Access denied")
    390 
    391         # This maps a valid user to the Emulab projects and users to use
    392         found, user_match = self.find_access((tb, project, user))
    393        
    394         if found == None:
    395             raise service_error(service_error.access,
    396                     "Access denied - cannot map access")
    397 
    398         # resolve <dynamic> and <same> in found
    399         dyn_proj = False
    400         dyn_create_user = False
    401         dyn_service_user = False
    402 
    403         if found[0].name == "<same>":
    404             if project != None:
    405                 rp.name = project
    406             else :
    407                 raise service_error(\
    408                         service_error.server_config,
    409                         "Project matched <same> when no project given")
    410         elif found[0].name == "<dynamic>":
    411             rp.name = None
    412             dyn_proj = True
    413         else:
    414             rp.name = found[0].name
    415         rp.node_types = found[0].node_types;
    416 
    417         if found[1] == "<same>":
    418             if user_match == "<any>":
    419                 if user != None: rcu = user[0]
    420                 else: raise service_error(\
    421                         service_error.server_config,
    422                         "Matched <same> on anonymous request")
    423             else:
    424                 rcu = user_match
    425         elif found[1] == "<dynamic>":
    426             rcu = None
    427             dyn_create_user = True
    428         else:
    429             rcu = found[1]
    430        
    431         if found[2] == "<same>":
    432             if user_match == "<any>":
    433                 if user != None: rsu = user[0]
    434                 else: raise service_error(\
    435                         service_error.server_config,
    436                         "Matched <same> on anonymous request")
    437             else:
    438                 rsu = user_match
    439         elif found[2] == "<dynamic>":
    440             rsu = None
    441             dyn_service_user = True
    442         else:
    443             rsu = found[2]
    444 
    445         return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
    446                 owners
     331        """
     332        Determine the allowed access for this request.  Return the access and
     333        which fields are dynamic.
     334
     335        The fedid is needed to construct the request
     336        """
     337        # Search keys
     338        tb = None
     339        project = None
     340        user = None
     341        # Return values
     342        rp = access_project(None, ())
     343        ru = None
     344
     345        if req.has_key('project'):
     346            p = req['project']
     347            if p.has_key('name'):
     348                project = unpack_id(p['name'])
     349            user = self.get_users(p)
     350        else:
     351            user = self.get_users(req)
     352
     353        user_fedids = [ u for u in user if isinstance(u, fedid)]
     354        # Determine how the caller is representing itself.  If its fedid shows
     355        # up as a project or a singleton user, let that stand.  If neither the
     356        # usernames nor the project name is a fedid, the caller is a testbed.
     357        if project and isinstance(project, fedid):
     358            if project == fid:
     359                # The caller is the project (which is already in the tuple
     360                # passed in to the authorizer)
     361                owners = user_fedids
     362                owners.append(project)
     363            else:
     364                raise service_error(service_error.req,
     365                        "Project asserting different fedid")
     366        else:
     367            if fid not in user_fedids:
     368                tb = fid
     369                owners = user_fedids
     370                owners.append(fid)
     371            else:
     372                if len(fedids) > 1:
     373                    raise service_error(service_error.req,
     374                            "User asserting different fedid")
     375                else:
     376                    # Which is a singleton
     377                    owners = user_fedids
     378        # Confirm authorization
     379
     380        for u in user:
     381            self.log.debug("[lookup_access] Checking access for %s" % \
     382                    ((tb, project, u),))
     383            if self.auth.check_attribute((tb, project, u), 'access'):
     384                self.log.debug("[lookup_access] Access granted")
     385                break
     386            else:
     387                self.log.debug("[lookup_access] Access Denied")
     388        else:
     389            raise service_error(service_error.access, "Access denied")
     390
     391        # This maps a valid user to the Emulab projects and users to use
     392        found, user_match = self.find_access((tb, project, user))
     393       
     394        if found == None:
     395            raise service_error(service_error.access,
     396                    "Access denied - cannot map access")
     397
     398        # resolve <dynamic> and <same> in found
     399        dyn_proj = False
     400        dyn_create_user = False
     401        dyn_service_user = False
     402
     403        if found[0].name == "<same>":
     404            if project != None:
     405                rp.name = project
     406            else :
     407                raise service_error(\
     408                        service_error.server_config,
     409                        "Project matched <same> when no project given")
     410        elif found[0].name == "<dynamic>":
     411            rp.name = None
     412            dyn_proj = True
     413        else:
     414            rp.name = found[0].name
     415        rp.node_types = found[0].node_types;
     416
     417        if found[1] == "<same>":
     418            if user_match == "<any>":
     419                if user != None: rcu = user[0]
     420                else: raise service_error(\
     421                        service_error.server_config,
     422                        "Matched <same> on anonymous request")
     423            else:
     424                rcu = user_match
     425        elif found[1] == "<dynamic>":
     426            rcu = None
     427            dyn_create_user = True
     428        else:
     429            rcu = found[1]
     430       
     431        if found[2] == "<same>":
     432            if user_match == "<any>":
     433                if user != None: rsu = user[0]
     434                else: raise service_error(\
     435                        service_error.server_config,
     436                        "Matched <same> on anonymous request")
     437            else:
     438                rsu = user_match
     439        elif found[2] == "<dynamic>":
     440            rsu = None
     441            dyn_service_user = True
     442        else:
     443            rsu = found[2]
     444
     445        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
     446                owners
    447447
    448448    def build_response(self, alloc_id, ap):
    449         """
    450         Create the SOAP response.
    451 
    452         Build the dictionary description of the response and use
    453         fedd_utils.pack_soap to create the soap message.  ap is the allocate
    454         project message returned from a remote project allocation (even if that
    455         allocation was done locally).
    456         """
    457         # Because alloc_id is already a fedd_services_types.IDType_Holder,
    458         # there's no need to repack it
    459         msg = {
    460                 'allocID': alloc_id,
    461                 'emulab': {
    462                     'domain': self.domain,
    463                     'boss': self.boss,
    464                     'ops': self.ops,
    465                     'fileServer': self.fileserver,
    466                     'eventServer': self.eventserver,
    467                     'project': ap['project']
    468                 },
    469             }
    470         if len(self.attrs) > 0:
    471             msg['emulab']['fedAttr'] = \
    472                 [ { 'attribute': x, 'value' : y } \
    473                         for x,y in self.attrs.iteritems()]
    474         return msg
     449        """
     450        Create the SOAP response.
     451
     452        Build the dictionary description of the response and use
     453        fedd_utils.pack_soap to create the soap message.  ap is the allocate
     454        project message returned from a remote project allocation (even if that
     455        allocation was done locally).
     456        """
     457        # Because alloc_id is already a fedd_services_types.IDType_Holder,
     458        # there's no need to repack it
     459        msg = {
     460                'allocID': alloc_id,
     461                'emulab': {
     462                    'domain': self.domain,
     463                    'boss': self.boss,
     464                    'ops': self.ops,
     465                    'fileServer': self.fileserver,
     466                    'eventServer': self.eventserver,
     467                    'project': ap['project']
     468                },
     469            }
     470        if len(self.attrs) > 0:
     471            msg['emulab']['fedAttr'] = \
     472                [ { 'attribute': x, 'value' : y } \
     473                        for x,y in self.attrs.iteritems()]
     474        return msg
    475475
    476476    def RequestAccess(self, req, fid):
    477         """
    478         Handle the access request.  Proxy if not for us.
    479 
    480         Parse out the fields and make the allocations or rejections if for us,
    481         otherwise, assuming we're willing to proxy, proxy the request out.
    482         """
    483 
    484         def gateway_hardware(h):
    485             if h == 'GWTYPE': return self.attrs.get('connectorType', 'GWTYPE')
    486             else: return h
    487 
    488         # The dance to get into the request body
    489         if req.has_key('RequestAccessRequestBody'):
    490             req = req['RequestAccessRequestBody']
    491         else:
    492             raise service_error(service_error.req, "No request!?")
    493 
    494         if req.has_key('destinationTestbed'):
    495             dt = unpack_id(req['destinationTestbed'])
    496 
    497         if dt == None or dt in self.testbed:
    498             # Request for this fedd
    499             found, dyn, owners = self.lookup_access(req, fid)
    500             restricted = None
    501             ap = None
    502 
    503             # If this is a request to export a project and the access project
    504             # is not the project to export, access denied.
    505             if req.has_key('exportProject'):
    506                 ep = unpack_id(req['exportProject'])
    507                 if ep != found[0].name:
    508                     raise service_error(service_error.access,
    509                             "Cannot export %s" % ep)
    510 
    511             # Check for access to restricted nodes
    512             if req.has_key('resources') and req['resources'].has_key('node'):
    513                 resources = req['resources']
    514                 restricted = [ gateway_hardware(t) for n in resources['node'] \
    515                                 if n.has_key('hardware') \
    516                                     for t in n['hardware'] \
    517                                         if gateway_hardware(t) \
    518                                             in self.restricted ]
    519                 inaccessible = [ t for t in restricted \
    520                                     if t not in found[0].node_types]
    521                 if len(inaccessible) > 0:
    522                     raise service_error(service_error.access,
    523                             "Access denied (nodetypes %s)" % \
    524                             str(', ').join(inaccessible))
    525             # These collect the keys for the two roles into single sets, one
    526             # for creation and one for service.  The sets are a simple way to
    527             # eliminate duplicates
    528             create_ssh = set([ x['sshPubkey'] \
    529                     for x in req['createAccess'] \
    530                         if x.has_key('sshPubkey')])
    531 
    532             service_ssh = set([ x['sshPubkey'] \
    533                     for x in req['serviceAccess'] \
    534                         if x.has_key('sshPubkey')])
    535 
    536             if len(create_ssh) > 0 and len(service_ssh) >0:
    537                 if dyn[1]:
    538                     # Compose the dynamic project request
    539                     # (only dynamic, dynamic currently allowed)
    540                     preq = { 'AllocateProjectRequestBody': \
    541                                 { 'project' : {\
    542                                     'user': [ \
    543                                     { \
    544                                         'access': [ { 'sshPubkey': s } \
    545                                             for s in service_ssh ],
    546                                         'role': "serviceAccess",\
    547                                     }, \
    548                                     { \
    549                                         'access': [ { 'sshPubkey': s } \
    550                                             for s in create_ssh ],
    551                                         'role': "experimentCreation",\
    552                                     }, \
    553                                     ], \
    554                                     }\
    555                                 }\
    556                             }
    557                     if restricted != None and len(restricted) > 0:
    558                         preq['AllocateProjectRequestBody']['resources'] = \
    559                              {'node': [ { 'hardware' :  [ h ] } \
    560                                     for h in restricted ] }
    561                     ap = self.allocate_project.dynamic_project(preq)
    562                 else:
    563                     preq = {'StaticProjectRequestBody' : \
    564                             { 'project': \
    565                                 { 'name' : { 'localname' : found[0].name },\
    566                                   'user' : [ \
    567                                     {\
    568                                         'userID': { 'localname' : found[1] }, \
    569                                         'access': [ { 'sshPubkey': s }
    570                                             for s in create_ssh ],
    571                                         'role': 'experimentCreation'\
    572                                     },\
    573                                     {\
    574                                         'userID': { 'localname' : found[2] }, \
    575                                         'access': [ { 'sshPubkey': s }
    576                                             for s in service_ssh ],
    577                                         'role': 'serviceAccess'\
    578                                     },\
    579                                 ]}\
    580                             }\
    581                     }
    582                     if restricted != None and len(restricted) > 0:
    583                         preq['StaticProjectRequestBody']['resources'] = \
    584                             {'node': [ { 'hardware' :  [ h ] } \
    585                                     for h in restricted ] }
    586                     ap = self.allocate_project.static_project(preq)
    587             else:
    588                 raise service_error(service_error.req,
    589                         "SSH access parameters required")
    590             # keep track of what's been added
    591             allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
    592             aid = unicode(allocID)
    593 
    594             self.state_lock.acquire()
    595             self.allocation[aid] = { }
    596             try:
    597                 pname = ap['project']['name']['localname']
    598             except KeyError:
    599                 pname = None
    600 
    601             if dyn[1]:
    602                 if not pname:
    603                     self.state_lock.release()
    604                     raise service_error(service_error.internal,
    605                             "Misformed allocation response?")
    606                 if self.projects.has_key(pname): self.projects[pname] += 1
    607                 else: self.projects[pname] = 1
    608                 self.allocation[aid]['project'] = pname
    609 
    610             if ap.has_key('resources'):
    611                 if not pname:
    612                     self.state_lock.release()
    613                     raise service_error(service_error.internal,
    614                             "Misformed allocation response?")
    615                 self.allocation[aid]['types'] = set()
    616                 nodes = ap['resources'].get('node', [])
    617                 for n in nodes:
    618                     for h in n.get('hardware', []):
    619                         if self.types.has_key((pname, h)):
    620                             self.types[(pname, h)] += 1
    621                         else:
    622                             self.types[(pname, h)] = 1
    623                         self.allocation[aid]['types'].add((pname,h))
    624 
    625 
    626             self.allocation[aid]['keys'] = [ ]
    627 
    628             try:
    629                 for u in ap['project']['user']:
    630                     uname = u['userID']['localname']
    631                     for k in [ k['sshPubkey'] for k in u['access'] \
    632                             if k.has_key('sshPubkey') ]:
    633                         kv = "%s:%s" % (uname, k)
    634                         if self.keys.has_key(kv): self.keys[kv] += 1
    635                         else: self.keys[kv] = 1
    636                         self.allocation[aid]['keys'].append((uname, k))
    637             except KeyError:
    638                 self.state_lock.release()
    639                 raise service_error(service_error.internal,
    640                         "Misformed allocation response?")
    641 
    642 
    643             self.allocation[aid]['owners'] = owners
    644             self.write_state()
    645             self.state_lock.release()
    646             for o in owners:
    647                 self.auth.set_attribute(o, allocID)
    648             resp = self.build_response({ 'fedid': allocID } , ap)
    649             return resp
    650         else:
    651             if self.allow_proxy:
    652                 resp = self.proxy_RequestAccess.call_service(dt, req,
    653                             self.cert_file, self.cert_pwd,
    654                             self.trusted_certs)
    655                 if resp.has_key('RequestAccessResponseBody'):
    656                     return resp['RequestAccessResponseBody']
    657                 else:
    658                     return None
    659             else:
    660                 raise service_error(service_error.access,
    661                         "Access proxying denied")
     477        """
     478        Handle the access request.  Proxy if not for us.
     479
     480        Parse out the fields and make the allocations or rejections if for us,
     481        otherwise, assuming we're willing to proxy, proxy the request out.
     482        """
     483
     484        def gateway_hardware(h):
     485            if h == 'GWTYPE': return self.attrs.get('connectorType', 'GWTYPE')
     486            else: return h
     487
     488        # The dance to get into the request body
     489        if req.has_key('RequestAccessRequestBody'):
     490            req = req['RequestAccessRequestBody']
     491        else:
     492            raise service_error(service_error.req, "No request!?")
     493
     494        if req.has_key('destinationTestbed'):
     495            dt = unpack_id(req['destinationTestbed'])
     496
     497        if dt == None or dt in self.testbed:
     498            # Request for this fedd
     499            found, dyn, owners = self.lookup_access(req, fid)
     500            restricted = None
     501            ap = None
     502
     503            # If this is a request to export a project and the access project
     504            # is not the project to export, access denied.
     505            if req.has_key('exportProject'):
     506                ep = unpack_id(req['exportProject'])
     507                if ep != found[0].name:
     508                    raise service_error(service_error.access,
     509                            "Cannot export %s" % ep)
     510
     511            # Check for access to restricted nodes
     512            if req.has_key('resources') and req['resources'].has_key('node'):
     513                resources = req['resources']
     514                restricted = [ gateway_hardware(t) for n in resources['node'] \
     515                                if n.has_key('hardware') \
     516                                    for t in n['hardware'] \
     517                                        if gateway_hardware(t) \
     518                                            in self.restricted ]
     519                inaccessible = [ t for t in restricted \
     520                                    if t not in found[0].node_types]
     521                if len(inaccessible) > 0:
     522                    raise service_error(service_error.access,
     523                            "Access denied (nodetypes %s)" % \
     524                            str(', ').join(inaccessible))
     525            # These collect the keys for the two roles into single sets, one
     526            # for creation and one for service.  The sets are a simple way to
     527            # eliminate duplicates
     528            create_ssh = set([ x['sshPubkey'] \
     529                    for x in req['createAccess'] \
     530                        if x.has_key('sshPubkey')])
     531
     532            service_ssh = set([ x['sshPubkey'] \
     533                    for x in req['serviceAccess'] \
     534                        if x.has_key('sshPubkey')])
     535
     536            if len(create_ssh) > 0 and len(service_ssh) >0:
     537                if dyn[1]:
     538                    # Compose the dynamic project request
     539                    # (only dynamic, dynamic currently allowed)
     540                    preq = { 'AllocateProjectRequestBody': \
     541                                { 'project' : {\
     542                                    'user': [ \
     543                                    { \
     544                                        'access': [ { 'sshPubkey': s } \
     545                                            for s in service_ssh ],
     546                                        'role': "serviceAccess",\
     547                                    }, \
     548                                    { \
     549                                        'access': [ { 'sshPubkey': s } \
     550                                            for s in create_ssh ],
     551                                        'role': "experimentCreation",\
     552                                    }, \
     553                                    ], \
     554                                    }\
     555                                }\
     556                            }
     557                    if restricted != None and len(restricted) > 0:
     558                        preq['AllocateProjectRequestBody']['resources'] = \
     559                             {'node': [ { 'hardware' :  [ h ] } \
     560                                    for h in restricted ] }
     561                    ap = self.allocate_project.dynamic_project(preq)
     562                else:
     563                    preq = {'StaticProjectRequestBody' : \
     564                            { 'project': \
     565                                { 'name' : { 'localname' : found[0].name },\
     566                                  'user' : [ \
     567                                    {\
     568                                        'userID': { 'localname' : found[1] }, \
     569                                        'access': [ { 'sshPubkey': s }
     570                                            for s in create_ssh ],
     571                                        'role': 'experimentCreation'\
     572                                    },\
     573                                    {\
     574                                        'userID': { 'localname' : found[2] }, \
     575                                        'access': [ { 'sshPubkey': s }
     576                                            for s in service_ssh ],
     577                                        'role': 'serviceAccess'\
     578                                    },\
     579                                ]}\
     580                            }\
     581                    }
     582                    if restricted != None and len(restricted) > 0:
     583                        preq['StaticProjectRequestBody']['resources'] = \
     584                            {'node': [ { 'hardware' :  [ h ] } \
     585                                    for h in restricted ] }
     586                    ap = self.allocate_project.static_project(preq)
     587            else:
     588                raise service_error(service_error.req,
     589                        "SSH access parameters required")
     590            # keep track of what's been added
     591            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
     592            aid = unicode(allocID)
     593
     594            self.state_lock.acquire()
     595            self.allocation[aid] = { }
     596            try:
     597                pname = ap['project']['name']['localname']
     598            except KeyError:
     599                pname = None
     600
     601            if dyn[1]:
     602                if not pname:
     603                    self.state_lock.release()
     604                    raise service_error(service_error.internal,
     605                            "Misformed allocation response?")
     606                if self.projects.has_key(pname): self.projects[pname] += 1
     607                else: self.projects[pname] = 1
     608                self.allocation[aid]['project'] = pname
     609
     610            if ap.has_key('resources'):
     611                if not pname:
     612                    self.state_lock.release()
     613                    raise service_error(service_error.internal,
     614                            "Misformed allocation response?")
     615                self.allocation[aid]['types'] = set()
     616                nodes = ap['resources'].get('node', [])
     617                for n in nodes:
     618                    for h in n.get('hardware', []):
     619                        if self.types.has_key((pname, h)):
     620                            self.types[(pname, h)] += 1
     621                        else:
     622                            self.types[(pname, h)] = 1
     623                        self.allocation[aid]['types'].add((pname,h))
     624
     625
     626            self.allocation[aid]['keys'] = [ ]
     627
     628            try:
     629                for u in ap['project']['user']:
     630                    uname = u['userID']['localname']
     631                    for k in [ k['sshPubkey'] for k in u['access'] \
     632                            if k.has_key('sshPubkey') ]:
     633                        kv = "%s:%s" % (uname, k)
     634                        if self.keys.has_key(kv): self.keys[kv] += 1
     635                        else: self.keys[kv] = 1
     636                        self.allocation[aid]['keys'].append((uname, k))
     637            except KeyError:
     638                self.state_lock.release()
     639                raise service_error(service_error.internal,
     640                        "Misformed allocation response?")
     641
     642
     643            self.allocation[aid]['owners'] = owners
     644            self.write_state()
     645            self.state_lock.release()
     646            for o in owners:
     647                self.auth.set_attribute(o, allocID)
     648            resp = self.build_response({ 'fedid': allocID } , ap)
     649            return resp
     650        else:
     651            if self.allow_proxy:
     652                resp = self.proxy_RequestAccess.call_service(dt, req,
     653                            self.cert_file, self.cert_pwd,
     654                            self.trusted_certs)
     655                if resp.has_key('RequestAccessResponseBody'):
     656                    return resp['RequestAccessResponseBody']
     657                else:
     658                    return None
     659            else:
     660                raise service_error(service_error.access,
     661                        "Access proxying denied")
    662662
    663663    def ReleaseAccess(self, req, fid):
    664         # The dance to get into the request body
    665         if req.has_key('ReleaseAccessRequestBody'):
    666             req = req['ReleaseAccessRequestBody']
    667         else:
    668             raise service_error(service_error.req, "No request!?")
    669 
    670         if req.has_key('destinationTestbed'):
    671             dt = unpack_id(req['destinationTestbed'])
    672         else:
    673             dt = None
    674 
    675         if dt == None or dt in self.testbed:
    676             # Local request
    677             try:
    678                 if req['allocID'].has_key('localname'):
    679                     auth_attr = aid = req['allocID']['localname']
    680                 elif req['allocID'].has_key('fedid'):
    681                     aid = unicode(req['allocID']['fedid'])
    682                     auth_attr = req['allocID']['fedid']
    683                 else:
    684                     raise service_error(service_error.req,
    685                             "Only localnames and fedids are understood")
    686             except KeyError:
    687                 raise service_error(service_error.req, "Badly formed request")
    688 
    689             self.log.debug("[access] deallocation requested for %s", aid)
    690             if not self.auth.check_attribute(fid, auth_attr):
    691                 self.log.debug("[access] deallocation denied for %s", aid)
    692                 raise service_error(service_error.access, "Access Denied")
    693 
    694             # If we know this allocation, reduce the reference counts and
    695             # remove the local allocations.  Otherwise report an error.  If
    696             # there is an allocation to delete, del_users will be a dictonary
    697             # of sets where the key is the user that owns the keys in the set.
    698             # We use a set to avoid duplicates.  del_project is just the name
    699             # of any dynamic project to delete.  We're somewhat lazy about
    700             # deleting authorization attributes.  Having access to something
    701             # that doesn't exist isn't harmful.
    702             del_users = { }
    703             del_project = None
    704             del_types = set()
    705 
    706             if self.allocation.has_key(aid):
    707                 self.log.debug("Found allocation for %s" %aid)
    708                 self.state_lock.acquire()
    709                 for k in self.allocation[aid]['keys']:
    710                     kk = "%s:%s" % k
    711                     self.keys[kk] -= 1
    712                     if self.keys[kk] == 0:
    713                         if not del_users.has_key(k[0]):
    714                             del_users[k[0]] = set()
    715                         del_users[k[0]].add(k[1])
    716                         del self.keys[kk]
    717 
    718                 if self.allocation[aid].has_key('project'):
    719                     pname = self.allocation[aid]['project']
    720                     self.projects[pname] -= 1
    721                     if self.projects[pname] == 0:
    722                         del_project = pname
    723                         del self.projects[pname]
    724 
    725                 if self.allocation[aid].has_key('types'):
    726                     for t in self.allocation[aid]['types']:
    727                         self.types[t] -= 1
    728                         if self.types[t] == 0:
    729                             if not del_project: del_project = t[0]
    730                             del_types.add(t[1])
    731                             del self.types[t]
    732 
    733                 del self.allocation[aid]
    734                 self.write_state()
    735                 self.state_lock.release()
    736                 # If we actually have resources to deallocate, prepare the call.
    737                 if del_project or del_users:
    738                     msg = { 'project': { }}
    739                     if del_project:
    740                         msg['project']['name']= {'localname': del_project}
    741                     users = [ ]
    742                     for u in del_users.keys():
    743                         users.append({ 'userID': { 'localname': u },\
    744                             'access' :  \
    745                                     [ {'sshPubkey' : s } for s in del_users[u]]\
    746                         })
    747                     if users:
    748                         msg['project']['user'] = users
    749                     if len(del_types) > 0:
    750                         msg['resources'] = { 'node': \
    751                                 [ {'hardware': [ h ] } for h in del_types ]\
    752                             }
    753                     if self.allocate_project.release_project:
    754                         msg = { 'ReleaseProjectRequestBody' : msg}
    755                         self.allocate_project.release_project(msg)
    756                 return { 'allocID': req['allocID'] }
    757             else:
    758                 raise service_error(service_error.req, "No such allocation")
    759 
    760         else:
    761             if self.allow_proxy:
    762                 resp = self.proxy_ReleaseAccess.call_service(dt, req,
    763                             self.cert_file, self.cert_pwd,
    764                             self.trusted_certs)
    765                 if resp.has_key('ReleaseAccessResponseBody'):
    766                     return resp['ReleaseAccessResponseBody']
    767                 else:
    768                     return None
    769             else:
    770                 raise service_error(service_error.access,
    771                         "Access proxying denied")
    772 
    773 
     664        # The dance to get into the request body
     665        if req.has_key('ReleaseAccessRequestBody'):
     666            req = req['ReleaseAccessRequestBody']
     667        else:
     668            raise service_error(service_error.req, "No request!?")
     669
     670        if req.has_key('destinationTestbed'):
     671            dt = unpack_id(req['destinationTestbed'])
     672        else:
     673            dt = None
     674
     675        if dt == None or dt in self.testbed:
     676            # Local request
     677            try:
     678                if req['allocID'].has_key('localname'):
     679                    auth_attr = aid = req['allocID']['localname']
     680                elif req['allocID'].has_key('fedid'):
     681                    aid = unicode(req['allocID']['fedid'])
     682                    auth_attr = req['allocID']['fedid']
     683                else:
     684                    raise service_error(service_error.req,
     685                            "Only localnames and fedids are understood")
     686            except KeyError:
     687                raise service_error(service_error.req, "Badly formed request")
     688
     689            self.log.debug("[access] deallocation requested for %s", aid)
     690            if not self.auth.check_attribute(fid, auth_attr):
     691                self.log.debug("[access] deallocation denied for %s", aid)
     692                raise service_error(service_error.access, "Access Denied")
     693
     694            # If we know this allocation, reduce the reference counts and
     695            # remove the local allocations.  Otherwise report an error.  If
     696            # there is an allocation to delete, del_users will be a dictonary
     697            # of sets where the key is the user that owns the keys in the set.
     698            # We use a set to avoid duplicates.  del_project is just the name
     699            # of any dynamic project to delete.  We're somewhat lazy about
     700            # deleting authorization attributes.  Having access to something
     701            # that doesn't exist isn't harmful.
     702            del_users = { }
     703            del_project = None
     704            del_types = set()
     705
     706            if self.allocation.has_key(aid):
     707                self.log.debug("Found allocation for %s" %aid)
     708                self.state_lock.acquire()
     709                for k in self.allocation[aid]['keys']:
     710                    kk = "%s:%s" % k
     711                    self.keys[kk] -= 1
     712                    if self.keys[kk] == 0:
     713                        if not del_users.has_key(k[0]):
     714                            del_users[k[0]] = set()
     715                        del_users[k[0]].add(k[1])
     716                        del self.keys[kk]
     717
     718                if self.allocation[aid].has_key('project'):
     719                    pname = self.allocation[aid]['project']
     720                    self.projects[pname] -= 1
     721                    if self.projects[pname] == 0:
     722                        del_project = pname
     723                        del self.projects[pname]
     724
     725                if self.allocation[aid].has_key('types'):
     726                    for t in self.allocation[aid]['types']:
     727                        self.types[t] -= 1
     728                        if self.types[t] == 0:
     729                            if not del_project: del_project = t[0]
     730                            del_types.add(t[1])
     731                            del self.types[t]
     732
     733                del self.allocation[aid]
     734                self.write_state()
     735                self.state_lock.release()
     736                # If we actually have resources to deallocate, prepare the call.
     737                if del_project or del_users:
     738                    msg = { 'project': { }}
     739                    if del_project:
     740                        msg['project']['name']= {'localname': del_project}
     741                    users = [ ]
     742                    for u in del_users.keys():
     743                        users.append({ 'userID': { 'localname': u },\
     744                            'access' :  \
     745                                    [ {'sshPubkey' : s } for s in del_users[u]]\
     746                        })
     747                    if users:
     748                        msg['project']['user'] = users
     749                    if len(del_types) > 0:
     750                        msg['resources'] = { 'node': \
     751                                [ {'hardware': [ h ] } for h in del_types ]\
     752                            }
     753                    if self.allocate_project.release_project:
     754                        msg = { 'ReleaseProjectRequestBody' : msg}
     755                        self.allocate_project.release_project(msg)
     756                return { 'allocID': req['allocID'] }
     757            else:
     758                raise service_error(service_error.req, "No such allocation")
     759
     760        else:
     761            if self.allow_proxy:
     762                resp = self.proxy_ReleaseAccess.call_service(dt, req,
     763                            self.cert_file, self.cert_pwd,
     764                            self.trusted_certs)
     765                if resp.has_key('ReleaseAccessResponseBody'):
     766                    return resp['ReleaseAccessResponseBody']
     767                else:
     768                    return None
     769            else:
     770                raise service_error(service_error.access,
     771                        "Access proxying denied")
     772
     773
  • fedd/federation/experiment_control.py

    r8780cbec r866c983  
    4040   
    4141    class thread_pool:
    42         """
    43         A class to keep track of a set of threads all invoked for the same
    44         task.  Manages the mutual exclusion of the states.
    45         """
    46         def __init__(self, nthreads):
    47             """
    48             Start a pool.
    49             """
    50             self.changed = Condition()
    51             self.started = 0
    52             self.terminated = 0
    53             self.nthreads = nthreads
    54 
    55         def acquire(self):
    56             """
    57             Get the pool's lock.
    58             """
    59             self.changed.acquire()
    60 
    61         def release(self):
    62             """
    63             Release the pool's lock.
    64             """
    65             self.changed.release()
    66 
    67         def wait(self, timeout = None):
    68             """
    69             Wait for a pool thread to start or stop.
    70             """
    71             self.changed.wait(timeout)
    72 
    73         def start(self):
    74             """
    75             Called by a pool thread to report starting.
    76             """
    77             self.changed.acquire()
    78             self.started += 1
    79             self.changed.notifyAll()
    80             self.changed.release()
    81 
    82         def terminate(self):
    83             """
    84             Called by a pool thread to report finishing.
    85             """
    86             self.changed.acquire()
    87             self.terminated += 1
    88             self.changed.notifyAll()
    89             self.changed.release()
    90 
    91         def clear(self):
    92             """
    93             Clear all pool data.
    94             """
    95             self.changed.acquire()
    96             self.started = 0
    97             self.terminated =0
    98             self.changed.notifyAll()
    99             self.changed.release()
    100 
    101         def wait_for_slot(self):
    102             """
    103             Wait until we have a free slot to start another pooled thread
    104             """
    105             self.acquire()
    106             while self.started - self.terminated >= self.nthreads:
    107                 self.wait()
    108             self.release()
    109 
    110         def wait_for_all_done(self):
    111             """
    112             Wait until all active threads finish (and at least one has started)
    113             """
    114             self.acquire()
    115             while self.started == 0 or self.started > self.terminated:
    116                 self.wait()
    117             self.release()
     42        """
     43        A class to keep track of a set of threads all invoked for the same
     44        task.  Manages the mutual exclusion of the states.
     45        """
     46        def __init__(self, nthreads):
     47            """
     48            Start a pool.
     49            """
     50            self.changed = Condition()
     51            self.started = 0
     52            self.terminated = 0
     53            self.nthreads = nthreads
     54
     55        def acquire(self):
     56            """
     57            Get the pool's lock.
     58            """
     59            self.changed.acquire()
     60
     61        def release(self):
     62            """
     63            Release the pool's lock.
     64            """
     65            self.changed.release()
     66
     67        def wait(self, timeout = None):
     68            """
     69            Wait for a pool thread to start or stop.
     70            """
     71            self.changed.wait(timeout)
     72
     73        def start(self):
     74            """
     75            Called by a pool thread to report starting.
     76            """
     77            self.changed.acquire()
     78            self.started += 1
     79            self.changed.notifyAll()
     80            self.changed.release()
     81
     82        def terminate(self):
     83            """
     84            Called by a pool thread to report finishing.
     85            """
     86            self.changed.acquire()
     87            self.terminated += 1
     88            self.changed.notifyAll()
     89            self.changed.release()
     90
     91        def clear(self):
     92            """
     93            Clear all pool data.
     94            """
     95            self.changed.acquire()
     96            self.started = 0
     97            self.terminated =0
     98            self.changed.notifyAll()
     99            self.changed.release()
     100
     101        def wait_for_slot(self):
     102            """
     103            Wait until we have a free slot to start another pooled thread
     104            """
     105            self.acquire()
     106            while self.started - self.terminated >= self.nthreads:
     107                self.wait()
     108            self.release()
     109
     110        def wait_for_all_done(self):
     111            """
     112            Wait until all active threads finish (and at least one has started)
     113            """
     114            self.acquire()
     115            while self.started == 0 or self.started > self.terminated:
     116                self.wait()
     117            self.release()
    118118
    119119    class pooled_thread(Thread):
    120         """
    121         One of a set of threads dedicated to a specific task.  Uses the
    122         thread_pool class above for coordination.
    123         """
    124         def __init__(self, group=None, target=None, name=None, args=(),
    125                 kwargs={}, pdata=None, trace_file=None):
    126             Thread.__init__(self, group, target, name, args, kwargs)
    127             self.rv = None          # Return value of the ops in this thread
    128             self.exception = None   # Exception that terminated this thread
    129             self.target=target      # Target function to run on start()
    130             self.args = args        # Args to pass to target
    131             self.kwargs = kwargs    # Additional kw args
    132             self.pdata = pdata      # thread_pool for this class
    133             # Logger for this thread
    134             self.log = logging.getLogger("fedd.experiment_control")
    135        
    136         def run(self):
    137             """
    138             Emulate Thread.run, except add pool data manipulation and error
    139             logging.
    140             """
    141             if self.pdata:
    142                 self.pdata.start()
    143 
    144             if self.target:
    145                 try:
    146                     self.rv = self.target(*self.args, **self.kwargs)
    147                 except service_error, s:
    148                     self.exception = s
    149                     self.log.error("Thread exception: %s %s" % \
    150                             (s.code_string(), s.desc))
    151                 except:
    152                     self.exception = sys.exc_info()[1]
    153                     self.log.error(("Unexpected thread exception: %s" +\
    154                             "Trace %s") % (self.exception,\
    155                                 traceback.format_exc()))
    156             if self.pdata:
    157                 self.pdata.terminate()
     120        """
     121        One of a set of threads dedicated to a specific task.  Uses the
     122        thread_pool class above for coordination.
     123        """
     124        def __init__(self, group=None, target=None, name=None, args=(),
     125                kwargs={}, pdata=None, trace_file=None):
     126            Thread.__init__(self, group, target, name, args, kwargs)
     127            self.rv = None          # Return value of the ops in this thread
     128            self.exception = None   # Exception that terminated this thread
     129            self.target=target      # Target function to run on start()
     130            self.args = args        # Args to pass to target
     131            self.kwargs = kwargs    # Additional kw args
     132            self.pdata = pdata      # thread_pool for this class
     133            # Logger for this thread
     134            self.log = logging.getLogger("fedd.experiment_control")
     135       
     136        def run(self):
     137            """
     138            Emulate Thread.run, except add pool data manipulation and error
     139            logging.
     140            """
     141            if self.pdata:
     142                self.pdata.start()
     143
     144            if self.target:
     145                try:
     146                    self.rv = self.target(*self.args, **self.kwargs)
     147                except service_error, s:
     148                    self.exception = s
     149                    self.log.error("Thread exception: %s %s" % \
     150                            (s.code_string(), s.desc))
     151                except:
     152                    self.exception = sys.exc_info()[1]
     153                    self.log.error(("Unexpected thread exception: %s" +\
     154                            "Trace %s") % (self.exception,\
     155                                traceback.format_exc()))
     156            if self.pdata:
     157                self.pdata.terminate()
    158158
    159159    call_RequestAccess = service_caller('RequestAccess')
     
    162162
    163163    def __init__(self, config=None, auth=None):
    164         """
    165         Intialize the various attributes, most from the config object
    166         """
    167 
    168         def parse_tarfile_list(tf):
    169             """
    170             Parse a tarfile list from the configuration.  This is a set of
    171             paths and tarfiles separated by spaces.
    172             """
    173             rv = [ ]
    174             if tf is not None:
    175                 tl = tf.split()
    176                 while len(tl) > 1:
    177                     p, t = tl[0:2]
    178                     del tl[0:2]
    179                     rv.append((p, t))
    180             return rv
    181 
    182         self.thread_with_rv = experiment_control_local.pooled_thread
    183         self.thread_pool = experiment_control_local.thread_pool
    184 
    185         self.cert_file = config.get("experiment_control", "cert_file")
    186         if self.cert_file:
    187             self.cert_pwd = config.get("experiment_control", "cert_pwd")
    188         else:
    189             self.cert_file = config.get("globals", "cert_file")
    190             self.cert_pwd = config.get("globals", "cert_pwd")
    191 
    192         self.trusted_certs = config.get("experiment_control", "trusted_certs") \
    193                 or config.get("globals", "trusted_certs")
    194 
    195         self.exp_stem = "fed-stem"
    196         self.log = logging.getLogger("fedd.experiment_control")
    197         set_log_level(config, "experiment_control", self.log)
    198         self.muxmax = 2
    199         self.nthreads = 2
    200         self.randomize_experiments = False
    201 
    202         self.scp_exec = "/usr/bin/scp"
    203         self.splitter = None
    204         self.ssh_exec="/usr/bin/ssh"
    205         self.ssh_keygen = "/usr/bin/ssh-keygen"
    206         self.ssh_identity_file = None
    207 
    208 
    209         self.debug = config.getboolean("experiment_control", "create_debug")
    210         self.state_filename = config.get("experiment_control",
    211                 "experiment_state")
    212         self.splitter_url = config.get("experiment_control", "splitter_uri")
    213         self.fedkit = parse_tarfile_list(\
    214                 config.get("experiment_control", "fedkit"))
    215         self.gatewaykit = parse_tarfile_list(\
    216                 config.get("experiment_control", "gatewaykit"))
    217         accessdb_file = config.get("experiment_control", "accessdb")
    218 
    219         self.ssh_pubkey_file = config.get("experiment_control",
    220                 "ssh_pubkey_file")
    221         self.ssh_privkey_file = config.get("experiment_control",
    222                 "ssh_privkey_file")
    223         # NB for internal master/slave ops, not experiment setup
    224         self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
    225         self.state = { }
    226         self.state_lock = Lock()
    227         self.tclsh = "/usr/local/bin/otclsh"
    228         self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
    229                 config.get("experiment_control", "tcl_splitter",
    230                         "/usr/testbed/lib/ns2ir/parse.tcl")
    231         mapdb_file = config.get("experiment_control", "mapdb")
    232         self.trace_file = sys.stderr
    233 
    234         self.def_expstart = \
    235                 "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
    236                 "/tmp/federate";
    237         self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
    238                 "FEDDIR/hosts";
    239         self.def_gwstart = \
    240                 "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
    241                 "/tmp/bridge.log";
    242         self.def_mgwstart = \
    243                 "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
    244                 "/tmp/bridge.log";
    245         self.def_gwimage = "FBSD61-TUNNEL2";
    246         self.def_gwtype = "pc";
    247         self.local_access = { }
    248 
    249         if auth:
    250             self.auth = auth
    251         else:
    252             self.log.error(\
    253                     "[access]: No authorizer initialized, creating local one.")
    254             auth = authorizer()
    255 
    256 
    257         if self.ssh_pubkey_file:
    258             try:
    259                 f = open(self.ssh_pubkey_file, 'r')
    260                 self.ssh_pubkey = f.read()
    261                 f.close()
    262             except IOError:
    263                 raise service_error(service_error.internal,
    264                         "Cannot read sshpubkey")
    265         else:
    266             raise service_error(service_error.internal,
    267                     "No SSH public key file?")
    268 
    269         if not self.ssh_privkey_file:
    270             raise service_error(service_error.internal,
    271                     "No SSH public key file?")
    272 
    273 
    274         if mapdb_file:
    275             self.read_mapdb(mapdb_file)
    276         else:
    277             self.log.warn("[experiment_control] No testbed map, using defaults")
    278             self.tbmap = {
    279                     'deter':'https://users.isi.deterlab.net:23235',
    280                     'emulab':'https://users.isi.deterlab.net:23236',
    281                     'ucb':'https://users.isi.deterlab.net:23237',
    282                     }
    283 
    284         if accessdb_file:
    285                 self.read_accessdb(accessdb_file)
    286         else:
    287             raise service_error(service_error.internal,
    288                     "No accessdb specified in config")
    289 
    290         # Grab saved state.  OK to do this w/o locking because it's read only
    291         # and only one thread should be in existence that can see self.state at
    292         # this point.
    293         if self.state_filename:
    294             self.read_state()
    295 
    296         # Dispatch tables
    297         self.soap_services = {\
    298                 'Create': soap_handler('Create', self.create_experiment),
    299                 'Vtopo': soap_handler('Vtopo', self.get_vtopo),
    300                 'Vis': soap_handler('Vis', self.get_vis),
    301                 'Info': soap_handler('Info', self.get_info),
    302                 'Terminate': soap_handler('Terminate',
    303                     self.terminate_experiment),
    304         }
    305 
    306         self.xmlrpc_services = {\
    307                 'Create': xmlrpc_handler('Create', self.create_experiment),
    308                 'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
    309                 'Vis': xmlrpc_handler('Vis', self.get_vis),
    310                 'Info': xmlrpc_handler('Info', self.get_info),
    311                 'Terminate': xmlrpc_handler('Terminate',
    312                     self.terminate_experiment),
    313         }
     164        """
     165        Intialize the various attributes, most from the config object
     166        """
     167
     168        def parse_tarfile_list(tf):
     169            """
     170            Parse a tarfile list from the configuration.  This is a set of
     171            paths and tarfiles separated by spaces.
     172            """
     173            rv = [ ]
     174            if tf is not None:
     175                tl = tf.split()
     176                while len(tl) > 1:
     177                    p, t = tl[0:2]
     178                    del tl[0:2]
     179                    rv.append((p, t))
     180            return rv
     181
     182        self.thread_with_rv = experiment_control_local.pooled_thread
     183        self.thread_pool = experiment_control_local.thread_pool
     184
     185        self.cert_file = config.get("experiment_control", "cert_file")
     186        if self.cert_file:
     187            self.cert_pwd = config.get("experiment_control", "cert_pwd")
     188        else:
     189            self.cert_file = config.get("globals", "cert_file")
     190            self.cert_pwd = config.get("globals", "cert_pwd")
     191
     192        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
     193                or config.get("globals", "trusted_certs")
     194
     195        self.exp_stem = "fed-stem"
     196        self.log = logging.getLogger("fedd.experiment_control")
     197        set_log_level(config, "experiment_control", self.log)
     198        self.muxmax = 2
     199        self.nthreads = 2
     200        self.randomize_experiments = False
     201
     202        self.scp_exec = "/usr/bin/scp"
     203        self.splitter = None
     204        self.ssh_exec="/usr/bin/ssh"
     205        self.ssh_keygen = "/usr/bin/ssh-keygen"
     206        self.ssh_identity_file = None
     207
     208
     209        self.debug = config.getboolean("experiment_control", "create_debug")
     210        self.state_filename = config.get("experiment_control",
     211                "experiment_state")
     212        self.splitter_url = config.get("experiment_control", "splitter_uri")
     213        self.fedkit = parse_tarfile_list(\
     214                config.get("experiment_control", "fedkit"))
     215        self.gatewaykit = parse_tarfile_list(\
     216                config.get("experiment_control", "gatewaykit"))
     217        accessdb_file = config.get("experiment_control", "accessdb")
     218
     219        self.ssh_pubkey_file = config.get("experiment_control",
     220                "ssh_pubkey_file")
     221        self.ssh_privkey_file = config.get("experiment_control",
     222                "ssh_privkey_file")
     223        # NB for internal master/slave ops, not experiment setup
     224        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
     225        self.state = { }
     226        self.state_lock = Lock()
     227        self.tclsh = "/usr/local/bin/otclsh"
     228        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
     229                config.get("experiment_control", "tcl_splitter",
     230                        "/usr/testbed/lib/ns2ir/parse.tcl")
     231        mapdb_file = config.get("experiment_control", "mapdb")
     232        self.trace_file = sys.stderr
     233
     234        self.def_expstart = \
     235                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
     236                "/tmp/federate";
     237        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
     238                "FEDDIR/hosts";
     239        self.def_gwstart = \
     240                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
     241                "/tmp/bridge.log";
     242        self.def_mgwstart = \
     243                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
     244                "/tmp/bridge.log";
     245        self.def_gwimage = "FBSD61-TUNNEL2";
     246        self.def_gwtype = "pc";
     247        self.local_access = { }
     248
     249        if auth:
     250            self.auth = auth
     251        else:
     252            self.log.error(\
     253                    "[access]: No authorizer initialized, creating local one.")
     254            auth = authorizer()
     255
     256
     257        if self.ssh_pubkey_file:
     258            try:
     259                f = open(self.ssh_pubkey_file, 'r')
     260                self.ssh_pubkey = f.read()
     261                f.close()
     262            except IOError:
     263                raise service_error(service_error.internal,
     264                        "Cannot read sshpubkey")
     265        else:
     266            raise service_error(service_error.internal,
     267                    "No SSH public key file?")
     268
     269        if not self.ssh_privkey_file:
     270            raise service_error(service_error.internal,
     271                    "No SSH public key file?")
     272
     273
     274        if mapdb_file:
     275            self.read_mapdb(mapdb_file)
     276        else:
     277            self.log.warn("[experiment_control] No testbed map, using defaults")
     278            self.tbmap = {
     279                    'deter':'https://users.isi.deterlab.net:23235',
     280                    'emulab':'https://users.isi.deterlab.net:23236',
     281                    'ucb':'https://users.isi.deterlab.net:23237',
     282                    }
     283
     284        if accessdb_file:
     285                self.read_accessdb(accessdb_file)
     286        else:
     287            raise service_error(service_error.internal,
     288                    "No accessdb specified in config")
     289
     290        # Grab saved state.  OK to do this w/o locking because it's read only
     291        # and only one thread should be in existence that can see self.state at
     292        # this point.
     293        if self.state_filename:
     294            self.read_state()
     295
     296        # Dispatch tables
     297        self.soap_services = {\
     298                'Create': soap_handler('Create', self.create_experiment),
     299                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
     300                'Vis': soap_handler('Vis', self.get_vis),
     301                'Info': soap_handler('Info', self.get_info),
     302                'Terminate': soap_handler('Terminate',
     303                    self.terminate_experiment),
     304        }
     305
     306        self.xmlrpc_services = {\
     307                'Create': xmlrpc_handler('Create', self.create_experiment),
     308                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
     309                'Vis': xmlrpc_handler('Vis', self.get_vis),
     310                'Info': xmlrpc_handler('Info', self.get_info),
     311                'Terminate': xmlrpc_handler('Terminate',
     312                    self.terminate_experiment),
     313        }
    314314
    315315    def copy_file(self, src, dest, size=1024):
    316         """
    317         Exceedingly simple file copy.
    318         """
    319         s = open(src,'r')
    320         d = open(dest, 'w')
    321 
    322         buf = "x"
    323         while buf != "":
    324             buf = s.read(size)
    325             d.write(buf)
    326         s.close()
    327         d.close()
     316        """
     317        Exceedingly simple file copy.
     318        """
     319        s = open(src,'r')
     320        d = open(dest, 'w')
     321
     322        buf = "x"
     323        while buf != "":
     324            buf = s.read(size)
     325            d.write(buf)
     326        s.close()
     327        d.close()
    328328
    329329    # Call while holding self.state_lock
    330330    def write_state(self):
    331         """
    332         Write a new copy of experiment state after copying the existing state
    333         to a backup.
    334 
    335         State format is a simple pickling of the state dictionary.
    336         """
    337         if os.access(self.state_filename, os.W_OK):
    338             self.copy_file(self.state_filename, \
    339                     "%s.bak" % self.state_filename)
    340         try:
    341             f = open(self.state_filename, 'w')
    342             pickle.dump(self.state, f)
    343         except IOError, e:
    344             self.log.error("Can't write file %s: %s" % \
    345                     (self.state_filename, e))
    346         except pickle.PicklingError, e:
    347             self.log.error("Pickling problem: %s" % e)
    348         except TypeError, e:
    349             self.log.error("Pickling problem (TypeError): %s" % e)
     331        """
     332        Write a new copy of experiment state after copying the existing state
     333        to a backup.
     334
     335        State format is a simple pickling of the state dictionary.
     336        """
     337        if os.access(self.state_filename, os.W_OK):
     338            self.copy_file(self.state_filename, \
     339                    "%s.bak" % self.state_filename)
     340        try:
     341            f = open(self.state_filename, 'w')
     342            pickle.dump(self.state, f)
     343        except IOError, e:
     344            self.log.error("Can't write file %s: %s" % \
     345                    (self.state_filename, e))
     346        except pickle.PicklingError, e:
     347            self.log.error("Pickling problem: %s" % e)
     348        except TypeError, e:
     349            self.log.error("Pickling problem (TypeError): %s" % e)
    350350
    351351    # Call while holding self.state_lock
    352352    def read_state(self):
    353         """
    354         Read a new copy of experiment state.  Old state is overwritten.
    355 
    356         State format is a simple pickling of the state dictionary.
    357         """
    358         try:
    359             f = open(self.state_filename, "r")
    360             self.state = pickle.load(f)
    361             self.log.debug("[read_state]: Read state from %s" % \
    362                     self.state_filename)
    363         except IOError, e:
    364             self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
    365                     % (self.state_filename, e))
    366         except pickle.UnpicklingError, e:
    367             self.log.warning(("[read_state]: No saved state: " + \
    368                     "Unpickling failed: %s") % e)
    369        
    370         for k in self.state.keys():
    371             try:
    372                 # This list should only have one element in it, but phrasing it
    373                 # as a for loop doesn't cost much, really.  We have to find the
    374                 # fedid elements anyway.
    375                 for eid in [ f['fedid'] \
    376                         for f in self.state[k]['experimentID']\
    377                             if f.has_key('fedid') ]:
    378                     self.auth.set_attribute(self.state[k]['owner'], eid)
    379             except KeyError, e:
    380                 self.log.warning("[read_state]: State ownership or identity " +\
    381                         "misformatted in %s: %s" % (self.state_filename, e))
     353        """
     354        Read a new copy of experiment state.  Old state is overwritten.
     355
     356        State format is a simple pickling of the state dictionary.
     357        """
     358        try:
     359            f = open(self.state_filename, "r")
     360            self.state = pickle.load(f)
     361            self.log.debug("[read_state]: Read state from %s" % \
     362                    self.state_filename)
     363        except IOError, e:
     364            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
     365                    % (self.state_filename, e))
     366        except pickle.UnpicklingError, e:
     367            self.log.warning(("[read_state]: No saved state: " + \
     368                    "Unpickling failed: %s") % e)
     369       
     370        for k in self.state.keys():
     371            try:
     372                # This list should only have one element in it, but phrasing it
     373                # as a for loop doesn't cost much, really.  We have to find the
     374                # fedid elements anyway.
     375                for eid in [ f['fedid'] \
     376                        for f in self.state[k]['experimentID']\
     377                            if f.has_key('fedid') ]:
     378                    self.auth.set_attribute(self.state[k]['owner'], eid)
     379            except KeyError, e:
     380                self.log.warning("[read_state]: State ownership or identity " +\
     381                        "misformatted in %s: %s" % (self.state_filename, e))
    382382
    383383
    384384    def read_accessdb(self, accessdb_file):
    385         """
    386         Read the mapping from fedids that can create experiments to their name
    387         in the 3-level access namespace.  All will be asserted from this
    388         testbed and can include the local username and porject that will be
    389         asserted on their behalf by this fedd.  Each fedid is also added to the
    390         authorization system with the "create" attribute.
    391         """
    392         self.accessdb = {}
    393         # These are the regexps for parsing the db
    394         name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
    395         project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
    396                 "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
    397         user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
    398                 "\s*->\s*(" + name_expr + ")\s*$")
    399         lineno = 0
    400 
    401         # Parse the mappings and store in self.authdb, a dict of
    402         # fedid -> (proj, user)
    403         try:
    404             f = open(accessdb_file, "r")
    405             for line in f:
    406                 lineno += 1
    407                 line = line.strip()
    408                 if len(line) == 0 or line.startswith('#'):
    409                     continue
    410                 m = project_line.match(line)
    411                 if m:
    412                     fid = fedid(hexstr=m.group(1))
    413                     project, user = m.group(2,3)
    414                     if not self.accessdb.has_key(fid):
    415                         self.accessdb[fid] = []
    416                     self.accessdb[fid].append((project, user))
    417                     continue
    418 
    419                 m = user_line.match(line)
    420                 if m:
    421                     fid = fedid(hexstr=m.group(1))
    422                     project = None
    423                     user = m.group(2)
    424                     if not self.accessdb.has_key(fid):
    425                         self.accessdb[fid] = []
    426                     self.accessdb[fid].append((project, user))
    427                     continue
    428                 self.log.warn("[experiment_control] Error parsing access " +\
    429                         "db %s at line %d" %  (accessdb_file, lineno))
    430         except IOError:
    431             raise service_error(service_error.internal,
    432                     "Error opening/reading %s as experiment " +\
    433                             "control accessdb" %  accessdb_file)
    434         f.close()
    435 
    436         # Initialize the authorization attributes
    437         for fid in self.accessdb.keys():
    438             self.auth.set_attribute(fid, 'create')
     385        """
     386        Read the mapping from fedids that can create experiments to their name
     387        in the 3-level access namespace.  All will be asserted from this
     388        testbed and can include the local username and porject that will be
     389        asserted on their behalf by this fedd.  Each fedid is also added to the
     390        authorization system with the "create" attribute.
     391        """
     392        self.accessdb = {}
     393        # These are the regexps for parsing the db
     394        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
     395        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
     396                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
     397        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
     398                "\s*->\s*(" + name_expr + ")\s*$")
     399        lineno = 0
     400
     401        # Parse the mappings and store in self.authdb, a dict of
     402        # fedid -> (proj, user)
     403        try:
     404            f = open(accessdb_file, "r")
     405            for line in f:
     406                lineno += 1
     407                line = line.strip()
     408                if len(line) == 0 or line.startswith('#'):
     409                    continue
     410                m = project_line.match(line)
     411                if m:
     412                    fid = fedid(hexstr=m.group(1))
     413                    project, user = m.group(2,3)
     414                    if not self.accessdb.has_key(fid):
     415                        self.accessdb[fid] = []
     416                    self.accessdb[fid].append((project, user))
     417                    continue
     418
     419                m = user_line.match(line)
     420                if m:
     421                    fid = fedid(hexstr=m.group(1))
     422                    project = None
     423                    user = m.group(2)
     424                    if not self.accessdb.has_key(fid):
     425                        self.accessdb[fid] = []
     426                    self.accessdb[fid].append((project, user))
     427                    continue
     428                self.log.warn("[experiment_control] Error parsing access " +\
     429                        "db %s at line %d" %  (accessdb_file, lineno))
     430        except IOError:
     431            raise service_error(service_error.internal,
     432                    "Error opening/reading %s as experiment " +\
     433                            "control accessdb" %  accessdb_file)
     434        f.close()
     435
     436        # Initialize the authorization attributes
     437        for fid in self.accessdb.keys():
     438            self.auth.set_attribute(fid, 'create')
    439439
    440440    def read_mapdb(self, file):
    441         """
    442         Read a simple colon separated list of mappings for the
    443         label-to-testbed-URL mappings.  Clears or creates self.tbmap.
    444         """
    445 
    446         self.tbmap = { }
    447         lineno =0
    448         try:
    449             f = open(file, "r")
    450             for line in f:
    451                 lineno += 1
    452                 line = line.strip()
    453                 if line.startswith('#') or len(line) == 0:
    454                     continue
    455                 try:
    456                     label, url = line.split(':', 1)
    457                     self.tbmap[label] = url
    458                 except ValueError, e:
    459                     self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
    460                             "map db: %s %s" % (lineno, line, e))
    461         except IOError, e:
    462             self.log.warning("[read_mapdb]: No saved map database: Can't " +\
    463                     "open %s: %s" % (file, e))
    464         f.close()
     441        """
     442        Read a simple colon separated list of mappings for the
     443        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
     444        """
     445
     446        self.tbmap = { }
     447        lineno =0
     448        try:
     449            f = open(file, "r")
     450            for line in f:
     451                lineno += 1
     452                line = line.strip()
     453                if line.startswith('#') or len(line) == 0:
     454                    continue
     455                try:
     456                    label, url = line.split(':', 1)
     457                    self.tbmap[label] = url
     458                except ValueError, e:
     459                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
     460                            "map db: %s %s" % (lineno, line, e))
     461        except IOError, e:
     462            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
     463                    "open %s: %s" % (file, e))
     464        f.close()
    465465
    466466    def scp_file(self, file, user, host, dest=""):
    467         """
    468         scp a file to the remote host.  If debug is set the action is only
    469         logged.
    470         """
    471 
    472         scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes',
    473                 '-o', 'StrictHostKeyChecking yes', '-i',
    474                 self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
    475         rv = 0
    476 
    477         try:
    478             dnull = open("/dev/null", "w")
    479         except IOError:
    480             self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
    481             dnull = Null
    482 
    483         self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
    484         if not self.debug:
    485             rv = call(scp_cmd, stdout=dnull, stderr=dnull)
    486 
    487         return rv == 0
     467        """
     468        scp a file to the remote host.  If debug is set the action is only
     469        logged.
     470        """
     471
     472        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes',
     473                '-o', 'StrictHostKeyChecking yes', '-i',
     474                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
     475        rv = 0
     476
     477        try:
     478            dnull = open("/dev/null", "w")
     479        except IOError:
     480            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
     481            dnull = Null
     482
     483        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
     484        if not self.debug:
     485            rv = call(scp_cmd, stdout=dnull, stderr=dnull)
     486
     487        return rv == 0
    488488
    489489    def ssh_cmd(self, user, host, cmd, wname=None):
    490         """
    491         Run a remote command on host as user.  If debug is set, the action is
    492         only logged.
    493         """
    494         sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
    495                 "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
    496                 (self.ssh_exec, self.ssh_privkey_file,
    497                         user, host, cmd)
    498 
    499         try:
    500             dnull = open("/dev/null", "r")
    501         except IOError:
    502             self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
    503             dnull = Null
    504 
    505         self.log.debug("[ssh_cmd]: %s" % sh_str)
    506         if not self.debug:
    507             if dnull:
    508                 sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
    509             else:
    510                 sub = Popen(sh_str, shell=True)
    511             return sub.wait() == 0
    512         else:
    513             return True
     490        """
     491        Run a remote command on host as user.  If debug is set, the action is
     492        only logged.
     493        """
     494        sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
     495                "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
     496                (self.ssh_exec, self.ssh_privkey_file,
     497                        user, host, cmd)
     498
     499        try:
     500            dnull = open("/dev/null", "r")
     501        except IOError:
     502            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
     503            dnull = Null
     504
     505        self.log.debug("[ssh_cmd]: %s" % sh_str)
     506        if not self.debug:
     507            if dnull:
     508                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
     509            else:
     510                sub = Popen(sh_str, shell=True)
     511            return sub.wait() == 0
     512        else:
     513            return True
    514514
    515515    def ship_configs(self, host, user, src_dir, dest_dir):
    516         """
    517         Copy federant-specific configuration files to the federant.
    518         """
    519         if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
    520             return False
    521         if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
    522             return False
    523 
    524         for f in os.listdir(src_dir):
    525             if os.path.isdir(f):
    526                 if not self.ship_configs(host, user, "%s/%s" % (src_dir, f),
    527                         "%s/%s" % (dest_dir, f)):
    528                     return False
    529             else:
    530                 if not self.scp_file("%s/%s" % (src_dir, f),
    531                         user, host, dest_dir):
    532                     return False
    533         return True
     516        """
     517        Copy federant-specific configuration files to the federant.
     518        """
     519        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
     520            return False
     521        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
     522            return False
     523
     524        for f in os.listdir(src_dir):
     525            if os.path.isdir(f):
     526                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f),
     527                        "%s/%s" % (dest_dir, f)):
     528                    return False
     529            else:
     530                if not self.scp_file("%s/%s" % (src_dir, f),
     531                        user, host, dest_dir):
     532                    return False
     533        return True
    534534
    535535    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
    536         """
    537         Start a sub-experiment on a federant.
    538 
    539         Get the current state, modify or create as appropriate, ship data and
    540         configs and start the experiment.  There are small ordering differences
    541         based on the initial state of the sub-experiment.
    542         """
    543         # ops node in the federant
    544         host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
    545         user = tbparams[tb]['user']     # federant user
    546         pid = tbparams[tb]['project']   # federant project
    547         # XXX
    548         base_confs = ( "hosts",)
    549         tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
    550         # command to test experiment state
    551         expinfo_exec = "/usr/testbed/bin/expinfo" 
    552         # Configuration directories on the remote machine
    553         proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
    554         tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
    555         rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
    556         # Regular expressions to parse the expinfo response
    557         state_re = re.compile("State:\s+(\w+)")
    558         no_exp_re = re.compile("^No\s+such\s+experiment")
    559         state = None    # Experiment state parsed from expinfo
    560         # The expinfo ssh command.  Note the identity restriction to use only
    561         # the identity provided in the pubkey given.
    562         cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o',
    563                 'StrictHostKeyChecking yes', '-i',
    564                 self.ssh_privkey_file, "%s@%s" % (user, host),
    565                 expinfo_exec, pid, eid]
    566 
    567         # Get status
    568         self.log.debug("[start_segment]: %s"% " ".join(cmd))
    569         dev_null = None
    570         try:
    571             dev_null = open("/dev/null", "a")
    572         except IOError, e:
    573             self.log.error("[start_segment]: can't open /dev/null: %s" %e)
    574 
    575         if self.debug:
    576             state = 'swapped'
    577             rv = 0
    578         else:
    579             status = Popen(cmd, stdout=PIPE, stderr=dev_null)
    580             for line in status.stdout:
    581                 m = state_re.match(line)
    582                 if m: state = m.group(1)
    583                 else:
    584                     m = no_exp_re.match(line)
    585                     if m: state = "none"
    586             rv = status.wait()
    587 
    588         # If the experiment is not present the subcommand returns a non-zero
    589         # return value.  If we successfully parsed a "none" outcome, ignore the
    590         # return code.
    591         if rv != 0 and state != 'none':
    592             raise service_error(service_error.internal,
    593                     "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
    594 
    595         if state not in ('active', 'swapped', 'none'):
    596             self.log.debug("[start_segment]:unknown state %s" % state)
    597             return False
    598 
    599         self.log.debug("[start_segment]: %s: %s" % (tb, state))
    600         self.log.info("[start_segment]:transferring experiment to %s" % tb)
    601 
    602         if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
    603             return False
    604         # Clear the federation config dirs
    605         if not self.ssh_cmd(user, host,
    606                 "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
    607             return False
    608         # Clear and create the tarfiles and rpm directories
    609         for d in (tarfiles_dir, rpms_dir):
    610             if not self.ssh_cmd(user, host,
    611                     "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
    612                 return False
    613             if not self.ssh_cmd(user, host, "mkdir -p %s" % d,
    614                     "create tarfiles"):
    615                 return False
    616        
    617         if state == 'none':
    618             # Create a null copy of the experiment so that we capture any logs
    619             # there if the modify fails.  Emulab software discards the logs
    620             # from a failed startexp
    621             if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
    622                 return False
    623             self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
    624             if not self.ssh_cmd(user, host,
    625                     "/usr/testbed/bin/startexp -i -f -w -p %s -e %s null.tcl" \
    626                             % (pid, eid), "startexp"):
    627                 return False
    628        
    629         # Create the federation config dirs
    630         if not self.ssh_cmd(user, host,
    631                 "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
    632             return False
    633         for f in base_confs:
    634             if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
    635                     "%s/%s" % (proj_dir, f)):
    636                 return False
    637         if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
    638                 proj_dir):
    639             return False
    640         if os.path.isdir("%s/tarfiles" % tmpdir):
    641             if not self.ship_configs(host, user,
    642                     "%s/tarfiles" % tmpdir, tarfiles_dir):
    643                 return False
    644         if os.path.isdir("%s/rpms" % tmpdir):
    645             if not self.ship_configs(host, user,
    646                     "%s/rpms" % tmpdir, tarfiles_dir):
    647                 return False
    648         # Stage the new configuration (active experiments will stay swapped in
    649         # now)
    650         self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
    651         if not self.ssh_cmd(user, host,
    652                 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
    653                         (pid, eid, tclfile),
    654                 "modexp"):
    655             return False
    656         # Active experiments are still swapped, this swaps the others in.
    657         if state != 'active':
    658             self.log.info("[start_segment]: Swapping %s in on %s" % \
    659                     (eid, tb))
    660             if not self.ssh_cmd(user, host,
    661                     "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
    662                     "swapexp"):
    663                 return False
    664         return True
     536        """
     537        Start a sub-experiment on a federant.
     538
     539        Get the current state, modify or create as appropriate, ship data and
     540        configs and start the experiment.  There are small ordering differences
     541        based on the initial state of the sub-experiment.
     542        """
     543        # ops node in the federant
     544        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
     545        user = tbparams[tb]['user']     # federant user
     546        pid = tbparams[tb]['project']   # federant project
     547        # XXX
     548        base_confs = ( "hosts",)
     549        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
     550        # command to test experiment state
     551        expinfo_exec = "/usr/testbed/bin/expinfo" 
     552        # Configuration directories on the remote machine
     553        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
     554        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
     555        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
     556        # Regular expressions to parse the expinfo response
     557        state_re = re.compile("State:\s+(\w+)")
     558        no_exp_re = re.compile("^No\s+such\s+experiment")
     559        state = None    # Experiment state parsed from expinfo
     560        # The expinfo ssh command.  Note the identity restriction to use only
     561        # the identity provided in the pubkey given.
     562        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o',
     563                'StrictHostKeyChecking yes', '-i',
     564                self.ssh_privkey_file, "%s@%s" % (user, host),
     565                expinfo_exec, pid, eid]
     566
     567        # Get status
     568        self.log.debug("[start_segment]: %s"% " ".join(cmd))
     569        dev_null = None
     570        try:
     571            dev_null = open("/dev/null", "a")
     572        except IOError, e:
     573            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
     574
     575        if self.debug:
     576            state = 'swapped'
     577            rv = 0
     578        else:
     579            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
     580            for line in status.stdout:
     581                m = state_re.match(line)
     582                if m: state = m.group(1)
     583                else:
     584                    m = no_exp_re.match(line)
     585                    if m: state = "none"
     586            rv = status.wait()
     587
     588        # If the experiment is not present the subcommand returns a non-zero
     589        # return value.  If we successfully parsed a "none" outcome, ignore the
     590        # return code.
     591        if rv != 0 and state != 'none':
     592            raise service_error(service_error.internal,
     593                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
     594
     595        if state not in ('active', 'swapped', 'none'):
     596            self.log.debug("[start_segment]:unknown state %s" % state)
     597            return False
     598
     599        self.log.debug("[start_segment]: %s: %s" % (tb, state))
     600        self.log.info("[start_segment]:transferring experiment to %s" % tb)
     601
     602        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
     603            return False
     604        # Clear the federation config dirs
     605        if not self.ssh_cmd(user, host,
     606                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
     607            return False
     608        # Clear and create the tarfiles and rpm directories
     609        for d in (tarfiles_dir, rpms_dir):
     610            if not self.ssh_cmd(user, host,
     611                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
     612                return False
     613            if not self.ssh_cmd(user, host, "mkdir -p %s" % d,
     614                    "create tarfiles"):
     615                return False
     616       
     617        if state == 'none':
     618            # Create a null copy of the experiment so that we capture any logs
     619            # there if the modify fails.  Emulab software discards the logs
     620            # from a failed startexp
     621            if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
     622                return False
     623            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
     624            if not self.ssh_cmd(user, host,
     625                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s null.tcl" \
     626                            % (pid, eid), "startexp"):
     627                return False
     628       
     629        # Create the federation config dirs
     630        if not self.ssh_cmd(user, host,
     631                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
     632            return False
     633        for f in base_confs:
     634            if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
     635                    "%s/%s" % (proj_dir, f)):
     636                return False
     637        if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
     638                proj_dir):
     639            return False
     640        if os.path.isdir("%s/tarfiles" % tmpdir):
     641            if not self.ship_configs(host, user,
     642                    "%s/tarfiles" % tmpdir, tarfiles_dir):
     643                return False
     644        if os.path.isdir("%s/rpms" % tmpdir):
     645            if not self.ship_configs(host, user,
     646                    "%s/rpms" % tmpdir, tarfiles_dir):
     647                return False
     648        # Stage the new configuration (active experiments will stay swapped in
     649        # now)
     650        self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
     651        if not self.ssh_cmd(user, host,
     652                "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
     653                        (pid, eid, tclfile),
     654                "modexp"):
     655            return False
     656        # Active experiments are still swapped, this swaps the others in.
     657        if state != 'active':
     658            self.log.info("[start_segment]: Swapping %s in on %s" % \
     659                    (eid, tb))
     660            if not self.ssh_cmd(user, host,
     661                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
     662                    "swapexp"):
     663                return False
     664        return True
    665665
    666666    def stop_segment(self, tb, eid, tbparams):
    667         """
    668         Stop a sub experiment by calling swapexp on the federant
    669         """
    670         user = tbparams[tb]['user']
    671         host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
    672         pid = tbparams[tb]['project']
    673 
    674         self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
    675         return self.ssh_cmd(user, host,
    676                 "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
    677 
    678        
     667        """
     668        Stop a sub experiment by calling swapexp on the federant
     669        """
     670        user = tbparams[tb]['user']
     671        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
     672        pid = tbparams[tb]['project']
     673
     674        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
     675        return self.ssh_cmd(user, host,
     676                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
     677
     678       
    679679    def generate_ssh_keys(self, dest, type="rsa" ):
    680         """
    681         Generate a set of keys for the gateways to use to talk.
    682 
    683         Keys are of type type and are stored in the required dest file.
    684         """
    685         valid_types = ("rsa", "dsa")
    686         t = type.lower();
    687         if t not in valid_types: raise ValueError
    688         cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
    689 
    690         try:
    691             trace = open("/dev/null", "w")
    692         except IOError:
    693             raise service_error(service_error.internal,
    694                     "Cannot open /dev/null??");
    695 
    696         # May raise CalledProcessError
    697         self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
    698         rv = call(cmd, stdout=trace, stderr=trace)
    699         if rv != 0:
    700             raise service_error(service_error.internal,
    701                     "Cannot generate nonce ssh keys.  %s return code %d" \
    702                             % (self.ssh_keygen, rv))
     680        """
     681        Generate a set of keys for the gateways to use to talk.
     682
     683        Keys are of type type and are stored in the required dest file.
     684        """
     685        valid_types = ("rsa", "dsa")
     686        t = type.lower();
     687        if t not in valid_types: raise ValueError
     688        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
     689
     690        try:
     691            trace = open("/dev/null", "w")
     692        except IOError:
     693            raise service_error(service_error.internal,
     694                    "Cannot open /dev/null??");
     695
     696        # May raise CalledProcessError
     697        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
     698        rv = call(cmd, stdout=trace, stderr=trace)
     699        if rv != 0:
     700            raise service_error(service_error.internal,
     701                    "Cannot generate nonce ssh keys.  %s return code %d" \
     702                            % (self.ssh_keygen, rv))
    703703
    704704    def gentopo(self, str):
    705         """
    706         Generate the topology dtat structure from the splitter's XML
    707         representation of it.
    708 
    709         The topology XML looks like:
    710             <experiment>
    711                 <nodes>
    712                     <node><vname></vname><ips>ip1:ip2</ips></node>
    713                 </nodes>
    714                 <lans>
    715                     <lan>
    716                         <vname></vname><vnode></vnode><ip></ip>
    717                         <bandwidth></bandwidth><member>node:port</member>
    718                     </lan>
    719                 </lans>
    720         """
    721         class topo_parse:
    722             """
    723             Parse the topology XML and create the dats structure.
    724             """
    725             def __init__(self):
    726                 # Typing of the subelements for data conversion
    727                 self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
    728                 self.int_subelements = ( 'bandwidth',)
    729                 self.float_subelements = ( 'delay',)
    730                 # The final data structure
    731                 self.nodes = [ ]
    732                 self.lans =  [ ]
    733                 self.topo = { \
    734                         'node': self.nodes,\
    735                         'lan' : self.lans,\
    736                     }
    737                 self.element = { }  # Current element being created
    738                 self.chars = ""     # Last text seen
    739 
    740             def end_element(self, name):
    741                 # After each sub element the contents is added to the current
    742                 # element or to the appropriate list.
    743                 if name == 'node':
    744                     self.nodes.append(self.element)
    745                     self.element = { }
    746                 elif name == 'lan':
    747                     self.lans.append(self.element)
    748                     self.element = { }
    749                 elif name in self.str_subelements:
    750                     self.element[name] = self.chars
    751                     self.chars = ""
    752                 elif name in self.int_subelements:
    753                     self.element[name] = int(self.chars)
    754                     self.chars = ""
    755                 elif name in self.float_subelements:
    756                     self.element[name] = float(self.chars)
    757                     self.chars = ""
    758 
    759             def found_chars(self, data):
    760                 self.chars += data.rstrip()
    761 
    762 
    763         tp = topo_parse();
    764         parser = xml.parsers.expat.ParserCreate()
    765         parser.EndElementHandler = tp.end_element
    766         parser.CharacterDataHandler = tp.found_chars
    767 
    768         parser.Parse(str)
    769 
    770         return tp.topo
    771        
     705        """
     706        Generate the topology dtat structure from the splitter's XML
     707        representation of it.
     708
     709        The topology XML looks like:
     710            <experiment>
     711                <nodes>
     712                    <node><vname></vname><ips>ip1:ip2</ips></node>
     713                </nodes>
     714                <lans>
     715                    <lan>
     716                        <vname></vname><vnode></vnode><ip></ip>
     717                        <bandwidth></bandwidth><member>node:port</member>
     718                    </lan>
     719                </lans>
     720        """
     721        class topo_parse:
     722            """
     723            Parse the topology XML and create the dats structure.
     724            """
     725            def __init__(self):
     726                # Typing of the subelements for data conversion
     727                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
     728                self.int_subelements = ( 'bandwidth',)
     729                self.float_subelements = ( 'delay',)
     730                # The final data structure
     731                self.nodes = [ ]
     732                self.lans =  [ ]
     733                self.topo = { \
     734                        'node': self.nodes,\
     735                        'lan' : self.lans,\
     736                    }
     737                self.element = { }  # Current element being created
     738                self.chars = ""     # Last text seen
     739
     740            def end_element(self, name):
     741                # After each sub element the contents is added to the current
     742                # element or to the appropriate list.
     743                if name == 'node':
     744                    self.nodes.append(self.element)
     745                    self.element = { }
     746                elif name == 'lan':
     747                    self.lans.append(self.element)
     748                    self.element = { }
     749                elif name in self.str_subelements:
     750                    self.element[name] = self.chars
     751                    self.chars = ""
     752                elif name in self.int_subelements:
     753                    self.element[name] = int(self.chars)
     754                    self.chars = ""
     755                elif name in self.float_subelements:
     756                    self.element[name] = float(self.chars)
     757                    self.chars = ""
     758
     759            def found_chars(self, data):
     760                self.chars += data.rstrip()
     761
     762
     763        tp = topo_parse();
     764        parser = xml.parsers.expat.ParserCreate()
     765        parser.EndElementHandler = tp.end_element
     766        parser.CharacterDataHandler = tp.found_chars
     767
     768        parser.Parse(str)
     769
     770        return tp.topo
     771       
    772772
    773773    def genviz(self, topo):
    774         """
    775         Generate the visualization the virtual topology
    776         """
    777 
    778         neato = "/usr/local/bin/neato"
    779         # These are used to parse neato output and to create the visualization
    780         # file.
    781         vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
    782         vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
    783                 "%s</type></node>"
    784 
    785         try:
    786             # Node names
    787             nodes = [ n['vname'] for n in topo['node'] ]
    788             topo_lans = topo['lan']
    789         except KeyError:
    790             raise service_error(service_error.internal, "Bad topology")
    791 
    792         lans = { }
    793         links = { }
    794 
    795         # Walk through the virtual topology, organizing the connections into
    796         # 2-node connections (links) and more-than-2-node connections (lans).
    797         # When a lan is created, it's added to the list of nodes (there's a
    798         # node in the visualization for the lan).
    799         for l in topo_lans:
    800             if links.has_key(l['vname']):
    801                 if len(links[l['vname']]) < 2:
    802                     links[l['vname']].append(l['vnode'])
    803                 else:
    804                     nodes.append(l['vname'])
    805                     lans[l['vname']] = links[l['vname']]
    806                     del links[l['vname']]
    807                     lans[l['vname']].append(l['vnode'])
    808             elif lans.has_key(l['vname']):
    809                 lans[l['vname']].append(l['vnode'])
    810             else:
    811                 links[l['vname']] = [ l['vnode'] ]
    812 
    813 
    814         # Open up a temporary file for dot to turn into a visualization
    815         try:
    816             df, dotname = tempfile.mkstemp()
    817             dotfile = os.fdopen(df, 'w')
    818         except IOError:
    819             raise service_error(service_error.internal,
    820                     "Failed to open file in genviz")
    821 
    822         # Generate a dot/neato input file from the links, nodes and lans
    823         try:
    824             print >>dotfile, "graph G {"
    825             for n in nodes:
    826                 print >>dotfile, '\t"%s"' % n
    827             for l in links.keys():
    828                 print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
    829             for l in lans.keys():
    830                 for n in lans[l]:
    831                     print >>dotfile, '\t "%s" -- "%s"' % (n,l)
    832             print >>dotfile, "}"
    833             dotfile.close()
    834         except TypeError:
    835             raise service_error(service_error.internal,
    836                     "Single endpoint link in vtopo")
    837         except IOError:
    838             raise service_error(service_error.internal, "Cannot write dot file")
    839 
    840         # Use dot to create a visualization
    841         dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
    842                 '-Gpack=true', dotname], stdout=PIPE)
    843 
    844         # Translate dot to vis format
    845         vis_nodes = [ ]
    846         vis = { 'node': vis_nodes }
    847         for line in dot.stdout:
    848             m = vis_re.match(line)
    849             if m:
    850                 vn = m.group(1)
    851                 vis_node = {'name': vn, \
    852                         'x': float(m.group(2)),\
    853                         'y' : float(m.group(3)),\
    854                     }
    855                 if vn in links.keys() or vn in lans.keys():
    856                     vis_node['type'] = 'lan'
    857                 else:
    858                     vis_node['type'] = 'node'
    859                 vis_nodes.append(vis_node)
    860         rv = dot.wait()
    861 
    862         os.remove(dotname)
    863         if rv == 0 : return vis
    864         else: return None
     774        """
     775        Generate the visualization the virtual topology
     776        """
     777
     778        neato = "/usr/local/bin/neato"
     779        # These are used to parse neato output and to create the visualization
     780        # file.
     781        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
     782        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
     783                "%s</type></node>"
     784
     785        try:
     786            # Node names
     787            nodes = [ n['vname'] for n in topo['node'] ]
     788            topo_lans = topo['lan']
     789        except KeyError:
     790            raise service_error(service_error.internal, "Bad topology")
     791
     792        lans = { }
     793        links = { }
     794
     795        # Walk through the virtual topology, organizing the connections into
     796        # 2-node connections (links) and more-than-2-node connections (lans).
     797        # When a lan is created, it's added to the list of nodes (there's a
     798        # node in the visualization for the lan).
     799        for l in topo_lans:
     800            if links.has_key(l['vname']):
     801                if len(links[l['vname']]) < 2:
     802                    links[l['vname']].append(l['vnode'])
     803                else:
     804                    nodes.append(l['vname'])
     805                    lans[l['vname']] = links[l['vname']]
     806                    del links[l['vname']]
     807                    lans[l['vname']].append(l['vnode'])
     808            elif lans.has_key(l['vname']):
     809                lans[l['vname']].append(l['vnode'])
     810            else:
     811                links[l['vname']] = [ l['vnode'] ]
     812
     813
     814        # Open up a temporary file for dot to turn into a visualization
     815        try:
     816            df, dotname = tempfile.mkstemp()
     817            dotfile = os.fdopen(df, 'w')
     818        except IOError:
     819            raise service_error(service_error.internal,
     820                    "Failed to open file in genviz")
     821
     822        # Generate a dot/neato input file from the links, nodes and lans
     823        try:
     824            print >>dotfile, "graph G {"
     825            for n in nodes:
     826                print >>dotfile, '\t"%s"' % n
     827            for l in links.keys():
     828                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
     829            for l in lans.keys():
     830                for n in lans[l]:
     831                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
     832            print >>dotfile, "}"
     833            dotfile.close()
     834        except TypeError:
     835            raise service_error(service_error.internal,
     836                    "Single endpoint link in vtopo")
     837        except IOError:
     838            raise service_error(service_error.internal, "Cannot write dot file")
     839
     840        # Use dot to create a visualization
     841        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
     842                '-Gpack=true', dotname], stdout=PIPE)
     843
     844        # Translate dot to vis format
     845        vis_nodes = [ ]
     846        vis = { 'node': vis_nodes }
     847        for line in dot.stdout:
     848            m = vis_re.match(line)
     849            if m:
     850                vn = m.group(1)
     851                vis_node = {'name': vn, \
     852                        'x': float(m.group(2)),\
     853                        'y' : float(m.group(3)),\
     854                    }
     855                if vn in links.keys() or vn in lans.keys():
     856                    vis_node['type'] = 'lan'
     857                else:
     858                    vis_node['type'] = 'node'
     859                vis_nodes.append(vis_node)
     860        rv = dot.wait()
     861
     862        os.remove(dotname)
     863        if rv == 0 : return vis
     864        else: return None
    865865
    866866    def get_access(self, tb, nodes, user, tbparam, master, export_project,
    867             access_user):
    868         """
    869         Get access to testbed through fedd and set the parameters for that tb
    870         """
    871         uri = self.tbmap.get(tb, None)
    872         if not uri:
    873             raise service_error(serice_error.server_config,
    874                     "Unknown testbed: %s" % tb)
    875 
    876         # currently this lumps all users into one service access group
    877         service_keys = [ a for u in user \
    878                 for a in u.get('access', []) \
    879                     if a.has_key('sshPubkey')]
    880 
    881         if len(service_keys) == 0:
    882             raise service_error(service_error.req,
    883                     "Must have at least one SSH pubkey for services")
    884 
    885 
    886         for p, u in access_user:
    887             self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
    888                     "to %s") %  ((p or "None"), u, uri))
    889 
    890             if p:
    891                 # Request with user and project specified
    892                 req = {\
    893                         'destinationTestbed' : { 'uri' : uri },
    894                         'project': {
    895                             'name': {'localname': p},
    896                             'user': [ {'userID': { 'localname': u } } ],
    897                             },
    898                         'user':  user,
    899                         'allocID' : { 'localname': 'test' },
    900                         'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
    901                         'serviceAccess' : service_keys
    902                     }
    903             else:
    904                 # Request with only user specified
    905                 req = {\
    906                         'destinationTestbed' : { 'uri' : uri },
    907                         'user':  [ {'userID': { 'localname': u } } ],
    908                         'allocID' : { 'localname': 'test' },
    909                         'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
    910                         'serviceAccess' : service_keys
    911                     }
    912 
    913             if tb == master:
    914                 # NB, the export_project parameter is a dict that includes
    915                 # the type
    916                 req['exportProject'] = export_project
    917 
    918             # node resources if any
    919             if nodes != None and len(nodes) > 0:
    920                 rnodes = [ ]
    921                 for n in nodes:
    922                     rn = { }
    923                     image, hw, count = n.split(":")
    924                     if image: rn['image'] = [ image ]
    925                     if hw: rn['hardware'] = [ hw ]
    926                     if count and int(count) >0 : rn['count'] = int(count)
    927                     rnodes.append(rn)
    928                 req['resources']= { }
    929                 req['resources']['node'] = rnodes
    930 
    931             try:
    932                 if self.local_access.has_key(uri):
    933                     # Local access call
    934                     req = { 'RequestAccessRequestBody' : req }
    935                     r = self.local_access[uri].RequestAccess(req,
    936                             fedid(file=self.cert_file))
    937                     r = { 'RequestAccessResponseBody' : r }
    938                 else:
    939                     r = self.call_RequestAccess(uri, req,
    940                             self.cert_file, self.cert_pwd, self.trusted_certs)
    941             except service_error, e:
    942                 if e.code == service_error.access:
    943                     self.log.debug("[get_access] Access denied")
    944                     r = None
    945                     continue
    946                 else:
    947                     raise e
    948 
    949             if r.has_key('RequestAccessResponseBody'):
    950                 # Through to here we have a valid response, not a fault.
    951                 # Access denied is a fault, so something better or worse than
    952                 # access denied has happened.
    953                 r = r['RequestAccessResponseBody']
    954                 self.log.debug("[get_access] Access granted")
    955                 break
    956             else:
    957                 raise service_error(service_error.protocol,
    958                         "Bad proxy response")
    959        
    960         if not r:
    961             raise service_error(service_error.access,
    962                     "Access denied by %s (%s)" % (tb, uri))
    963 
    964         e = r['emulab']
    965         p = e['project']
    966         tbparam[tb] = {
    967                 "boss": e['boss'],
    968                 "host": e['ops'],
    969                 "domain": e['domain'],
    970                 "fs": e['fileServer'],
    971                 "eventserver": e['eventServer'],
    972                 "project": unpack_id(p['name']),
    973                 "emulab" : e,
    974                 "allocID" : r['allocID'],
    975                 }
    976         # Make the testbed name be the label the user applied
    977         p['testbed'] = {'localname': tb }
    978 
    979         for u in p['user']:
    980             role = u.get('role', None)
    981             if role == 'experimentCreation':
    982                 tbparam[tb]['user'] = unpack_id(u['userID'])
    983                 break
    984         else:
    985             raise service_error(service_error.internal,
    986                     "No createExperimentUser from %s" %tb)
    987 
    988         # Add attributes to barameter space.  We don't allow attributes to
    989         # overlay any parameters already installed.
    990         for a in e['fedAttr']:
    991             try:
    992                 if a['attribute'] and isinstance(a['attribute'], basestring)\
    993                         and not tbparam[tb].has_key(a['attribute'].lower()):
    994                     tbparam[tb][a['attribute'].lower()] = a['value']
    995             except KeyError:
    996                 self.log.error("Bad attribute in response: %s" % a)
    997        
     867            access_user):
     868        """
     869        Get access to testbed through fedd and set the parameters for that tb
     870        """
     871        uri = self.tbmap.get(tb, None)
     872        if not uri:
     873            raise service_error(serice_error.server_config,
     874                    "Unknown testbed: %s" % tb)
     875
     876        # currently this lumps all users into one service access group
     877        service_keys = [ a for u in user \
     878                for a in u.get('access', []) \
     879                    if a.has_key('sshPubkey')]
     880
     881        if len(service_keys) == 0:
     882            raise service_error(service_error.req,
     883                    "Must have at least one SSH pubkey for services")
     884
     885
     886        for p, u in access_user:
     887            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
     888                    "to %s") %  ((p or "None"), u, uri))
     889
     890            if p:
     891                # Request with user and project specified
     892                req = {\
     893                        'destinationTestbed' : { 'uri' : uri },
     894                        'project': {
     895                            'name': {'localname': p},
     896                            'user': [ {'userID': { 'localname': u } } ],
     897                            },
     898                        'user':  user,
     899                        'allocID' : { 'localname': 'test' },
     900                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
     901                        'serviceAccess' : service_keys
     902                    }
     903            else:
     904                # Request with only user specified
     905                req = {\
     906                        'destinationTestbed' : { 'uri' : uri },
     907                        'user':  [ {'userID': { 'localname': u } } ],
     908                        'allocID' : { 'localname': 'test' },
     909                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
     910                        'serviceAccess' : service_keys
     911                    }
     912
     913            if tb == master:
     914                # NB, the export_project parameter is a dict that includes
     915                # the type
     916                req['exportProject'] = export_project
     917
     918            # node resources if any
     919            if nodes != None and len(nodes) > 0:
     920                rnodes = [ ]
     921                for n in nodes:
     922                    rn = { }
     923                    image, hw, count = n.split(":")
     924                    if image: rn['image'] = [ image ]
     925                    if hw: rn['hardware'] = [ hw ]
     926                    if count and int(count) >0 : rn['count'] = int(count)
     927                    rnodes.append(rn)
     928                req['resources']= { }
     929                req['resources']['node'] = rnodes
     930
     931            try:
     932                if self.local_access.has_key(uri):
     933                    # Local access call
     934                    req = { 'RequestAccessRequestBody' : req }
     935                    r = self.local_access[uri].RequestAccess(req,
     936                            fedid(file=self.cert_file))
     937                    r = { 'RequestAccessResponseBody' : r }
     938                else:
     939                    r = self.call_RequestAccess(uri, req,
     940                            self.cert_file, self.cert_pwd, self.trusted_certs)
     941            except service_error, e:
     942                if e.code == service_error.access:
     943                    self.log.debug("[get_access] Access denied")
     944                    r = None
     945                    continue
     946                else:
     947                    raise e
     948
     949            if r.has_key('RequestAccessResponseBody'):
     950                # Through to here we have a valid response, not a fault.
     951                # Access denied is a fault, so something better or worse than
     952                # access denied has happened.
     953                r = r['RequestAccessResponseBody']
     954                self.log.debug("[get_access] Access granted")
     955                break
     956            else:
     957                raise service_error(service_error.protocol,
     958                        "Bad proxy response")
     959       
     960        if not r:
     961            raise service_error(service_error.access,
     962                    "Access denied by %s (%s)" % (tb, uri))
     963
     964        e = r['emulab']
     965        p = e['project']
     966        tbparam[tb] = {
     967                "boss": e['boss'],
     968                "host": e['ops'],
     969                "domain": e['domain'],
     970                "fs": e['fileServer'],
     971                "eventserver": e['eventServer'],
     972                "project": unpack_id(p['name']),
     973                "emulab" : e,
     974                "allocID" : r['allocID'],
     975                }
     976        # Make the testbed name be the label the user applied
     977        p['testbed'] = {'localname': tb }
     978
     979        for u in p['user']:
     980            role = u.get('role', None)
     981            if role == 'experimentCreation':
     982                tbparam[tb]['user'] = unpack_id(u['userID'])
     983                break
     984        else:
     985            raise service_error(service_error.internal,
     986                    "No createExperimentUser from %s" %tb)
     987
     988        # Add attributes to barameter space.  We don't allow attributes to
     989        # overlay any parameters already installed.
     990        for a in e['fedAttr']:
     991            try:
     992                if a['attribute'] and isinstance(a['attribute'], basestring)\
     993                        and not tbparam[tb].has_key(a['attribute'].lower()):
     994                    tbparam[tb][a['attribute'].lower()] = a['value']
     995            except KeyError:
     996                self.log.error("Bad attribute in response: %s" % a)
     997       
    998998    def release_access(self, tb, aid):
    999         """
    1000         Release access to testbed through fedd
    1001         """
    1002 
    1003         uri = self.tbmap.get(tb, None)
    1004         if not uri:
    1005             raise service_error(serice_error.server_config,
    1006                     "Unknown testbed: %s" % tb)
    1007 
    1008         if self.local_access.has_key(uri):
    1009             resp = self.local_access[uri].ReleaseAccess(\
    1010                     { 'ReleaseAccessRequestBody' : {'allocID': aid},},
    1011                     fedid(file=self.cert_file))
    1012             resp = { 'ReleaseAccessResponseBody': resp }
    1013         else:
    1014             resp = self.call_ReleaseAccess(uri, {'allocID': aid},
    1015                     self.cert_file, self.cert_pwd, self.trusted_certs)
    1016 
    1017         # better error coding
     999        """
     1000        Release access to testbed through fedd
     1001        """
     1002
     1003        uri = self.tbmap.get(tb, None)
     1004        if not uri:
     1005            raise service_error(serice_error.server_config,
     1006                    "Unknown testbed: %s" % tb)
     1007
     1008        if self.local_access.has_key(uri):
     1009            resp = self.local_access[uri].ReleaseAccess(\
     1010                    { 'ReleaseAccessRequestBody' : {'allocID': aid},},
     1011                    fedid(file=self.cert_file))
     1012            resp = { 'ReleaseAccessResponseBody': resp }
     1013        else:
     1014            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
     1015                    self.cert_file, self.cert_pwd, self.trusted_certs)
     1016
     1017        # better error coding
    10181018
    10191019    def remote_splitter(self, uri, desc, master):
    10201020
    1021         req = {
    1022                 'description' : { 'ns2description': desc },
    1023                 'master': master,
    1024                 'include_fedkit': bool(self.fedkit),
    1025                 'include_gatewaykit': bool(self.gatewaykit)
    1026             }
    1027 
    1028         r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd,
    1029                 self.trusted_certs)
    1030 
    1031         if r.has_key('Ns2SplitResponseBody'):
    1032             r = r['Ns2SplitResponseBody']
    1033             if r.has_key('output'):
    1034                 return r['output'].splitlines()
    1035             else:
    1036                 raise service_error(service_error.protocol,
    1037                         "Bad splitter response (no output)")
    1038         else:
    1039             raise service_error(service_error.protocol, "Bad splitter response")
    1040        
     1021        req = {
     1022                'description' : { 'ns2description': desc },
     1023                'master': master,
     1024                'include_fedkit': bool(self.fedkit),
     1025                'include_gatewaykit': bool(self.gatewaykit)
     1026            }
     1027
     1028        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd,
     1029                self.trusted_certs)
     1030
     1031        if r.has_key('Ns2SplitResponseBody'):
     1032            r = r['Ns2SplitResponseBody']
     1033            if r.has_key('output'):
     1034                return r['output'].splitlines()
     1035            else:
     1036                raise service_error(service_error.protocol,
     1037                        "Bad splitter response (no output)")
     1038        else:
     1039            raise service_error(service_error.protocol, "Bad splitter response")
     1040       
    10411041    class current_testbed:
    1042         """
    1043         Object for collecting the current testbed description.  The testbed
    1044         description is saved to a file with the local testbed variables
    1045         subsittuted line by line.
    1046         """
    1047         def __init__(self, eid, tmpdir, fedkit, gatewaykit):
    1048             def tar_list_to_string(tl):
    1049                 if tl is None: return None
    1050 
    1051                 rv = ""
    1052                 for t in tl:
    1053                     rv += " %s PROJDIR/tarfiles/EID/%s" % \
    1054                             (t[0], os.path.basename(t[1]))
    1055                 return rv
    1056 
    1057 
    1058             self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
    1059             self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
    1060             self.current_testbed = None
    1061             self.testbed_file = None
    1062 
    1063             self.def_expstart = \
    1064                     "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
    1065             self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
    1066             self.def_gwstart = \
    1067                     "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
    1068             self.def_mgwstart = \
    1069                     "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
    1070             self.def_gwimage = "FBSD61-TUNNEL2";
    1071             self.def_gwtype = "pc";
    1072             self.def_mgwcmd = '# '
    1073             self.def_mgwcmdparams = ''
    1074             self.def_gwcmd = '# '
    1075             self.def_gwcmdparams = ''
    1076 
    1077             self.eid = eid
    1078             self.tmpdir = tmpdir
    1079             # Convert fedkit and gateway kit (which are lists of tuples) into a
    1080             # substituition string.
    1081             self.fedkit = tar_list_to_string(fedkit)
    1082             self.gatewaykit = tar_list_to_string(gatewaykit)
    1083 
    1084         def __call__(self, line, master, allocated, tbparams):
    1085             # Capture testbed topology descriptions
    1086             if self.current_testbed == None:
    1087                 m = self.begin_testbed.match(line)
    1088                 if m != None:
    1089                     self.current_testbed = m.group(1)
    1090                     if self.current_testbed == None:
    1091                         raise service_error(service_error.req,
    1092                                 "Bad request format (unnamed testbed)")
    1093                     allocated[self.current_testbed] = \
    1094                             allocated.get(self.current_testbed,0) + 1
    1095                     tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
    1096                     if not os.path.exists(tb_dir):
    1097                         try:
    1098                             os.mkdir(tb_dir)
    1099                         except IOError:
    1100                             raise service_error(service_error.internal,
    1101                                     "Cannot create %s" % tb_dir)
    1102                     try:
    1103                         self.testbed_file = open("%s/%s.%s.tcl" %
    1104                                 (tb_dir, self.eid, self.current_testbed), 'w')
    1105                     except IOError:
    1106                         self.testbed_file = None
    1107                     return True
    1108                 else: return False
    1109             else:
    1110                 m = self.end_testbed.match(line)
    1111                 if m != None:
    1112                     if m.group(1) != self.current_testbed:
    1113                         raise service_error(service_error.internal,
    1114                                 "Mismatched testbed markers!?")
    1115                     if self.testbed_file != None:
    1116                         self.testbed_file.close()
    1117                         self.testbed_file = None
    1118                     self.current_testbed = None
    1119                 elif self.testbed_file:
    1120                     # Substitute variables and put the line into the local
    1121                     # testbed file.
    1122                     gwtype = tbparams[self.current_testbed].get(\
    1123                             'connectortype', self.def_gwtype)
    1124                     gwimage = tbparams[self.current_testbed].get(\
    1125                             'connectorimage', self.def_gwimage)
    1126                     mgwstart = tbparams[self.current_testbed].get(\
    1127                             'masterconnectorstartcmd', self.def_mgwstart)
    1128                     mexpstart = tbparams[self.current_testbed].get(\
    1129                             'masternodestartcmd', self.def_mexpstart)
    1130                     gwstart = tbparams[self.current_testbed].get(\
    1131                             'slaveconnectorstartcmd', self.def_gwstart)
    1132                     expstart = tbparams[self.current_testbed].get(\
    1133                             'slavenodestartcmd', self.def_expstart)
    1134                     project = tbparams[self.current_testbed].get('project')
    1135                     gwcmd = tbparams[self.current_testbed].get(\
    1136                             'slaveconnectorcmd', self.def_gwcmd)
    1137                     gwcmdparams = tbparams[self.current_testbed].get(\
    1138                             'slaveconnectorcmdparams', self.def_gwcmdparams)
    1139                     mgwcmd = tbparams[self.current_testbed].get(\
    1140                             'masterconnectorcmd', self.def_gwcmd)
    1141                     mgwcmdparams = tbparams[self.current_testbed].get(\
    1142                             'masterconnectorcmdparams', self.def_gwcmdparams)
    1143                     line = re.sub("GWTYPE", gwtype, line)
    1144                     line = re.sub("GWIMAGE", gwimage, line)
    1145                     if self.current_testbed == master:
    1146                         line = re.sub("GWSTART", mgwstart, line)
    1147                         line = re.sub("EXPSTART", mexpstart, line)
    1148                         # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
    1149                         line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
    1150                         line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
    1151                     else:
    1152                         line = re.sub("GWSTART", gwstart, line)
    1153                         line = re.sub("EXPSTART", expstart, line)
    1154                         # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
    1155                         line = re.sub("GWCMDPARAMS", gwcmdparams, line)
    1156                         line = re.sub("(#\s*)?GWCMD", gwcmd, line)
    1157                     #These expansions contain EID and PROJDIR.  NB these are
    1158                     # local fedkit and gatewaykit, which are strings.
    1159                     if self.fedkit:
    1160                         line = re.sub("FEDKIT", self.fedkit, line)
    1161                     if self.gatewaykit:
    1162                         line = re.sub("GATEWAYKIT", self.gatewaykit, line)
    1163                     line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
    1164                     line = re.sub("PROJDIR", "/proj/%s/" % project, line)
    1165                     line = re.sub("EID", self.eid, line)
    1166                     line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
    1167                             (project, self.eid), line)
    1168                     print >>self.testbed_file, line
    1169                 return True
     1042        """
     1043        Object for collecting the current testbed description.  The testbed
     1044        description is saved to a file with the local testbed variables
     1045        subsittuted line by line.
     1046        """
     1047        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
     1048            def tar_list_to_string(tl):
     1049                if tl is None: return None
     1050
     1051                rv = ""
     1052                for t in tl:
     1053                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
     1054                            (t[0], os.path.basename(t[1]))
     1055                return rv
     1056
     1057
     1058            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
     1059            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
     1060            self.current_testbed = None
     1061            self.testbed_file = None
     1062
     1063            self.def_expstart = \
     1064                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
     1065            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
     1066            self.def_gwstart = \
     1067                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
     1068            self.def_mgwstart = \
     1069                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
     1070            self.def_gwimage = "FBSD61-TUNNEL2";
     1071            self.def_gwtype = "pc";
     1072            self.def_mgwcmd = '# '
     1073            self.def_mgwcmdparams = ''
     1074            self.def_gwcmd = '# '
     1075            self.def_gwcmdparams = ''
     1076
     1077            self.eid = eid
     1078            self.tmpdir = tmpdir
     1079            # Convert fedkit and gateway kit (which are lists of tuples) into a
     1080            # substituition string.
     1081            self.fedkit = tar_list_to_string(fedkit)
     1082            self.gatewaykit = tar_list_to_string(gatewaykit)
     1083
     1084        def __call__(self, line, master, allocated, tbparams):
     1085            # Capture testbed topology descriptions
     1086            if self.current_testbed == None:
     1087                m = self.begin_testbed.match(line)
     1088                if m != None:
     1089                    self.current_testbed = m.group(1)
     1090                    if self.current_testbed == None:
     1091                        raise service_error(service_error.req,
     1092                                "Bad request format (unnamed testbed)")
     1093                    allocated[self.current_testbed] = \
     1094                            allocated.get(self.current_testbed,0) + 1
     1095                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
     1096                    if not os.path.exists(tb_dir):
     1097                        try:
     1098                            os.mkdir(tb_dir)
     1099                        except IOError:
     1100                            raise service_error(service_error.internal,
     1101                                    "Cannot create %s" % tb_dir)
     1102                    try:
     1103                        self.testbed_file = open("%s/%s.%s.tcl" %
     1104                                (tb_dir, self.eid, self.current_testbed), 'w')
     1105                    except IOError:
     1106                        self.testbed_file = None
     1107                    return True
     1108                else: return False
     1109            else:
     1110                m = self.end_testbed.match(line)
     1111                if m != None:
     1112                    if m.group(1) != self.current_testbed:
     1113                        raise service_error(service_error.internal,
     1114                                "Mismatched testbed markers!?")
     1115                    if self.testbed_file != None:
     1116                        self.testbed_file.close()
     1117                        self.testbed_file = None
     1118                    self.current_testbed = None
     1119                elif self.testbed_file:
     1120                    # Substitute variables and put the line into the local
     1121                    # testbed file.
     1122                    gwtype = tbparams[self.current_testbed].get(\
     1123                            'connectortype', self.def_gwtype)
     1124                    gwimage = tbparams[self.current_testbed].get(\
     1125                            'connectorimage', self.def_gwimage)
     1126                    mgwstart = tbparams[self.current_testbed].get(\
     1127                            'masterconnectorstartcmd', self.def_mgwstart)
     1128                    mexpstart = tbparams[self.current_testbed].get(\
     1129                            'masternodestartcmd', self.def_mexpstart)
     1130                    gwstart = tbparams[self.current_testbed].get(\
     1131                            'slaveconnectorstartcmd', self.def_gwstart)
     1132                    expstart = tbparams[self.current_testbed].get(\
     1133                            'slavenodestartcmd', self.def_expstart)
     1134                    project = tbparams[self.current_testbed].get('project')
     1135                    gwcmd = tbparams[self.current_testbed].get(\
     1136                            'slaveconnectorcmd', self.def_gwcmd)
     1137                    gwcmdparams = tbparams[self.current_testbed].get(\
     1138                            'slaveconnectorcmdparams', self.def_gwcmdparams)
     1139                    mgwcmd = tbparams[self.current_testbed].get(\
     1140                            'masterconnectorcmd', self.def_gwcmd)
     1141                    mgwcmdparams = tbparams[self.current_testbed].get(\
     1142                            'masterconnectorcmdparams', self.def_gwcmdparams)
     1143                    line = re.sub("GWTYPE", gwtype, line)
     1144                    line = re.sub("GWIMAGE", gwimage, line)
     1145                    if self.current_testbed == master:
     1146                        line = re.sub("GWSTART", mgwstart, line)
     1147                        line = re.sub("EXPSTART", mexpstart, line)
     1148                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
     1149                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
     1150                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
     1151                    else:
     1152                        line = re.sub("GWSTART", gwstart, line)
     1153                        line = re.sub("EXPSTART", expstart, line)
     1154                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
     1155                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
     1156                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
     1157                    #These expansions contain EID and PROJDIR.  NB these are
     1158                    # local fedkit and gatewaykit, which are strings.
     1159                    if self.fedkit:
     1160                        line = re.sub("FEDKIT", self.fedkit, line)
     1161                    if self.gatewaykit:
     1162                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
     1163                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
     1164                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
     1165                    line = re.sub("EID", self.eid, line)
     1166                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
     1167                            (project, self.eid), line)
     1168                    print >>self.testbed_file, line
     1169                return True
    11701170
    11711171    class allbeds:
    1172         """
    1173         Process the Allbeds section.  Get access to each federant and save the
    1174         parameters in tbparams
    1175         """
    1176         def __init__(self, get_access):
    1177             self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
    1178             self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
    1179             self.in_allbeds = False
    1180             self.get_access = get_access
    1181 
    1182         def __call__(self, line, user, tbparams, master, export_project,
    1183                 access_user):
    1184             # Testbed access parameters
    1185             if not self.in_allbeds:
    1186                 if self.begin_allbeds.match(line):
    1187                     self.in_allbeds = True
    1188                     return True
    1189                 else:
    1190                     return False
    1191             else:
    1192                 if self.end_allbeds.match(line):
    1193                     self.in_allbeds = False
    1194                 else:
    1195                     nodes = line.split('|')
    1196                     tb = nodes.pop(0)
    1197                     self.get_access(tb, nodes, user, tbparams, master,
    1198                             export_project, access_user)
    1199                 return True
     1172        """
     1173        Process the Allbeds section.  Get access to each federant and save the
     1174        parameters in tbparams
     1175        """
     1176        def __init__(self, get_access):
     1177            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
     1178            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
     1179            self.in_allbeds = False
     1180            self.get_access = get_access
     1181
     1182        def __call__(self, line, user, tbparams, master, export_project,
     1183                access_user):
     1184            # Testbed access parameters
     1185            if not self.in_allbeds:
     1186                if self.begin_allbeds.match(line):
     1187                    self.in_allbeds = True
     1188                    return True
     1189                else:
     1190                    return False
     1191            else:
     1192                if self.end_allbeds.match(line):
     1193                    self.in_allbeds = False
     1194                else:
     1195                    nodes = line.split('|')
     1196                    tb = nodes.pop(0)
     1197                    self.get_access(tb, nodes, user, tbparams, master,
     1198                            export_project, access_user)
     1199                return True
    12001200
    12011201    class gateways:
    1202         def __init__(self, eid, master, tmpdir, gw_pubkey,
    1203                 gw_secretkey, copy_file, fedkit):
    1204             self.begin_gateways = \
    1205                     re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
    1206             self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
    1207             self.current_gateways = None
    1208             self.control_gateway = None
    1209             self.active_end = { }
    1210 
    1211             self.eid = eid
    1212             self.master = master
    1213             self.tmpdir = tmpdir
    1214             self.gw_pubkey_base = gw_pubkey
    1215             self.gw_secretkey_base = gw_secretkey
    1216 
    1217             self.copy_file = copy_file
    1218             self.fedkit = fedkit
    1219 
    1220 
    1221         def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
    1222                 active_end, tbparams, dtb, myname, desthost, type):
    1223             """
    1224             Produce a gateway configuration file from a gateways line.
    1225             """
    1226 
    1227             sproject = tbparams[gw].get('project', 'project')
    1228             dproject = tbparams[dtb].get('project', 'project')
    1229             sdomain = ".%s.%s%s" % (eid, sproject,
    1230                     tbparams[gw].get('domain', ".example.com"))
    1231             ddomain = ".%s.%s%s" % (eid, dproject,
    1232                     tbparams[dtb].get('domain', ".example.com"))
    1233             boss = tbparams[master].get('boss', "boss")
    1234             fs = tbparams[master].get('fs', "fs")
    1235             event_server = "%s%s" % \
    1236                     (tbparams[gw].get('eventserver', "event_server"),
    1237                             tbparams[gw].get('domain', "example.com"))
    1238             remote_event_server = "%s%s" % \
    1239                     (tbparams[dtb].get('eventserver', "event_server"),
    1240                             tbparams[dtb].get('domain', "example.com"))
    1241             seer_control = "%s%s" % \
    1242                     (tbparams[gw].get('control', "control"), sdomain)
    1243             tunnel_iface = tbparams[gw].get("tunnelinterface", None)
    1244 
    1245             if self.fedkit:
    1246                 remote_script_dir = "/usr/local/federation/bin"
    1247                 local_script_dir = "/usr/local/federation/bin"
    1248             else:
    1249                 remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
    1250                 local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
    1251 
    1252             local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
    1253             remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
    1254             tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
    1255 
    1256             conf_file = "%s%s.gw.conf" % (myname, sdomain)
    1257             remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
    1258 
    1259             # translate to lower case so the `hostname` hack for specifying
    1260             # configuration files works.
    1261             conf_file = conf_file.lower();
    1262             remote_conf_file = remote_conf_file.lower();
    1263 
    1264             if dtb == master:
    1265                 active = "false"
    1266             elif gw == master:
    1267                 active = "true"
    1268             elif active_end.has_key('%s-%s' % (dtb, gw)):
    1269                 active = "false"
    1270             else:
    1271                 active_end['%s-%s' % (gw, dtb)] = 1
    1272                 active = "true"
    1273 
    1274             gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
    1275             print >>gwconfig, "Active: %s" % active
    1276             print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
    1277             if tunnel_iface:
    1278                 print >>gwconfig, "Interface: %s" % tunnel_iface
    1279             print >>gwconfig, "BossName: %s" % boss
    1280             print >>gwconfig, "FsName: %s" % fs
    1281             print >>gwconfig, "EventServerName: %s" % event_server
    1282             print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
    1283             print >>gwconfig, "SeerControl: %s" % seer_control
    1284             print >>gwconfig, "Type: %s" % type
    1285             print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
    1286             print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
    1287                     local_script_dir
    1288             print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
    1289             print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
    1290             print >>gwconfig, "RemoteConfigFile: %s/%s" % \
    1291                     (remote_conf_dir, remote_conf_file)
    1292             print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
    1293             print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
    1294             print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
    1295             gwconfig.close()
    1296 
    1297             return active == "true"
    1298 
    1299         def __call__(self, line, allocated, tbparams):
    1300             # Process gateways
    1301             if not self.current_gateways:
    1302                 m = self.begin_gateways.match(line)
    1303                 if m:
    1304                     self.current_gateways = m.group(1)
    1305                     if allocated.has_key(self.current_gateways):
    1306                         # This test should always succeed
    1307                         tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
    1308                         if not os.path.exists(tb_dir):
    1309                             try:
    1310                                 os.mkdir(tb_dir)
    1311                             except IOError:
    1312                                 raise service_error(service_error.internal,
    1313                                         "Cannot create %s" % tb_dir)
    1314                     else:
    1315                         # XXX
    1316                         self.log.error("[gateways]: Ignoring gateways for " + \
    1317                                 "unknown testbed %s" % self.current_gateways)
    1318                         self.current_gateways = None
    1319                     return True
    1320                 else:
    1321                     return False
    1322             else:
    1323                 m = self.end_gateways.match(line)
    1324                 if m :
    1325                     if m.group(1) != self.current_gateways:
    1326                         raise service_error(service_error.internal,
    1327                                 "Mismatched gateway markers!?")
    1328                     if self.control_gateway:
    1329                         try:
    1330                             cc = open("%s/%s/client.conf" %
    1331                                     (self.tmpdir, self.current_gateways), 'w')
    1332                             print >>cc, "ControlGateway: %s" % \
    1333                                     self.control_gateway
    1334                             if tbparams[self.master].has_key('smbshare'):
    1335                                 print >>cc, "SMBSHare: %s" % \
    1336                                         tbparams[self.master]['smbshare']
    1337                             print >>cc, "ProjectUser: %s" % \
    1338                                     tbparams[self.master]['user']
    1339                             print >>cc, "ProjectName: %s" % \
    1340                                     tbparams[self.master]['project']
    1341                             print >>cc, "ExperimentID: %s/%s" % \
    1342                                     ( tbparams[self.master]['project'], \
    1343                                     self.eid )
    1344                             cc.close()
    1345                         except IOError:
    1346                             raise service_error(service_error.internal,
    1347                                     "Error creating client config")
    1348                         # XXX: This seer specific file should disappear
    1349                         try:
    1350                             cc = open("%s/%s/seer.conf" %
    1351                                     (self.tmpdir, self.current_gateways),
    1352                                     'w')
    1353                             if self.current_gateways != self.master:
    1354                                 print >>cc, "ControlNode: %s" % \
    1355                                         self.control_gateway
    1356                             print >>cc, "ExperimentID: %s/%s" % \
    1357                                     ( tbparams[self.master]['project'], \
    1358                                     self.eid )
    1359                             cc.close()
    1360                         except IOError:
    1361                             raise service_error(service_error.internal,
    1362                                     "Error creating seer config")
    1363                     else:
    1364                         debug.error("[gateways]: No control gateway for %s" %\
    1365                                     self.current_gateways)
    1366                     self.current_gateways = None
    1367                 else:
    1368                     dtb, myname, desthost, type = line.split(" ")
    1369 
    1370                     if type == "control" or type == "both":
    1371                         self.control_gateway = "%s.%s.%s%s" % (myname,
    1372                                 self.eid,
    1373                                 tbparams[self.current_gateways]['project'],
    1374                                 tbparams[self.current_gateways]['domain'])
    1375                     try:
    1376                         active = self.gateway_conf_file(self.current_gateways,
    1377                                 self.master, self.eid, self.gw_pubkey_base,
    1378                                 self.gw_secretkey_base,
    1379                                 self.active_end, tbparams, dtb, myname,
    1380                                 desthost, type)
    1381                     except IOError, e:
    1382                         raise service_error(service_error.internal,
    1383                                 "Failed to write config file for %s" % \
    1384                                         self.current_gateway)
    1385            
    1386                     gw_pubkey = "%s/keys/%s" % \
    1387                             (self.tmpdir, self.gw_pubkey_base)
    1388                     gw_secretkey = "%s/keys/%s" % \
    1389                             (self.tmpdir, self.gw_secretkey_base)
    1390 
    1391                     pkfile = "%s/%s/%s" % \
    1392                             ( self.tmpdir, self.current_gateways,
    1393                                     self.gw_pubkey_base)
    1394                     skfile = "%s/%s/%s" % \
    1395                             ( self.tmpdir, self.current_gateways,
    1396                                     self.gw_secretkey_base)
    1397 
    1398                     if not os.path.exists(pkfile):
    1399                         try:
    1400                             self.copy_file(gw_pubkey, pkfile)
    1401                         except IOError:
    1402                             service_error(service_error.internal,
    1403                                     "Failed to copy pubkey file")
    1404 
    1405                     if active and not os.path.exists(skfile):
    1406                         try:
    1407                             self.copy_file(gw_secretkey, skfile)
    1408                         except IOError:
    1409                             service_error(service_error.internal,
    1410                                     "Failed to copy secretkey file")
    1411                 return True
     1202        def __init__(self, eid, master, tmpdir, gw_pubkey,
     1203                gw_secretkey, copy_file, fedkit):
     1204            self.begin_gateways = \
     1205                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
     1206            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
     1207            self.current_gateways = None
     1208            self.control_gateway = None
     1209            self.active_end = { }
     1210
     1211            self.eid = eid
     1212            self.master = master
     1213            self.tmpdir = tmpdir
     1214            self.gw_pubkey_base = gw_pubkey
     1215            self.gw_secretkey_base = gw_secretkey
     1216
     1217            self.copy_file = copy_file
     1218            self.fedkit = fedkit
     1219
     1220
     1221        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
     1222                active_end, tbparams, dtb, myname, desthost, type):
     1223            """
     1224            Produce a gateway configuration file from a gateways line.
     1225            """
     1226
     1227            sproject = tbparams[gw].get('project', 'project')
     1228            dproject = tbparams[dtb].get('project', 'project')
     1229            sdomain = ".%s.%s%s" % (eid, sproject,
     1230                    tbparams[gw].get('domain', ".example.com"))
     1231            ddomain = ".%s.%s%s" % (eid, dproject,
     1232                    tbparams[dtb].get('domain', ".example.com"))
     1233            boss = tbparams[master].get('boss', "boss")
     1234            fs = tbparams[master].get('fs', "fs")
     1235            event_server = "%s%s" % \
     1236                    (tbparams[gw].get('eventserver', "event_server"),
     1237                            tbparams[gw].get('domain', "example.com"))
     1238            remote_event_server = "%s%s" % \
     1239                    (tbparams[dtb].get('eventserver', "event_server"),
     1240                            tbparams[dtb].get('domain', "example.com"))
     1241            seer_control = "%s%s" % \
     1242                    (tbparams[gw].get('control', "control"), sdomain)
     1243            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
     1244
     1245            if self.fedkit:
     1246                remote_script_dir = "/usr/local/federation/bin"
     1247                local_script_dir = "/usr/local/federation/bin"
     1248            else:
     1249                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
     1250                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
     1251
     1252            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
     1253            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
     1254            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
     1255
     1256            conf_file = "%s%s.gw.conf" % (myname, sdomain)
     1257            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
     1258
     1259            # translate to lower case so the `hostname` hack for specifying
     1260            # configuration files works.
     1261            conf_file = conf_file.lower();
     1262            remote_conf_file = remote_conf_file.lower();
     1263
     1264            if dtb == master:
     1265                active = "false"
     1266            elif gw == master:
     1267                active = "true"
     1268            elif active_end.has_key('%s-%s' % (dtb, gw)):
     1269                active = "false"
     1270            else:
     1271                active_end['%s-%s' % (gw, dtb)] = 1
     1272                active = "true"
     1273
     1274            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
     1275            print >>gwconfig, "Active: %s" % active
     1276            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
     1277            if tunnel_iface:
     1278                print >>gwconfig, "Interface: %s" % tunnel_iface
     1279            print >>gwconfig, "BossName: %s" % boss
     1280            print >>gwconfig, "FsName: %s" % fs
     1281            print >>gwconfig, "EventServerName: %s" % event_server
     1282            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
     1283            print >>gwconfig, "SeerControl: %s" % seer_control
     1284            print >>gwconfig, "Type: %s" % type
     1285            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
     1286            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
     1287                    local_script_dir
     1288            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
     1289            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
     1290            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
     1291                    (remote_conf_dir, remote_conf_file)
     1292            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
     1293            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
     1294            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
     1295            gwconfig.close()
     1296
     1297            return active == "true"
     1298
     1299        def __call__(self, line, allocated, tbparams):
     1300            # Process gateways
     1301            if not self.current_gateways:
     1302                m = self.begin_gateways.match(line)
     1303                if m:
     1304                    self.current_gateways = m.group(1)
     1305                    if allocated.has_key(self.current_gateways):
     1306                        # This test should always succeed
     1307                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
     1308                        if not os.path.exists(tb_dir):
     1309                            try:
     1310                                os.mkdir(tb_dir)
     1311                            except IOError:
     1312                                raise service_error(service_error.internal,
     1313                                        "Cannot create %s" % tb_dir)
     1314                    else:
     1315                        # XXX
     1316                        self.log.error("[gateways]: Ignoring gateways for " + \
     1317                                "unknown testbed %s" % self.current_gateways)
     1318                        self.current_gateways = None
     1319                    return True
     1320                else:
     1321                    return False
     1322            else:
     1323                m = self.end_gateways.match(line)
     1324                if m :
     1325                    if m.group(1) != self.current_gateways:
     1326                        raise service_error(service_error.internal,
     1327                                "Mismatched gateway markers!?")
     1328                    if self.control_gateway:
     1329                        try:
     1330                            cc = open("%s/%s/client.conf" %
     1331                                    (self.tmpdir, self.current_gateways), 'w')
     1332                            print >>cc, "ControlGateway: %s" % \
     1333                                    self.control_gateway
     1334                            if tbparams[self.master].has_key('smbshare'):
     1335                                print >>cc, "SMBSHare: %s" % \
     1336                                        tbparams[self.master]['smbshare']
     1337                            print >>cc, "ProjectUser: %s" % \
     1338                                    tbparams[self.master]['user']
     1339                            print >>cc, "ProjectName: %s" % \
     1340                                    tbparams[self.master]['project']
     1341                            print >>cc, "ExperimentID: %s/%s" % \
     1342                                    ( tbparams[self.master]['project'], \
     1343                                    self.eid )
     1344                            cc.close()
     1345                        except IOError:
     1346                            raise service_error(service_error.internal,
     1347                                    "Error creating client config")
     1348                        # XXX: This seer specific file should disappear
     1349                        try:
     1350                            cc = open("%s/%s/seer.conf" %
     1351                                    (self.tmpdir, self.current_gateways),
     1352                                    'w')
     1353                            if self.current_gateways != self.master:
     1354                                print >>cc, "ControlNode: %s" % \
     1355                                        self.control_gateway
     1356                            print >>cc, "ExperimentID: %s/%s" % \
     1357                                    ( tbparams[self.master]['project'], \
     1358                                    self.eid )
     1359                            cc.close()
     1360                        except IOError:
     1361                            raise service_error(service_error.internal,
     1362                                    "Error creating seer config")
     1363                    else:
     1364                        debug.error("[gateways]: No control gateway for %s" %\
     1365                                    self.current_gateways)
     1366                    self.current_gateways = None
     1367                else:
     1368                    dtb, myname, desthost, type = line.split(" ")
     1369
     1370                    if type == "control" or type == "both":
     1371                        self.control_gateway = "%s.%s.%s%s" % (myname,
     1372                                self.eid,
     1373                                tbparams[self.current_gateways]['project'],
     1374                                tbparams[self.current_gateways]['domain'])
     1375                    try:
     1376                        active = self.gateway_conf_file(self.current_gateways,
     1377                                self.master, self.eid, self.gw_pubkey_base,
     1378                                self.gw_secretkey_base,
     1379                                self.active_end, tbparams, dtb, myname,
     1380                                desthost, type)
     1381                    except IOError, e:
     1382                        raise service_error(service_error.internal,
     1383                                "Failed to write config file for %s" % \
     1384                                        self.current_gateway)
     1385           
     1386                    gw_pubkey = "%s/keys/%s" % \
     1387                            (self.tmpdir, self.gw_pubkey_base)
     1388                    gw_secretkey = "%s/keys/%s" % \
     1389                            (self.tmpdir, self.gw_secretkey_base)
     1390
     1391                    pkfile = "%s/%s/%s" % \
     1392                            ( self.tmpdir, self.current_gateways,
     1393                                    self.gw_pubkey_base)
     1394                    skfile = "%s/%s/%s" % \
     1395                            ( self.tmpdir, self.current_gateways,
     1396                                    self.gw_secretkey_base)
     1397
     1398                    if not os.path.exists(pkfile):
     1399                        try:
     1400                            self.copy_file(gw_pubkey, pkfile)
     1401                        except IOError:
     1402                            service_error(service_error.internal,
     1403                                    "Failed to copy pubkey file")
     1404
     1405                    if active and not os.path.exists(skfile):
     1406                        try:
     1407                            self.copy_file(gw_secretkey, skfile)
     1408                        except IOError:
     1409                            service_error(service_error.internal,
     1410                                    "Failed to copy secretkey file")
     1411                return True
    14121412
    14131413    class shunt_to_file:
    1414         """
    1415         Simple class to write data between two regexps to a file.
    1416         """
    1417         def __init__(self, begin, end, filename):
    1418             """
    1419             Begin shunting on a match of begin, stop on end, send data to
    1420             filename.
    1421             """
    1422             self.begin = re.compile(begin)
    1423             self.end = re.compile(end)
    1424             self.in_shunt = False
    1425             self.file = None
    1426             self.filename = filename
    1427 
    1428         def __call__(self, line):
    1429             """
    1430             Call this on each line in the input that may be shunted.
    1431             """
    1432             if not self.in_shunt:
    1433                 if self.begin.match(line):
    1434                     self.in_shunt = True
    1435                     try:
    1436                         self.file = open(self.filename, "w")
    1437                     except:
    1438                         self.file = None
    1439                         raise
    1440                     return True
    1441                 else:
    1442                     return False
    1443             else:
    1444                 if self.end.match(line):
    1445                     if self.file:
    1446                         self.file.close()
    1447                         self.file = None
    1448                     self.in_shunt = False
    1449                 else:
    1450                     if self.file:
    1451                         print >>self.file, line
    1452                 return True
     1414        """
     1415        Simple class to write data between two regexps to a file.
     1416        """
     1417        def __init__(self, begin, end, filename):
     1418            """
     1419            Begin shunting on a match of begin, stop on end, send data to
     1420            filename.
     1421            """
     1422            self.begin = re.compile(begin)
     1423            self.end = re.compile(end)
     1424            self.in_shunt = False
     1425            self.file = None
     1426            self.filename = filename
     1427
     1428        def __call__(self, line):
     1429            """
     1430            Call this on each line in the input that may be shunted.
     1431            """
     1432            if not self.in_shunt:
     1433                if self.begin.match(line):
     1434                    self.in_shunt = True
     1435                    try:
     1436                        self.file = open(self.filename, "w")
     1437                    except:
     1438                        self.file = None
     1439                        raise
     1440                    return True
     1441                else:
     1442                    return False
     1443            else:
     1444                if self.end.match(line):
     1445                    if self.file:
     1446                        self.file.close()
     1447                        self.file = None
     1448                    self.in_shunt = False
     1449                else:
     1450                    if self.file:
     1451                        print >>self.file, line
     1452                return True
    14531453
    14541454    class shunt_to_list:
    1455         """
    1456         Same interface as shunt_to_file.  Data collected in self.list, one list
    1457         element per line.
    1458         """
    1459         def __init__(self, begin, end):
    1460             self.begin = re.compile(begin)
    1461             self.end = re.compile(end)
    1462             self.in_shunt = False
    1463             self.list = [ ]
    1464        
    1465         def __call__(self, line):
    1466             if not self.in_shunt:
    1467                 if self.begin.match(line):
    1468                     self.in_shunt = True
    1469                     return True
    1470                 else:
    1471                     return False
    1472             else:
    1473                 if self.end.match(line):
    1474                     self.in_shunt = False
    1475                 else:
    1476                     self.list.append(line)
    1477                 return True
     1455        """
     1456        Same interface as shunt_to_file.  Data collected in self.list, one list
     1457        element per line.
     1458        """
     1459        def __init__(self, begin, end):
     1460            self.begin = re.compile(begin)
     1461            self.end = re.compile(end)
     1462            self.in_shunt = False
     1463            self.list = [ ]
     1464       
     1465        def __call__(self, line):
     1466            if not self.in_shunt:
     1467                if self.begin.match(line):
     1468                    self.in_shunt = True
     1469                    return True
     1470                else:
     1471                    return False
     1472            else:
     1473                if self.end.match(line):
     1474                    self.in_shunt = False
     1475                else:
     1476                    self.list.append(line)
     1477                return True
    14781478
    14791479    class shunt_to_string:
    1480         """
    1481         Same interface as shunt_to_file.  Data collected in self.str, all in
    1482         one string.
    1483         """
    1484         def __init__(self, begin, end):
    1485             self.begin = re.compile(begin)
    1486             self.end = re.compile(end)
    1487             self.in_shunt = False
    1488             self.str = ""
    1489        
    1490         def __call__(self, line):
    1491             if not self.in_shunt:
    1492                 if self.begin.match(line):
    1493                     self.in_shunt = True
    1494                     return True
    1495                 else:
    1496                     return False
    1497             else:
    1498                 if self.end.match(line):
    1499                     self.in_shunt = False
    1500                 else:
    1501                     self.str += line
    1502                 return True
     1480        """
     1481        Same interface as shunt_to_file.  Data collected in self.str, all in
     1482        one string.
     1483        """
     1484        def __init__(self, begin, end):
     1485            self.begin = re.compile(begin)
     1486            self.end = re.compile(end)
     1487            self.in_shunt = False
     1488            self.str = ""
     1489       
     1490        def __call__(self, line):
     1491            if not self.in_shunt:
     1492                if self.begin.match(line):
     1493                    self.in_shunt = True
     1494                    return True
     1495                else:
     1496                    return False
     1497            else:
     1498                if self.end.match(line):
     1499                    self.in_shunt = False
     1500                else:
     1501                    self.str += line
     1502                return True
    15031503
    15041504    def create_experiment(self, req, fid):
    1505         """
    1506         The external interface to experiment creation called from the
    1507         dispatcher.
    1508 
    1509         Creates a working directory, splits the incoming description using the
    1510         splitter script and parses out the avrious subsections using the
    1511         lcasses above.  Once each sub-experiment is created, use pooled threads
    1512         to instantiate them and start it all up.
    1513         """
    1514 
    1515         if not self.auth.check_attribute(fid, 'create'):
    1516             raise service_error(service_error.access, "Create access denied")
    1517 
    1518         try:
    1519             tmpdir = tempfile.mkdtemp(prefix="split-")
    1520         except IOError:
    1521             raise service_error(service_error.internal, "Cannot create tmp dir")
    1522 
    1523         gw_pubkey_base = "fed.%s.pub" % self.ssh_type
    1524         gw_secretkey_base = "fed.%s" % self.ssh_type
    1525         gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
    1526         gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
    1527         tclfile = tmpdir + "/experiment.tcl"
    1528         tbparams = { }
    1529         try:
    1530             access_user = self.accessdb[fid]
    1531         except KeyError:
    1532             raise service_error(service_error.internal,
    1533                     "Access map and authorizer out of sync in " + \
    1534                             "create_experiment for fedid %s"  % fid)
    1535 
    1536         pid = "dummy"
    1537         gid = "dummy"
    1538         # XXX
    1539         fail_soft = False
    1540 
    1541         try:
    1542             os.mkdir(tmpdir+"/keys")
    1543         except OSError:
    1544             raise service_error(service_error.internal,
    1545                     "Can't make temporary dir")
    1546 
    1547         req = req.get('CreateRequestBody', None)
    1548         if not req:
    1549             raise service_error(service_error.req,
    1550                     "Bad request format (no CreateRequestBody)")
    1551         # The tcl parser needs to read a file so put the content into that file
    1552         descr=req.get('experimentdescription', None)
    1553         if descr:
    1554             file_content=descr.get('ns2description', None)
    1555             if file_content:
    1556                 try:
    1557                     f = open(tclfile, 'w')
    1558                     f.write(file_content)
    1559                     f.close()
    1560                 except IOError:
    1561                     raise service_error(service_error.internal,
    1562                             "Cannot write temp experiment description")
    1563             else:
    1564                 raise service_error(service_error.req,
    1565                         "Only ns2descriptions supported")
    1566         else:
    1567             raise service_error(service_error.req, "No experiment description")
    1568 
    1569         if req.has_key('experimentID') and \
    1570                 req['experimentID'].has_key('localname'):
    1571             eid = req['experimentID']['localname']
    1572             self.state_lock.acquire()
    1573             while (self.state.has_key(eid)):
    1574                 eid += random.choice(string.ascii_letters)
    1575             # To avoid another thread picking this localname
    1576             self.state[eid] = "placeholder"
    1577             self.state_lock.release()
    1578         else:
    1579             eid = self.exp_stem
    1580             for i in range(0,5):
    1581                 eid += random.choice(string.ascii_letters)
    1582             self.state_lock.acquire()
    1583             while (self.state.has_key(eid)):
    1584                 eid = self.exp_stem
    1585                 for i in range(0,5):
    1586                     eid += random.choice(string.ascii_letters)
    1587             # To avoid another thread picking this localname
    1588             self.state[eid] = "placeholder"
    1589             self.state_lock.release()
    1590 
    1591         try:
    1592             # This catches exceptions to clear the placeholder if necessary
    1593             try:
    1594                 self.generate_ssh_keys(gw_secretkey, self.ssh_type)
    1595             except ValueError:
    1596                 raise service_error(service_error.server_config,
    1597                         "Bad key type (%s)" % self.ssh_type)
    1598 
    1599             user = req.get('user', None)
    1600             if user == None:
    1601                 raise service_error(service_error.req, "No user")
    1602 
    1603             master = req.get('master', None)
    1604             if not master:
    1605                 raise service_error(service_error.req,
    1606                         "No master testbed label")
    1607             export_project = req.get('exportProject', None)
    1608             if not export_project:
    1609                 raise service_error(service_error.req, "No export project")
    1610            
    1611             if self.splitter_url:
    1612                 self.log.debug("Calling remote splitter at %s" % \
    1613                         self.splitter_url)
    1614                 split_data = self.remote_splitter(self.splitter_url,
    1615                         file_content, master)
    1616             else:
    1617                 tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x',
    1618                     str(self.muxmax), '-m', master]
    1619 
    1620                 if self.fedkit:
    1621                     tclcmd.append('-k')
    1622 
    1623                 if self.gatewaykit:
    1624                     tclcmd.append('-K')
    1625 
    1626                 tclcmd.extend([pid, gid, eid, tclfile])
    1627 
    1628                 self.log.debug("running local splitter %s", " ".join(tclcmd))
    1629                 tclparser = Popen(tclcmd, stdout=PIPE)
    1630                 split_data = tclparser.stdout
    1631 
    1632             allocated = { }         # Testbeds we can access
    1633             started = { }           # Testbeds where a sub-experiment started
    1634                                 # successfully
    1635 
    1636             # Objects to parse the splitter output (defined above)
    1637             parse_current_testbed = self.current_testbed(eid, tmpdir,
    1638                     self.fedkit, self.gatewaykit)
    1639             parse_allbeds = self.allbeds(self.get_access)
    1640             parse_gateways = self.gateways(eid, master, tmpdir,
    1641                     gw_pubkey_base, gw_secretkey_base, self.copy_file,
    1642                     self.fedkit)
    1643             parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
    1644                         "^#\s+End\s+Vtopo")
    1645             parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
    1646                         "^#\s+End\s+hostnames", tmpdir + "/hosts")
    1647             parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
    1648                     "^#\s+End\s+tarfiles")
    1649             parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
    1650                     "^#\s+End\s+rpms")
    1651 
    1652             # Working on the split data
    1653             for line in split_data:
    1654                 line = line.rstrip()
    1655                 if parse_current_testbed(line, master, allocated, tbparams):
    1656                     continue
    1657                 elif parse_allbeds(line, user, tbparams, master, export_project,
    1658                         access_user):
    1659                     continue
    1660                 elif parse_gateways(line, allocated, tbparams):
    1661                     continue
    1662                 elif parse_vtopo(line):
    1663                     continue
    1664                 elif parse_hostnames(line):
    1665                     continue
    1666                 elif parse_tarfiles(line):
    1667                     continue
    1668                 elif parse_rpms(line):
    1669                     continue
    1670                 else:
    1671                     raise service_error(service_error.internal,
    1672                             "Bad tcl parse? %s" % line)
    1673             # Virtual topology and visualization
    1674             vtopo = self.gentopo(parse_vtopo.str)
    1675             if not vtopo:
    1676                 raise service_error(service_error.internal,
    1677                         "Failed to generate virtual topology")
    1678 
    1679             vis = self.genviz(vtopo)
    1680             if not vis:
    1681                 raise service_error(service_error.internal,
    1682                         "Failed to generate visualization")
    1683            
    1684             # save federant information
    1685             for k in allocated.keys():
    1686                 tbparams[k]['federant'] = {\
    1687                         'name': [ { 'localname' : eid} ],\
    1688                         'emulab': tbparams[k]['emulab'],\
    1689                         'allocID' : tbparams[k]['allocID'],\
    1690                         'master' : k == master,\
    1691                     }
    1692 
    1693 
    1694             # Copy tarfiles and rpms needed at remote sites into a staging area
    1695             try:
    1696                 if self.fedkit:
    1697                     for t in self.fedkit:
    1698                         parse_tarfiles.list.append(t[1])
    1699                 if self.gatewaykit:
    1700                     for t in self.gatewaykit:
    1701                         parse_tarfiles.list.append(t[1])
    1702                 for t in parse_tarfiles.list:
    1703                     if not os.path.exists("%s/tarfiles" % tmpdir):
    1704                         os.mkdir("%s/tarfiles" % tmpdir)
    1705                     self.copy_file(t, "%s/tarfiles/%s" % \
    1706                             (tmpdir, os.path.basename(t)))
    1707                 for r in parse_rpms.list:
    1708                     if not os.path.exists("%s/rpms" % tmpdir):
    1709                         os.mkdir("%s/rpms" % tmpdir)
    1710                     self.copy_file(r, "%s/rpms/%s" % \
    1711                             (tmpdir, os.path.basename(r)))
    1712                 # A null experiment file in case we need to create a remote
    1713                 # experiment from scratch
    1714                 f = open("%s/null.tcl" % tmpdir, "w")
    1715                 print >>f, """
     1505        """
     1506        The external interface to experiment creation called from the
     1507        dispatcher.
     1508
     1509        Creates a working directory, splits the incoming description using the
     1510        splitter script and parses out the avrious subsections using the
     1511        lcasses above.  Once each sub-experiment is created, use pooled threads
     1512        to instantiate them and start it all up.
     1513        """
     1514
     1515        if not self.auth.check_attribute(fid, 'create'):
     1516            raise service_error(service_error.access, "Create access denied")
     1517
     1518        try:
     1519            tmpdir = tempfile.mkdtemp(prefix="split-")
     1520        except IOError:
     1521            raise service_error(service_error.internal, "Cannot create tmp dir")
     1522
     1523        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
     1524        gw_secretkey_base = "fed.%s" % self.ssh_type
     1525        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
     1526        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
     1527        tclfile = tmpdir + "/experiment.tcl"
     1528        tbparams = { }
     1529        try:
     1530            access_user = self.accessdb[fid]
     1531        except KeyError:
     1532            raise service_error(service_error.internal,
     1533                    "Access map and authorizer out of sync in " + \
     1534                            "create_experiment for fedid %s"  % fid)
     1535
     1536        pid = "dummy"
     1537        gid = "dummy"
     1538        # XXX
     1539        fail_soft = False
     1540
     1541        try:
     1542            os.mkdir(tmpdir+"/keys")
     1543        except OSError:
     1544            raise service_error(service_error.internal,
     1545                    "Can't make temporary dir")
     1546
     1547        req = req.get('CreateRequestBody', None)
     1548        if not req:
     1549            raise service_error(service_error.req,
     1550                    "Bad request format (no CreateRequestBody)")
     1551        # The tcl parser needs to read a file so put the content into that file
     1552        descr=req.get('experimentdescription', None)
     1553        if descr:
     1554            file_content=descr.get('ns2description', None)
     1555            if file_content:
     1556                try:
     1557                    f = open(tclfile, 'w')
     1558                    f.write(file_content)
     1559                    f.close()
     1560                except IOError:
     1561                    raise service_error(service_error.internal,
     1562                            "Cannot write temp experiment description")
     1563            else:
     1564                raise service_error(service_error.req,
     1565                        "Only ns2descriptions supported")
     1566        else:
     1567            raise service_error(service_error.req, "No experiment description")
     1568
     1569        if req.has_key('experimentID') and \
     1570                req['experimentID'].has_key('localname'):
     1571            eid = req['experimentID']['localname']
     1572            self.state_lock.acquire()
     1573            while (self.state.has_key(eid)):
     1574                eid += random.choice(string.ascii_letters)
     1575            # To avoid another thread picking this localname
     1576            self.state[eid] = "placeholder"
     1577            self.state_lock.release()
     1578        else:
     1579            eid = self.exp_stem
     1580            for i in range(0,5):
     1581                eid += random.choice(string.ascii_letters)
     1582            self.state_lock.acquire()
     1583            while (self.state.has_key(eid)):
     1584                eid = self.exp_stem
     1585                for i in range(0,5):
     1586                    eid += random.choice(string.ascii_letters)
     1587            # To avoid another thread picking this localname
     1588            self.state[eid] = "placeholder"
     1589            self.state_lock.release()
     1590
     1591        try:
     1592            # This catches exceptions to clear the placeholder if necessary
     1593            try:
     1594                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
     1595            except ValueError:
     1596                raise service_error(service_error.server_config,
     1597                        "Bad key type (%s)" % self.ssh_type)
     1598
     1599            user = req.get('user', None)
     1600            if user == None:
     1601                raise service_error(service_error.req, "No user")
     1602
     1603            master = req.get('master', None)
     1604            if not master:
     1605                raise service_error(service_error.req,
     1606                        "No master testbed label")
     1607            export_project = req.get('exportProject', None)
     1608            if not export_project:
     1609                raise service_error(service_error.req, "No export project")
     1610           
     1611            if self.splitter_url:
     1612                self.log.debug("Calling remote splitter at %s" % \
     1613                        self.splitter_url)
     1614                split_data = self.remote_splitter(self.splitter_url,
     1615                        file_content, master)
     1616            else:
     1617                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x',
     1618                    str(self.muxmax), '-m', master]
     1619
     1620                if self.fedkit:
     1621                    tclcmd.append('-k')
     1622
     1623                if self.gatewaykit:
     1624                    tclcmd.append('-K')
     1625
     1626                tclcmd.extend([pid, gid, eid, tclfile])
     1627
     1628                self.log.debug("running local splitter %s", " ".join(tclcmd))
     1629                tclparser = Popen(tclcmd, stdout=PIPE)
     1630                split_data = tclparser.stdout
     1631
     1632            allocated = { }         # Testbeds we can access
     1633            started = { }           # Testbeds where a sub-experiment started
     1634                                # successfully
     1635
     1636            # Objects to parse the splitter output (defined above)
     1637            parse_current_testbed = self.current_testbed(eid, tmpdir,
     1638                    self.fedkit, self.gatewaykit)
     1639            parse_allbeds = self.allbeds(self.get_access)
     1640            parse_gateways = self.gateways(eid, master, tmpdir,
     1641                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
     1642                    self.fedkit)
     1643            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
     1644                        "^#\s+End\s+Vtopo")
     1645            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
     1646                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
     1647            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
     1648                    "^#\s+End\s+tarfiles")
     1649            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
     1650                    "^#\s+End\s+rpms")
     1651
     1652            # Working on the split data
     1653            for line in split_data:
     1654                line = line.rstrip()
     1655                if parse_current_testbed(line, master, allocated, tbparams):
     1656                    continue
     1657                elif parse_allbeds(line, user, tbparams, master, export_project,
     1658                        access_user):
     1659                    continue
     1660                elif parse_gateways(line, allocated, tbparams):
     1661                    continue
     1662                elif parse_vtopo(line):
     1663                    continue
     1664                elif parse_hostnames(line):
     1665                    continue
     1666                elif parse_tarfiles(line):
     1667                    continue
     1668                elif parse_rpms(line):
     1669                    continue
     1670                else:
     1671                    raise service_error(service_error.internal,
     1672                            "Bad tcl parse? %s" % line)
     1673            # Virtual topology and visualization
     1674            vtopo = self.gentopo(parse_vtopo.str)
     1675            if not vtopo:
     1676                raise service_error(service_error.internal,
     1677                        "Failed to generate virtual topology")
     1678
     1679            vis = self.genviz(vtopo)
     1680            if not vis:
     1681                raise service_error(service_error.internal,
     1682                        "Failed to generate visualization")
     1683           
     1684            # save federant information
     1685            for k in allocated.keys():
     1686                tbparams[k]['federant'] = {\
     1687                        'name': [ { 'localname' : eid} ],\
     1688                        'emulab': tbparams[k]['emulab'],\
     1689                        'allocID' : tbparams[k]['allocID'],\
     1690                        'master' : k == master,\
     1691                    }
     1692
     1693
     1694            # Copy tarfiles and rpms needed at remote sites into a staging area
     1695            try:
     1696                if self.fedkit:
     1697                    for t in self.fedkit:
     1698                        parse_tarfiles.list.append(t[1])
     1699                if self.gatewaykit:
     1700                    for t in self.gatewaykit:
     1701                        parse_tarfiles.list.append(t[1])
     1702                for t in parse_tarfiles.list:
     1703                    if not os.path.exists("%s/tarfiles" % tmpdir):
     1704                        os.mkdir("%s/tarfiles" % tmpdir)
     1705                    self.copy_file(t, "%s/tarfiles/%s" % \
     1706                            (tmpdir, os.path.basename(t)))
     1707                for r in parse_rpms.list:
     1708                    if not os.path.exists("%s/rpms" % tmpdir):
     1709                        os.mkdir("%s/rpms" % tmpdir)
     1710                    self.copy_file(r, "%s/rpms/%s" % \
     1711                            (tmpdir, os.path.basename(r)))
     1712                # A null experiment file in case we need to create a remote
     1713                # experiment from scratch
     1714                f = open("%s/null.tcl" % tmpdir, "w")
     1715                print >>f, """
    17161716set ns [new Simulator]
    17171717source tb_compat.tcl
     
    17221722$ns run
    17231723"""
    1724                 f.close()
    1725 
    1726             except IOError, e:
    1727                 raise service_error(service_error.internal,
    1728                         "Cannot stage tarfile/rpm: %s" % e.strerror)
    1729 
    1730         except service_error, e:
    1731             # If something goes wrong in the parse (usually an access error)
    1732             # clear the placeholder state.  From here on out the code delays
    1733             # exceptions.
    1734             self.state_lock.acquire()
    1735             del self.state[eid]
    1736             self.state_lock.release()
    1737             raise e
    1738 
    1739         thread_pool = self.thread_pool(self.nthreads)
    1740         threads = [ ]
    1741 
    1742         for tb in [ k for k in allocated.keys() if k != master]:
    1743             # Create and start a thread to start the segment, and save it to
    1744             # get the return value later
    1745             thread_pool.wait_for_slot()
    1746             t  = self.pooled_thread(target=self.start_segment,
    1747                     args=(tb, eid, tbparams, tmpdir, 0), name=tb,
    1748                     pdata=thread_pool, trace_file=self.trace_file)
    1749             threads.append(t)
    1750             t.start()
    1751 
    1752         # Wait until all finish
    1753         thread_pool.wait_for_all_done()
    1754 
    1755         # If none failed, start the master
    1756         failed = [ t.getName() for t in threads if not t.rv ]
    1757 
    1758         if len(failed) == 0:
    1759             if not self.start_segment(master, eid, tbparams, tmpdir):
    1760                 failed.append(master)
    1761 
    1762         succeeded = [tb for tb in allocated.keys() if tb not in failed]
    1763         # If one failed clean up, unless fail_soft is set
    1764         if failed:
    1765             if not fail_soft:
    1766                 thread_pool.clear()
    1767                 for tb in succeeded:
    1768                     # Create and start a thread to stop the segment
    1769                     thread_pool.wait_for_slot()
    1770                     t  = self.pooled_thread(target=self.stop_segment,
    1771                             args=(tb, eid, tbparams), name=tb,
    1772                             pdata=thread_pool, trace_file=self.trace_file)
    1773                     t.start()
    1774                 # Wait until all finish
    1775                 thread_pool.wait_for_all_done()
    1776 
    1777                 # release the allocations
    1778                 for tb in tbparams.keys():
    1779                     self.release_access(tb, tbparams[tb]['allocID'])
    1780                 # Remove the placeholder
    1781                 self.state_lock.acquire()
    1782                 del self.state[eid]
    1783                 self.state_lock.release()
    1784 
    1785                 raise service_error(service_error.federant,
    1786                     "Swap in failed on %s" % ",".join(failed))
    1787         else:
    1788             self.log.info("[start_segment]: Experiment %s started" % eid)
    1789 
    1790         # Generate an ID for the experiment (slice) and a certificate that the
    1791         # allocator can use to prove they own it.  We'll ship it back through
    1792         # the encrypted connection.
    1793         (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
    1794 
    1795         self.log.debug("[start_experiment]: removing %s" % tmpdir)
    1796 
    1797         # Walk up tmpdir, deleting as we go
    1798         for path, dirs, files in os.walk(tmpdir, topdown=False):
    1799             for f in files:
    1800                 os.remove(os.path.join(path, f))
    1801             for d in dirs:
    1802                 os.rmdir(os.path.join(path, d))
    1803         os.rmdir(tmpdir)
    1804 
    1805         # The deepcopy prevents the allocation ID and other binaries from being
    1806         # translated into other formats
    1807         resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
    1808                 for tb in tbparams.keys() \
    1809                     if tbparams[tb].has_key('federant') ],\
    1810                     'vtopo': vtopo,\
    1811                     'vis' : vis,
    1812                     'experimentID' : [\
    1813                             { 'fedid': copy.copy(expid) }, \
    1814                             { 'localname': eid },\
    1815                         ],\
    1816                     'experimentAccess': { 'X509' : expcert },\
    1817                 }
    1818         # remove the allocationID info from each federant
    1819         for f in resp['federant']:
    1820             if f.has_key('allocID'): del f['allocID']
    1821 
    1822         # Insert the experiment into our state and update the disk copy
    1823         self.state_lock.acquire()
    1824         self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
    1825                 for tb in tbparams.keys() \
    1826                     if tbparams[tb].has_key('federant') ],\
    1827                     'vtopo': vtopo,\
    1828                     'vis' : vis,
    1829                     'owner': fid,
    1830                     'experimentID' : [\
    1831                             { 'fedid': expid }, { 'localname': eid },\
    1832                         ],\
    1833                 }
    1834         self.state[eid] = self.state[expid]
    1835         if self.state_filename: self.write_state()
    1836         self.state_lock.release()
    1837 
    1838         self.auth.set_attribute(fid, expid)
    1839         self.auth.set_attribute(expid, expid)
    1840 
    1841         if not failed:
    1842             return resp
    1843         else:
    1844             raise service_error(service_error.partial, \
    1845                     "Partial swap in on %s" % ",".join(succeeded))
     1724                f.close()
     1725
     1726            except IOError, e:
     1727                raise service_error(service_error.internal,
     1728                        "Cannot stage tarfile/rpm: %s" % e.strerror)
     1729
     1730        except service_error, e:
     1731            # If something goes wrong in the parse (usually an access error)
     1732            # clear the placeholder state.  From here on out the code delays
     1733            # exceptions.
     1734            self.state_lock.acquire()
     1735            del self.state[eid]
     1736            self.state_lock.release()
     1737            raise e
     1738
     1739        thread_pool = self.thread_pool(self.nthreads)
     1740        threads = [ ]
     1741
     1742        for tb in [ k for k in allocated.keys() if k != master]:
     1743            # Create and start a thread to start the segment, and save it to
     1744            # get the return value later
     1745            thread_pool.wait_for_slot()
     1746            t  = self.pooled_thread(target=self.start_segment,
     1747                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
     1748                    pdata=thread_pool, trace_file=self.trace_file)
     1749            threads.append(t)
     1750            t.start()
     1751
     1752        # Wait until all finish
     1753        thread_pool.wait_for_all_done()
     1754
     1755        # If none failed, start the master
     1756        failed = [ t.getName() for t in threads if not t.rv ]
     1757
     1758        if len(failed) == 0:
     1759            if not self.start_segment(master, eid, tbparams, tmpdir):
     1760                failed.append(master)
     1761
     1762        succeeded = [tb for tb in allocated.keys() if tb not in failed]
     1763        # If one failed clean up, unless fail_soft is set
     1764        if failed:
     1765            if not fail_soft:
     1766                thread_pool.clear()
     1767                for tb in succeeded:
     1768                    # Create and start a thread to stop the segment
     1769                    thread_pool.wait_for_slot()
     1770                    t  = self.pooled_thread(target=self.stop_segment,
     1771                            args=(tb, eid, tbparams), name=tb,
     1772                            pdata=thread_pool, trace_file=self.trace_file)
     1773                    t.start()
     1774                # Wait until all finish
     1775                thread_pool.wait_for_all_done()
     1776
     1777                # release the allocations
     1778                for tb in tbparams.keys():
     1779                    self.release_access(tb, tbparams[tb]['allocID'])
     1780                # Remove the placeholder
     1781                self.state_lock.acquire()
     1782                del self.state[eid]
     1783                self.state_lock.release()
     1784
     1785                raise service_error(service_error.federant,
     1786                    "Swap in failed on %s" % ",".join(failed))
     1787        else:
     1788            self.log.info("[start_segment]: Experiment %s started" % eid)
     1789
     1790        # Generate an ID for the experiment (slice) and a certificate that the
     1791        # allocator can use to prove they own it.  We'll ship it back through
     1792        # the encrypted connection.
     1793        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
     1794
     1795        self.log.debug("[start_experiment]: removing %s" % tmpdir)
     1796
     1797        # Walk up tmpdir, deleting as we go
     1798        for path, dirs, files in os.walk(tmpdir, topdown=False):
     1799            for f in files:
     1800                os.remove(os.path.join(path, f))
     1801            for d in dirs:
     1802                os.rmdir(os.path.join(path, d))
     1803        os.rmdir(tmpdir)
     1804
     1805        # The deepcopy prevents the allocation ID and other binaries from being
     1806        # translated into other formats
     1807        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
     1808                for tb in tbparams.keys() \
     1809                    if tbparams[tb].has_key('federant') ],\
     1810                    'vtopo': vtopo,\
     1811                    'vis' : vis,
     1812                    'experimentID' : [\
     1813                            { 'fedid': copy.copy(expid) }, \
     1814                            { 'localname': eid },\
     1815                        ],\
     1816                    'experimentAccess': { 'X509' : expcert },\
     1817                }
     1818        # remove the allocationID info from each federant
     1819        for f in resp['federant']:
     1820            if f.has_key('allocID'): del f['allocID']
     1821
     1822        # Insert the experiment into our state and update the disk copy
     1823        self.state_lock.acquire()
     1824        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
     1825                for tb in tbparams.keys() \
     1826                    if tbparams[tb].has_key('federant') ],\
     1827                    'vtopo': vtopo,\
     1828                    'vis' : vis,
     1829                    'owner': fid,
     1830                    'experimentID' : [\
     1831                            { 'fedid': expid }, { 'localname': eid },\
     1832                        ],\
     1833                }
     1834        self.state[eid] = self.state[expid]
     1835        if self.state_filename: self.write_state()
     1836        self.state_lock.release()
     1837
     1838        self.auth.set_attribute(fid, expid)
     1839        self.auth.set_attribute(expid, expid)
     1840
     1841        if not failed:
     1842            return resp
     1843        else:
     1844            raise service_error(service_error.partial, \
     1845                    "Partial swap in on %s" % ",".join(succeeded))
    18461846
    18471847    def check_experiment_access(self, fid, key):
    1848         """
    1849         Confirm that the fid has access to the experiment.  Though a request
    1850         may be made in terms of a local name, the access attribute is always
    1851         the experiment's fedid.
    1852         """
    1853         if not isinstance(key, fedid):
    1854             self.state_lock.acquire()
    1855             if self.state.has_key(key):
    1856                 if isinstance(self.state[key], dict):
    1857                     try:
    1858                         kl = [ f['fedid'] for f in \
    1859                                 self.state[key]['experimentID']\
    1860                                     if f.has_key('fedid') ]
    1861                     except KeyError:
    1862                         self.state_lock.release()
    1863                         raise service_error(service_error.internal,
    1864                                 "No fedid for experiment %s when checking " +\
    1865                                         "access(!?)" % key)
    1866                     if len(kl) == 1:
    1867                         key = kl[0]
    1868                     else:
    1869                         self.state_lock.release()
    1870                         raise service_error(service_error.internal,
    1871                                 "multiple fedids for experiment %s when " +\
    1872                                         "checking access(!?)" % key)
    1873                 elif isinstance(self.state[key], str):
    1874                     self.state_lock.release()
    1875                     raise service_error(service_error.internal,
    1876                             ("experiment %s is placeholder.  " +\
    1877                                     "Creation in progress or aborted oddly") \
    1878                                     % key)
    1879                 else:
    1880                     self.state_lock.release()
    1881                     raise service_error(service_error.internal,
    1882                             "Unexpected state for %s" % key)
    1883 
    1884             else:
    1885                 self.state_lock.release()
    1886                 raise service_error(service_error.access, "Access Denied")
    1887             self.state_lock.release()
    1888 
    1889         if self.auth.check_attribute(fid, key):
    1890             return True
    1891         else:
    1892             raise service_error(service_error.access, "Access Denied")
     1848        """
     1849        Confirm that the fid has access to the experiment.  Though a request
     1850        may be made in terms of a local name, the access attribute is always
     1851        the experiment's fedid.
     1852        """
     1853        if not isinstance(key, fedid):
     1854            self.state_lock.acquire()
     1855            if self.state.has_key(key):
     1856                if isinstance(self.state[key], dict):
     1857                    try:
     1858                        kl = [ f['fedid'] for f in \
     1859                                self.state[key]['experimentID']\
     1860                                    if f.has_key('fedid') ]
     1861                    except KeyError:
     1862                        self.state_lock.release()
     1863                        raise service_error(service_error.internal,
     1864                                "No fedid for experiment %s when checking " +\
     1865                                        "access(!?)" % key)
     1866                    if len(kl) == 1:
     1867                        key = kl[0]
     1868                    else:
     1869                        self.state_lock.release()
     1870                        raise service_error(service_error.internal,
     1871                                "multiple fedids for experiment %s when " +\
     1872                                        "checking access(!?)" % key)
     1873                elif isinstance(self.state[key], str):
     1874                    self.state_lock.release()
     1875                    raise service_error(service_error.internal,
     1876                            ("experiment %s is placeholder.  " +\
     1877                                    "Creation in progress or aborted oddly") \
     1878                                    % key)
     1879                else:
     1880                    self.state_lock.release()
     1881                    raise service_error(service_error.internal,
     1882                            "Unexpected state for %s" % key)
     1883
     1884            else:
     1885                self.state_lock.release()
     1886                raise service_error(service_error.access, "Access Denied")
     1887            self.state_lock.release()
     1888
     1889        if self.auth.check_attribute(fid, key):
     1890            return True
     1891        else:
     1892            raise service_error(service_error.access, "Access Denied")
    18931893
    18941894
    18951895
    18961896    def get_vtopo(self, req, fid):
    1897         """
    1898         Return the stored virtual topology for this experiment
    1899         """
    1900         rv = None
    1901 
    1902         req = req.get('VtopoRequestBody', None)
    1903         if not req:
    1904             raise service_error(service_error.req,
    1905                     "Bad request format (no VtopoRequestBody)")
    1906         exp = req.get('experiment', None)
    1907         if exp:
    1908             if exp.has_key('fedid'):
    1909                 key = exp['fedid']
    1910                 keytype = "fedid"
    1911             elif exp.has_key('localname'):
    1912                 key = exp['localname']
    1913                 keytype = "localname"
    1914             else:
    1915                 raise service_error(service_error.req, "Unknown lookup type")
    1916         else:
    1917             raise service_error(service_error.req, "No request?")
    1918 
    1919         self.check_experiment_access(fid, key)
    1920 
    1921         self.state_lock.acquire()
    1922         if self.state.has_key(key):
    1923             rv = { 'experiment' : {keytype: key },\
    1924                     'vtopo': self.state[key]['vtopo'],\
    1925                 }
    1926         self.state_lock.release()
    1927 
    1928         if rv: return rv
    1929         else: raise service_error(service_error.req, "No such experiment")
     1897        """
     1898        Return the stored virtual topology for this experiment
     1899        """
     1900        rv = None
     1901
     1902        req = req.get('VtopoRequestBody', None)
     1903        if not req:
     1904            raise service_error(service_error.req,
     1905                    "Bad request format (no VtopoRequestBody)")
     1906        exp = req.get('experiment', None)
     1907        if exp:
     1908            if exp.has_key('fedid'):
     1909                key = exp['fedid']
     1910                keytype = "fedid"
     1911            elif exp.has_key('localname'):
     1912                key = exp['localname']
     1913                keytype = "localname"
     1914            else:
     1915                raise service_error(service_error.req, "Unknown lookup type")
     1916        else:
     1917            raise service_error(service_error.req, "No request?")
     1918
     1919        self.check_experiment_access(fid, key)
     1920
     1921        self.state_lock.acquire()
     1922        if self.state.has_key(key):
     1923            rv = { 'experiment' : {keytype: key },\
     1924                    'vtopo': self.state[key]['vtopo'],\
     1925                }
     1926        self.state_lock.release()
     1927
     1928        if rv: return rv
     1929        else: raise service_error(service_error.req, "No such experiment")
    19301930
    19311931    def get_vis(self, req, fid):
    1932         """
    1933         Return the stored visualization for this experiment
    1934         """
    1935         rv = None
    1936 
    1937         req = req.get('VisRequestBody', None)
    1938         if not req:
    1939             raise service_error(service_error.req,
    1940                     "Bad request format (no VisRequestBody)")
    1941         exp = req.get('experiment', None)
    1942         if exp:
    1943             if exp.has_key('fedid'):
    1944                 key = exp['fedid']
    1945                 keytype = "fedid"
    1946             elif exp.has_key('localname'):
    1947                 key = exp['localname']
    1948                 keytype = "localname"
    1949             else:
    1950                 raise service_error(service_error.req, "Unknown lookup type")
    1951         else:
    1952             raise service_error(service_error.req, "No request?")
    1953 
    1954         self.check_experiment_access(fid, key)
    1955 
    1956         self.state_lock.acquire()
    1957         if self.state.has_key(key):
    1958             rv =  { 'experiment' : {keytype: key },\
    1959                     'vis': self.state[key]['vis'],\
    1960                     }
    1961         self.state_lock.release()
    1962 
    1963         if rv: return rv
    1964         else: raise service_error(service_error.req, "No such experiment")
     1932        """
     1933        Return the stored visualization for this experiment
     1934        """
     1935        rv = None
     1936
     1937        req = req.get('VisRequestBody', None)
     1938        if not req:
     1939            raise service_error(service_error.req,
     1940                    "Bad request format (no VisRequestBody)")
     1941        exp = req.get('experiment', None)
     1942        if exp:
     1943            if exp.has_key('fedid'):
     1944                key = exp['fedid']
     1945                keytype = "fedid"
     1946            elif exp.has_key('localname'):
     1947                key = exp['localname']
     1948                keytype = "localname"
     1949            else:
     1950                raise service_error(service_error.req, "Unknown lookup type")
     1951        else:
     1952            raise service_error(service_error.req, "No request?")
     1953
     1954        self.check_experiment_access(fid, key)
     1955
     1956        self.state_lock.acquire()
     1957        if self.state.has_key(key):
     1958            rv =  { 'experiment' : {keytype: key },\
     1959                    'vis': self.state[key]['vis'],\
     1960                    }
     1961        self.state_lock.release()
     1962
     1963        if rv: return rv
     1964        else: raise service_error(service_error.req, "No such experiment")
    19651965
    19661966    def get_info(self, req, fid):
    1967         """
    1968         Return all the stored info about this experiment
    1969         """
    1970         rv = None
    1971 
    1972         req = req.get('InfoRequestBody', None)
    1973         if not req:
    1974             raise service_error(service_error.req,
    1975                     "Bad request format (no VisRequestBody)")
    1976         exp = req.get('experiment', None)
    1977         if exp:
    1978             if exp.has_key('fedid'):
    1979                 key = exp['fedid']
    1980                 keytype = "fedid"
    1981             elif exp.has_key('localname'):
    1982                 key = exp['localname']
    1983                 keytype = "localname"
    1984             else:
    1985                 raise service_error(service_error.req, "Unknown lookup type")
    1986         else:
    1987             raise service_error(service_error.req, "No request?")
    1988 
    1989         self.check_experiment_access(fid, key)
    1990 
    1991         # The state may be massaged by the service function that called
    1992         # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
    1993         # state.
    1994         self.state_lock.acquire()
    1995         if self.state.has_key(key):
    1996             rv = copy.deepcopy(self.state[key])
    1997         self.state_lock.release()
    1998         # Remove the owner info
    1999         del rv['owner']
    2000         # remove the allocationID info from each federant
    2001         for f in rv['federant']:
    2002             if f.has_key('allocID'): del f['allocID']
    2003 
    2004         if rv: return rv
    2005         else: raise service_error(service_error.req, "No such experiment")
     1967        """
     1968        Return all the stored info about this experiment
     1969        """
     1970        rv = None
     1971
     1972        req = req.get('InfoRequestBody', None)
     1973        if not req:
     1974            raise service_error(service_error.req,
     1975                    "Bad request format (no VisRequestBody)")
     1976        exp = req.get('experiment', None)
     1977        if exp:
     1978            if exp.has_key('fedid'):
     1979                key = exp['fedid']
     1980                keytype = "fedid"
     1981            elif exp.has_key('localname'):
     1982                key = exp['localname']
     1983                keytype = "localname"
     1984            else:
     1985                raise service_error(service_error.req, "Unknown lookup type")
     1986        else:
     1987            raise service_error(service_error.req, "No request?")
     1988
     1989        self.check_experiment_access(fid, key)
     1990
     1991        # The state may be massaged by the service function that called
     1992        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
     1993        # state.
     1994        self.state_lock.acquire()
     1995        if self.state.has_key(key):
     1996            rv = copy.deepcopy(self.state[key])
     1997        self.state_lock.release()
     1998        # Remove the owner info
     1999        del rv['owner']
     2000        # remove the allocationID info from each federant
     2001        for f in rv['federant']:
     2002            if f.has_key('allocID'): del f['allocID']
     2003
     2004        if rv: return rv
     2005        else: raise service_error(service_error.req, "No such experiment")
    20062006
    20072007
    20082008    def terminate_experiment(self, req, fid):
    2009         """
    2010         Swap this experiment out on the federants and delete the shared
    2011         information
    2012         """
    2013         tbparams = { }
    2014         req = req.get('TerminateRequestBody', None)
    2015         if not req:
    2016             raise service_error(service_error.req,
    2017                     "Bad request format (no TerminateRequestBody)")
    2018         exp = req.get('experiment', None)
    2019         if exp:
    2020             if exp.has_key('fedid'):
    2021                 key = exp['fedid']
    2022                 keytype = "fedid"
    2023             elif exp.has_key('localname'):
    2024                 key = exp['localname']
    2025                 keytype = "localname"
    2026             else:
    2027                 raise service_error(service_error.req, "Unknown lookup type")
    2028         else:
    2029             raise service_error(service_error.req, "No request?")
    2030 
    2031         self.check_experiment_access(fid, key)
    2032 
    2033         self.state_lock.acquire()
    2034         fed_exp = self.state.get(key, None)
    2035 
    2036         if fed_exp:
    2037             # This branch of the conditional holds the lock to generate a
    2038             # consistent temporary tbparams variable to deallocate experiments.
    2039             # It releases the lock to do the deallocations and reacquires it to
    2040             # remove the experiment state when the termination is complete.
    2041             ids = []
    2042             #  experimentID is a list of dicts that are self-describing
    2043             #  identifiers.  This finds all the fedids and localnames - the
    2044             #  keys of self.state - and puts them into ids.
    2045             for id in fed_exp.get('experimentID', []):
    2046                 if id.has_key('fedid'): ids.append(id['fedid'])
    2047                 if id.has_key('localname'): ids.append(id['localname'])
    2048 
    2049             # Construct enough of the tbparams to make the stop_segment calls
    2050             # work
    2051             for fed in fed_exp['federant']:
    2052                 try:
    2053                     for e in fed['name']:
    2054                         eid = e.get('localname', None)
    2055                         if eid: break
    2056                     else:
    2057                         continue
    2058 
    2059                     p = fed['emulab']['project']
    2060 
    2061                     project = p['name']['localname']
    2062                     tb = p['testbed']['localname']
    2063                     user = p['user'][0]['userID']['localname']
    2064 
    2065                     domain = fed['emulab']['domain']
    2066                     host  = fed['emulab']['ops']
    2067                     aid = fed['allocID']
    2068                 except KeyError, e:
    2069                     continue
    2070                 tbparams[tb] = {\
    2071                         'user': user,\
    2072                         'domain': domain,\
    2073                         'project': project,\
    2074                         'host': host,\
    2075                         'eid': eid,\
    2076                         'aid': aid,\
    2077                     }
    2078             self.state_lock.release()
    2079 
    2080             # Stop everyone.
    2081             thread_pool = self.thread_pool(self.nthreads)
    2082             for tb in tbparams.keys():
    2083                 # Create and start a thread to stop the segment
    2084                 thread_pool.wait_for_slot()
    2085                 t  = self.pooled_thread(target=self.stop_segment,
    2086                         args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
    2087                         pdata=thread_pool, trace_file=self.trace_file)
    2088                 t.start()
    2089             # Wait for completions
    2090             thread_pool.wait_for_all_done()
    2091 
    2092             # release the allocations
    2093             for tb in tbparams.keys():
    2094                 self.release_access(tb, tbparams[tb]['aid'])
    2095 
    2096             # Remove the terminated experiment
    2097             self.state_lock.acquire()
    2098             for id in ids:
    2099                 if self.state.has_key(id): del self.state[id]
    2100 
    2101             if self.state_filename: self.write_state()
    2102             self.state_lock.release()
    2103 
    2104             return { 'experiment': exp }
    2105         else:
    2106             # Don't forget to release the lock
    2107             self.state_lock.release()
    2108             raise service_error(service_error.req, "No saved state")
     2009        """
     2010        Swap this experiment out on the federants and delete the shared
     2011        information
     2012        """
     2013        tbparams = { }
     2014        req = req.get('TerminateRequestBody', None)
     2015        if not req:
     2016            raise service_error(service_error.req,
     2017                    "Bad request format (no TerminateRequestBody)")
     2018        exp = req.get('experiment', None)
     2019        if exp:
     2020            if exp.has_key('fedid'):
     2021                key = exp['fedid']
     2022                keytype = "fedid"
     2023            elif exp.has_key('localname'):
     2024                key = exp['localname']
     2025                keytype = "localname"
     2026            else:
     2027                raise service_error(service_error.req, "Unknown lookup type")
     2028        else:
     2029            raise service_error(service_error.req, "No request?")
     2030
     2031        self.check_experiment_access(fid, key)
     2032
     2033        self.state_lock.acquire()
     2034        fed_exp = self.state.get(key, None)
     2035
     2036        if fed_exp:
     2037            # This branch of the conditional holds the lock to generate a
     2038            # consistent temporary tbparams variable to deallocate experiments.
     2039            # It releases the lock to do the deallocations and reacquires it to
     2040            # remove the experiment state when the termination is complete.
     2041            ids = []
     2042            #  experimentID is a list of dicts that are self-describing
     2043            #  identifiers.  This finds all the fedids and localnames - the
     2044            #  keys of self.state - and puts them into ids.
     2045            for id in fed_exp.get('experimentID', []):
     2046                if id.has_key('fedid'): ids.append(id['fedid'])
     2047                if id.has_key('localname'): ids.append(id['localname'])
     2048
     2049            # Construct enough of the tbparams to make the stop_segment calls
     2050            # work
     2051            for fed in fed_exp['federant']:
     2052                try:
     2053                    for e in fed['name']:
     2054                        eid = e.get('localname', None)
     2055                        if eid: break
     2056                    else:
     2057                        continue
     2058
     2059                    p = fed['emulab']['project']
     2060
     2061                    project = p['name']['localname']
     2062                    tb = p['testbed']['localname']
     2063                    user = p['user'][0]['userID']['localname']
     2064
     2065                    domain = fed['emulab']['domain']
     2066                    host  = fed['emulab']['ops']
     2067                    aid = fed['allocID']
     2068                except KeyError, e:
     2069                    continue
     2070                tbparams[tb] = {\
     2071                        'user': user,\
     2072                        'domain': domain,\
     2073                        'project': project,\
     2074                        'host': host,\
     2075                        'eid': eid,\
     2076                        'aid': aid,\
     2077                    }
     2078            self.state_lock.release()
     2079
     2080            # Stop everyone.
     2081            thread_pool = self.thread_pool(self.nthreads)
     2082            for tb in tbparams.keys():
     2083                # Create and start a thread to stop the segment
     2084                thread_pool.wait_for_slot()
     2085                t  = self.pooled_thread(target=self.stop_segment,
     2086                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
     2087                        pdata=thread_pool, trace_file=self.trace_file)
     2088                t.start()
     2089            # Wait for completions
     2090            thread_pool.wait_for_all_done()
     2091
     2092            # release the allocations
     2093            for tb in tbparams.keys():
     2094                self.release_access(tb, tbparams[tb]['aid'])
     2095
     2096            # Remove the terminated experiment
     2097            self.state_lock.acquire()
     2098            for id in ids:
     2099                if self.state.has_key(id): del self.state[id]
     2100
     2101            if self.state_filename: self.write_state()
     2102            self.state_lock.release()
     2103
     2104            return { 'experiment': exp }
     2105        else:
     2106            # Don't forget to release the lock
     2107            self.state_lock.release()
     2108            raise service_error(service_error.req, "No saved state")
Note: See TracChangeset for help on using the changeset viewer.