source: fedd/federation/experiment_control.py @ 73e0a61

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 73e0a61 was 593e901, checked in by Ted Faber <faber@…>, 15 years ago

Checkpoint working federation w/PG (w/o routing yet...)

  • Property mode set to 100644
File size: 93.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_control_local:
45    """
46    Control of experiments that this system can directly access.
47
48    Includes experiment creation, termination and information dissemination.
49    Thred safe.
50    """
51
52    class ssh_cmd_timeout(RuntimeError): pass
53   
54    class thread_pool:
55        """
56        A class to keep track of a set of threads all invoked for the same
57        task.  Manages the mutual exclusion of the states.
58        """
59        def __init__(self, nthreads):
60            """
61            Start a pool.
62            """
63            self.changed = Condition()
64            self.started = 0
65            self.terminated = 0
66            self.nthreads = nthreads
67
68        def acquire(self):
69            """
70            Get the pool's lock.
71            """
72            self.changed.acquire()
73
74        def release(self):
75            """
76            Release the pool's lock.
77            """
78            self.changed.release()
79
80        def wait(self, timeout = None):
81            """
82            Wait for a pool thread to start or stop.
83            """
84            self.changed.wait(timeout)
85
86        def start(self):
87            """
88            Called by a pool thread to report starting.
89            """
90            self.changed.acquire()
91            self.started += 1
92            self.changed.notifyAll()
93            self.changed.release()
94
95        def terminate(self):
96            """
97            Called by a pool thread to report finishing.
98            """
99            self.changed.acquire()
100            self.terminated += 1
101            self.changed.notifyAll()
102            self.changed.release()
103
104        def clear(self):
105            """
106            Clear all pool data.
107            """
108            self.changed.acquire()
109            self.started = 0
110            self.terminated =0
111            self.changed.notifyAll()
112            self.changed.release()
113
114        def wait_for_slot(self):
115            """
116            Wait until we have a free slot to start another pooled thread
117            """
118            self.acquire()
119            while self.started - self.terminated >= self.nthreads:
120                self.wait()
121            self.release()
122
123        def wait_for_all_done(self, timeout=None):
124            """
125            Wait until all active threads finish (and at least one has
126            started).  If a timeout is given, return after waiting that long
127            for termination.  If all threads are done (and one has started in
128            the since the last clear()) return True, otherwise False.
129            """
130            if timeout:
131                deadline = time.time() + timeout
132            self.acquire()
133            while self.started == 0 or self.started > self.terminated:
134                self.wait(timeout)
135                if timeout:
136                    if time.time() > deadline:
137                        break
138                    timeout = deadline - time.time()
139            self.release()
140            return not (self.started == 0 or self.started > self.terminated)
141
142    class pooled_thread(Thread):
143        """
144        One of a set of threads dedicated to a specific task.  Uses the
145        thread_pool class above for coordination.
146        """
147        def __init__(self, group=None, target=None, name=None, args=(), 
148                kwargs={}, pdata=None, trace_file=None):
149            Thread.__init__(self, group, target, name, args, kwargs)
150            self.rv = None          # Return value of the ops in this thread
151            self.exception = None   # Exception that terminated this thread
152            self.target=target      # Target function to run on start()
153            self.args = args        # Args to pass to target
154            self.kwargs = kwargs    # Additional kw args
155            self.pdata = pdata      # thread_pool for this class
156            # Logger for this thread
157            self.log = logging.getLogger("fedd.experiment_control")
158       
159        def run(self):
160            """
161            Emulate Thread.run, except add pool data manipulation and error
162            logging.
163            """
164            if self.pdata:
165                self.pdata.start()
166
167            if self.target:
168                try:
169                    self.rv = self.target(*self.args, **self.kwargs)
170                except service_error, s:
171                    self.exception = s
172                    self.log.error("Thread exception: %s %s" % \
173                            (s.code_string(), s.desc))
174                except:
175                    self.exception = sys.exc_info()[1]
176                    self.log.error(("Unexpected thread exception: %s" +\
177                            "Trace %s") % (self.exception,\
178                                traceback.format_exc()))
179            if self.pdata:
180                self.pdata.terminate()
181
182    call_RequestAccess = service_caller('RequestAccess')
183    call_ReleaseAccess = service_caller('ReleaseAccess')
184    call_StartSegment = service_caller('StartSegment')
185    call_TerminateSegment = service_caller('TerminateSegment')
186    call_Ns2Split = service_caller('Ns2Split')
187
188    def __init__(self, config=None, auth=None):
189        """
190        Intialize the various attributes, most from the config object
191        """
192
193        def parse_tarfile_list(tf):
194            """
195            Parse a tarfile list from the configuration.  This is a set of
196            paths and tarfiles separated by spaces.
197            """
198            rv = [ ]
199            if tf is not None:
200                tl = tf.split()
201                while len(tl) > 1:
202                    p, t = tl[0:2]
203                    del tl[0:2]
204                    rv.append((p, t))
205            return rv
206
207        self.thread_with_rv = experiment_control_local.pooled_thread
208        self.thread_pool = experiment_control_local.thread_pool
209        self.list_log = list_log.list_log
210
211        self.cert_file = config.get("experiment_control", "cert_file")
212        if self.cert_file:
213            self.cert_pwd = config.get("experiment_control", "cert_pwd")
214        else:
215            self.cert_file = config.get("globals", "cert_file")
216            self.cert_pwd = config.get("globals", "cert_pwd")
217
218        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
219                or config.get("globals", "trusted_certs")
220
221        self.repodir = config.get("experiment_control", "repodir")
222        self.repo_url = config.get("experiment_control", "repo_url", 
223                "https://users.isi.deterlab.net:23235");
224
225        self.exp_stem = "fed-stem"
226        self.log = logging.getLogger("fedd.experiment_control")
227        set_log_level(config, "experiment_control", self.log)
228        self.muxmax = 2
229        self.nthreads = 2
230        self.randomize_experiments = False
231
232        self.splitter = None
233        self.ssh_keygen = "/usr/bin/ssh-keygen"
234        self.ssh_identity_file = None
235
236
237        self.debug = config.getboolean("experiment_control", "create_debug")
238        self.cleanup = not config.getboolean("experiment_control", 
239                "leave_tmpfiles")
240        self.state_filename = config.get("experiment_control", 
241                "experiment_state")
242        self.store_filename = config.get("experiment_control", 
243                "synch_store")
244        self.store_url = config.get("experiment_control", "store_url")
245        self.splitter_url = config.get("experiment_control", "splitter_uri")
246        self.fedkit = parse_tarfile_list(\
247                config.get("experiment_control", "fedkit"))
248        self.gatewaykit = parse_tarfile_list(\
249                config.get("experiment_control", "gatewaykit"))
250        accessdb_file = config.get("experiment_control", "accessdb")
251
252        self.ssh_pubkey_file = config.get("experiment_control", 
253                "ssh_pubkey_file")
254        self.ssh_privkey_file = config.get("experiment_control",
255                "ssh_privkey_file")
256        # NB for internal master/slave ops, not experiment setup
257        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
258
259        self.overrides = set([])
260        ovr = config.get('experiment_control', 'overrides')
261        if ovr:
262            for o in ovr.split(","):
263                o = o.strip()
264                if o.startswith('fedid:'): o = o[len('fedid:'):]
265                self.overrides.add(fedid(hexstr=o))
266
267        self.state = { }
268        self.state_lock = Lock()
269        self.tclsh = "/usr/local/bin/otclsh"
270        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
271                config.get("experiment_control", "tcl_splitter",
272                        "/usr/testbed/lib/ns2ir/parse.tcl")
273        mapdb_file = config.get("experiment_control", "mapdb")
274        self.trace_file = sys.stderr
275
276        self.def_expstart = \
277                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
278                "/tmp/federate";
279        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
280                "FEDDIR/hosts";
281        self.def_gwstart = \
282                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
283                "/tmp/bridge.log";
284        self.def_mgwstart = \
285                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
286                "/tmp/bridge.log";
287        self.def_gwimage = "FBSD61-TUNNEL2";
288        self.def_gwtype = "pc";
289        self.local_access = { }
290
291        if auth:
292            self.auth = auth
293        else:
294            self.log.error(\
295                    "[access]: No authorizer initialized, creating local one.")
296            auth = authorizer()
297
298
299        if self.ssh_pubkey_file:
300            try:
301                f = open(self.ssh_pubkey_file, 'r')
302                self.ssh_pubkey = f.read()
303                f.close()
304            except IOError:
305                raise service_error(service_error.internal,
306                        "Cannot read sshpubkey")
307        else:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311        if not self.ssh_privkey_file:
312            raise service_error(service_error.internal, 
313                    "No SSH public key file?")
314
315
316        if mapdb_file:
317            self.read_mapdb(mapdb_file)
318        else:
319            self.log.warn("[experiment_control] No testbed map, using defaults")
320            self.tbmap = { 
321                    'deter':'https://users.isi.deterlab.net:23235',
322                    'emulab':'https://users.isi.deterlab.net:23236',
323                    'ucb':'https://users.isi.deterlab.net:23237',
324                    }
325
326        if accessdb_file:
327                self.read_accessdb(accessdb_file)
328        else:
329            raise service_error(service_error.internal,
330                    "No accessdb specified in config")
331
332        # Grab saved state.  OK to do this w/o locking because it's read only
333        # and only one thread should be in existence that can see self.state at
334        # this point.
335        if self.state_filename:
336            self.read_state()
337
338        if self.store_filename:
339            self.read_store()
340        else:
341            self.log.warning("No saved synch store")
342            self.synch_store = synch_store
343
344        # Dispatch tables
345        self.soap_services = {\
346                'New': soap_handler('New', self.new_experiment),
347                'Create': soap_handler('Create', self.create_experiment),
348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
352                'Terminate': soap_handler('Terminate', 
353                    self.terminate_experiment),
354                'GetValue': soap_handler('GetValue', self.GetValue),
355                'SetValue': soap_handler('SetValue', self.SetValue),
356        }
357
358        self.xmlrpc_services = {\
359                'New': xmlrpc_handler('New', self.new_experiment),
360                'Create': xmlrpc_handler('Create', self.create_experiment),
361                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
362                'Vis': xmlrpc_handler('Vis', self.get_vis),
363                'Info': xmlrpc_handler('Info', self.get_info),
364                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
365                'Terminate': xmlrpc_handler('Terminate',
366                    self.terminate_experiment),
367                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
368                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
369        }
370
371    # Call while holding self.state_lock
372    def write_state(self):
373        """
374        Write a new copy of experiment state after copying the existing state
375        to a backup.
376
377        State format is a simple pickling of the state dictionary.
378        """
379        if os.access(self.state_filename, os.W_OK):
380            copy_file(self.state_filename, \
381                    "%s.bak" % self.state_filename)
382        try:
383            f = open(self.state_filename, 'w')
384            pickle.dump(self.state, f)
385        except IOError, e:
386            self.log.error("Can't write file %s: %s" % \
387                    (self.state_filename, e))
388        except pickle.PicklingError, e:
389            self.log.error("Pickling problem: %s" % e)
390        except TypeError, e:
391            self.log.error("Pickling problem (TypeError): %s" % e)
392
393    @staticmethod
394    def get_alloc_ids(state):
395        """
396        Pull the fedids of the identifiers of each allocation from the
397        state.  Again, a dict dive that's best isolated.
398
399        Used by read_store and read state
400        """
401
402        return [ f['allocID']['fedid'] 
403                for f in state.get('federant',[]) \
404                    if f.has_key('allocID') and \
405                        f['allocID'].has_key('fedid')]
406
407    # Call while holding self.state_lock
408    def read_state(self):
409        """
410        Read a new copy of experiment state.  Old state is overwritten.
411
412        State format is a simple pickling of the state dictionary.
413        """
414       
415        def get_experiment_id(state):
416            """
417            Pull the fedid experimentID out of the saved state.  This is kind
418            of a gross walk through the dict.
419            """
420
421            if state.has_key('experimentID'):
422                for e in state['experimentID']:
423                    if e.has_key('fedid'):
424                        return e['fedid']
425                else:
426                    return None
427            else:
428                return None
429
430        try:
431            f = open(self.state_filename, "r")
432            self.state = pickle.load(f)
433            self.log.debug("[read_state]: Read state from %s" % \
434                    self.state_filename)
435        except IOError, e:
436            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
437                    % (self.state_filename, e))
438        except pickle.UnpicklingError, e:
439            self.log.warning(("[read_state]: No saved state: " + \
440                    "Unpickling failed: %s") % e)
441       
442        for s in self.state.values():
443            try:
444
445                eid = get_experiment_id(s)
446                if eid : 
447                    # Give the owner rights to the experiment
448                    self.auth.set_attribute(s['owner'], eid)
449                    # And holders of the eid as well
450                    self.auth.set_attribute(eid, eid)
451                    # allow overrides to control experiments as well
452                    for o in self.overrides:
453                        self.auth.set_attribute(o, eid)
454                    # Set permissions to allow reading of the software repo, if
455                    # any, as well.
456                    for a in self.get_alloc_ids(s):
457                        self.auth.set_attribute(a, 'repo/%s' % eid)
458                else:
459                    raise KeyError("No experiment id")
460            except KeyError, e:
461                self.log.warning("[read_state]: State ownership or identity " +\
462                        "misformatted in %s: %s" % (self.state_filename, e))
463
464
465    def read_accessdb(self, accessdb_file):
466        """
467        Read the mapping from fedids that can create experiments to their name
468        in the 3-level access namespace.  All will be asserted from this
469        testbed and can include the local username and porject that will be
470        asserted on their behalf by this fedd.  Each fedid is also added to the
471        authorization system with the "create" attribute.
472        """
473        self.accessdb = {}
474        # These are the regexps for parsing the db
475        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
476        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
477                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
478        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
479                "\s*->\s*(" + name_expr + ")\s*$")
480        lineno = 0
481
482        # Parse the mappings and store in self.authdb, a dict of
483        # fedid -> (proj, user)
484        try:
485            f = open(accessdb_file, "r")
486            for line in f:
487                lineno += 1
488                line = line.strip()
489                if len(line) == 0 or line.startswith('#'):
490                    continue
491                m = project_line.match(line)
492                if m:
493                    fid = fedid(hexstr=m.group(1))
494                    project, user = m.group(2,3)
495                    if not self.accessdb.has_key(fid):
496                        self.accessdb[fid] = []
497                    self.accessdb[fid].append((project, user))
498                    continue
499
500                m = user_line.match(line)
501                if m:
502                    fid = fedid(hexstr=m.group(1))
503                    project = None
504                    user = m.group(2)
505                    if not self.accessdb.has_key(fid):
506                        self.accessdb[fid] = []
507                    self.accessdb[fid].append((project, user))
508                    continue
509                self.log.warn("[experiment_control] Error parsing access " +\
510                        "db %s at line %d" %  (accessdb_file, lineno))
511        except IOError:
512            raise service_error(service_error.internal,
513                    "Error opening/reading %s as experiment " +\
514                            "control accessdb" %  accessdb_file)
515        f.close()
516
517        # Initialize the authorization attributes
518        for fid in self.accessdb.keys():
519            self.auth.set_attribute(fid, 'create')
520            self.auth.set_attribute(fid, 'new')
521
522    def read_mapdb(self, file):
523        """
524        Read a simple colon separated list of mappings for the
525        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
526        """
527
528        self.tbmap = { }
529        lineno =0
530        try:
531            f = open(file, "r")
532            for line in f:
533                lineno += 1
534                line = line.strip()
535                if line.startswith('#') or len(line) == 0:
536                    continue
537                try:
538                    label, url = line.split(':', 1)
539                    self.tbmap[label] = url
540                except ValueError, e:
541                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
542                            "map db: %s %s" % (lineno, line, e))
543        except IOError, e:
544            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
545                    "open %s: %s" % (file, e))
546        f.close()
547
548    def read_store(self):
549        try:
550            self.synch_store = synch_store()
551            self.synch_store.load(self.store_filename)
552            self.log.debug("[read_store]: Read store from %s" % \
553                    self.store_filename)
554        except IOError, e:
555            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
556                    % (self.state_filename, e))
557            self.synch_store = synch_store()
558
559        # Set the initial permissions on data in the store.  XXX: This ad hoc
560        # authorization attribute initialization is getting out of hand.
561        for k in self.synch_store.all_keys():
562            try:
563                if k.startswith('fedid:'):
564                    fid = fedid(hexstr=k[6:46])
565                    if self.state.has_key(fid):
566                        for a in self.get_alloc_ids(self.state[fid]):
567                            self.auth.set_attribute(a, k)
568            except ValueError, e:
569                self.log.warn("Cannot deduce permissions for %s" % k)
570
571
572    def write_store(self):
573        """
574        Write a new copy of synch_store after writing current state
575        to a backup.  We use the internal synch_store pickle method to avoid
576        incinsistent data.
577
578        State format is a simple pickling of the store.
579        """
580        if os.access(self.store_filename, os.W_OK):
581            copy_file(self.store_filename, \
582                    "%s.bak" % self.store_filename)
583        try:
584            self.synch_store.save(self.store_filename)
585        except IOError, e:
586            self.log.error("Can't write file %s: %s" % \
587                    (self.store_filename, e))
588        except TypeError, e:
589            self.log.error("Pickling problem (TypeError): %s" % e)
590
591       
592    def generate_ssh_keys(self, dest, type="rsa" ):
593        """
594        Generate a set of keys for the gateways to use to talk.
595
596        Keys are of type type and are stored in the required dest file.
597        """
598        valid_types = ("rsa", "dsa")
599        t = type.lower();
600        if t not in valid_types: raise ValueError
601        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
602
603        try:
604            trace = open("/dev/null", "w")
605        except IOError:
606            raise service_error(service_error.internal,
607                    "Cannot open /dev/null??");
608
609        # May raise CalledProcessError
610        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
611        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
612        if rv != 0:
613            raise service_error(service_error.internal, 
614                    "Cannot generate nonce ssh keys.  %s return code %d" \
615                            % (self.ssh_keygen, rv))
616
617    def gentopo(self, str):
618        """
619        Generate the topology dtat structure from the splitter's XML
620        representation of it.
621
622        The topology XML looks like:
623            <experiment>
624                <nodes>
625                    <node><vname></vname><ips>ip1:ip2</ips></node>
626                </nodes>
627                <lans>
628                    <lan>
629                        <vname></vname><vnode></vnode><ip></ip>
630                        <bandwidth></bandwidth><member>node:port</member>
631                    </lan>
632                </lans>
633        """
634        class topo_parse:
635            """
636            Parse the topology XML and create the dats structure.
637            """
638            def __init__(self):
639                # Typing of the subelements for data conversion
640                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
641                self.int_subelements = ( 'bandwidth',)
642                self.float_subelements = ( 'delay',)
643                # The final data structure
644                self.nodes = [ ]
645                self.lans =  [ ]
646                self.topo = { \
647                        'node': self.nodes,\
648                        'lan' : self.lans,\
649                    }
650                self.element = { }  # Current element being created
651                self.chars = ""     # Last text seen
652
653            def end_element(self, name):
654                # After each sub element the contents is added to the current
655                # element or to the appropriate list.
656                if name == 'node':
657                    self.nodes.append(self.element)
658                    self.element = { }
659                elif name == 'lan':
660                    self.lans.append(self.element)
661                    self.element = { }
662                elif name in self.str_subelements:
663                    self.element[name] = self.chars
664                    self.chars = ""
665                elif name in self.int_subelements:
666                    self.element[name] = int(self.chars)
667                    self.chars = ""
668                elif name in self.float_subelements:
669                    self.element[name] = float(self.chars)
670                    self.chars = ""
671
672            def found_chars(self, data):
673                self.chars += data.rstrip()
674
675
676        tp = topo_parse();
677        parser = xml.parsers.expat.ParserCreate()
678        parser.EndElementHandler = tp.end_element
679        parser.CharacterDataHandler = tp.found_chars
680
681        parser.Parse(str)
682
683        return tp.topo
684       
685
686    def genviz(self, topo):
687        """
688        Generate the visualization the virtual topology
689        """
690
691        neato = "/usr/local/bin/neato"
692        # These are used to parse neato output and to create the visualization
693        # file.
694        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
695        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
696                "%s</type></node>"
697
698        try:
699            # Node names
700            nodes = [ n['vname'] for n in topo['node'] ]
701            topo_lans = topo['lan']
702        except KeyError, e:
703            raise service_error(service_error.internal, "Bad topology: %s" %e)
704
705        lans = { }
706        links = { }
707
708        # Walk through the virtual topology, organizing the connections into
709        # 2-node connections (links) and more-than-2-node connections (lans).
710        # When a lan is created, it's added to the list of nodes (there's a
711        # node in the visualization for the lan).
712        for l in topo_lans:
713            if links.has_key(l['vname']):
714                if len(links[l['vname']]) < 2:
715                    links[l['vname']].append(l['vnode'])
716                else:
717                    nodes.append(l['vname'])
718                    lans[l['vname']] = links[l['vname']]
719                    del links[l['vname']]
720                    lans[l['vname']].append(l['vnode'])
721            elif lans.has_key(l['vname']):
722                lans[l['vname']].append(l['vnode'])
723            else:
724                links[l['vname']] = [ l['vnode'] ]
725
726
727        # Open up a temporary file for dot to turn into a visualization
728        try:
729            df, dotname = tempfile.mkstemp()
730            dotfile = os.fdopen(df, 'w')
731        except IOError:
732            raise service_error(service_error.internal,
733                    "Failed to open file in genviz")
734
735        try:
736            dnull = open('/dev/null', 'w')
737        except IOError:
738            service_error(service_error.internal,
739                    "Failed to open /dev/null in genviz")
740
741        # Generate a dot/neato input file from the links, nodes and lans
742        try:
743            print >>dotfile, "graph G {"
744            for n in nodes:
745                print >>dotfile, '\t"%s"' % n
746            for l in links.keys():
747                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
748            for l in lans.keys():
749                for n in lans[l]:
750                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
751            print >>dotfile, "}"
752            dotfile.close()
753        except TypeError:
754            raise service_error(service_error.internal,
755                    "Single endpoint link in vtopo")
756        except IOError:
757            raise service_error(service_error.internal, "Cannot write dot file")
758
759        # Use dot to create a visualization
760        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
761                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
762                close_fds=True)
763        dnull.close()
764
765        # Translate dot to vis format
766        vis_nodes = [ ]
767        vis = { 'node': vis_nodes }
768        for line in dot.stdout:
769            m = vis_re.match(line)
770            if m:
771                vn = m.group(1)
772                vis_node = {'name': vn, \
773                        'x': float(m.group(2)),\
774                        'y' : float(m.group(3)),\
775                    }
776                if vn in links.keys() or vn in lans.keys():
777                    vis_node['type'] = 'lan'
778                else:
779                    vis_node['type'] = 'node'
780                vis_nodes.append(vis_node)
781        rv = dot.wait()
782
783        os.remove(dotname)
784        if rv == 0 : return vis
785        else: return None
786
787    def get_access(self, tb, nodes, tbparam, master, export_project,
788            access_user, services):
789        """
790        Get access to testbed through fedd and set the parameters for that tb
791        """
792        uri = self.tbmap.get(tb, None)
793        if not uri:
794            raise service_error(service_error.server_config, 
795                    "Unknown testbed: %s" % tb)
796
797        # Tweak search order so that if there are entries in access_user that
798        # have a project matching the export project, we try them first
799        if export_project and export_project.has_key('localname'): 
800            pn = export_project['localname'] 
801               
802            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
803            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
804        else: 
805            access_sequence = access_user
806
807        for p, u in access_sequence: 
808            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
809                    "to %s") %  ((p or "None"), u, uri))
810
811            if p:
812                # Request with user and project specified
813                req = {\
814                        'destinationTestbed' : { 'uri' : uri },
815                        'credential': [ "project: %s" % p, "user: %s"  % u],
816                        'allocID' : { 'localname': 'test' },
817                    }
818            else:
819                # Request with only user specified
820                req = {\
821                        'destinationTestbed' : { 'uri' : uri },
822                        'credential': [ 'user: %s' % u ],
823                        'user':  [ {'userID': { 'localname': u } } ],
824                        'allocID' : { 'localname': 'test' },
825                    }
826
827            if tb == master:
828                # NB, the export_project parameter is a dict that includes
829                # the type
830                req['exportProject'] = export_project
831                req['service'] = [
832                        { 'name': 'userconfig', 'visibility': 'export'},
833                        { 'name': 'SMB', 'visibility': 'export'},
834                        { 'name': 'seer', 'visibility': 'export'},
835                        { 'name': 'tmcd', 'visibility': 'export'},
836                    ]
837
838            # node resources if any
839            if nodes != None and len(nodes) > 0:
840                rnodes = [ ]
841                for n in nodes:
842                    rn = { }
843                    image, hw, count = n.split(":")
844                    if image: rn['image'] = [ image ]
845                    if hw: rn['hardware'] = [ hw ]
846                    if count and int(count) >0 : rn['count'] = int(count)
847                    rnodes.append(rn)
848                req['resources']= { }
849                req['resources']['node'] = rnodes
850
851            try:
852                if self.local_access.has_key(uri):
853                    # Local access call
854                    req = { 'RequestAccessRequestBody' : req }
855                    r = self.local_access[uri].RequestAccess(req, 
856                            fedid(file=self.cert_file))
857                    r = { 'RequestAccessResponseBody' : r }
858                else:
859                    r = self.call_RequestAccess(uri, req, 
860                            self.cert_file, self.cert_pwd, self.trusted_certs)
861            except service_error, e:
862                if e.code == service_error.access:
863                    self.log.debug("[get_access] Access denied")
864                    r = None
865                    continue
866                else:
867                    raise e
868
869            if r.has_key('RequestAccessResponseBody'):
870                # Through to here we have a valid response, not a fault.
871                # Access denied is a fault, so something better or worse than
872                # access denied has happened.
873                r = r['RequestAccessResponseBody']
874                self.log.debug("[get_access] Access granted")
875                break
876            else:
877                raise service_error(service_error.protocol,
878                        "Bad proxy response")
879       
880        if not r:
881            raise service_error(service_error.access, 
882                    "Access denied by %s (%s)" % (tb, uri))
883
884        tbparam[tb] = { 
885                "allocID" : r['allocID'],
886                "uri": uri,
887                }
888        if 'service' in r: 
889            services.extend(r['service'])
890
891        # Add attributes to parameter space.  We don't allow attributes to
892        # overlay any parameters already installed.
893        for a in r.get('fedAttr', []):
894            try:
895                if a['attribute'] and \
896                        isinstance(a['attribute'], basestring)\
897                        and not tbparam[tb].has_key(a['attribute'].lower()):
898                    tbparam[tb][a['attribute'].lower()] = a['value']
899            except KeyError:
900                self.log.error("Bad attribute in response: %s" % a)
901
902    def release_access(self, tb, aid, uri=None):
903        """
904        Release access to testbed through fedd
905        """
906
907        if not uri:
908            uri = self.tbmap.get(tb, None)
909        if not uri:
910            raise service_error(service_error.server_config, 
911                    "Unknown testbed: %s" % tb)
912
913        if self.local_access.has_key(uri):
914            resp = self.local_access[uri].ReleaseAccess(\
915                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
916                    fedid(file=self.cert_file))
917            resp = { 'ReleaseAccessResponseBody': resp } 
918        else:
919            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
920                    self.cert_file, self.cert_pwd, self.trusted_certs)
921
922        # better error coding
923
924    def remote_splitter(self, uri, desc, master):
925
926        req = {
927                'description' : { 'ns2description': desc },
928                'master': master,
929                'include_fedkit': bool(self.fedkit),
930                'include_gatewaykit': bool(self.gatewaykit)
931            }
932
933        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
934                self.trusted_certs)
935
936        if r.has_key('Ns2SplitResponseBody'):
937            r = r['Ns2SplitResponseBody']
938            if r.has_key('output'):
939                return r['output'].splitlines()
940            else:
941                raise service_error(service_error.protocol, 
942                        "Bad splitter response (no output)")
943        else:
944            raise service_error(service_error.protocol, "Bad splitter response")
945
946    class start_segment:
947        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
948                cert_pwd=None, trusted_certs=None, caller=None,
949                log_collector=None):
950            self.log = log
951            self.debug = debug
952            self.cert_file = cert_file
953            self.cert_pwd = cert_pwd
954            self.trusted_certs = None
955            self.caller = caller
956            self.testbed = testbed
957            self.log_collector = log_collector
958            self.response = None
959
960        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
961                services=None):
962            req = {
963                    'allocID': { 'fedid' : aid }, 
964                    'segmentdescription': { 
965                        'topdldescription': topo.to_dict(),
966                    },
967                    'master': master,
968                }
969
970            if connInfo:
971                req['connection'] = connInfo
972            # Add services to request.  The master exports, everyone else
973            # imports.
974            if services:
975                svcs = [ x.copy() for x in services]
976                for s in svcs:
977                    if master: s['visibility'] = 'export'
978                    else: s['visibility'] = 'import'
979                req['service'] = svcs
980            if attrs:
981                req['fedAttr'] = attrs
982
983            try:
984                self.log.debug("Calling StartSegment at %s " % uri)
985                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
986                        self.trusted_certs)
987                if r.has_key('StartSegmentResponseBody'):
988                    lval = r['StartSegmentResponseBody'].get('allocationLog',
989                            None)
990                    if lval and self.log_collector:
991                        for line in  lval.splitlines(True):
992                            self.log_collector.write(line)
993                    self.response = r
994                else:
995                    raise service_error(service_error.internal, 
996                            "Bad response!?: %s" %r)
997                return True
998            except service_error, e:
999                self.log.error("Start segment failed on %s: %s" % \
1000                        (self.testbed, e))
1001                return False
1002
1003
1004
1005    class terminate_segment:
1006        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1007                cert_pwd=None, trusted_certs=None, caller=None):
1008            self.log = log
1009            self.debug = debug
1010            self.cert_file = cert_file
1011            self.cert_pwd = cert_pwd
1012            self.trusted_certs = None
1013            self.caller = caller
1014            self.testbed = testbed
1015
1016        def __call__(self, uri, aid ):
1017            req = {
1018                    'allocID': aid , 
1019                }
1020            try:
1021                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1022                        self.trusted_certs)
1023                return True
1024            except service_error, e:
1025                self.log.error("Terminate segment failed on %s: %s" % \
1026                        (self.testbed, e))
1027                return False
1028   
1029
1030    def allocate_resources(self, allocated, master, eid, expid, 
1031            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
1032            attrs=None, connInfo={}, services=[]):
1033
1034        started = { }           # Testbeds where a sub-experiment started
1035                                # successfully
1036
1037        # XXX
1038        fail_soft = False
1039
1040        log = alloc_log or self.log
1041
1042        thread_pool = self.thread_pool(self.nthreads)
1043        threads = [ ]
1044
1045        for tb in allocated.keys():
1046            # Create and start a thread to start the segment, and save it
1047            # to get the return value later
1048            thread_pool.wait_for_slot()
1049            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1050            if not uri:
1051                raise service_error(service_error.internal, 
1052                        "Unknown testbed %s !?" % tb)
1053
1054            if tbparams[tb].has_key('allocID') and \
1055                    tbparams[tb]['allocID'].has_key('fedid'):
1056                aid = tbparams[tb]['allocID']['fedid']
1057            else:
1058                raise service_error(service_error.internal, 
1059                        "No alloc id for testbed %s !?" % tb)
1060
1061            t  = self.pooled_thread(\
1062                    target=self.start_segment(log=log, debug=self.debug,
1063                        testbed=tb, cert_file=self.cert_file,
1064                        cert_pwd=self.cert_pwd,
1065                        trusted_certs=self.trusted_certs,
1066                        caller=self.call_StartSegment,
1067                        log_collector=log_collector), 
1068                    args=(uri, aid, topo[tb], tb == master, 
1069                        attrs, connInfo[tb], services),
1070                    name=tb,
1071                    pdata=thread_pool, trace_file=self.trace_file)
1072            threads.append(t)
1073            t.start()
1074
1075        # Wait until all finish (keep pinging the log, though)
1076        mins = 0
1077        revoked = False
1078        while not thread_pool.wait_for_all_done(60.0):
1079            mins += 1
1080            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1081                    % mins)
1082            if not revoked and \
1083                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1084                # a testbed has failed.  Revoke this experiment's
1085                # synchronizarion values so that sub experiments will not
1086                # deadlock waiting for synchronization that will never happen
1087                self.log.info("A subexperiment has failed to swap in, " + \
1088                        "revoking synch keys")
1089                var_key = "fedid:%s" % expid
1090                for k in self.synch_store.all_keys():
1091                    if len(k) > 45 and k[0:46] == var_key:
1092                        self.synch_store.revoke_key(k)
1093                revoked = True
1094
1095        failed = [ t.getName() for t in threads if not t.rv ]
1096        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1097
1098        # If one failed clean up, unless fail_soft is set
1099        if failed:
1100            if not fail_soft:
1101                thread_pool.clear()
1102                for tb in succeeded:
1103                    # Create and start a thread to stop the segment
1104                    thread_pool.wait_for_slot()
1105                    uri = tbparams[tb]['uri']
1106                    t  = self.pooled_thread(\
1107                            target=self.terminate_segment(log=log,
1108                                testbed=tb,
1109                                cert_file=self.cert_file, 
1110                                cert_pwd=self.cert_pwd,
1111                                trusted_certs=self.trusted_certs,
1112                                caller=self.call_TerminateSegment),
1113                            args=(uri, tbparams[tb]['federant']['allocID']),
1114                            name=tb,
1115                            pdata=thread_pool, trace_file=self.trace_file)
1116                    t.start()
1117                # Wait until all finish (if any are being stopped)
1118                if succeeded:
1119                    thread_pool.wait_for_all_done()
1120
1121                # release the allocations
1122                for tb in tbparams.keys():
1123                    self.release_access(tb, tbparams[tb]['allocID'],
1124                            tbparams[tb].get('uri', None))
1125                # Remove the placeholder
1126                self.state_lock.acquire()
1127                self.state[eid]['experimentStatus'] = 'failed'
1128                if self.state_filename: self.write_state()
1129                self.state_lock.release()
1130
1131                log.error("Swap in failed on %s" % ",".join(failed))
1132                return
1133        else:
1134            log.info("[start_segment]: Experiment %s active" % eid)
1135
1136
1137        # Walk up tmpdir, deleting as we go
1138        if self.cleanup:
1139            log.debug("[start_experiment]: removing %s" % tmpdir)
1140            for path, dirs, files in os.walk(tmpdir, topdown=False):
1141                for f in files:
1142                    os.remove(os.path.join(path, f))
1143                for d in dirs:
1144                    os.rmdir(os.path.join(path, d))
1145            os.rmdir(tmpdir)
1146        else:
1147            log.debug("[start_experiment]: not removing %s" % tmpdir)
1148
1149        # Insert the experiment into our state and update the disk copy
1150        self.state_lock.acquire()
1151        self.state[expid]['experimentStatus'] = 'active'
1152        self.state[eid] = self.state[expid]
1153        if self.state_filename: self.write_state()
1154        self.state_lock.release()
1155        return
1156
1157
1158    def add_kit(self, e, kit):
1159        """
1160        Add a Software object created from the list of (install, location)
1161        tuples passed as kit  to the software attribute of an object e.  We
1162        do this enough to break out the code, but it's kind of a hack to
1163        avoid changing the old tuple rep.
1164        """
1165
1166        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1167
1168        if isinstance(e.software, list): e.software.extend(s)
1169        else: e.software = s
1170
1171
1172    def create_experiment_state(self, fid, req, expid, expcert, 
1173            state='starting'):
1174        """
1175        Create the initial entry in the experiment's state.  The expid and
1176        expcert are the experiment's fedid and certifacte that represents that
1177        ID, which are installed in the experiment state.  If the request
1178        includes a suggested local name that is used if possible.  If the local
1179        name is already taken by an experiment owned by this user that has
1180        failed, it is overwritten.  Otherwise new letters are added until a
1181        valid localname is found.  The generated local name is returned.
1182        """
1183
1184        if req.has_key('experimentID') and \
1185                req['experimentID'].has_key('localname'):
1186            overwrite = False
1187            eid = req['experimentID']['localname']
1188            # If there's an old failed experiment here with the same local name
1189            # and accessible by this user, we'll overwrite it, otherwise we'll
1190            # fall through and do the collision avoidance.
1191            old_expid = self.get_experiment_fedid(eid)
1192            if old_expid and self.check_experiment_access(fid, old_expid):
1193                self.state_lock.acquire()
1194                status = self.state[eid].get('experimentStatus', None)
1195                if status and status == 'failed':
1196                    # remove the old access attribute
1197                    self.auth.unset_attribute(fid, old_expid)
1198                    overwrite = True
1199                    del self.state[eid]
1200                    del self.state[old_expid]
1201                self.state_lock.release()
1202            self.state_lock.acquire()
1203            while (self.state.has_key(eid) and not overwrite):
1204                eid += random.choice(string.ascii_letters)
1205            # Initial state
1206            self.state[eid] = {
1207                    'experimentID' : \
1208                            [ { 'localname' : eid }, {'fedid': expid } ],
1209                    'experimentStatus': state,
1210                    'experimentAccess': { 'X509' : expcert },
1211                    'owner': fid,
1212                    'log' : [],
1213                }
1214            self.state[expid] = self.state[eid]
1215            if self.state_filename: self.write_state()
1216            self.state_lock.release()
1217        else:
1218            eid = self.exp_stem
1219            for i in range(0,5):
1220                eid += random.choice(string.ascii_letters)
1221            self.state_lock.acquire()
1222            while (self.state.has_key(eid)):
1223                eid = self.exp_stem
1224                for i in range(0,5):
1225                    eid += random.choice(string.ascii_letters)
1226            # Initial state
1227            self.state[eid] = {
1228                    'experimentID' : \
1229                            [ { 'localname' : eid }, {'fedid': expid } ],
1230                    'experimentStatus': state,
1231                    'experimentAccess': { 'X509' : expcert },
1232                    'owner': fid,
1233                    'log' : [],
1234                }
1235            self.state[expid] = self.state[eid]
1236            if self.state_filename: self.write_state()
1237            self.state_lock.release()
1238
1239        return eid
1240
1241
1242    def allocate_ips_to_topo(self, top):
1243        """
1244        Add an ip4_address attribute to all the hosts in the topology, based on
1245        the shared substrates on which they sit.  An /etc/hosts file is also
1246        created and returned as a list of hostfiles entries.  We also return
1247        the allocator, because we may need to allocate IPs to portals
1248        (specifically DRAGON portals).
1249        """
1250        subs = sorted(top.substrates, 
1251                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1252                reverse=True)
1253        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1254        ifs = { }
1255        hosts = [ ]
1256
1257        for idx, s in enumerate(subs):
1258            net_size = len(s.interfaces)+2
1259
1260            a = ips.allocate(net_size)
1261            if a :
1262                base, num = a
1263                if num < net_size: 
1264                    raise service_error(service_error.internal,
1265                            "Allocator returned wrong number of IPs??")
1266            else:
1267                raise service_error(service_error.req, 
1268                        "Cannot allocate IP addresses")
1269            mask = 2
1270            while 2 **mask < net_size:
1271                mask += 1
1272
1273            netmask = ((2**32-1) ^ (mask**2 -1))
1274            print "%d %x" % (mask , netmask)
1275
1276            base += 1
1277            for i in s.interfaces:
1278                i.attribute.append(
1279                        topdl.Attribute('ip4_address', 
1280                            "%s" % ip_addr(base)))
1281                i.attribute.append(
1282                        topdl.Attribute('ip4_netmask', 
1283                            "%s" % ip_addr(int(netmask))))
1284
1285                hname = i.element.name[0]
1286                if ifs.has_key(hname):
1287                    hosts.append("%s\t%s-%s %s-%d" % \
1288                            (ip_addr(base), hname, s.name, hname,
1289                                ifs[hname]))
1290                else:
1291                    ifs[hname] = 0
1292                    hosts.append("%s\t%s-%s %s-%d %s" % \
1293                            (ip_addr(base), hname, s.name, hname,
1294                                ifs[hname], hname))
1295
1296                ifs[hname] += 1
1297                base += 1
1298        return hosts, ips
1299
1300    def get_access_to_testbeds(self, testbeds, access_user, 
1301            export_project, master, allocated, tbparams, services):
1302        """
1303        Request access to the various testbeds required for this instantiation
1304        (passed in as testbeds).  User, access_user, expoert_project and master
1305        are used to construct the correct requests.  Per-testbed parameters are
1306        returned in tbparams.
1307        """
1308        for tb in testbeds:
1309            self.get_access(tb, None, tbparams, master,
1310                    export_project, access_user, services)
1311            allocated[tb] = 1
1312
1313    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1314        """
1315        Create the sub-topologies that are needed for experiment instantiation.
1316        """
1317        for tb in testbeds:
1318            topo[tb] = top.clone()
1319            to_delete = [ ]
1320            # XXX: copy in for loop to simplify
1321            for e in topo[tb].elements:
1322                etb = e.get_attribute('testbed')
1323                if etb and etb != tb:
1324                    for i in e.interface:
1325                        for s in i.subs:
1326                            try:
1327                                s.interfaces.remove(i)
1328                            except ValueError:
1329                                raise service_error(service_error.internal,
1330                                        "Can't remove interface??")
1331                    to_delete.append(e)
1332            for e in to_delete:
1333                topo[tb].elements.remove(e)
1334            topo[tb].make_indices()
1335
1336            for e in [ e for e in topo[tb].elements \
1337                    if isinstance(e,topdl.Computer)]:
1338                if self.fedkit: self.add_kit(e, self.fedkit)
1339
1340    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1341            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
1342            expid=None):
1343        """
1344        Return a new internet portal node and a dict with the connectionInfo to
1345        be attached.
1346        """
1347        dproject = tbparams[dt].get('project', 'project')
1348        ddomain = tbparams[dt].get('domain', ".example.com")
1349        mdomain = tbparams[master].get('domain', '.example.com')
1350        mproject = tbparams[master].get('project', 'project')
1351        muser = tbparams[master].get('user', 'root')
1352        smbshare = tbparams[master].get('smbshare', 'USERS')
1353
1354        if st == master or dt == master:
1355            active = ("%s" % (st == master))
1356        else:
1357            active = ("%s" % (st > dt))
1358
1359        ifaces = [ ]
1360        for sub, attrs in iface_desc:
1361            inf = topdl.Interface(
1362                    name="inf%03d" % len(ifaces),
1363                    substrate=sub,
1364                    attribute=[
1365                        topdl.Attribute(
1366                            attribute=n,
1367                            value = v)
1368                        for n, v in attrs
1369                        ]
1370                    )
1371            ifaces.append(inf)
1372        if conn_type == "ssh":
1373            try:
1374                aid = tbparams[st]['allocID']['fedid']
1375            except:
1376                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
1377                        % st)
1378                aid = None
1379            info = {
1380                    "type" : conn_type, 
1381                    "portal": myname,
1382                    'fedAttr': [ 
1383                            { 'attribute': 'masterdomain', 'value': mdomain},
1384                            { 'attribute': 'masterexperiment', 'value': 
1385                                "%s/%s" % (mproject, eid)},
1386                            { 'attribute': 'active', 'value': active},
1387                            # Move to SMB service description
1388                            { 'attribute': 'masteruser', 'value': muser},
1389                            { 'attribute': 'smbshare', 'value': smbshare},
1390                        ],
1391                    'parameter': [
1392                        {
1393                            'name': 'peer',
1394                            'key': 'fedid:%s/%s' % (expid, myname),
1395                            'store': self.store_url,
1396                            'type': 'output',
1397                        },
1398                        {
1399                            'name': 'ssh_port',
1400                            'key': 'fedid:%s/%s-port' % (expid, myname),
1401                            'store': self.store_url,
1402                            'type': 'output',
1403                        },
1404                        {
1405                            'name': 'peer',
1406                            'key': 'fedid:%s/%s' % (expid, desthost),
1407                            'store': self.store_url,
1408                            'type': 'input',
1409                        },
1410                        {
1411                            'name': 'ssh_port',
1412                            'key': 'fedid:%s/%s-port' % (expid, desthost),
1413                            'store': self.store_url,
1414                            'type': 'input',
1415                        },
1416                        ]
1417                    }
1418            # Give this allocation the rights to access the key of the
1419            # peers
1420            if aid:
1421                for h in (myname, desthost):
1422                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
1423                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
1424                            (expid, h))
1425            else:
1426                self.log.error("No aid for %s in new_portal_node" % st)
1427        else:
1428            info = None
1429
1430        return (topdl.Computer(
1431                name=myname,
1432                attribute=[ 
1433                    topdl.Attribute(attribute=n,value=v)
1434                        for n, v in (\
1435                            ('portal', 'true'),
1436                            ('portal_type', portal_type), 
1437                        )
1438                    ],
1439                interface=ifaces,
1440                ), info)
1441
1442    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
1443        ddomain = tbparams[dt].get('domain', ".example.com")
1444        dproject = tbparams[dt].get('project', 'project')
1445        tsubstrate = \
1446                topdl.Substrate(name='%s-%s' % (st, dt),
1447                        attribute= [
1448                            topdl.Attribute(
1449                                attribute='portal',
1450                                value='true')
1451                            ]
1452                        )
1453        segment_element = topdl.Segment(
1454                id= tbparams[dt]['allocID'],
1455                type='emulab',
1456                uri = self.tbmap.get(dt, None),
1457                interface=[ 
1458                    topdl.Interface(
1459                        substrate=tsubstrate.name),
1460                    ],
1461                attribute = [
1462                    topdl.Attribute(attribute=n, value=v)
1463                        for n, v in (\
1464                            ('domain', ddomain),
1465                            ('experiment', "%s/%s" % \
1466                                    (dproject, eid)),)
1467                    ],
1468                )
1469
1470        return (tsubstrate, segment_element)
1471
1472    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
1473        if sub.capacity is None:
1474            raise service_error(service_error.internal,
1475                    "Cannot DRAGON split substrate w/o capacity")
1476        segs = [ ]
1477        substr = topdl.Substrate(name="dragon%d" % idx, 
1478                capacity=sub.capacity.clone(),
1479                attribute=[ topdl.Attribute(attribute=n, value=v)
1480                    for n, v, in (\
1481                            ('vlan', 'unassigned%d' % idx),)])
1482        name = "dragon%d" % idx
1483        store_key = 'fedid:%s/vlan%d' % (expid, idx)
1484        for tb in tbs.keys():
1485            seg = topdl.Segment(
1486                    id = tbparams[tb]['allocID'],
1487                    type='emulab',
1488                    uri = self.tbmap.get(tb, None),
1489                    interface=[ 
1490                        topdl.Interface(
1491                            substrate=substr.name),
1492                        ],
1493                    attribute=[ topdl.Attribute(
1494                        attribute='dragon_endpoint', 
1495                        value=tbparams[tb]['dragon']),
1496                        ]
1497                    )
1498            if tbparams[tb].has_key('vlans'):
1499                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1500            segs.append(seg)
1501
1502            # Give this allocation the rights to access the key of the
1503            # vlan_id
1504            try:
1505                aid = tbparams[tb]['allocID']['fedid']
1506                self.auth.set_attribute(aid, store_key)
1507            except:
1508                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
1509                        % tb)
1510
1511        connInfo[name] = [ { 
1512            'type': 'transit',
1513            'parameter': [ { 
1514                'name': 'vlan_id',
1515                'key': store_key,
1516                'store': self.store_url,
1517                'type': 'output'
1518                } ]
1519            } ]
1520
1521        topo[name] = \
1522                topdl.Topology(substrates=[substr], elements=segs,
1523                        attribute=[
1524                            topdl.Attribute(attribute="transit", value='true'),
1525                            topdl.Attribute(attribute="dynamic", value='true'),
1526                            topdl.Attribute(attribute="testbed", 
1527                                value='dragon'),
1528                            topdl.Attribute(attribute="store_keys", 
1529                                value=store_key),
1530                            ]
1531                        )
1532
1533    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
1534            connInfo, expid=None):
1535        """
1536        Add attribiutes to the various elements indicating that they are to be
1537        dragon connected and create a dragon segment in topo to be
1538        instantiated.
1539        """
1540
1541        def get_substrate_from_topo(name, t):
1542            for s in t.substrates:
1543                if s.name == name: return s
1544            else: return None
1545
1546
1547        mdomain = tbparams[master].get('domain', '.example.com')
1548        mproject = tbparams[master].get('project', 'project')
1549        # dn is the number of previously created dragon nets.  This routine
1550        # creates a net numbered by dn
1551        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1552        # Count the number of interfaces on this substrate in each testbed from
1553        # the global topology
1554        count = { }
1555        node = { }
1556        for e in [ i.element for i in sub.interfaces ]:
1557            tb = e.get_attribute('testbed')
1558            count[tb] = count.get(tb, 0) + 1
1559            node[tb] = i.get_attribute('ip4_address')
1560
1561
1562        # Set the attributes in the copies that will allow setup of dragon
1563        # connections.
1564        for tb in tbs.keys():
1565            s = get_substrate_from_topo(sub.name, topo[tb])
1566            if s:
1567                if not connInfo.has_key(tb):
1568                    connInfo[tb] = [ ]
1569
1570                try:
1571                    aid = tbparams[tb]['allocID']['fedid']
1572                except:
1573                    self.log.debug("[creat_dragon_substrate] " + 
1574                            "Can't get alloc id for %s?" %tb)
1575                    aid = None
1576
1577                # This may need another look, but only a service gateway will
1578                # look at the active parameter, and these are only inserted to
1579                # connect to the master.
1580                active = "%s" % ( tb == master)
1581                info = {
1582                        'type': 'transit',
1583                        'member': [ {
1584                            'element': i.element.name[0], 
1585                            'interface': i.name
1586                            } for i in s.interfaces \
1587                                    if isinstance(i.element, topdl.Computer) ],
1588                        'fedAttr': [ 
1589                            { 'attribute': 'masterdomain', 'value': mdomain},
1590                            { 'attribute': 'masterexperiment', 'value': 
1591                                "%s/%s" % (mproject, eid)},
1592                            { 'attribute': 'active', 'value': active},
1593                            ],
1594                        'parameter': [ {
1595                            'name': 'vlan_id',
1596                            'key': 'fedid:%s/vlan%d' % (expid, dn),
1597                            'store': self.store_url,
1598                            'type': 'input',
1599                            } ]
1600                        }
1601                if tbs.has_key(tb):
1602                    info['peer'] = tbs[tb]
1603                connInfo[tb].append(info)
1604
1605                # Give this allocation the rights to access the key of the
1606                # vlan_id
1607                if aid:
1608                    self.auth.set_attribute(aid, 
1609                            'fedid:%s/vlan%d' % (expid, dn))
1610            else:
1611                raise service_error(service_error.internal,
1612                        "No substrate %s in testbed %s" % (sub.name, tb))
1613
1614        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
1615
1616    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1617            segment_substrate, portals, connInfo, expid):
1618        # More than one testbed is on this substrate.  Insert
1619        # some portals into the subtopologies.  st == source testbed,
1620        # dt == destination testbed.
1621        for st in tbs.keys():
1622            if not segment_substrate.has_key(st):
1623                segment_substrate[st] = { }
1624            if not portals.has_key(st): 
1625                portals[st] = { }
1626            if not connInfo.has_key(st):
1627                connInfo[st] = [ ]
1628            for dt in [ t for t in tbs.keys() if t != st]:
1629                sproject = tbparams[st].get('project', 'project')
1630                dproject = tbparams[dt].get('project', 'project')
1631                mproject = tbparams[master].get('project', 'project')
1632                sdomain = tbparams[st].get('domain', ".example.com")
1633                ddomain = tbparams[dt].get('domain', ".example.com")
1634                mdomain = tbparams[master].get('domain', '.example.com')
1635                muser = tbparams[master].get('user', 'root')
1636                smbshare = tbparams[master].get('smbshare', 'USERS')
1637                aid = tbparams[dt]['allocID']['fedid']
1638                if st == master or dt == master:
1639                    active = ("%s" % (st == master))
1640                else:
1641                    active = ("%s" %(st > dt))
1642                if not segment_substrate[st].has_key(dt):
1643                    # Put a substrate and a segment for the connected
1644                    # testbed in there.
1645                    tsubstrate, segment_element = \
1646                            self.new_portal_substrate(st, dt, eid, tbparams,
1647                                    expid)
1648                    segment_substrate[st][dt] = tsubstrate
1649                    topo[st].substrates.append(tsubstrate)
1650                    topo[st].elements.append(segment_element)
1651
1652                new_portal = False
1653                if portals[st].has_key(dt):
1654                    # There's a portal set up to go to this destination.
1655                    # See if there's room to multiplex this connection on
1656                    # it.  If so, add an interface to the portal; if not,
1657                    # set up to add a portal below.
1658                    # [This little festival of braces is just a pop of the
1659                    # last element in the list of portals between st and
1660                    # dt.]
1661                    portal = portals[st][dt][-1]
1662                    mux = len([ i for i in portal.interface \
1663                            if not i.get_attribute('portal')])
1664                    if mux == self.muxmax:
1665                        new_portal = True
1666                        portal_type = "experiment"
1667                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1668                        desthost = "%stunnel%d" % (st.lower(), 
1669                                len(portals[st][dt]))
1670                    else:
1671                        new_i = topdl.Interface(
1672                                substrate=sub.name,
1673                                attribute=[ 
1674                                    topdl.Attribute(
1675                                        attribute='ip4_address', 
1676                                        value=tbs[dt]
1677                                    )
1678                                ])
1679                        portal.interface.append(new_i)
1680                else:
1681                    # First connection to this testbed, make an empty list
1682                    # and set up to add the new portal below
1683                    new_portal = True
1684                    portals[st][dt] = [ ]
1685                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1686                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
1687
1688                    if dt == master or st == master: portal_type = "both"
1689                    else: portal_type = "experiment"
1690
1691                if new_portal:
1692                    infs = (
1693                            (segment_substrate[st][dt].name, 
1694                                (('portal', 'true'),)),
1695                            (sub.name, 
1696                                (('ip4_address', tbs[dt]),))
1697                        )
1698                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
1699                            master, eid, myname, desthost, portal_type,
1700                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
1701                    if self.fedkit:
1702                        self.add_kit(portal, self.fedkit)
1703                    if self.gatewaykit: 
1704                        self.add_kit(portal, self.gatewaykit)
1705
1706                    topo[st].elements.append(portal)
1707                    portals[st][dt].append(portal)
1708                    connInfo[st].append(info)
1709
1710    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo, expid):
1711        # Add to the master testbed
1712        tsubstrate, segment_element = \
1713                self.new_portal_substrate(st, dt, eid, tbparams, expid)
1714        myname = "%stunnel" % dt
1715        desthost = "%stunnel" % st
1716
1717        portal, info = self.new_portal_node(st, dt, tbparams, master,
1718                eid, myname, desthost, "control", 
1719                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
1720                conn_attrs=[], expid=expid)
1721        if self.fedkit:
1722            self.add_kit(portal, self.fedkit)
1723        if self.gatewaykit: 
1724            self.add_kit(portal, self.gatewaykit)
1725
1726        topo[st].substrates.append(tsubstrate)
1727        topo[st].elements.append(segment_element)
1728        topo[st].elements.append(portal)
1729        if not connInfo.has_key(st):
1730            connInfo[st] = [ ]
1731        connInfo[st].append(info)
1732
1733    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1734            substrate, tbparams, expid):
1735        # Add to the master testbed
1736        myname = "%stunnel" % dt
1737        desthost = "%s" % ip_addr(dip)
1738
1739        portal, info = self.new_portal_node(st, dt, tbparams, master,
1740                eid, myname, desthost, "control", 
1741                ((substrate.name,(
1742                    ('portal','true'),
1743                    ('ip4_address', "%s" % ip_addr(myip)),)),),
1744                conn_type="transit", conn_attrs=[], expid=expid)
1745        if self.fedkit:
1746            self.add_kit(portal, self.fedkit)
1747        if self.gatewaykit: 
1748            self.add_kit(portal, self.gatewaykit)
1749
1750        return portal
1751
1752    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
1753            connInfo, expid):
1754        """
1755        For each substrate in the main topology, find those that
1756        have nodes on more than one testbed.  Insert portal nodes
1757        into the copies of those substrates on the sub topologies.
1758        """
1759        segment_substrate = { }
1760        portals = { }
1761        for s in top.substrates:
1762            # tbs will contain an ip address on this subsrate that is in
1763            # each testbed.
1764            tbs = { }
1765            for i in s.interfaces:
1766                e = i.element
1767                tb = e.get_attribute('testbed')
1768                if tb and not tbs.has_key(tb):
1769                    for i in e.interface:
1770                        if s in i.subs:
1771                            tbs[tb]= i.get_attribute('ip4_address')
1772            if len(tbs) < 2:
1773                continue
1774
1775            # DRAGON will not create multi-site vlans yet
1776            if len(tbs) == 2 and \
1777                    all([tbparams[x].has_key('dragon') for x in tbs]):
1778                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1779                        master, eid, connInfo, expid)
1780            else:
1781                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1782                        eid, segment_substrate, portals, connInfo, expid)
1783
1784        # Make sure that all the slaves have a control portal back to the
1785        # master.
1786        for tb in [ t for t in tbparams.keys() if t != master ]:
1787            if len([e for e in topo[tb].elements \
1788                    if isinstance(e, topdl.Computer) and \
1789                    e.get_attribute('portal') and \
1790                    e.get_attribute('portal_type') == 'both']) == 0:
1791
1792                if tbparams[master].has_key('dragon') \
1793                        and tbparams[tb].has_key('dragon'):
1794
1795                    idx = len([x for x in topo.keys() \
1796                            if x.startswith('dragon')])
1797                    dip, leng = ip_allocator.allocate(4)
1798                    dip += 1
1799                    mip = dip+1
1800                    csub = topdl.Substrate(
1801                            name="dragon-control-%s" % tb,
1802                            capacity=topdl.Capacity(100000.0, 'max'),
1803                            attribute=[
1804                                topdl.Attribute(
1805                                    attribute='portal',
1806                                    value='true'
1807                                    )
1808                                ]
1809                            )
1810                    seg = topdl.Segment(
1811                            id= tbparams[master]['allocID'],
1812                            type='emulab',
1813                            uri = self.tbmap.get(master, None),
1814                            interface=[ 
1815                                topdl.Interface(
1816                                    substrate=csub.name),
1817                                ],
1818                            attribute = [
1819                                topdl.Attribute(attribute=n, value=v)
1820                                    for n, v in (\
1821                                        ('domain', 
1822                                            tbparams[master].get('domain',
1823                                                ".example.com")),
1824                                        ('experiment', "%s/%s" % \
1825                                                (tbparams[master].get(
1826                                                    'project', 
1827                                                    'project'), 
1828                                                    eid)),)
1829                                ],
1830                            )
1831                    portal = self.new_dragon_portal(tb, master,
1832                            master, eid, dip, mip, idx, csub, tbparams, expid)
1833                    topo[tb].substrates.append(csub)
1834                    topo[tb].elements.append(portal)
1835                    topo[tb].elements.append(seg)
1836
1837                    mcsub = csub.clone()
1838                    seg = topdl.Segment(
1839                            id= tbparams[tb]['allocID'],
1840                            type='emulab',
1841                            uri = self.tbmap.get(tb, None),
1842                            interface=[ 
1843                                topdl.Interface(
1844                                    substrate=csub.name),
1845                                ],
1846                            attribute = [
1847                                topdl.Attribute(attribute=n, value=v)
1848                                    for n, v in (\
1849                                        ('domain', 
1850                                            tbparams[tb].get('domain',
1851                                                ".example.com")),
1852                                        ('experiment', "%s/%s" % \
1853                                                (tbparams[tb].get('project', 
1854                                                    'project'), 
1855                                                    eid)),)
1856                                ],
1857                            )
1858                    portal = self.new_dragon_portal(master, tb, master,
1859                            eid, mip, dip, idx, mcsub, tbparams, expid)
1860                    topo[master].substrates.append(mcsub)
1861                    topo[master].elements.append(portal)
1862                    topo[master].elements.append(seg)
1863                    for t in (master, tb):
1864                        topo[t].incorporate_elements()
1865
1866                    self.create_dragon_substrate(csub, topo, 
1867                            {tb: ip_addr(mip), master: ip_addr(dip)}, 
1868                            tbparams, master, eid, connInfo,
1869                            expid)
1870                else:
1871                    self.add_control_portal(master, tb, master, eid, topo, 
1872                            tbparams, connInfo, expid)
1873                    self.add_control_portal(tb, master, master, eid, topo, 
1874                            tbparams, connInfo, expid)
1875
1876        # Connect the portal nodes into the topologies and clear out
1877        # substrates that are not in the topologies
1878        for tb in tbparams.keys():
1879            topo[tb].incorporate_elements()
1880            topo[tb].substrates = \
1881                    [s for s in topo[tb].substrates \
1882                        if len(s.interfaces) >0]
1883
1884    def wrangle_software(self, expid, top, topo, tbparams):
1885        """
1886        Copy software out to the repository directory, allocate permissions and
1887        rewrite the segment topologies to look for the software in local
1888        places.
1889        """
1890
1891        # Copy the rpms and tarfiles to a distribution directory from
1892        # which the federants can retrieve them
1893        linkpath = "%s/software" %  expid
1894        softdir ="%s/%s" % ( self.repodir, linkpath)
1895        softmap = { }
1896        # These are in a list of tuples format (each kit).  This comprehension
1897        # unwraps them into a single list of tuples that initilaizes the set of
1898        # tuples.
1899        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1900                for p, t in l ])
1901        pkgs.update([x.location for e in top.elements \
1902                for x in e.software])
1903        try:
1904            os.makedirs(softdir)
1905        except IOError, e:
1906            raise service_error(
1907                    "Cannot create software directory: %s" % e)
1908        # The actual copying.  Everything's converted into a url for copying.
1909        for pkg in pkgs:
1910            loc = pkg
1911
1912            scheme, host, path = urlparse(loc)[0:3]
1913            dest = os.path.basename(path)
1914            if not scheme:
1915                if not loc.startswith('/'):
1916                    loc = "/%s" % loc
1917                loc = "file://%s" %loc
1918            try:
1919                u = urlopen(loc)
1920            except Exception, e:
1921                raise service_error(service_error.req, 
1922                        "Cannot open %s: %s" % (loc, e))
1923            try:
1924                f = open("%s/%s" % (softdir, dest) , "w")
1925                self.log.debug("Writing %s/%s" % (softdir,dest) )
1926                data = u.read(4096)
1927                while data:
1928                    f.write(data)
1929                    data = u.read(4096)
1930                f.close()
1931                u.close()
1932            except Exception, e:
1933                raise service_error(service_error.internal,
1934                        "Could not copy %s: %s" % (loc, e))
1935            path = re.sub("/tmp", "", linkpath)
1936            # XXX
1937            softmap[pkg] = \
1938                    "%s/%s/%s" %\
1939                    ( self.repo_url, path, dest)
1940
1941            # Allow the individual segments to access the software.
1942            for tb in tbparams.keys():
1943                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1944                        "/%s/%s" % ( path, dest))
1945
1946        # Convert the software locations in the segments into the local
1947        # copies on this host
1948        for soft in [ s for tb in topo.values() \
1949                for e in tb.elements \
1950                    if getattr(e, 'software', False) \
1951                        for s in e.software ]:
1952            if softmap.has_key(soft.location):
1953                soft.location = softmap[soft.location]
1954
1955
1956    def new_experiment(self, req, fid):
1957        """
1958        The external interface to empty initial experiment creation called from
1959        the dispatcher.
1960
1961        Creates a working directory, splits the incoming description using the
1962        splitter script and parses out the avrious subsections using the
1963        lcasses above.  Once each sub-experiment is created, use pooled threads
1964        to instantiate them and start it all up.
1965        """
1966        if not self.auth.check_attribute(fid, 'new'):
1967            raise service_error(service_error.access, "New access denied")
1968
1969        try:
1970            tmpdir = tempfile.mkdtemp(prefix="split-")
1971        except IOError:
1972            raise service_error(service_error.internal, "Cannot create tmp dir")
1973
1974        try:
1975            access_user = self.accessdb[fid]
1976        except KeyError:
1977            raise service_error(service_error.internal,
1978                    "Access map and authorizer out of sync in " + \
1979                            "new_experiment for fedid %s"  % fid)
1980
1981        pid = "dummy"
1982        gid = "dummy"
1983
1984        req = req.get('NewRequestBody', None)
1985        if not req:
1986            raise service_error(service_error.req,
1987                    "Bad request format (no NewRequestBody)")
1988
1989        # Generate an ID for the experiment (slice) and a certificate that the
1990        # allocator can use to prove they own it.  We'll ship it back through
1991        # the encrypted connection.
1992        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1993
1994        #now we're done with the tmpdir, and it should be empty
1995        if self.cleanup:
1996            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1997            os.rmdir(tmpdir)
1998        else:
1999            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
2000
2001        eid = self.create_experiment_state(fid, req, expid, expcert, 
2002                state='empty')
2003
2004        # Let users touch the state
2005        self.auth.set_attribute(fid, expid)
2006        self.auth.set_attribute(expid, expid)
2007        # Override fedids can manipulate state as well
2008        for o in self.overrides:
2009            self.auth.set_attribute(o, expid)
2010
2011        rv = {
2012                'experimentID': [
2013                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2014                ],
2015                'experimentStatus': 'empty',
2016                'experimentAccess': { 'X509' : expcert }
2017            }
2018
2019        return rv
2020
2021
2022    def create_experiment(self, req, fid):
2023        """
2024        The external interface to experiment creation called from the
2025        dispatcher.
2026
2027        Creates a working directory, splits the incoming description using the
2028        splitter script and parses out the avrious subsections using the
2029        lcasses above.  Once each sub-experiment is created, use pooled threads
2030        to instantiate them and start it all up.
2031        """
2032
2033        req = req.get('CreateRequestBody', None)
2034        if not req:
2035            raise service_error(service_error.req,
2036                    "Bad request format (no CreateRequestBody)")
2037
2038        # Get the experiment access
2039        exp = req.get('experimentID', None)
2040        if exp:
2041            if exp.has_key('fedid'):
2042                key = exp['fedid']
2043                expid = key
2044                eid = None
2045            elif exp.has_key('localname'):
2046                key = exp['localname']
2047                eid = key
2048                expid = None
2049            else:
2050                raise service_error(service_error.req, "Unknown lookup type")
2051        else:
2052            raise service_error(service_error.req, "No request?")
2053
2054        self.check_experiment_access(fid, key)
2055
2056        try:
2057            tmpdir = tempfile.mkdtemp(prefix="split-")
2058            os.mkdir(tmpdir+"/keys")
2059        except IOError:
2060            raise service_error(service_error.internal, "Cannot create tmp dir")
2061
2062        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2063        gw_secretkey_base = "fed.%s" % self.ssh_type
2064        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2065        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2066        tclfile = tmpdir + "/experiment.tcl"
2067        tbparams = { }
2068        try:
2069            access_user = self.accessdb[fid]
2070        except KeyError:
2071            raise service_error(service_error.internal,
2072                    "Access map and authorizer out of sync in " + \
2073                            "create_experiment for fedid %s"  % fid)
2074
2075        pid = "dummy"
2076        gid = "dummy"
2077
2078        # The tcl parser needs to read a file so put the content into that file
2079        descr=req.get('experimentdescription', None)
2080        if descr:
2081            file_content=descr.get('ns2description', None)
2082            if file_content:
2083                try:
2084                    f = open(tclfile, 'w')
2085                    f.write(file_content)
2086                    f.close()
2087                except IOError:
2088                    raise service_error(service_error.internal,
2089                            "Cannot write temp experiment description")
2090            else:
2091                raise service_error(service_error.req, 
2092                        "Only ns2descriptions supported")
2093        else:
2094            raise service_error(service_error.req, "No experiment description")
2095
2096        self.state_lock.acquire()
2097        if self.state.has_key(key):
2098            self.state[key]['experimentStatus'] = "starting"
2099            for e in self.state[key].get('experimentID',[]):
2100                if not expid and e.has_key('fedid'):
2101                    expid = e['fedid']
2102                elif not eid and e.has_key('localname'):
2103                    eid = e['localname']
2104        self.state_lock.release()
2105
2106        if not (eid and expid):
2107            raise service_error(service_error.internal, 
2108                    "Cannot find local experiment info!?")
2109
2110        try: 
2111            # This catches exceptions to clear the placeholder if necessary
2112            try:
2113                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2114            except ValueError:
2115                raise service_error(service_error.server_config, 
2116                        "Bad key type (%s)" % self.ssh_type)
2117
2118            master = req.get('master', None)
2119            if not master:
2120                raise service_error(service_error.req,
2121                        "No master testbed label")
2122            export_project = req.get('exportProject', None)
2123            if not export_project:
2124                raise service_error(service_error.req, "No export project")
2125           
2126            # Translate to topdl
2127            if self.splitter_url:
2128                # XXX: need remote topdl translator
2129                self.log.debug("Calling remote splitter at %s" % \
2130                        self.splitter_url)
2131                split_data = self.remote_splitter(self.splitter_url,
2132                        file_content, master)
2133            else:
2134                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2135                    str(self.muxmax), '-m', master]
2136
2137                if self.fedkit:
2138                    tclcmd.append('-k')
2139
2140                if self.gatewaykit:
2141                    tclcmd.append('-K')
2142
2143                tclcmd.extend([pid, gid, eid, tclfile])
2144
2145                self.log.debug("running local splitter %s", " ".join(tclcmd))
2146                # This is just fantastic.  As a side effect the parser copies
2147                # tb_compat.tcl into the current directory, so that directory
2148                # must be writable by the fedd user.  Doing this in the
2149                # temporary subdir ensures this is the case.
2150                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2151                        cwd=tmpdir)
2152                split_data = tclparser.stdout
2153
2154            top = topdl.topology_from_xml(file=split_data, top="experiment")
2155
2156            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2157             # Find the testbeds to look up
2158            testbeds = set([ a.value for e in top.elements \
2159                    for a in e.attribute \
2160                    if a.attribute == 'testbed'] )
2161
2162            allocated = { }         # Testbeds we can access
2163            topo ={ }               # Sub topologies
2164            connInfo = { }          # Connection information
2165            services = [ ]
2166            self.get_access_to_testbeds(testbeds, access_user, 
2167                    export_project, master, allocated, tbparams, services)
2168            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2169
2170            # Copy configuration files into the remote file store
2171            # The config urlpath
2172            configpath = "/%s/config" % expid
2173            # The config file system location
2174            configdir ="%s%s" % ( self.repodir, configpath)
2175            try:
2176                os.makedirs(configdir)
2177            except IOError, e:
2178                raise service_error(
2179                        "Cannot create config directory: %s" % e)
2180            try:
2181                f = open("%s/hosts" % configdir, "w")
2182                f.write('\n'.join(hosts))
2183                f.close()
2184            except IOError, e:
2185                raise service_error(service_error.internal, 
2186                        "Cannot write hosts file: %s" % e)
2187            try:
2188                copy_file("%s" % gw_pubkey, "%s/%s" % \
2189                        (configdir, gw_pubkey_base))
2190                copy_file("%s" % gw_secretkey, "%s/%s" % \
2191                        (configdir, gw_secretkey_base))
2192            except IOError, e:
2193                raise service_error(service_error.internal, 
2194                        "Cannot copy keyfiles: %s" % e)
2195
2196            # Allow the individual testbeds to access the configuration files.
2197            for tb in tbparams.keys():
2198                asignee = tbparams[tb]['allocID']['fedid']
2199                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2200                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2201
2202            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
2203                    connInfo, expid)
2204            # Now get access to the dynamic testbeds
2205            for k, t in topo.items():
2206                if not t.get_attribute('dynamic'):
2207                    continue
2208                tb = t.get_attribute('testbed')
2209                if tb: 
2210                    self.get_access(tb, None, tbparams, master, 
2211                            export_project, access_user, services)
2212                    tbparams[k] = tbparams[tb]
2213                    del tbparams[tb]
2214                    allocated[k] = 1
2215                    store_keys = t.get_attribute('store_keys')
2216                    # Give the testbed access to keys it exports or imports
2217                    if store_keys:
2218                        for sk in store_keys.split(" "):
2219                            self.auth.set_attribute(\
2220                                    tbparams[k]['allocID']['fedid'], sk)
2221                else:
2222                    raise service_error(service_error.internal, 
2223                            "Dynamic allocation from no testbed!?")
2224
2225            self.wrangle_software(expid, top, topo, tbparams)
2226
2227            vtopo = topdl.topology_to_vtopo(top)
2228            vis = self.genviz(vtopo)
2229
2230            # save federant information
2231            for k in allocated.keys():
2232                tbparams[k]['federant'] = {
2233                        'name': [ { 'localname' : eid} ],
2234                        'allocID' : tbparams[k]['allocID'],
2235                        'master' : k == master,
2236                        'uri': tbparams[k]['uri'],
2237                    }
2238                if tbparams[k].has_key('emulab'):
2239                        tbparams[k]['federant']['emulab'] = \
2240                                tbparams[k]['emulab']
2241
2242            self.state_lock.acquire()
2243            self.state[eid]['vtopo'] = vtopo
2244            self.state[eid]['vis'] = vis
2245            self.state[expid]['federant'] = \
2246                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2247                        if tbparams[tb].has_key('federant') ]
2248            if self.state_filename: 
2249                self.write_state()
2250            self.state_lock.release()
2251        except service_error, e:
2252            # If something goes wrong in the parse (usually an access error)
2253            # clear the placeholder state.  From here on out the code delays
2254            # exceptions.  Failing at this point returns a fault to the remote
2255            # caller.
2256
2257            self.state_lock.acquire()
2258            del self.state[eid]
2259            del self.state[expid]
2260            if self.state_filename: self.write_state()
2261            self.state_lock.release()
2262            raise e
2263
2264
2265        # Start the background swapper and return the starting state.  From
2266        # here on out, the state will stick around a while.
2267
2268        # Let users touch the state
2269        self.auth.set_attribute(fid, expid)
2270        self.auth.set_attribute(expid, expid)
2271        # Override fedids can manipulate state as well
2272        for o in self.overrides:
2273            self.auth.set_attribute(o, expid)
2274
2275        # Create a logger that logs to the experiment's state object as well as
2276        # to the main log file.
2277        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2278        alloc_collector = self.list_log(self.state[eid]['log'])
2279        h = logging.StreamHandler(alloc_collector)
2280        # XXX: there should be a global one of these rather than repeating the
2281        # code.
2282        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2283                    '%d %b %y %H:%M:%S'))
2284        alloc_log.addHandler(h)
2285       
2286        attrs = [ 
2287                {
2288                    'attribute': 'ssh_pubkey', 
2289                    'value': '%s/%s/config/%s' % \
2290                            (self.repo_url, expid, gw_pubkey_base)
2291                },
2292                {
2293                    'attribute': 'ssh_secretkey', 
2294                    'value': '%s/%s/config/%s' % \
2295                            (self.repo_url, expid, gw_secretkey_base)
2296                },
2297                {
2298                    'attribute': 'hosts', 
2299                    'value': '%s/%s/config/hosts' % \
2300                            (self.repo_url, expid)
2301                },
2302                {
2303                    'attribute': 'experiment_name',
2304                    'value': eid,
2305                },
2306            ]
2307
2308        # transit and disconnected testbeds may not have a connInfo entry.
2309        # Fill in the blanks.
2310        for t in allocated.keys():
2311            if not connInfo.has_key(t):
2312                connInfo[t] = { }
2313
2314        # Start a thread to do the resource allocation
2315        t  = Thread(target=self.allocate_resources,
2316                args=(allocated, master, eid, expid, tbparams, 
2317                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
2318                    services),
2319                name=eid)
2320        t.start()
2321
2322        rv = {
2323                'experimentID': [
2324                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2325                ],
2326                'experimentStatus': 'starting',
2327            }
2328
2329        return rv
2330   
2331    def get_experiment_fedid(self, key):
2332        """
2333        find the fedid associated with the localname key in the state database.
2334        """
2335
2336        rv = None
2337        self.state_lock.acquire()
2338        if self.state.has_key(key):
2339            if isinstance(self.state[key], dict):
2340                try:
2341                    kl = [ f['fedid'] for f in \
2342                            self.state[key]['experimentID']\
2343                                if f.has_key('fedid') ]
2344                except KeyError:
2345                    self.state_lock.release()
2346                    raise service_error(service_error.internal, 
2347                            "No fedid for experiment %s when getting "+\
2348                                    "fedid(!?)" % key)
2349                if len(kl) == 1:
2350                    rv = kl[0]
2351                else:
2352                    self.state_lock.release()
2353                    raise service_error(service_error.internal, 
2354                            "multiple fedids for experiment %s when " +\
2355                                    "getting fedid(!?)" % key)
2356            else:
2357                self.state_lock.release()
2358                raise service_error(service_error.internal, 
2359                        "Unexpected state for %s" % key)
2360        self.state_lock.release()
2361        return rv
2362
2363    def check_experiment_access(self, fid, key):
2364        """
2365        Confirm that the fid has access to the experiment.  Though a request
2366        may be made in terms of a local name, the access attribute is always
2367        the experiment's fedid.
2368        """
2369        if not isinstance(key, fedid):
2370            key = self.get_experiment_fedid(key)
2371
2372        if self.auth.check_attribute(fid, key):
2373            return True
2374        else:
2375            raise service_error(service_error.access, "Access Denied")
2376
2377
2378    def get_handler(self, path, fid):
2379        self.log.info("Get handler %s %s" % (path, fid))
2380        if self.auth.check_attribute(fid, path):
2381            return ("%s/%s" % (self.repodir, path), "application/binary")
2382        else:
2383            return (None, None)
2384
2385    def get_vtopo(self, req, fid):
2386        """
2387        Return the stored virtual topology for this experiment
2388        """
2389        rv = None
2390        state = None
2391
2392        req = req.get('VtopoRequestBody', None)
2393        if not req:
2394            raise service_error(service_error.req,
2395                    "Bad request format (no VtopoRequestBody)")
2396        exp = req.get('experiment', None)
2397        if exp:
2398            if exp.has_key('fedid'):
2399                key = exp['fedid']
2400                keytype = "fedid"
2401            elif exp.has_key('localname'):
2402                key = exp['localname']
2403                keytype = "localname"
2404            else:
2405                raise service_error(service_error.req, "Unknown lookup type")
2406        else:
2407            raise service_error(service_error.req, "No request?")
2408
2409        self.check_experiment_access(fid, key)
2410
2411        self.state_lock.acquire()
2412        if self.state.has_key(key):
2413            if self.state[key].has_key('vtopo'):
2414                rv = { 'experiment' : {keytype: key },\
2415                        'vtopo': self.state[key]['vtopo'],\
2416                    }
2417            else:
2418                state = self.state[key]['experimentStatus']
2419        self.state_lock.release()
2420
2421        if rv: return rv
2422        else: 
2423            if state:
2424                raise service_error(service_error.partial, 
2425                        "Not ready: %s" % state)
2426            else:
2427                raise service_error(service_error.req, "No such experiment")
2428
2429    def get_vis(self, req, fid):
2430        """
2431        Return the stored visualization for this experiment
2432        """
2433        rv = None
2434        state = None
2435
2436        req = req.get('VisRequestBody', None)
2437        if not req:
2438            raise service_error(service_error.req,
2439                    "Bad request format (no VisRequestBody)")
2440        exp = req.get('experiment', None)
2441        if exp:
2442            if exp.has_key('fedid'):
2443                key = exp['fedid']
2444                keytype = "fedid"
2445            elif exp.has_key('localname'):
2446                key = exp['localname']
2447                keytype = "localname"
2448            else:
2449                raise service_error(service_error.req, "Unknown lookup type")
2450        else:
2451            raise service_error(service_error.req, "No request?")
2452
2453        self.check_experiment_access(fid, key)
2454
2455        self.state_lock.acquire()
2456        if self.state.has_key(key):
2457            if self.state[key].has_key('vis'):
2458                rv =  { 'experiment' : {keytype: key },\
2459                        'vis': self.state[key]['vis'],\
2460                        }
2461            else:
2462                state = self.state[key]['experimentStatus']
2463        self.state_lock.release()
2464
2465        if rv: return rv
2466        else:
2467            if state:
2468                raise service_error(service_error.partial, 
2469                        "Not ready: %s" % state)
2470            else:
2471                raise service_error(service_error.req, "No such experiment")
2472
2473    def clean_info_response(self, rv):
2474        """
2475        Remove the information in the experiment's state object that is not in
2476        the info response.
2477        """
2478        # Remove the owner info (should always be there, but...)
2479        if rv.has_key('owner'): del rv['owner']
2480
2481        # Convert the log into the allocationLog parameter and remove the
2482        # log entry (with defensive programming)
2483        if rv.has_key('log'):
2484            rv['allocationLog'] = "".join(rv['log'])
2485            del rv['log']
2486        else:
2487            rv['allocationLog'] = ""
2488
2489        if rv['experimentStatus'] != 'active':
2490            if rv.has_key('federant'): del rv['federant']
2491        else:
2492            # remove the allocationID and uri info from each federant
2493            for f in rv.get('federant', []):
2494                if f.has_key('allocID'): del f['allocID']
2495                if f.has_key('uri'): del f['uri']
2496        return rv
2497
2498    def get_info(self, req, fid):
2499        """
2500        Return all the stored info about this experiment
2501        """
2502        rv = None
2503
2504        req = req.get('InfoRequestBody', None)
2505        if not req:
2506            raise service_error(service_error.req,
2507                    "Bad request format (no InfoRequestBody)")
2508        exp = req.get('experiment', None)
2509        if exp:
2510            if exp.has_key('fedid'):
2511                key = exp['fedid']
2512                keytype = "fedid"
2513            elif exp.has_key('localname'):
2514                key = exp['localname']
2515                keytype = "localname"
2516            else:
2517                raise service_error(service_error.req, "Unknown lookup type")
2518        else:
2519            raise service_error(service_error.req, "No request?")
2520
2521        self.check_experiment_access(fid, key)
2522
2523        # The state may be massaged by the service function that called
2524        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2525        # state.
2526        self.state_lock.acquire()
2527        if self.state.has_key(key):
2528            rv = copy.deepcopy(self.state[key])
2529        self.state_lock.release()
2530
2531        if rv:
2532            return self.clean_info_response(rv)
2533        else:
2534            raise service_error(service_error.req, "No such experiment")
2535
2536    def get_multi_info(self, req, fid):
2537        """
2538        Return all the stored info that this fedid can access
2539        """
2540        rv = { 'info': [ ] }
2541
2542        self.state_lock.acquire()
2543        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2544            try:
2545                self.check_experiment_access(fid, key)
2546            except service_error, e:
2547                if e.code == service_error.access:
2548                    continue
2549                else:
2550                    self.state_lock.release()
2551                    raise e
2552
2553            if self.state.has_key(key):
2554                e = copy.deepcopy(self.state[key])
2555                e = self.clean_info_response(e)
2556                rv['info'].append(e)
2557        self.state_lock.release()
2558        return rv
2559
2560    def terminate_experiment(self, req, fid):
2561        """
2562        Swap this experiment out on the federants and delete the shared
2563        information
2564        """
2565        tbparams = { }
2566        req = req.get('TerminateRequestBody', None)
2567        if not req:
2568            raise service_error(service_error.req,
2569                    "Bad request format (no TerminateRequestBody)")
2570        force = req.get('force', False)
2571        exp = req.get('experiment', None)
2572        if exp:
2573            if exp.has_key('fedid'):
2574                key = exp['fedid']
2575                keytype = "fedid"
2576            elif exp.has_key('localname'):
2577                key = exp['localname']
2578                keytype = "localname"
2579            else:
2580                raise service_error(service_error.req, "Unknown lookup type")
2581        else:
2582            raise service_error(service_error.req, "No request?")
2583
2584        self.check_experiment_access(fid, key)
2585
2586        dealloc_list = [ ]
2587
2588
2589        # Create a logger that logs to the dealloc_list as well as to the main
2590        # log file.
2591        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2592        h = logging.StreamHandler(self.list_log(dealloc_list))
2593        # XXX: there should be a global one of these rather than repeating the
2594        # code.
2595        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2596                    '%d %b %y %H:%M:%S'))
2597        dealloc_log.addHandler(h)
2598
2599        self.state_lock.acquire()
2600        fed_exp = self.state.get(key, None)
2601
2602        if fed_exp:
2603            # This branch of the conditional holds the lock to generate a
2604            # consistent temporary tbparams variable to deallocate experiments.
2605            # It releases the lock to do the deallocations and reacquires it to
2606            # remove the experiment state when the termination is complete.
2607
2608            # First make sure that the experiment creation is complete.
2609            status = fed_exp.get('experimentStatus', None)
2610
2611            if status:
2612                if status in ('starting', 'terminating'):
2613                    if not force:
2614                        self.state_lock.release()
2615                        raise service_error(service_error.partial, 
2616                                'Experiment still being created or destroyed')
2617                    else:
2618                        self.log.warning('Experiment in %s state ' % status + \
2619                                'being terminated by force.')
2620            else:
2621                # No status??? trouble
2622                self.state_lock.release()
2623                raise service_error(service_error.internal,
2624                        "Experiment has no status!?")
2625
2626            ids = []
2627            #  experimentID is a list of dicts that are self-describing
2628            #  identifiers.  This finds all the fedids and localnames - the
2629            #  keys of self.state - and puts them into ids.
2630            for id in fed_exp.get('experimentID', []):
2631                if id.has_key('fedid'): ids.append(id['fedid'])
2632                if id.has_key('localname'): ids.append(id['localname'])
2633
2634            # Collect the allocation/segment ids into a dict keyed by the fedid
2635            # of the allocation (or a monotonically increasing integer) that
2636            # contains a tuple of uri, aid (which is a dict...)
2637            for i, fed in enumerate(fed_exp.get('federant', [])):
2638                try:
2639                    uri = fed['uri']
2640                    aid = fed['allocID']
2641                    k = fed['allocID'].get('fedid', i)
2642                except KeyError, e:
2643                    continue
2644                tbparams[k] = (uri, aid)
2645            fed_exp['experimentStatus'] = 'terminating'
2646            if self.state_filename: self.write_state()
2647            self.state_lock.release()
2648
2649            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2650            # then completes, so we can't wait if nothing starts.  So, no
2651            # tbparams, no start.
2652            if len(tbparams) > 0:
2653                thread_pool = self.thread_pool(self.nthreads)
2654                for k in tbparams.keys():
2655                    # Create and start a thread to stop the segment
2656                    thread_pool.wait_for_slot()
2657                    uri, aid = tbparams[k]
2658                    t  = self.pooled_thread(\
2659                            target=self.terminate_segment(log=dealloc_log,
2660                                testbed=uri,
2661                                cert_file=self.cert_file, 
2662                                cert_pwd=self.cert_pwd,
2663                                trusted_certs=self.trusted_certs,
2664                                caller=self.call_TerminateSegment),
2665                            args=(uri, aid), name=k,
2666                            pdata=thread_pool, trace_file=self.trace_file)
2667                    t.start()
2668                # Wait for completions
2669                thread_pool.wait_for_all_done()
2670
2671            # release the allocations (failed experiments have done this
2672            # already, and starting experiments may be in odd states, so we
2673            # ignore errors releasing those allocations
2674            try: 
2675                for k in tbparams.keys():
2676                    # This releases access by uri
2677                    uri, aid = tbparams[k]
2678                    self.release_access(None, aid, uri=uri)
2679            except service_error, e:
2680                if status != 'failed' and not force:
2681                    raise e
2682
2683            # Remove the terminated experiment
2684            self.state_lock.acquire()
2685            for id in ids:
2686                if self.state.has_key(id): del self.state[id]
2687
2688            if self.state_filename: self.write_state()
2689            self.state_lock.release()
2690
2691            # Delete any synch points associated with this experiment.  All
2692            # synch points begin with the fedid of the experiment.
2693            fedid_keys = set(["fedid:%s" % f for f in ids \
2694                    if isinstance(f, fedid)])
2695            for k in self.synch_store.all_keys():
2696                try:
2697                    if len(k) > 45 and k[0:46] in fedid_keys:
2698                        self.synch_store.del_value(k)
2699                except synch_store.BadDeletionError:
2700                    pass
2701            self.write_store()
2702               
2703            return { 
2704                    'experiment': exp , 
2705                    'deallocationLog': "".join(dealloc_list),
2706                    }
2707        else:
2708            # Don't forget to release the lock
2709            self.state_lock.release()
2710            raise service_error(service_error.req, "No saved state")
2711
2712
2713    def GetValue(self, req, fid):
2714        """
2715        Get a value from the synchronized store
2716        """
2717        req = req.get('GetValueRequestBody', None)
2718        if not req:
2719            raise service_error(service_error.req,
2720                    "Bad request format (no GetValueRequestBody)")
2721       
2722        name = req['name']
2723        wait = req['wait']
2724        rv = { 'name': name }
2725
2726        if self.auth.check_attribute(fid, name):
2727            try:
2728                v = self.synch_store.get_value(name, wait)
2729            except synch_store.RevokedKeyError:
2730                # No more synch on this key
2731                raise service_error(service_error.federant, 
2732                        "Synch key %s revoked" % name)
2733            if v is not None:
2734                rv['value'] = v
2735            self.log.debug("[GetValue] got %s from %s" % (v, name))
2736            return rv
2737        else:
2738            raise service_error(service_error.access, "Access Denied")
2739       
2740
2741    def SetValue(self, req, fid):
2742        """
2743        Set a value in the synchronized store
2744        """
2745        req = req.get('SetValueRequestBody', None)
2746        if not req:
2747            raise service_error(service_error.req,
2748                    "Bad request format (no SetValueRequestBody)")
2749       
2750        name = req['name']
2751        v = req['value']
2752
2753        if self.auth.check_attribute(fid, name):
2754            try:
2755                self.synch_store.set_value(name, v)
2756                self.write_store()
2757                self.log.debug("[SetValue] set %s to %s" % (name, v))
2758            except synch_store.CollisionError:
2759                # Translate into a service_error
2760                raise service_error(service_error.req,
2761                        "Value already set: %s" %name)
2762            except synch_store.RevokedKeyError:
2763                # No more synch on this key
2764                raise service_error(service_error.federant, 
2765                        "Synch key %s revoked" % name)
2766            return { 'name': name, 'value': v }
2767        else:
2768            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.