source: fedd/federation/experiment_control.py @ 2761484

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 2761484 was 2761484, checked in by Ted Faber <faber@…>, 14 years ago

Inital parameterization and synchronization. Tested for Emulabs, but not DRAGON.
Add get and set value synchronization.

  • Property mode set to 100644
File size: 93.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_control_local:
45    """
46    Control of experiments that this system can directly access.
47
48    Includes experiment creation, termination and information dissemination.
49    Thred safe.
50    """
51
52    class ssh_cmd_timeout(RuntimeError): pass
53   
54    class thread_pool:
55        """
56        A class to keep track of a set of threads all invoked for the same
57        task.  Manages the mutual exclusion of the states.
58        """
59        def __init__(self, nthreads):
60            """
61            Start a pool.
62            """
63            self.changed = Condition()
64            self.started = 0
65            self.terminated = 0
66            self.nthreads = nthreads
67
68        def acquire(self):
69            """
70            Get the pool's lock.
71            """
72            self.changed.acquire()
73
74        def release(self):
75            """
76            Release the pool's lock.
77            """
78            self.changed.release()
79
80        def wait(self, timeout = None):
81            """
82            Wait for a pool thread to start or stop.
83            """
84            self.changed.wait(timeout)
85
86        def start(self):
87            """
88            Called by a pool thread to report starting.
89            """
90            self.changed.acquire()
91            self.started += 1
92            self.changed.notifyAll()
93            self.changed.release()
94
95        def terminate(self):
96            """
97            Called by a pool thread to report finishing.
98            """
99            self.changed.acquire()
100            self.terminated += 1
101            self.changed.notifyAll()
102            self.changed.release()
103
104        def clear(self):
105            """
106            Clear all pool data.
107            """
108            self.changed.acquire()
109            self.started = 0
110            self.terminated =0
111            self.changed.notifyAll()
112            self.changed.release()
113
114        def wait_for_slot(self):
115            """
116            Wait until we have a free slot to start another pooled thread
117            """
118            self.acquire()
119            while self.started - self.terminated >= self.nthreads:
120                self.wait()
121            self.release()
122
123        def wait_for_all_done(self, timeout=None):
124            """
125            Wait until all active threads finish (and at least one has
126            started).  If a timeout is given, return after waiting that long
127            for termination.  If all threads are done (and one has started in
128            the since the last clear()) return True, otherwise False.
129            """
130            if timeout:
131                deadline = time.time() + timeout
132            self.acquire()
133            while self.started == 0 or self.started > self.terminated:
134                self.wait(timeout)
135                if timeout:
136                    if time.time() > deadline:
137                        break
138                    timeout = deadline - time.time()
139            self.release()
140            return not (self.started == 0 or self.started > self.terminated)
141
142    class pooled_thread(Thread):
143        """
144        One of a set of threads dedicated to a specific task.  Uses the
145        thread_pool class above for coordination.
146        """
147        def __init__(self, group=None, target=None, name=None, args=(), 
148                kwargs={}, pdata=None, trace_file=None):
149            Thread.__init__(self, group, target, name, args, kwargs)
150            self.rv = None          # Return value of the ops in this thread
151            self.exception = None   # Exception that terminated this thread
152            self.target=target      # Target function to run on start()
153            self.args = args        # Args to pass to target
154            self.kwargs = kwargs    # Additional kw args
155            self.pdata = pdata      # thread_pool for this class
156            # Logger for this thread
157            self.log = logging.getLogger("fedd.experiment_control")
158       
159        def run(self):
160            """
161            Emulate Thread.run, except add pool data manipulation and error
162            logging.
163            """
164            if self.pdata:
165                self.pdata.start()
166
167            if self.target:
168                try:
169                    self.rv = self.target(*self.args, **self.kwargs)
170                except service_error, s:
171                    self.exception = s
172                    self.log.error("Thread exception: %s %s" % \
173                            (s.code_string(), s.desc))
174                except:
175                    self.exception = sys.exc_info()[1]
176                    self.log.error(("Unexpected thread exception: %s" +\
177                            "Trace %s") % (self.exception,\
178                                traceback.format_exc()))
179            if self.pdata:
180                self.pdata.terminate()
181
182    call_RequestAccess = service_caller('RequestAccess')
183    call_ReleaseAccess = service_caller('ReleaseAccess')
184    call_StartSegment = service_caller('StartSegment')
185    call_TerminateSegment = service_caller('TerminateSegment')
186    call_Ns2Split = service_caller('Ns2Split')
187
188    def __init__(self, config=None, auth=None):
189        """
190        Intialize the various attributes, most from the config object
191        """
192
193        def parse_tarfile_list(tf):
194            """
195            Parse a tarfile list from the configuration.  This is a set of
196            paths and tarfiles separated by spaces.
197            """
198            rv = [ ]
199            if tf is not None:
200                tl = tf.split()
201                while len(tl) > 1:
202                    p, t = tl[0:2]
203                    del tl[0:2]
204                    rv.append((p, t))
205            return rv
206
207        self.thread_with_rv = experiment_control_local.pooled_thread
208        self.thread_pool = experiment_control_local.thread_pool
209        self.list_log = list_log.list_log
210
211        self.cert_file = config.get("experiment_control", "cert_file")
212        if self.cert_file:
213            self.cert_pwd = config.get("experiment_control", "cert_pwd")
214        else:
215            self.cert_file = config.get("globals", "cert_file")
216            self.cert_pwd = config.get("globals", "cert_pwd")
217
218        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
219                or config.get("globals", "trusted_certs")
220
221        self.repodir = config.get("experiment_control", "repodir")
222        self.repo_url = config.get("experiment_control", "repo_url", 
223                "https://users.isi.deterlab.net:23235");
224
225        self.exp_stem = "fed-stem"
226        self.log = logging.getLogger("fedd.experiment_control")
227        set_log_level(config, "experiment_control", self.log)
228        self.muxmax = 2
229        self.nthreads = 2
230        self.randomize_experiments = False
231
232        self.splitter = None
233        self.ssh_keygen = "/usr/bin/ssh-keygen"
234        self.ssh_identity_file = None
235
236
237        self.debug = config.getboolean("experiment_control", "create_debug")
238        self.cleanup = not config.getboolean("experiment_control", 
239                "leave_tmpfiles")
240        self.state_filename = config.get("experiment_control", 
241                "experiment_state")
242        self.store_filename = config.get("experiment_control", 
243                "synch_store")
244        self.store_url = config.get("experiment_control", "store_url")
245        self.splitter_url = config.get("experiment_control", "splitter_uri")
246        self.fedkit = parse_tarfile_list(\
247                config.get("experiment_control", "fedkit"))
248        self.gatewaykit = parse_tarfile_list(\
249                config.get("experiment_control", "gatewaykit"))
250        accessdb_file = config.get("experiment_control", "accessdb")
251
252        self.ssh_pubkey_file = config.get("experiment_control", 
253                "ssh_pubkey_file")
254        self.ssh_privkey_file = config.get("experiment_control",
255                "ssh_privkey_file")
256        # NB for internal master/slave ops, not experiment setup
257        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
258
259        self.overrides = set([])
260        ovr = config.get('experiment_control', 'overrides')
261        if ovr:
262            for o in ovr.split(","):
263                o = o.strip()
264                if o.startswith('fedid:'): o = o[len('fedid:'):]
265                self.overrides.add(fedid(hexstr=o))
266
267        self.state = { }
268        self.state_lock = Lock()
269        self.tclsh = "/usr/local/bin/otclsh"
270        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
271                config.get("experiment_control", "tcl_splitter",
272                        "/usr/testbed/lib/ns2ir/parse.tcl")
273        mapdb_file = config.get("experiment_control", "mapdb")
274        self.trace_file = sys.stderr
275
276        self.def_expstart = \
277                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
278                "/tmp/federate";
279        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
280                "FEDDIR/hosts";
281        self.def_gwstart = \
282                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
283                "/tmp/bridge.log";
284        self.def_mgwstart = \
285                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
286                "/tmp/bridge.log";
287        self.def_gwimage = "FBSD61-TUNNEL2";
288        self.def_gwtype = "pc";
289        self.local_access = { }
290
291        if auth:
292            self.auth = auth
293        else:
294            self.log.error(\
295                    "[access]: No authorizer initialized, creating local one.")
296            auth = authorizer()
297
298
299        if self.ssh_pubkey_file:
300            try:
301                f = open(self.ssh_pubkey_file, 'r')
302                self.ssh_pubkey = f.read()
303                f.close()
304            except IOError:
305                raise service_error(service_error.internal,
306                        "Cannot read sshpubkey")
307        else:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311        if not self.ssh_privkey_file:
312            raise service_error(service_error.internal, 
313                    "No SSH public key file?")
314
315
316        if mapdb_file:
317            self.read_mapdb(mapdb_file)
318        else:
319            self.log.warn("[experiment_control] No testbed map, using defaults")
320            self.tbmap = { 
321                    'deter':'https://users.isi.deterlab.net:23235',
322                    'emulab':'https://users.isi.deterlab.net:23236',
323                    'ucb':'https://users.isi.deterlab.net:23237',
324                    }
325
326        if accessdb_file:
327                self.read_accessdb(accessdb_file)
328        else:
329            raise service_error(service_error.internal,
330                    "No accessdb specified in config")
331
332        # Grab saved state.  OK to do this w/o locking because it's read only
333        # and only one thread should be in existence that can see self.state at
334        # this point.
335        if self.state_filename:
336            self.read_state()
337
338        if self.store_filename:
339            self.read_store()
340        else:
341            self.log.warning("No saved synch store")
342            self.synch_store = synch_store
343
344        # Dispatch tables
345        self.soap_services = {\
346                'New': soap_handler('New', self.new_experiment),
347                'Create': soap_handler('Create', self.create_experiment),
348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
352                'Terminate': soap_handler('Terminate', 
353                    self.terminate_experiment),
354                'GetValue': soap_handler('GetValue', self.GetValue),
355                'SetValue': soap_handler('SetValue', self.SetValue),
356        }
357
358        self.xmlrpc_services = {\
359                'New': xmlrpc_handler('New', self.new_experiment),
360                'Create': xmlrpc_handler('Create', self.create_experiment),
361                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
362                'Vis': xmlrpc_handler('Vis', self.get_vis),
363                'Info': xmlrpc_handler('Info', self.get_info),
364                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
365                'Terminate': xmlrpc_handler('Terminate',
366                    self.terminate_experiment),
367                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
368                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
369        }
370
371    # Call while holding self.state_lock
372    def write_state(self):
373        """
374        Write a new copy of experiment state after copying the existing state
375        to a backup.
376
377        State format is a simple pickling of the state dictionary.
378        """
379        if os.access(self.state_filename, os.W_OK):
380            copy_file(self.state_filename, \
381                    "%s.bak" % self.state_filename)
382        try:
383            f = open(self.state_filename, 'w')
384            pickle.dump(self.state, f)
385        except IOError, e:
386            self.log.error("Can't write file %s: %s" % \
387                    (self.state_filename, e))
388        except pickle.PicklingError, e:
389            self.log.error("Pickling problem: %s" % e)
390        except TypeError, e:
391            self.log.error("Pickling problem (TypeError): %s" % e)
392
393    @staticmethod
394    def get_alloc_ids(state):
395        """
396        Pull the fedids of the identifiers of each allocation from the
397        state.  Again, a dict dive that's best isolated.
398
399        Used by read_store and read state
400        """
401
402        return [ f['allocID']['fedid'] 
403                for f in state.get('federant',[]) \
404                    if f.has_key('allocID') and \
405                        f['allocID'].has_key('fedid')]
406
407    # Call while holding self.state_lock
408    def read_state(self):
409        """
410        Read a new copy of experiment state.  Old state is overwritten.
411
412        State format is a simple pickling of the state dictionary.
413        """
414       
415        def get_experiment_id(state):
416            """
417            Pull the fedid experimentID out of the saved state.  This is kind
418            of a gross walk through the dict.
419            """
420
421            if state.has_key('experimentID'):
422                for e in state['experimentID']:
423                    if e.has_key('fedid'):
424                        return e['fedid']
425                else:
426                    return None
427            else:
428                return None
429
430        try:
431            f = open(self.state_filename, "r")
432            self.state = pickle.load(f)
433            self.log.debug("[read_state]: Read state from %s" % \
434                    self.state_filename)
435        except IOError, e:
436            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
437                    % (self.state_filename, e))
438        except pickle.UnpicklingError, e:
439            self.log.warning(("[read_state]: No saved state: " + \
440                    "Unpickling failed: %s") % e)
441       
442        for s in self.state.values():
443            try:
444
445                eid = get_experiment_id(s)
446                if eid : 
447                    # Give the owner rights to the experiment
448                    self.auth.set_attribute(s['owner'], eid)
449                    # And holders of the eid as well
450                    self.auth.set_attribute(eid, eid)
451                    # allow overrides to control experiments as well
452                    for o in self.overrides:
453                        self.auth.set_attribute(o, eid)
454                    # Set permissions to allow reading of the software repo, if
455                    # any, as well.
456                    for a in self.get_alloc_ids(s):
457                        self.auth.set_attribute(a, 'repo/%s' % eid)
458                else:
459                    raise KeyError("No experiment id")
460            except KeyError, e:
461                self.log.warning("[read_state]: State ownership or identity " +\
462                        "misformatted in %s: %s" % (self.state_filename, e))
463
464
465    def read_accessdb(self, accessdb_file):
466        """
467        Read the mapping from fedids that can create experiments to their name
468        in the 3-level access namespace.  All will be asserted from this
469        testbed and can include the local username and porject that will be
470        asserted on their behalf by this fedd.  Each fedid is also added to the
471        authorization system with the "create" attribute.
472        """
473        self.accessdb = {}
474        # These are the regexps for parsing the db
475        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
476        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
477                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
478        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
479                "\s*->\s*(" + name_expr + ")\s*$")
480        lineno = 0
481
482        # Parse the mappings and store in self.authdb, a dict of
483        # fedid -> (proj, user)
484        try:
485            f = open(accessdb_file, "r")
486            for line in f:
487                lineno += 1
488                line = line.strip()
489                if len(line) == 0 or line.startswith('#'):
490                    continue
491                m = project_line.match(line)
492                if m:
493                    fid = fedid(hexstr=m.group(1))
494                    project, user = m.group(2,3)
495                    if not self.accessdb.has_key(fid):
496                        self.accessdb[fid] = []
497                    self.accessdb[fid].append((project, user))
498                    continue
499
500                m = user_line.match(line)
501                if m:
502                    fid = fedid(hexstr=m.group(1))
503                    project = None
504                    user = m.group(2)
505                    if not self.accessdb.has_key(fid):
506                        self.accessdb[fid] = []
507                    self.accessdb[fid].append((project, user))
508                    continue
509                self.log.warn("[experiment_control] Error parsing access " +\
510                        "db %s at line %d" %  (accessdb_file, lineno))
511        except IOError:
512            raise service_error(service_error.internal,
513                    "Error opening/reading %s as experiment " +\
514                            "control accessdb" %  accessdb_file)
515        f.close()
516
517        # Initialize the authorization attributes
518        for fid in self.accessdb.keys():
519            self.auth.set_attribute(fid, 'create')
520            self.auth.set_attribute(fid, 'new')
521
522    def read_mapdb(self, file):
523        """
524        Read a simple colon separated list of mappings for the
525        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
526        """
527
528        self.tbmap = { }
529        lineno =0
530        try:
531            f = open(file, "r")
532            for line in f:
533                lineno += 1
534                line = line.strip()
535                if line.startswith('#') or len(line) == 0:
536                    continue
537                try:
538                    label, url = line.split(':', 1)
539                    self.tbmap[label] = url
540                except ValueError, e:
541                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
542                            "map db: %s %s" % (lineno, line, e))
543        except IOError, e:
544            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
545                    "open %s: %s" % (file, e))
546        f.close()
547
548    def read_store(self):
549        try:
550            self.synch_store = synch_store()
551            self.synch_store.load(self.store_filename)
552            self.log.debug("[read_store]: Read store from %s" % \
553                    self.store_filename)
554        except IOError, e:
555            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
556                    % (self.state_filename, e))
557            self.synch_store = synch_store()
558
559        # Set the initial permissions on data in the store.  XXX: This ad hoc
560        # authorization attribute initialization is getting out of hand.
561        for k in self.synch_store.all_keys():
562            try:
563                if k.startswith('fedid:'):
564                    fid = fedid(hexstr=k[6:46])
565                    if self.state.has_key(fid):
566                        for a in self.get_alloc_ids(self.state[fid]):
567                            self.auth.set_attribute(a, k)
568            except ValueError, e:
569                self.log.warn("Cannot deduce permissions for %s" % k)
570
571
572    def write_store(self):
573        """
574        Write a new copy of synch_store after writing current state
575        to a backup.  We use the internal synch_store pickle method to avoid
576        incinsistent data.
577
578        State format is a simple pickling of the store.
579        """
580        if os.access(self.store_filename, os.W_OK):
581            copy_file(self.store_filename, \
582                    "%s.bak" % self.store_filename)
583        try:
584            self.synch_store.save(self.store_filename)
585        except IOError, e:
586            self.log.error("Can't write file %s: %s" % \
587                    (self.store_filename, e))
588        except TypeError, e:
589            self.log.error("Pickling problem (TypeError): %s" % e)
590
591       
592    def generate_ssh_keys(self, dest, type="rsa" ):
593        """
594        Generate a set of keys for the gateways to use to talk.
595
596        Keys are of type type and are stored in the required dest file.
597        """
598        valid_types = ("rsa", "dsa")
599        t = type.lower();
600        if t not in valid_types: raise ValueError
601        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
602
603        try:
604            trace = open("/dev/null", "w")
605        except IOError:
606            raise service_error(service_error.internal,
607                    "Cannot open /dev/null??");
608
609        # May raise CalledProcessError
610        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
611        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
612        if rv != 0:
613            raise service_error(service_error.internal, 
614                    "Cannot generate nonce ssh keys.  %s return code %d" \
615                            % (self.ssh_keygen, rv))
616
617    def gentopo(self, str):
618        """
619        Generate the topology dtat structure from the splitter's XML
620        representation of it.
621
622        The topology XML looks like:
623            <experiment>
624                <nodes>
625                    <node><vname></vname><ips>ip1:ip2</ips></node>
626                </nodes>
627                <lans>
628                    <lan>
629                        <vname></vname><vnode></vnode><ip></ip>
630                        <bandwidth></bandwidth><member>node:port</member>
631                    </lan>
632                </lans>
633        """
634        class topo_parse:
635            """
636            Parse the topology XML and create the dats structure.
637            """
638            def __init__(self):
639                # Typing of the subelements for data conversion
640                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
641                self.int_subelements = ( 'bandwidth',)
642                self.float_subelements = ( 'delay',)
643                # The final data structure
644                self.nodes = [ ]
645                self.lans =  [ ]
646                self.topo = { \
647                        'node': self.nodes,\
648                        'lan' : self.lans,\
649                    }
650                self.element = { }  # Current element being created
651                self.chars = ""     # Last text seen
652
653            def end_element(self, name):
654                # After each sub element the contents is added to the current
655                # element or to the appropriate list.
656                if name == 'node':
657                    self.nodes.append(self.element)
658                    self.element = { }
659                elif name == 'lan':
660                    self.lans.append(self.element)
661                    self.element = { }
662                elif name in self.str_subelements:
663                    self.element[name] = self.chars
664                    self.chars = ""
665                elif name in self.int_subelements:
666                    self.element[name] = int(self.chars)
667                    self.chars = ""
668                elif name in self.float_subelements:
669                    self.element[name] = float(self.chars)
670                    self.chars = ""
671
672            def found_chars(self, data):
673                self.chars += data.rstrip()
674
675
676        tp = topo_parse();
677        parser = xml.parsers.expat.ParserCreate()
678        parser.EndElementHandler = tp.end_element
679        parser.CharacterDataHandler = tp.found_chars
680
681        parser.Parse(str)
682
683        return tp.topo
684       
685
686    def genviz(self, topo):
687        """
688        Generate the visualization the virtual topology
689        """
690
691        neato = "/usr/local/bin/neato"
692        # These are used to parse neato output and to create the visualization
693        # file.
694        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
695        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
696                "%s</type></node>"
697
698        try:
699            # Node names
700            nodes = [ n['vname'] for n in topo['node'] ]
701            topo_lans = topo['lan']
702        except KeyError, e:
703            raise service_error(service_error.internal, "Bad topology: %s" %e)
704
705        lans = { }
706        links = { }
707
708        # Walk through the virtual topology, organizing the connections into
709        # 2-node connections (links) and more-than-2-node connections (lans).
710        # When a lan is created, it's added to the list of nodes (there's a
711        # node in the visualization for the lan).
712        for l in topo_lans:
713            if links.has_key(l['vname']):
714                if len(links[l['vname']]) < 2:
715                    links[l['vname']].append(l['vnode'])
716                else:
717                    nodes.append(l['vname'])
718                    lans[l['vname']] = links[l['vname']]
719                    del links[l['vname']]
720                    lans[l['vname']].append(l['vnode'])
721            elif lans.has_key(l['vname']):
722                lans[l['vname']].append(l['vnode'])
723            else:
724                links[l['vname']] = [ l['vnode'] ]
725
726
727        # Open up a temporary file for dot to turn into a visualization
728        try:
729            df, dotname = tempfile.mkstemp()
730            dotfile = os.fdopen(df, 'w')
731        except IOError:
732            raise service_error(service_error.internal,
733                    "Failed to open file in genviz")
734
735        try:
736            dnull = open('/dev/null', 'w')
737        except IOError:
738            service_error(service_error.internal,
739                    "Failed to open /dev/null in genviz")
740
741        # Generate a dot/neato input file from the links, nodes and lans
742        try:
743            print >>dotfile, "graph G {"
744            for n in nodes:
745                print >>dotfile, '\t"%s"' % n
746            for l in links.keys():
747                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
748            for l in lans.keys():
749                for n in lans[l]:
750                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
751            print >>dotfile, "}"
752            dotfile.close()
753        except TypeError:
754            raise service_error(service_error.internal,
755                    "Single endpoint link in vtopo")
756        except IOError:
757            raise service_error(service_error.internal, "Cannot write dot file")
758
759        # Use dot to create a visualization
760        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
761                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
762                close_fds=True)
763        dnull.close()
764
765        # Translate dot to vis format
766        vis_nodes = [ ]
767        vis = { 'node': vis_nodes }
768        for line in dot.stdout:
769            m = vis_re.match(line)
770            if m:
771                vn = m.group(1)
772                vis_node = {'name': vn, \
773                        'x': float(m.group(2)),\
774                        'y' : float(m.group(3)),\
775                    }
776                if vn in links.keys() or vn in lans.keys():
777                    vis_node['type'] = 'lan'
778                else:
779                    vis_node['type'] = 'node'
780                vis_nodes.append(vis_node)
781        rv = dot.wait()
782
783        os.remove(dotname)
784        if rv == 0 : return vis
785        else: return None
786
787    def get_access(self, tb, nodes, tbparam, master, export_project,
788            access_user, services):
789        """
790        Get access to testbed through fedd and set the parameters for that tb
791        """
792        uri = self.tbmap.get(tb, None)
793        if not uri:
794            raise service_error(serice_error.server_config, 
795                    "Unknown testbed: %s" % tb)
796
797        # Tweak search order so that if there are entries in access_user that
798        # have a project matching the export project, we try them first
799        if export_project and export_project.has_key('localname'): 
800            pn = export_project['localname'] 
801               
802            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
803            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
804        else: 
805            access_sequence = access_user
806
807        for p, u in access_sequence: 
808            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
809                    "to %s") %  ((p or "None"), u, uri))
810
811            if p:
812                # Request with user and project specified
813                req = {\
814                        'destinationTestbed' : { 'uri' : uri },
815                        'credential': [ "project: %s" % p, "user: %s"  % u],
816                        'allocID' : { 'localname': 'test' },
817                    }
818            else:
819                # Request with only user specified
820                req = {\
821                        'destinationTestbed' : { 'uri' : uri },
822                        'credential': [ 'user: %s' % u ],
823                        'user':  [ {'userID': { 'localname': u } } ],
824                        'allocID' : { 'localname': 'test' },
825                    }
826
827            if tb == master:
828                # NB, the export_project parameter is a dict that includes
829                # the type
830                req['exportProject'] = export_project
831                req['service'] = [
832                        { 'name': 'userconfig', 'visibility': 'export'},
833                        { 'name': 'SMB', 'visibility': 'export'},
834                        { 'name': 'seer', 'visibility': 'export'},
835                        { 'name': 'tmcd', 'visibility': 'export'},
836                    ]
837
838            # node resources if any
839            if nodes != None and len(nodes) > 0:
840                rnodes = [ ]
841                for n in nodes:
842                    rn = { }
843                    image, hw, count = n.split(":")
844                    if image: rn['image'] = [ image ]
845                    if hw: rn['hardware'] = [ hw ]
846                    if count and int(count) >0 : rn['count'] = int(count)
847                    rnodes.append(rn)
848                req['resources']= { }
849                req['resources']['node'] = rnodes
850
851            try:
852                if self.local_access.has_key(uri):
853                    # Local access call
854                    req = { 'RequestAccessRequestBody' : req }
855                    r = self.local_access[uri].RequestAccess(req, 
856                            fedid(file=self.cert_file))
857                    r = { 'RequestAccessResponseBody' : r }
858                else:
859                    r = self.call_RequestAccess(uri, req, 
860                            self.cert_file, self.cert_pwd, self.trusted_certs)
861            except service_error, e:
862                if e.code == service_error.access:
863                    self.log.debug("[get_access] Access denied")
864                    r = None
865                    continue
866                else:
867                    raise e
868
869            if r.has_key('RequestAccessResponseBody'):
870                # Through to here we have a valid response, not a fault.
871                # Access denied is a fault, so something better or worse than
872                # access denied has happened.
873                r = r['RequestAccessResponseBody']
874                self.log.debug("[get_access] Access granted")
875                break
876            else:
877                raise service_error(service_error.protocol,
878                        "Bad proxy response")
879       
880        if not r:
881            raise service_error(service_error.access, 
882                    "Access denied by %s (%s)" % (tb, uri))
883
884        tbparam[tb] = { 
885                "allocID" : r['allocID'],
886                "uri": uri,
887                }
888        if 'service' in r: 
889            services.extend(r['service'])
890
891        # Add attributes to parameter space.  We don't allow attributes to
892        # overlay any parameters already installed.
893        for a in r.get('fedAttr', []):
894            try:
895                if a['attribute'] and \
896                        isinstance(a['attribute'], basestring)\
897                        and not tbparam[tb].has_key(a['attribute'].lower()):
898                    tbparam[tb][a['attribute'].lower()] = a['value']
899            except KeyError:
900                self.log.error("Bad attribute in response: %s" % a)
901
902    def release_access(self, tb, aid, uri=None):
903        """
904        Release access to testbed through fedd
905        """
906
907        if not uri:
908            uri = self.tbmap.get(tb, None)
909        if not uri:
910            raise service_error(service_error.server_config, 
911                    "Unknown testbed: %s" % tb)
912
913        if self.local_access.has_key(uri):
914            resp = self.local_access[uri].ReleaseAccess(\
915                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
916                    fedid(file=self.cert_file))
917            resp = { 'ReleaseAccessResponseBody': resp } 
918        else:
919            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
920                    self.cert_file, self.cert_pwd, self.trusted_certs)
921
922        # better error coding
923
924    def remote_splitter(self, uri, desc, master):
925
926        req = {
927                'description' : { 'ns2description': desc },
928                'master': master,
929                'include_fedkit': bool(self.fedkit),
930                'include_gatewaykit': bool(self.gatewaykit)
931            }
932
933        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
934                self.trusted_certs)
935
936        if r.has_key('Ns2SplitResponseBody'):
937            r = r['Ns2SplitResponseBody']
938            if r.has_key('output'):
939                return r['output'].splitlines()
940            else:
941                raise service_error(service_error.protocol, 
942                        "Bad splitter response (no output)")
943        else:
944            raise service_error(service_error.protocol, "Bad splitter response")
945
946    class start_segment:
947        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
948                cert_pwd=None, trusted_certs=None, caller=None,
949                log_collector=None):
950            self.log = log
951            self.debug = debug
952            self.cert_file = cert_file
953            self.cert_pwd = cert_pwd
954            self.trusted_certs = None
955            self.caller = caller
956            self.testbed = testbed
957            self.log_collector = log_collector
958            self.response = None
959
960        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
961                services=None):
962            req = {
963                    'allocID': { 'fedid' : aid }, 
964                    'segmentdescription': { 
965                        'topdldescription': topo.to_dict(),
966                    },
967                    'master': master,
968                }
969
970            if connInfo:
971                req['connection'] = connInfo
972            # Add services to request.  The master exports, everyone else
973            # imports.
974            if services:
975                svcs = [ x.copy() for x in services]
976                for s in svcs:
977                    if master: s['visibility'] = 'export'
978                    else: s['visibility'] = 'import'
979                req['service'] = svcs
980            if attrs:
981                req['fedAttr'] = attrs
982
983            try:
984                self.log.debug("Calling StartSegment at %s " % uri)
985                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
986                        self.trusted_certs)
987                if r.has_key('StartSegmentResponseBody'):
988                    lval = r['StartSegmentResponseBody'].get('allocationLog',
989                            None)
990                    if lval and self.log_collector:
991                        for line in  lval.splitlines(True):
992                            self.log_collector.write(line)
993                    self.response = r
994                else:
995                    raise service_error(service_error.internal, 
996                            "Bad response!?: %s" %r)
997                return True
998            except service_error, e:
999                self.log.error("Start segment failed on %s: %s" % \
1000                        (self.testbed, e))
1001                return False
1002
1003
1004
1005    class terminate_segment:
1006        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1007                cert_pwd=None, trusted_certs=None, caller=None):
1008            self.log = log
1009            self.debug = debug
1010            self.cert_file = cert_file
1011            self.cert_pwd = cert_pwd
1012            self.trusted_certs = None
1013            self.caller = caller
1014            self.testbed = testbed
1015
1016        def __call__(self, uri, aid ):
1017            req = {
1018                    'allocID': aid , 
1019                }
1020            try:
1021                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1022                        self.trusted_certs)
1023                return True
1024            except service_error, e:
1025                self.log.error("Terminate segment failed on %s: %s" % \
1026                        (self.testbed, e))
1027                return False
1028   
1029
1030    def allocate_resources(self, allocated, master, eid, expid, 
1031            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
1032            attrs=None, connInfo={}, services=[]):
1033        def get_vlan(r):
1034            if r.has_key('StartSegmentResponseBody'):
1035                srb = r['StartSegmentResponseBody']
1036                if srb.has_key('fedAttr'):
1037                    for k, v in [ (a['attribute'], a['value']) \
1038                            for a in srb['fedAttr']]:
1039                        if k == 'vlan': return v
1040            return None
1041
1042        started = { }           # Testbeds where a sub-experiment started
1043                                # successfully
1044
1045        # XXX
1046        fail_soft = False
1047
1048        non_transit = [ k for k in allocated.keys() \
1049                if not topo[k].get_attribute('transit')]
1050        transit = [ k for k in allocated.keys() \
1051                if topo[k].get_attribute('transit')]
1052
1053        log = alloc_log or self.log
1054
1055        thread_pool = self.thread_pool(self.nthreads)
1056        threads = [ ]
1057
1058        for tb in transit:
1059            uri = tbparams[tb]['uri']
1060            if tbparams[tb].has_key('allocID') and \
1061                    tbparams[tb]['allocID'].has_key('fedid'):
1062                aid = tbparams[tb]['allocID']['fedid']
1063            else:
1064                raise service_error(service_error.internal, 
1065                        "No alloc id for testbed %s !?" % tb)
1066
1067            m = re.search('(\d+)', tb)
1068            if m:
1069                to_repl = "unassigned%s" % m.group(1)
1070            else:
1071                raise service_error(service_error.internal, 
1072                        "Bad dynamic allocation name")
1073                break
1074
1075            ss = self.start_segment(log=log, debug=self.debug, 
1076                testbed=tb, cert_file=self.cert_file, 
1077                cert_pwd=self.cert_pwd, 
1078                trusted_certs=self.trusted_certs,
1079                caller=self.call_StartSegment,
1080                log_collector=log_collector)
1081            t = self.pooled_thread(
1082                    target=ss,
1083                    args =(uri, aid, topo[tb], False, attrs, connInfo[tb],
1084                        services),
1085                    name=tb, pdata=thread_pool, trace_file=self.trace_file)
1086            threads.append(t)
1087            t.start()
1088            # Wait until the this transit node finishes (keep pinging the log,
1089            # though)
1090
1091            mins = 0
1092            while not thread_pool.wait_for_all_done(60.0):
1093                mins += 1
1094                alloc_log.info("Waiting for transit (it has been %d mins)" \
1095                        % mins)
1096
1097            if t.rv:
1098                vlan = get_vlan(ss.response)
1099                if vlan is not None:
1100                    for v in connInfo.values():
1101                        for i in v:
1102                            for a in i.get('fedAttr', []):
1103                                if a.get('attribute', "") == 'vlan_id' and \
1104                                        a.get('value', "") == to_repl:
1105                                    a['value'] = vlan
1106            else:
1107                break
1108            thread_pool.clear()
1109
1110
1111        failed = [ t.getName() for t in threads if not t.rv ]
1112
1113        if len(failed) == 0:
1114            for tb in non_transit:
1115                # Create and start a thread to start the segment, and save it
1116                # to get the return value later
1117                thread_pool.wait_for_slot()
1118                uri = self.tbmap.get(tb, None)
1119                if not uri:
1120                    raise service_error(service_error.internal, 
1121                            "Unknown testbed %s !?" % tb)
1122
1123                if tbparams[tb].has_key('allocID') and \
1124                        tbparams[tb]['allocID'].has_key('fedid'):
1125                    aid = tbparams[tb]['allocID']['fedid']
1126                else:
1127                    raise service_error(service_error.internal, 
1128                            "No alloc id for testbed %s !?" % tb)
1129
1130                t  = self.pooled_thread(\
1131                        target=self.start_segment(log=log, debug=self.debug,
1132                            testbed=tb, cert_file=self.cert_file,
1133                            cert_pwd=self.cert_pwd,
1134                            trusted_certs=self.trusted_certs,
1135                            caller=self.call_StartSegment,
1136                            log_collector=log_collector), 
1137                        args=(uri, aid, topo[tb], tb == master, 
1138                            attrs, connInfo[tb], services),
1139                        name=tb,
1140                        pdata=thread_pool, trace_file=self.trace_file)
1141                threads.append(t)
1142                t.start()
1143
1144            # Wait until all finish (keep pinging the log, though)
1145            mins = 0
1146            while not thread_pool.wait_for_all_done(60.0):
1147                mins += 1
1148                alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1149                        % mins)
1150
1151            thread_pool.clear()
1152
1153        failed = [ t.getName() for t in threads if not t.rv ]
1154        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1155
1156        # If one failed clean up, unless fail_soft is set
1157        if failed:
1158            if not fail_soft:
1159                thread_pool.clear()
1160                for tb in succeeded:
1161                    # Create and start a thread to stop the segment
1162                    thread_pool.wait_for_slot()
1163                    uri = tbparams[tb]['uri']
1164                    t  = self.pooled_thread(\
1165                            target=self.terminate_segment(log=log,
1166                                testbed=tb,
1167                                cert_file=self.cert_file, 
1168                                cert_pwd=self.cert_pwd,
1169                                trusted_certs=self.trusted_certs,
1170                                caller=self.call_TerminateSegment),
1171                            args=(uri, tbparams[tb]['federant']['allocID']),
1172                            name=tb,
1173                            pdata=thread_pool, trace_file=self.trace_file)
1174                    t.start()
1175                # Wait until all finish
1176                thread_pool.wait_for_all_done()
1177
1178                # release the allocations
1179                for tb in tbparams.keys():
1180                    self.release_access(tb, tbparams[tb]['allocID'],
1181                            tbparams[tb].get('uri', None))
1182                # Remove the placeholder
1183                self.state_lock.acquire()
1184                self.state[eid]['experimentStatus'] = 'failed'
1185                if self.state_filename: self.write_state()
1186                self.state_lock.release()
1187
1188                log.error("Swap in failed on %s" % ",".join(failed))
1189                return
1190        else:
1191            log.info("[start_segment]: Experiment %s active" % eid)
1192
1193
1194        # Walk up tmpdir, deleting as we go
1195        if self.cleanup:
1196            log.debug("[start_experiment]: removing %s" % tmpdir)
1197            for path, dirs, files in os.walk(tmpdir, topdown=False):
1198                for f in files:
1199                    os.remove(os.path.join(path, f))
1200                for d in dirs:
1201                    os.rmdir(os.path.join(path, d))
1202            os.rmdir(tmpdir)
1203        else:
1204            log.debug("[start_experiment]: not removing %s" % tmpdir)
1205
1206        # Insert the experiment into our state and update the disk copy
1207        self.state_lock.acquire()
1208        self.state[expid]['experimentStatus'] = 'active'
1209        self.state[eid] = self.state[expid]
1210        if self.state_filename: self.write_state()
1211        self.state_lock.release()
1212        return
1213
1214
1215    def add_kit(self, e, kit):
1216        """
1217        Add a Software object created from the list of (install, location)
1218        tuples passed as kit  to the software attribute of an object e.  We
1219        do this enough to break out the code, but it's kind of a hack to
1220        avoid changing the old tuple rep.
1221        """
1222
1223        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1224
1225        if isinstance(e.software, list): e.software.extend(s)
1226        else: e.software = s
1227
1228
1229    def create_experiment_state(self, fid, req, expid, expcert, 
1230            state='starting'):
1231        """
1232        Create the initial entry in the experiment's state.  The expid and
1233        expcert are the experiment's fedid and certifacte that represents that
1234        ID, which are installed in the experiment state.  If the request
1235        includes a suggested local name that is used if possible.  If the local
1236        name is already taken by an experiment owned by this user that has
1237        failed, it is overwritten.  Otherwise new letters are added until a
1238        valid localname is found.  The generated local name is returned.
1239        """
1240
1241        if req.has_key('experimentID') and \
1242                req['experimentID'].has_key('localname'):
1243            overwrite = False
1244            eid = req['experimentID']['localname']
1245            # If there's an old failed experiment here with the same local name
1246            # and accessible by this user, we'll overwrite it, otherwise we'll
1247            # fall through and do the collision avoidance.
1248            old_expid = self.get_experiment_fedid(eid)
1249            if old_expid and self.check_experiment_access(fid, old_expid):
1250                self.state_lock.acquire()
1251                status = self.state[eid].get('experimentStatus', None)
1252                if status and status == 'failed':
1253                    # remove the old access attribute
1254                    self.auth.unset_attribute(fid, old_expid)
1255                    overwrite = True
1256                    del self.state[eid]
1257                    del self.state[old_expid]
1258                self.state_lock.release()
1259            self.state_lock.acquire()
1260            while (self.state.has_key(eid) and not overwrite):
1261                eid += random.choice(string.ascii_letters)
1262            # Initial state
1263            self.state[eid] = {
1264                    'experimentID' : \
1265                            [ { 'localname' : eid }, {'fedid': expid } ],
1266                    'experimentStatus': state,
1267                    'experimentAccess': { 'X509' : expcert },
1268                    'owner': fid,
1269                    'log' : [],
1270                }
1271            self.state[expid] = self.state[eid]
1272            if self.state_filename: self.write_state()
1273            self.state_lock.release()
1274        else:
1275            eid = self.exp_stem
1276            for i in range(0,5):
1277                eid += random.choice(string.ascii_letters)
1278            self.state_lock.acquire()
1279            while (self.state.has_key(eid)):
1280                eid = self.exp_stem
1281                for i in range(0,5):
1282                    eid += random.choice(string.ascii_letters)
1283            # Initial state
1284            self.state[eid] = {
1285                    'experimentID' : \
1286                            [ { 'localname' : eid }, {'fedid': expid } ],
1287                    'experimentStatus': state,
1288                    'experimentAccess': { 'X509' : expcert },
1289                    'owner': fid,
1290                    'log' : [],
1291                }
1292            self.state[expid] = self.state[eid]
1293            if self.state_filename: self.write_state()
1294            self.state_lock.release()
1295
1296        return eid
1297
1298
1299    def allocate_ips_to_topo(self, top):
1300        """
1301        Add an ip4_address attribute to all the hosts in the topology, based on
1302        the shared substrates on which they sit.  An /etc/hosts file is also
1303        created and returned as a list of hostfiles entries.  We also return
1304        the allocator, because we may need to allocate IPs to portals
1305        (specifically DRAGON portals).
1306        """
1307        subs = sorted(top.substrates, 
1308                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1309                reverse=True)
1310        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1311        ifs = { }
1312        hosts = [ ]
1313
1314        for idx, s in enumerate(subs):
1315            a = ips.allocate(len(s.interfaces)+2)
1316            if a :
1317                base, num = a
1318                if num < len(s.interfaces) +2 : 
1319                    raise service_error(service_error.internal,
1320                            "Allocator returned wrong number of IPs??")
1321            else:
1322                raise service_error(service_error.req, 
1323                        "Cannot allocate IP addresses")
1324
1325            base += 1
1326            for i in s.interfaces:
1327                i.attribute.append(
1328                        topdl.Attribute('ip4_address', 
1329                            "%s" % ip_addr(base)))
1330                hname = i.element.name[0]
1331                if ifs.has_key(hname):
1332                    hosts.append("%s\t%s-%s %s-%d" % \
1333                            (ip_addr(base), hname, s.name, hname,
1334                                ifs[hname]))
1335                else:
1336                    ifs[hname] = 0
1337                    hosts.append("%s\t%s-%s %s-%d %s" % \
1338                            (ip_addr(base), hname, s.name, hname,
1339                                ifs[hname], hname))
1340
1341                ifs[hname] += 1
1342                base += 1
1343        return hosts, ips
1344
1345    def get_access_to_testbeds(self, testbeds, access_user, 
1346            export_project, master, allocated, tbparams, services):
1347        """
1348        Request access to the various testbeds required for this instantiation
1349        (passed in as testbeds).  User, access_user, expoert_project and master
1350        are used to construct the correct requests.  Per-testbed parameters are
1351        returned in tbparams.
1352        """
1353        for tb in testbeds:
1354            self.get_access(tb, None, tbparams, master,
1355                    export_project, access_user, services)
1356            allocated[tb] = 1
1357
1358    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1359        """
1360        Create the sub-topologies that are needed for experiment instantiation.
1361        """
1362        for tb in testbeds:
1363            topo[tb] = top.clone()
1364            to_delete = [ ]
1365            # XXX: copy in for loop to simplify
1366            for e in topo[tb].elements:
1367                etb = e.get_attribute('testbed')
1368                if etb and etb != tb:
1369                    for i in e.interface:
1370                        for s in i.subs:
1371                            try:
1372                                s.interfaces.remove(i)
1373                            except ValueError:
1374                                raise service_error(service_error.internal,
1375                                        "Can't remove interface??")
1376                    to_delete.append(e)
1377            for e in to_delete:
1378                topo[tb].elements.remove(e)
1379            topo[tb].make_indices()
1380
1381            for e in [ e for e in topo[tb].elements \
1382                    if isinstance(e,topdl.Computer)]:
1383                if self.fedkit: self.add_kit(e, self.fedkit)
1384
1385    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1386            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
1387            expid=None):
1388        """
1389        Return a new internet portal node and a dict with the connectionInfo to
1390        be attached.
1391        """
1392        dproject = tbparams[dt].get('project', 'project')
1393        ddomain = tbparams[dt].get('domain', ".example.com")
1394        mdomain = tbparams[master].get('domain', '.example.com')
1395        mproject = tbparams[master].get('project', 'project')
1396        muser = tbparams[master].get('user', 'root')
1397        smbshare = tbparams[master].get('smbshare', 'USERS')
1398
1399        if st == master or dt == master:
1400            active = ("%s" % (st == master))
1401        else:
1402            active = ("%s" % (st > dt))
1403
1404        ifaces = [ ]
1405        for sub, attrs in iface_desc:
1406            inf = topdl.Interface(
1407                    name="inf%03d" % len(ifaces),
1408                    substrate=sub,
1409                    attribute=[
1410                        topdl.Attribute(
1411                            attribute=n,
1412                            value = v)
1413                        for n, v in attrs
1414                        ]
1415                    )
1416            ifaces.append(inf)
1417        if conn_type == "ssh":
1418            try:
1419                aid = tbparams[st]['allocID']['fedid']
1420            except:
1421                self.log.debug("Can't get allocation id?")
1422                aid = None
1423            info = {
1424                    "type" : conn_type, 
1425                    "portal": myname,
1426                    'fedAttr': [ 
1427                            { 'attribute': 'masterdomain', 'value': mdomain},
1428                            { 'attribute': 'masterexperiment', 'value': 
1429                                "%s/%s" % (mproject, eid)},
1430                            { 'attribute': 'active', 'value': active},
1431                            # Move to SMB service description
1432                            { 'attribute': 'masteruser', 'value': muser},
1433                            { 'attribute': 'smbshare', 'value': smbshare},
1434                        ],
1435                    'parameter': [
1436                        {
1437                            'name': 'peer',
1438                            'key': 'fedid:%s/%s' % (expid, myname),
1439                            'store': self.store_url,
1440                            'type': 'output',
1441                        },
1442                        {
1443                            'name': 'peer',
1444                            'key': 'fedid:%s/%s' % (expid, desthost),
1445                            'store': self.store_url,
1446                            'type': 'input',
1447                        },
1448                        ]
1449                    }
1450            # Give this allocation the rights to access the key of the
1451            # peers
1452            if aid:
1453                for h in (myname, desthost):
1454                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
1455            else:
1456                self.log.error("No aid for %s in new_portal_node" % st)
1457        else:
1458            info = None
1459
1460        return (topdl.Computer(
1461                name=myname,
1462                attribute=[ 
1463                    topdl.Attribute(attribute=n,value=v)
1464                        for n, v in (\
1465                            ('portal', 'true'),
1466                            ('portal_type', portal_type), 
1467                        )
1468                    ],
1469                interface=ifaces,
1470                ), info)
1471
1472    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
1473        ddomain = tbparams[dt].get('domain', ".example.com")
1474        dproject = tbparams[dt].get('project', 'project')
1475        tsubstrate = \
1476                topdl.Substrate(name='%s-%s' % (st, dt),
1477                        attribute= [
1478                            topdl.Attribute(
1479                                attribute='portal',
1480                                value='true')
1481                            ]
1482                        )
1483        segment_element = topdl.Segment(
1484                id= tbparams[dt]['allocID'],
1485                type='emulab',
1486                uri = self.tbmap.get(dt, None),
1487                interface=[ 
1488                    topdl.Interface(
1489                        substrate=tsubstrate.name),
1490                    ],
1491                attribute = [
1492                    topdl.Attribute(attribute=n, value=v)
1493                        for n, v in (\
1494                            ('domain', ddomain),
1495                            ('experiment', "%s/%s" % \
1496                                    (dproject, eid)),)
1497                    ],
1498                )
1499
1500        return (tsubstrate, segment_element)
1501
1502    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
1503        if sub.capacity is None:
1504            raise service_error(service_error.internal,
1505                    "Cannot DRAGON split substrate w/o capacity")
1506        segs = [ ]
1507        substr = topdl.Substrate(name="dragon%d" % idx, 
1508                capacity=sub.capacity.clone(),
1509                attribute=[ topdl.Attribute(attribute=n, value=v)
1510                    for n, v, in (\
1511                            ('vlan', 'unassigned%d' % idx),)])
1512        name = "dragon%d" % idx
1513        for tb in tbs.keys():
1514            seg = topdl.Segment(
1515                    id = tbparams[tb]['allocID'],
1516                    type='emulab',
1517                    uri = self.tbmap.get(tb, None),
1518                    interface=[ 
1519                        topdl.Interface(
1520                            substrate=substr.name),
1521                        ],
1522                    attribute=[ topdl.Attribute(
1523                        attribute='dragon_endpoint', 
1524                        value=tbparams[tb]['dragon']),
1525                        ]
1526                    )
1527            if tbparams[tb].has_key('vlans'):
1528                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1529            segs.append(seg)
1530
1531
1532        try:
1533            aid = tbparams[tb]['allocID']['fedid']
1534        except:
1535            self.log.debug("Can't get allocation id?")
1536            aid = None
1537        connInfo[name] = [ { 
1538            'parameter': [ { 
1539                'name': 'vlan_id',
1540                'key': "fedid:%s/vlan%d" % (expid, idx),
1541                'store': self.store_url,
1542                'type': 'output'
1543                } ]
1544            } ]
1545
1546        # Give this allocation the rights to access the key of the
1547        # vlan_id
1548        if aid:
1549            self.auth.set_attribute(aid, 'fedid:%s/vlan%d' % (expid, idx))
1550
1551        topo[name] = \
1552                topdl.Topology(substrates=[substr], elements=segs,
1553                        attribute=[
1554                            topdl.Attribute(attribute="transit", value='true'),
1555                            topdl.Attribute(attribute="dynamic", value='true'),
1556                            topdl.Attribute(attribute="testbed", value='dragon'),
1557                            ]
1558                        )
1559
1560    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
1561            connInfo, peer={ }, expid=None):
1562        """
1563        Add attribiutes to the various elements indicating that they are to be
1564        dragon connected and create a dragon segment in topo to be
1565        instantiated.
1566        """
1567
1568        def get_substrate_from_topo(name, t):
1569            for s in t.substrates:
1570                if s.name == name: return s
1571            else: return None
1572
1573        mdomain = tbparams[master].get('domain', '.example.com')
1574        mproject = tbparams[master].get('project', 'project')
1575        # dn is the number of previously created dragon nets.  This routine
1576        # creates a net numbered by dn
1577        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1578        # Count the number of interfaces on this substrate in each testbed from
1579        # the global topology
1580        count = { }
1581        node = { }
1582        for e in [ i.element for i in sub.interfaces ]:
1583            tb = e.get_attribute('testbed')
1584            count[tb] = count.get(tb, 0) + 1
1585            node[tb] = i.get_attribute('ip4_address')
1586
1587
1588        # Set the attributes in the copies that will allow setup of dragon
1589        # connections.
1590        for tb in tbs.keys():
1591            s = get_substrate_from_topo(sub.name, topo[tb])
1592            if s:
1593                if not connInfo.has_key(tb):
1594                    connInfo[tb] = [ ]
1595
1596                try:
1597                    aid = tbparams[tb]['allocID']['fedid']
1598                except:
1599                    self.log.debug("Can't get allocation id?")
1600                    aid = None
1601
1602                # This may need another look, but only a service gateway will
1603                # look at the active parameter, and these are only inserted to
1604                # connect to the master.
1605                active = "%s" % ( tb == master)
1606                info = {
1607                        'type': 'transit',
1608                        'member': [ {
1609                            'element': i.element.name[0], 
1610                            'interface': i.name
1611                            } for i in s.interfaces \
1612                                    if isinstance(i.element, topdl.Computer) ],
1613                        'fedAttr': [ 
1614                            { 'attribute': 'vlan_id', 
1615                                'value': 'unassigned%d' % dn },
1616                            { 'attribute': 'masterdomain', 'value': mdomain},
1617                            { 'attribute': 'masterexperiment', 'value': 
1618                                "%s/%s" % (mproject, eid)},
1619                            { 'attribute': 'active', 'value': active},
1620                            ],
1621                        'parameter': [ {
1622                            'name': 'vlan_id',
1623                            'key': 'fedid:%s/vlan%d' % (expid, dn),
1624                            'store': self.store_url,
1625                            'type': 'input',
1626                            } ]
1627                        }
1628                if peer.has_key(tb):
1629                    info['peer'] = peer[tb]
1630                connInfo[tb].append(info)
1631
1632                # Give this allocation the rights to access the key of the
1633                # vlan_id
1634                if aid:
1635                    self.auth.set_attribute(aid, 
1636                            'fedid:%s/vlan%d' % (expid, dn))
1637            else:
1638                raise service_error(service_error.internal,
1639                        "No substrate %s in testbed %s" % (sub.name, tb))
1640
1641        self.new_dragon_topo(dn, sub, topo, tbs, tbparams)
1642
1643    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1644            segment_substrate, portals, connInfo, expid):
1645        # More than one testbed is on this substrate.  Insert
1646        # some portals into the subtopologies.  st == source testbed,
1647        # dt == destination testbed.
1648        for st in tbs.keys():
1649            if not segment_substrate.has_key(st):
1650                segment_substrate[st] = { }
1651            if not portals.has_key(st): 
1652                portals[st] = { }
1653            if not connInfo.has_key(st):
1654                connInfo[st] = [ ]
1655            for dt in [ t for t in tbs.keys() if t != st]:
1656                sproject = tbparams[st].get('project', 'project')
1657                dproject = tbparams[dt].get('project', 'project')
1658                mproject = tbparams[master].get('project', 'project')
1659                sdomain = tbparams[st].get('domain', ".example.com")
1660                ddomain = tbparams[dt].get('domain', ".example.com")
1661                mdomain = tbparams[master].get('domain', '.example.com')
1662                muser = tbparams[master].get('user', 'root')
1663                smbshare = tbparams[master].get('smbshare', 'USERS')
1664                aid = tbparams[dt]['allocID']['fedid']
1665                if st == master or dt == master:
1666                    active = ("%s" % (st == master))
1667                else:
1668                    active = ("%s" %(st > dt))
1669                if not segment_substrate[st].has_key(dt):
1670                    # Put a substrate and a segment for the connected
1671                    # testbed in there.
1672                    tsubstrate, segment_element = \
1673                            self.new_portal_substrate(st, dt, eid, tbparams,
1674                                    expid)
1675                    segment_substrate[st][dt] = tsubstrate
1676                    topo[st].substrates.append(tsubstrate)
1677                    topo[st].elements.append(segment_element)
1678
1679                new_portal = False
1680                if portals[st].has_key(dt):
1681                    # There's a portal set up to go to this destination.
1682                    # See if there's room to multiplex this connection on
1683                    # it.  If so, add an interface to the portal; if not,
1684                    # set up to add a portal below.
1685                    # [This little festival of braces is just a pop of the
1686                    # last element in the list of portals between st and
1687                    # dt.]
1688                    portal = portals[st][dt][-1]
1689                    mux = len([ i for i in portal.interface \
1690                            if not i.get_attribute('portal')])
1691                    if mux == self.muxmax:
1692                        new_portal = True
1693                        portal_type = "experiment"
1694                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1695                        desthost = "%stunnel%d" % (st.lower(), 
1696                                len(portals[st][dt]))
1697                    else:
1698                        new_i = topdl.Interface(
1699                                substrate=sub.name,
1700                                attribute=[ 
1701                                    topdl.Attribute(
1702                                        attribute='ip4_address', 
1703                                        value=tbs[dt]
1704                                    )
1705                                ])
1706                        portal.interface.append(new_i)
1707                else:
1708                    # First connection to this testbed, make an empty list
1709                    # and set up to add the new portal below
1710                    new_portal = True
1711                    portals[st][dt] = [ ]
1712                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1713                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
1714
1715                    if dt == master or st == master: portal_type = "both"
1716                    else: portal_type = "experiment"
1717
1718                if new_portal:
1719                    infs = (
1720                            (segment_substrate[st][dt].name, 
1721                                (('portal', 'true'),)),
1722                            (sub.name, 
1723                                (('ip4_address', tbs[dt]),))
1724                        )
1725                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
1726                            master, eid, myname, desthost, portal_type,
1727                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
1728                    if self.fedkit:
1729                        self.add_kit(portal, self.fedkit)
1730                    if self.gatewaykit: 
1731                        self.add_kit(portal, self.gatewaykit)
1732
1733                    topo[st].elements.append(portal)
1734                    portals[st][dt].append(portal)
1735                    connInfo[st].append(info)
1736
1737    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo, expid):
1738        # Add to the master testbed
1739        tsubstrate, segment_element = \
1740                self.new_portal_substrate(st, dt, eid, tbparams, expid)
1741        myname = "%stunnel" % dt
1742        desthost = "%stunnel" % st
1743
1744        portal, info = self.new_portal_node(st, dt, tbparams, master,
1745                eid, myname, desthost, "control", 
1746                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
1747                conn_attrs=[], expid=expid)
1748        if self.fedkit:
1749            self.add_kit(portal, self.fedkit)
1750        if self.gatewaykit: 
1751            self.add_kit(portal, self.gatewaykit)
1752
1753        topo[st].substrates.append(tsubstrate)
1754        topo[st].elements.append(segment_element)
1755        topo[st].elements.append(portal)
1756        if not connInfo.has_key(st):
1757            connInfo[st] = [ ]
1758        connInfo[st].append(info)
1759
1760    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1761            substrate, tbparams, expid):
1762        # Add to the master testbed
1763        myname = "%stunnel" % dt
1764        desthost = "%s" % ip_addr(dip)
1765
1766        portal, info = self.new_portal_node(st, dt, tbparams, master,
1767                eid, myname, desthost, "control", 
1768                ((substrate.name,(
1769                    ('portal','true'),
1770                    ('ip4_address', "%s" % ip_addr(myip)),)),),
1771                conn_type="transit", conn_attrs=[], expid=expid)
1772        if self.fedkit:
1773            self.add_kit(portal, self.fedkit)
1774        if self.gatewaykit: 
1775            self.add_kit(portal, self.gatewaykit)
1776
1777        return portal
1778
1779    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
1780            connInfo, expid):
1781        """
1782        For each substrate in the main topology, find those that
1783        have nodes on more than one testbed.  Insert portal nodes
1784        into the copies of those substrates on the sub topologies.
1785        """
1786        segment_substrate = { }
1787        portals = { }
1788        for s in top.substrates:
1789            # tbs will contain an ip address on this subsrate that is in
1790            # each testbed.
1791            tbs = { }
1792            for i in s.interfaces:
1793                e = i.element
1794                tb = e.get_attribute('testbed')
1795                if tb and not tbs.has_key(tb):
1796                    for i in e.interface:
1797                        if s in i.subs:
1798                            tbs[tb]= i.get_attribute('ip4_address')
1799            if len(tbs) < 2:
1800                continue
1801
1802            # DRAGON will not create multi-site vlans yet
1803            if len(tbs) == 2 and \
1804                    all([tbparams[x].has_key('dragon') for x in tbs]):
1805                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1806                        master, eid, connInfo, expid)
1807            else:
1808                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1809                        eid, segment_substrate, portals, connInfo, expid)
1810
1811        # Make sure that all the slaves have a control portal back to the
1812        # master.
1813        for tb in [ t for t in tbparams.keys() if t != master ]:
1814            if len([e for e in topo[tb].elements \
1815                    if isinstance(e, topdl.Computer) and \
1816                    e.get_attribute('portal') and \
1817                    e.get_attribute('portal_type') == 'both']) == 0:
1818
1819                if tbparams[master].has_key('dragon') \
1820                        and tbparams[tb].has_key('dragon'):
1821
1822                    idx = len([x for x in topo.keys() \
1823                            if x.startswith('dragon')])
1824                    dip, leng = ip_allocator.allocate(4)
1825                    dip += 1
1826                    mip = dip+1
1827                    csub = topdl.Substrate(
1828                            name="dragon-control-%s" % tb,
1829                            capacity=topdl.Capacity(100000.0, 'max'),
1830                            attribute=[
1831                                topdl.Attribute(
1832                                    attribute='portal',
1833                                    value='true'
1834                                    )
1835                                ]
1836                            )
1837                    seg = topdl.Segment(
1838                            id= tbparams[master]['allocID'],
1839                            type='emulab',
1840                            uri = self.tbmap.get(master, None),
1841                            interface=[ 
1842                                topdl.Interface(
1843                                    substrate=csub.name),
1844                                ],
1845                            attribute = [
1846                                topdl.Attribute(attribute=n, value=v)
1847                                    for n, v in (\
1848                                        ('domain', 
1849                                            tbparams[master].get('domain',
1850                                                ".example.com")),
1851                                        ('experiment', "%s/%s" % \
1852                                                (tbparams[master].get(
1853                                                    'project', 
1854                                                    'project'), 
1855                                                    eid)),)
1856                                ],
1857                            )
1858                    portal = self.new_dragon_portal(tb, master,
1859                            master, eid, dip, mip, idx, csub, tbparams, expid)
1860                    topo[tb].substrates.append(csub)
1861                    topo[tb].elements.append(portal)
1862                    topo[tb].elements.append(seg)
1863
1864                    mcsub = csub.clone()
1865                    seg = topdl.Segment(
1866                            id= tbparams[tb]['allocID'],
1867                            type='emulab',
1868                            uri = self.tbmap.get(tb, None),
1869                            interface=[ 
1870                                topdl.Interface(
1871                                    substrate=csub.name),
1872                                ],
1873                            attribute = [
1874                                topdl.Attribute(attribute=n, value=v)
1875                                    for n, v in (\
1876                                        ('domain', 
1877                                            tbparams[tb].get('domain',
1878                                                ".example.com")),
1879                                        ('experiment', "%s/%s" % \
1880                                                (tbparams[tb].get('project', 
1881                                                    'project'), 
1882                                                    eid)),)
1883                                ],
1884                            )
1885                    portal = self.new_dragon_portal(master, tb, master,
1886                            eid, mip, dip, idx, mcsub, tbparams, expid)
1887                    topo[master].substrates.append(mcsub)
1888                    topo[master].elements.append(portal)
1889                    topo[master].elements.append(seg)
1890                    for t in (master, tb):
1891                        topo[t].incorporate_elements()
1892
1893                    self.create_dragon_substrate(csub, topo, 
1894                            {tb: 1, master:1}, tbparams, master, eid, connInfo,
1895                            {tb: ip_addr(mip), master: ip_addr(dip)}, expid)
1896                else:
1897                    self.add_control_portal(master, tb, master, eid, topo, 
1898                            tbparams, connInfo, expid)
1899                    self.add_control_portal(tb, master, master, eid, topo, 
1900                            tbparams, connInfo, expid)
1901
1902        # Connect the portal nodes into the topologies and clear out
1903        # substrates that are not in the topologies
1904        for tb in tbparams.keys():
1905            topo[tb].incorporate_elements()
1906            topo[tb].substrates = \
1907                    [s for s in topo[tb].substrates \
1908                        if len(s.interfaces) >0]
1909
1910    def wrangle_software(self, expid, top, topo, tbparams):
1911        """
1912        Copy software out to the repository directory, allocate permissions and
1913        rewrite the segment topologies to look for the software in local
1914        places.
1915        """
1916
1917        # Copy the rpms and tarfiles to a distribution directory from
1918        # which the federants can retrieve them
1919        linkpath = "%s/software" %  expid
1920        softdir ="%s/%s" % ( self.repodir, linkpath)
1921        softmap = { }
1922        # These are in a list of tuples format (each kit).  This comprehension
1923        # unwraps them into a single list of tuples that initilaizes the set of
1924        # tuples.
1925        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1926                for p, t in l ])
1927        pkgs.update([x.location for e in top.elements \
1928                for x in e.software])
1929        try:
1930            os.makedirs(softdir)
1931        except IOError, e:
1932            raise service_error(
1933                    "Cannot create software directory: %s" % e)
1934        # The actual copying.  Everything's converted into a url for copying.
1935        for pkg in pkgs:
1936            loc = pkg
1937
1938            scheme, host, path = urlparse(loc)[0:3]
1939            dest = os.path.basename(path)
1940            if not scheme:
1941                if not loc.startswith('/'):
1942                    loc = "/%s" % loc
1943                loc = "file://%s" %loc
1944            try:
1945                u = urlopen(loc)
1946            except Exception, e:
1947                raise service_error(service_error.req, 
1948                        "Cannot open %s: %s" % (loc, e))
1949            try:
1950                f = open("%s/%s" % (softdir, dest) , "w")
1951                self.log.debug("Writing %s/%s" % (softdir,dest) )
1952                data = u.read(4096)
1953                while data:
1954                    f.write(data)
1955                    data = u.read(4096)
1956                f.close()
1957                u.close()
1958            except Exception, e:
1959                raise service_error(service_error.internal,
1960                        "Could not copy %s: %s" % (loc, e))
1961            path = re.sub("/tmp", "", linkpath)
1962            # XXX
1963            softmap[pkg] = \
1964                    "%s/%s/%s" %\
1965                    ( self.repo_url, path, dest)
1966
1967            # Allow the individual segments to access the software.
1968            for tb in tbparams.keys():
1969                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1970                        "/%s/%s" % ( path, dest))
1971
1972        # Convert the software locations in the segments into the local
1973        # copies on this host
1974        for soft in [ s for tb in topo.values() \
1975                for e in tb.elements \
1976                    if getattr(e, 'software', False) \
1977                        for s in e.software ]:
1978            if softmap.has_key(soft.location):
1979                soft.location = softmap[soft.location]
1980
1981
1982    def new_experiment(self, req, fid):
1983        """
1984        The external interface to empty initial experiment creation called from
1985        the dispatcher.
1986
1987        Creates a working directory, splits the incoming description using the
1988        splitter script and parses out the avrious subsections using the
1989        lcasses above.  Once each sub-experiment is created, use pooled threads
1990        to instantiate them and start it all up.
1991        """
1992        if not self.auth.check_attribute(fid, 'new'):
1993            raise service_error(service_error.access, "New access denied")
1994
1995        try:
1996            tmpdir = tempfile.mkdtemp(prefix="split-")
1997        except IOError:
1998            raise service_error(service_error.internal, "Cannot create tmp dir")
1999
2000        try:
2001            access_user = self.accessdb[fid]
2002        except KeyError:
2003            raise service_error(service_error.internal,
2004                    "Access map and authorizer out of sync in " + \
2005                            "new_experiment for fedid %s"  % fid)
2006
2007        pid = "dummy"
2008        gid = "dummy"
2009
2010        req = req.get('NewRequestBody', None)
2011        if not req:
2012            raise service_error(service_error.req,
2013                    "Bad request format (no NewRequestBody)")
2014
2015        # Generate an ID for the experiment (slice) and a certificate that the
2016        # allocator can use to prove they own it.  We'll ship it back through
2017        # the encrypted connection.
2018        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
2019
2020        #now we're done with the tmpdir, and it should be empty
2021        if self.cleanup:
2022            self.log.debug("[new_experiment]: removing %s" % tmpdir)
2023            os.rmdir(tmpdir)
2024        else:
2025            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
2026
2027        eid = self.create_experiment_state(fid, req, expid, expcert, 
2028                state='empty')
2029
2030        # Let users touch the state
2031        self.auth.set_attribute(fid, expid)
2032        self.auth.set_attribute(expid, expid)
2033        # Override fedids can manipulate state as well
2034        for o in self.overrides:
2035            self.auth.set_attribute(o, expid)
2036
2037        rv = {
2038                'experimentID': [
2039                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2040                ],
2041                'experimentStatus': 'empty',
2042                'experimentAccess': { 'X509' : expcert }
2043            }
2044
2045        return rv
2046
2047
2048    def create_experiment(self, req, fid):
2049        """
2050        The external interface to experiment creation called from the
2051        dispatcher.
2052
2053        Creates a working directory, splits the incoming description using the
2054        splitter script and parses out the avrious subsections using the
2055        lcasses above.  Once each sub-experiment is created, use pooled threads
2056        to instantiate them and start it all up.
2057        """
2058
2059        req = req.get('CreateRequestBody', None)
2060        if not req:
2061            raise service_error(service_error.req,
2062                    "Bad request format (no CreateRequestBody)")
2063
2064        # Get the experiment access
2065        exp = req.get('experimentID', None)
2066        if exp:
2067            if exp.has_key('fedid'):
2068                key = exp['fedid']
2069                expid = key
2070                eid = None
2071            elif exp.has_key('localname'):
2072                key = exp['localname']
2073                eid = key
2074                expid = None
2075            else:
2076                raise service_error(service_error.req, "Unknown lookup type")
2077        else:
2078            raise service_error(service_error.req, "No request?")
2079
2080        self.check_experiment_access(fid, key)
2081
2082        try:
2083            tmpdir = tempfile.mkdtemp(prefix="split-")
2084            os.mkdir(tmpdir+"/keys")
2085        except IOError:
2086            raise service_error(service_error.internal, "Cannot create tmp dir")
2087
2088        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2089        gw_secretkey_base = "fed.%s" % self.ssh_type
2090        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2091        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2092        tclfile = tmpdir + "/experiment.tcl"
2093        tbparams = { }
2094        try:
2095            access_user = self.accessdb[fid]
2096        except KeyError:
2097            raise service_error(service_error.internal,
2098                    "Access map and authorizer out of sync in " + \
2099                            "create_experiment for fedid %s"  % fid)
2100
2101        pid = "dummy"
2102        gid = "dummy"
2103
2104        # The tcl parser needs to read a file so put the content into that file
2105        descr=req.get('experimentdescription', None)
2106        if descr:
2107            file_content=descr.get('ns2description', None)
2108            if file_content:
2109                try:
2110                    f = open(tclfile, 'w')
2111                    f.write(file_content)
2112                    f.close()
2113                except IOError:
2114                    raise service_error(service_error.internal,
2115                            "Cannot write temp experiment description")
2116            else:
2117                raise service_error(service_error.req, 
2118                        "Only ns2descriptions supported")
2119        else:
2120            raise service_error(service_error.req, "No experiment description")
2121
2122        self.state_lock.acquire()
2123        if self.state.has_key(key):
2124            self.state[key]['experimentStatus'] = "starting"
2125            for e in self.state[key].get('experimentID',[]):
2126                if not expid and e.has_key('fedid'):
2127                    expid = e['fedid']
2128                elif not eid and e.has_key('localname'):
2129                    eid = e['localname']
2130        self.state_lock.release()
2131
2132        if not (eid and expid):
2133            raise service_error(service_error.internal, 
2134                    "Cannot find local experiment info!?")
2135
2136        try: 
2137            # This catches exceptions to clear the placeholder if necessary
2138            try:
2139                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2140            except ValueError:
2141                raise service_error(service_error.server_config, 
2142                        "Bad key type (%s)" % self.ssh_type)
2143
2144            master = req.get('master', None)
2145            if not master:
2146                raise service_error(service_error.req,
2147                        "No master testbed label")
2148            export_project = req.get('exportProject', None)
2149            if not export_project:
2150                raise service_error(service_error.req, "No export project")
2151           
2152            # Translate to topdl
2153            if self.splitter_url:
2154                # XXX: need remote topdl translator
2155                self.log.debug("Calling remote splitter at %s" % \
2156                        self.splitter_url)
2157                split_data = self.remote_splitter(self.splitter_url,
2158                        file_content, master)
2159            else:
2160                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2161                    str(self.muxmax), '-m', master]
2162
2163                if self.fedkit:
2164                    tclcmd.append('-k')
2165
2166                if self.gatewaykit:
2167                    tclcmd.append('-K')
2168
2169                tclcmd.extend([pid, gid, eid, tclfile])
2170
2171                self.log.debug("running local splitter %s", " ".join(tclcmd))
2172                # This is just fantastic.  As a side effect the parser copies
2173                # tb_compat.tcl into the current directory, so that directory
2174                # must be writable by the fedd user.  Doing this in the
2175                # temporary subdir ensures this is the case.
2176                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2177                        cwd=tmpdir)
2178                split_data = tclparser.stdout
2179
2180            top = topdl.topology_from_xml(file=split_data, top="experiment")
2181
2182            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2183             # Find the testbeds to look up
2184            testbeds = set([ a.value for e in top.elements \
2185                    for a in e.attribute \
2186                    if a.attribute == 'testbed'] )
2187
2188            allocated = { }         # Testbeds we can access
2189            topo ={ }               # Sub topologies
2190            connInfo = { }          # Connection information
2191            services = [ ]
2192            self.get_access_to_testbeds(testbeds, access_user, 
2193                    export_project, master, allocated, tbparams, services)
2194            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2195
2196            # Copy configuration files into the remote file store
2197            # The config urlpath
2198            configpath = "/%s/config" % expid
2199            # The config file system location
2200            configdir ="%s%s" % ( self.repodir, configpath)
2201            try:
2202                os.makedirs(configdir)
2203            except IOError, e:
2204                raise service_error(
2205                        "Cannot create config directory: %s" % e)
2206            try:
2207                f = open("%s/hosts" % configdir, "w")
2208                f.write('\n'.join(hosts))
2209                f.close()
2210            except IOError, e:
2211                raise service_error(service_error.internal, 
2212                        "Cannot write hosts file: %s" % e)
2213            try:
2214                copy_file("%s" % gw_pubkey, "%s/%s" % \
2215                        (configdir, gw_pubkey_base))
2216                copy_file("%s" % gw_secretkey, "%s/%s" % \
2217                        (configdir, gw_secretkey_base))
2218            except IOError, e:
2219                raise service_error(service_error.internal, 
2220                        "Cannot copy keyfiles: %s" % e)
2221
2222            # Allow the individual testbeds to access the configuration files.
2223            for tb in tbparams.keys():
2224                asignee = tbparams[tb]['allocID']['fedid']
2225                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2226                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2227
2228            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
2229                    connInfo, expid)
2230            # Now get access to the dynamic testbeds
2231            for k, t in topo.items():
2232                if not t.get_attribute('dynamic'):
2233                    continue
2234                tb = t.get_attribute('testbed')
2235                if tb: 
2236                    self.get_access(tb, None, tbparams, master, 
2237                            export_project, access_user, services)
2238                    tbparams[k] = tbparams[tb]
2239                    del tbparams[tb]
2240                    allocated[k] = 1
2241                else:
2242                    raise service_error(service_error.internal, 
2243                            "Dynamic allocation from no testbed!?")
2244
2245            self.wrangle_software(expid, top, topo, tbparams)
2246
2247            vtopo = topdl.topology_to_vtopo(top)
2248            vis = self.genviz(vtopo)
2249
2250            # save federant information
2251            for k in allocated.keys():
2252                tbparams[k]['federant'] = {
2253                        'name': [ { 'localname' : eid} ],
2254                        'allocID' : tbparams[k]['allocID'],
2255                        'master' : k == master,
2256                        'uri': tbparams[k]['uri'],
2257                    }
2258                if tbparams[k].has_key('emulab'):
2259                        tbparams[k]['federant']['emulab'] = \
2260                                tbparams[k]['emulab']
2261
2262            self.state_lock.acquire()
2263            self.state[eid]['vtopo'] = vtopo
2264            self.state[eid]['vis'] = vis
2265            self.state[expid]['federant'] = \
2266                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2267                        if tbparams[tb].has_key('federant') ]
2268            if self.state_filename: 
2269                self.write_state()
2270            self.state_lock.release()
2271        except service_error, e:
2272            # If something goes wrong in the parse (usually an access error)
2273            # clear the placeholder state.  From here on out the code delays
2274            # exceptions.  Failing at this point returns a fault to the remote
2275            # caller.
2276
2277            self.state_lock.acquire()
2278            del self.state[eid]
2279            del self.state[expid]
2280            if self.state_filename: self.write_state()
2281            self.state_lock.release()
2282            raise e
2283
2284
2285        # Start the background swapper and return the starting state.  From
2286        # here on out, the state will stick around a while.
2287
2288        # Let users touch the state
2289        self.auth.set_attribute(fid, expid)
2290        self.auth.set_attribute(expid, expid)
2291        # Override fedids can manipulate state as well
2292        for o in self.overrides:
2293            self.auth.set_attribute(o, expid)
2294
2295        # Create a logger that logs to the experiment's state object as well as
2296        # to the main log file.
2297        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2298        alloc_collector = self.list_log(self.state[eid]['log'])
2299        h = logging.StreamHandler(alloc_collector)
2300        # XXX: there should be a global one of these rather than repeating the
2301        # code.
2302        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2303                    '%d %b %y %H:%M:%S'))
2304        alloc_log.addHandler(h)
2305       
2306        attrs = [ 
2307                {
2308                    'attribute': 'ssh_pubkey', 
2309                    'value': '%s/%s/config/%s' % \
2310                            (self.repo_url, expid, gw_pubkey_base)
2311                },
2312                {
2313                    'attribute': 'ssh_secretkey', 
2314                    'value': '%s/%s/config/%s' % \
2315                            (self.repo_url, expid, gw_secretkey_base)
2316                },
2317                {
2318                    'attribute': 'hosts', 
2319                    'value': '%s/%s/config/hosts' % \
2320                            (self.repo_url, expid)
2321                },
2322                {
2323                    'attribute': 'experiment_name',
2324                    'value': eid,
2325                },
2326            ]
2327
2328        # transit and disconnected testbeds may not have a connInfo entry.
2329        # Fill in the blanks.
2330        for t in allocated.keys():
2331            if not connInfo.has_key(t):
2332                connInfo[t] = { }
2333
2334        # Start a thread to do the resource allocation
2335        t  = Thread(target=self.allocate_resources,
2336                args=(allocated, master, eid, expid, tbparams, 
2337                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
2338                    services),
2339                name=eid)
2340        t.start()
2341
2342        rv = {
2343                'experimentID': [
2344                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2345                ],
2346                'experimentStatus': 'starting',
2347            }
2348
2349        return rv
2350   
2351    def get_experiment_fedid(self, key):
2352        """
2353        find the fedid associated with the localname key in the state database.
2354        """
2355
2356        rv = None
2357        self.state_lock.acquire()
2358        if self.state.has_key(key):
2359            if isinstance(self.state[key], dict):
2360                try:
2361                    kl = [ f['fedid'] for f in \
2362                            self.state[key]['experimentID']\
2363                                if f.has_key('fedid') ]
2364                except KeyError:
2365                    self.state_lock.release()
2366                    raise service_error(service_error.internal, 
2367                            "No fedid for experiment %s when getting "+\
2368                                    "fedid(!?)" % key)
2369                if len(kl) == 1:
2370                    rv = kl[0]
2371                else:
2372                    self.state_lock.release()
2373                    raise service_error(service_error.internal, 
2374                            "multiple fedids for experiment %s when " +\
2375                                    "getting fedid(!?)" % key)
2376            else:
2377                self.state_lock.release()
2378                raise service_error(service_error.internal, 
2379                        "Unexpected state for %s" % key)
2380        self.state_lock.release()
2381        return rv
2382
2383    def check_experiment_access(self, fid, key):
2384        """
2385        Confirm that the fid has access to the experiment.  Though a request
2386        may be made in terms of a local name, the access attribute is always
2387        the experiment's fedid.
2388        """
2389        if not isinstance(key, fedid):
2390            key = self.get_experiment_fedid(key)
2391
2392        if self.auth.check_attribute(fid, key):
2393            return True
2394        else:
2395            raise service_error(service_error.access, "Access Denied")
2396
2397
2398    def get_handler(self, path, fid):
2399        self.log.info("Get handler %s %s" % (path, fid))
2400        if self.auth.check_attribute(fid, path):
2401            return ("%s/%s" % (self.repodir, path), "application/binary")
2402        else:
2403            return (None, None)
2404
2405    def get_vtopo(self, req, fid):
2406        """
2407        Return the stored virtual topology for this experiment
2408        """
2409        rv = None
2410        state = None
2411
2412        req = req.get('VtopoRequestBody', None)
2413        if not req:
2414            raise service_error(service_error.req,
2415                    "Bad request format (no VtopoRequestBody)")
2416        exp = req.get('experiment', None)
2417        if exp:
2418            if exp.has_key('fedid'):
2419                key = exp['fedid']
2420                keytype = "fedid"
2421            elif exp.has_key('localname'):
2422                key = exp['localname']
2423                keytype = "localname"
2424            else:
2425                raise service_error(service_error.req, "Unknown lookup type")
2426        else:
2427            raise service_error(service_error.req, "No request?")
2428
2429        self.check_experiment_access(fid, key)
2430
2431        self.state_lock.acquire()
2432        if self.state.has_key(key):
2433            if self.state[key].has_key('vtopo'):
2434                rv = { 'experiment' : {keytype: key },\
2435                        'vtopo': self.state[key]['vtopo'],\
2436                    }
2437            else:
2438                state = self.state[key]['experimentStatus']
2439        self.state_lock.release()
2440
2441        if rv: return rv
2442        else: 
2443            if state:
2444                raise service_error(service_error.partial, 
2445                        "Not ready: %s" % state)
2446            else:
2447                raise service_error(service_error.req, "No such experiment")
2448
2449    def get_vis(self, req, fid):
2450        """
2451        Return the stored visualization for this experiment
2452        """
2453        rv = None
2454        state = None
2455
2456        req = req.get('VisRequestBody', None)
2457        if not req:
2458            raise service_error(service_error.req,
2459                    "Bad request format (no VisRequestBody)")
2460        exp = req.get('experiment', None)
2461        if exp:
2462            if exp.has_key('fedid'):
2463                key = exp['fedid']
2464                keytype = "fedid"
2465            elif exp.has_key('localname'):
2466                key = exp['localname']
2467                keytype = "localname"
2468            else:
2469                raise service_error(service_error.req, "Unknown lookup type")
2470        else:
2471            raise service_error(service_error.req, "No request?")
2472
2473        self.check_experiment_access(fid, key)
2474
2475        self.state_lock.acquire()
2476        if self.state.has_key(key):
2477            if self.state[key].has_key('vis'):
2478                rv =  { 'experiment' : {keytype: key },\
2479                        'vis': self.state[key]['vis'],\
2480                        }
2481            else:
2482                state = self.state[key]['experimentStatus']
2483        self.state_lock.release()
2484
2485        if rv: return rv
2486        else:
2487            if state:
2488                raise service_error(service_error.partial, 
2489                        "Not ready: %s" % state)
2490            else:
2491                raise service_error(service_error.req, "No such experiment")
2492
2493    def clean_info_response(self, rv):
2494        """
2495        Remove the information in the experiment's state object that is not in
2496        the info response.
2497        """
2498        # Remove the owner info (should always be there, but...)
2499        if rv.has_key('owner'): del rv['owner']
2500
2501        # Convert the log into the allocationLog parameter and remove the
2502        # log entry (with defensive programming)
2503        if rv.has_key('log'):
2504            rv['allocationLog'] = "".join(rv['log'])
2505            del rv['log']
2506        else:
2507            rv['allocationLog'] = ""
2508
2509        if rv['experimentStatus'] != 'active':
2510            if rv.has_key('federant'): del rv['federant']
2511        else:
2512            # remove the allocationID and uri info from each federant
2513            for f in rv.get('federant', []):
2514                if f.has_key('allocID'): del f['allocID']
2515                if f.has_key('uri'): del f['uri']
2516        return rv
2517
2518    def get_info(self, req, fid):
2519        """
2520        Return all the stored info about this experiment
2521        """
2522        rv = None
2523
2524        req = req.get('InfoRequestBody', None)
2525        if not req:
2526            raise service_error(service_error.req,
2527                    "Bad request format (no InfoRequestBody)")
2528        exp = req.get('experiment', None)
2529        if exp:
2530            if exp.has_key('fedid'):
2531                key = exp['fedid']
2532                keytype = "fedid"
2533            elif exp.has_key('localname'):
2534                key = exp['localname']
2535                keytype = "localname"
2536            else:
2537                raise service_error(service_error.req, "Unknown lookup type")
2538        else:
2539            raise service_error(service_error.req, "No request?")
2540
2541        self.check_experiment_access(fid, key)
2542
2543        # The state may be massaged by the service function that called
2544        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2545        # state.
2546        self.state_lock.acquire()
2547        if self.state.has_key(key):
2548            rv = copy.deepcopy(self.state[key])
2549        self.state_lock.release()
2550
2551        if rv:
2552            return self.clean_info_response(rv)
2553        else:
2554            raise service_error(service_error.req, "No such experiment")
2555
2556    def get_multi_info(self, req, fid):
2557        """
2558        Return all the stored info that this fedid can access
2559        """
2560        rv = { 'info': [ ] }
2561
2562        self.state_lock.acquire()
2563        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2564            try:
2565                self.check_experiment_access(fid, key)
2566            except service_error, e:
2567                if e.code == service_error.access:
2568                    continue
2569                else:
2570                    self.state_lock.release()
2571                    raise e
2572
2573            if self.state.has_key(key):
2574                e = copy.deepcopy(self.state[key])
2575                e = self.clean_info_response(e)
2576                rv['info'].append(e)
2577        self.state_lock.release()
2578        return rv
2579
2580    def terminate_experiment(self, req, fid):
2581        """
2582        Swap this experiment out on the federants and delete the shared
2583        information
2584        """
2585        tbparams = { }
2586        req = req.get('TerminateRequestBody', None)
2587        if not req:
2588            raise service_error(service_error.req,
2589                    "Bad request format (no TerminateRequestBody)")
2590        force = req.get('force', False)
2591        exp = req.get('experiment', None)
2592        if exp:
2593            if exp.has_key('fedid'):
2594                key = exp['fedid']
2595                keytype = "fedid"
2596            elif exp.has_key('localname'):
2597                key = exp['localname']
2598                keytype = "localname"
2599            else:
2600                raise service_error(service_error.req, "Unknown lookup type")
2601        else:
2602            raise service_error(service_error.req, "No request?")
2603
2604        self.check_experiment_access(fid, key)
2605
2606        dealloc_list = [ ]
2607
2608
2609        # Create a logger that logs to the dealloc_list as well as to the main
2610        # log file.
2611        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2612        h = logging.StreamHandler(self.list_log(dealloc_list))
2613        # XXX: there should be a global one of these rather than repeating the
2614        # code.
2615        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2616                    '%d %b %y %H:%M:%S'))
2617        dealloc_log.addHandler(h)
2618
2619        self.state_lock.acquire()
2620        fed_exp = self.state.get(key, None)
2621
2622        if fed_exp:
2623            # This branch of the conditional holds the lock to generate a
2624            # consistent temporary tbparams variable to deallocate experiments.
2625            # It releases the lock to do the deallocations and reacquires it to
2626            # remove the experiment state when the termination is complete.
2627
2628            # First make sure that the experiment creation is complete.
2629            status = fed_exp.get('experimentStatus', None)
2630
2631            if status:
2632                if status in ('starting', 'terminating'):
2633                    if not force:
2634                        self.state_lock.release()
2635                        raise service_error(service_error.partial, 
2636                                'Experiment still being created or destroyed')
2637                    else:
2638                        self.log.warning('Experiment in %s state ' % status + \
2639                                'being terminated by force.')
2640            else:
2641                # No status??? trouble
2642                self.state_lock.release()
2643                raise service_error(service_error.internal,
2644                        "Experiment has no status!?")
2645
2646            ids = []
2647            #  experimentID is a list of dicts that are self-describing
2648            #  identifiers.  This finds all the fedids and localnames - the
2649            #  keys of self.state - and puts them into ids.
2650            for id in fed_exp.get('experimentID', []):
2651                if id.has_key('fedid'): ids.append(id['fedid'])
2652                if id.has_key('localname'): ids.append(id['localname'])
2653
2654            # Collect the allocation/segment ids into a dict keyed by the fedid
2655            # of the allocation (or a monotonically increasing integer) that
2656            # contains a tuple of uri, aid (which is a dict...)
2657            for i, fed in enumerate(fed_exp.get('federant', [])):
2658                try:
2659                    uri = fed['uri']
2660                    aid = fed['allocID']
2661                    k = fed['allocID'].get('fedid', i)
2662                except KeyError, e:
2663                    continue
2664                tbparams[k] = (uri, aid)
2665            fed_exp['experimentStatus'] = 'terminating'
2666            if self.state_filename: self.write_state()
2667            self.state_lock.release()
2668
2669            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2670            # then completes, so we can't wait if nothing starts.  So, no
2671            # tbparams, no start.
2672            if len(tbparams) > 0:
2673                thread_pool = self.thread_pool(self.nthreads)
2674                for k in tbparams.keys():
2675                    # Create and start a thread to stop the segment
2676                    thread_pool.wait_for_slot()
2677                    uri, aid = tbparams[k]
2678                    t  = self.pooled_thread(\
2679                            target=self.terminate_segment(log=dealloc_log,
2680                                testbed=uri,
2681                                cert_file=self.cert_file, 
2682                                cert_pwd=self.cert_pwd,
2683                                trusted_certs=self.trusted_certs,
2684                                caller=self.call_TerminateSegment),
2685                            args=(uri, aid), name=k,
2686                            pdata=thread_pool, trace_file=self.trace_file)
2687                    t.start()
2688                # Wait for completions
2689                thread_pool.wait_for_all_done()
2690
2691            # release the allocations (failed experiments have done this
2692            # already, and starting experiments may be in odd states, so we
2693            # ignore errors releasing those allocations
2694            try: 
2695                for k in tbparams.keys():
2696                    # This releases access by uri
2697                    uri, aid = tbparams[k]
2698                    self.release_access(None, aid, uri=uri)
2699            except service_error, e:
2700                if status != 'failed' and not force:
2701                    raise e
2702
2703            # Remove the terminated experiment
2704            self.state_lock.acquire()
2705            for id in ids:
2706                if self.state.has_key(id): del self.state[id]
2707
2708            if self.state_filename: self.write_state()
2709            self.state_lock.release()
2710
2711            # Delete any synch points associated with this experiment.  All
2712            # synch points begin with the fedid of the experiment.
2713            fedid_keys = set(["fedid:%s" % f for f in ids \
2714                    if isinstance(f, fedid)])
2715            for k in self.synch_store.all_keys():
2716                try:
2717                    if len(k) > 45 and k[0:46] in fedid_keys:
2718                        self.synch_store.del_value(k)
2719                except synch_store.BadDeleteionError:
2720                    pass
2721            self.write_store()
2722               
2723            return { 
2724                    'experiment': exp , 
2725                    'deallocationLog': "".join(dealloc_list),
2726                    }
2727        else:
2728            # Don't forget to release the lock
2729            self.state_lock.release()
2730            raise service_error(service_error.req, "No saved state")
2731
2732
2733    def GetValue(self, req, fid):
2734        """
2735        Get a value from the synchronized store
2736        """
2737        req = req.get('GetValueRequestBody', None)
2738        if not req:
2739            raise service_error(service_error.req,
2740                    "Bad request format (no GetValueRequestBody)")
2741       
2742        name = req['name']
2743        wait = req['wait']
2744        rv = { 'name': name }
2745
2746        if self.auth.check_attribute(fid, name):
2747            v = self.synch_store.get_value(name, wait)
2748            if v is not None:
2749                rv['value'] = v
2750            return rv
2751        else:
2752            raise service_error(service_error.access, "Access Denied")
2753       
2754
2755    def SetValue(self, req, fid):
2756        """
2757        Set a value in the synchronized store
2758        """
2759        req = req.get('SetValueRequestBody', None)
2760        if not req:
2761            raise service_error(service_error.req,
2762                    "Bad request format (no SetValueRequestBody)")
2763       
2764        name = req['name']
2765        v = req['value']
2766
2767        if self.auth.check_attribute(fid, name):
2768            try:
2769                self.synch_store.set_value(name, v)
2770                self.write_store()
2771            except synch_store.CollisionError:
2772                # Translate into a service_error
2773                raise service_error(service_error.req,
2774                        "Value already set: %s" %name)
2775            return { 'name': name, 'value': v }
2776        else:
2777            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.