source: fedd/federation/experiment_control.py @ 8846959

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 8846959 was 8846959, checked in by Ted Faber <faber@…>, 14 years ago

remove debugging

  • Property mode set to 100644
File size: 93.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_control_local:
45    """
46    Control of experiments that this system can directly access.
47
48    Includes experiment creation, termination and information dissemination.
49    Thred safe.
50    """
51
52    class ssh_cmd_timeout(RuntimeError): pass
53   
54    class thread_pool:
55        """
56        A class to keep track of a set of threads all invoked for the same
57        task.  Manages the mutual exclusion of the states.
58        """
59        def __init__(self, nthreads):
60            """
61            Start a pool.
62            """
63            self.changed = Condition()
64            self.started = 0
65            self.terminated = 0
66            self.nthreads = nthreads
67
68        def acquire(self):
69            """
70            Get the pool's lock.
71            """
72            self.changed.acquire()
73
74        def release(self):
75            """
76            Release the pool's lock.
77            """
78            self.changed.release()
79
80        def wait(self, timeout = None):
81            """
82            Wait for a pool thread to start or stop.
83            """
84            self.changed.wait(timeout)
85
86        def start(self):
87            """
88            Called by a pool thread to report starting.
89            """
90            self.changed.acquire()
91            self.started += 1
92            self.changed.notifyAll()
93            self.changed.release()
94
95        def terminate(self):
96            """
97            Called by a pool thread to report finishing.
98            """
99            self.changed.acquire()
100            self.terminated += 1
101            self.changed.notifyAll()
102            self.changed.release()
103
104        def clear(self):
105            """
106            Clear all pool data.
107            """
108            self.changed.acquire()
109            self.started = 0
110            self.terminated =0
111            self.changed.notifyAll()
112            self.changed.release()
113
114        def wait_for_slot(self):
115            """
116            Wait until we have a free slot to start another pooled thread
117            """
118            self.acquire()
119            while self.started - self.terminated >= self.nthreads:
120                self.wait()
121            self.release()
122
123        def wait_for_all_done(self, timeout=None):
124            """
125            Wait until all active threads finish (and at least one has
126            started).  If a timeout is given, return after waiting that long
127            for termination.  If all threads are done (and one has started in
128            the since the last clear()) return True, otherwise False.
129            """
130            if timeout:
131                deadline = time.time() + timeout
132            self.acquire()
133            while self.started == 0 or self.started > self.terminated:
134                self.wait(timeout)
135                if timeout:
136                    if time.time() > deadline:
137                        break
138                    timeout = deadline - time.time()
139            self.release()
140            return not (self.started == 0 or self.started > self.terminated)
141
142    class pooled_thread(Thread):
143        """
144        One of a set of threads dedicated to a specific task.  Uses the
145        thread_pool class above for coordination.
146        """
147        def __init__(self, group=None, target=None, name=None, args=(), 
148                kwargs={}, pdata=None, trace_file=None):
149            Thread.__init__(self, group, target, name, args, kwargs)
150            self.rv = None          # Return value of the ops in this thread
151            self.exception = None   # Exception that terminated this thread
152            self.target=target      # Target function to run on start()
153            self.args = args        # Args to pass to target
154            self.kwargs = kwargs    # Additional kw args
155            self.pdata = pdata      # thread_pool for this class
156            # Logger for this thread
157            self.log = logging.getLogger("fedd.experiment_control")
158       
159        def run(self):
160            """
161            Emulate Thread.run, except add pool data manipulation and error
162            logging.
163            """
164            if self.pdata:
165                self.pdata.start()
166
167            if self.target:
168                try:
169                    self.rv = self.target(*self.args, **self.kwargs)
170                except service_error, s:
171                    self.exception = s
172                    self.log.error("Thread exception: %s %s" % \
173                            (s.code_string(), s.desc))
174                except:
175                    self.exception = sys.exc_info()[1]
176                    self.log.error(("Unexpected thread exception: %s" +\
177                            "Trace %s") % (self.exception,\
178                                traceback.format_exc()))
179            if self.pdata:
180                self.pdata.terminate()
181
182    call_RequestAccess = service_caller('RequestAccess')
183    call_ReleaseAccess = service_caller('ReleaseAccess')
184    call_StartSegment = service_caller('StartSegment')
185    call_TerminateSegment = service_caller('TerminateSegment')
186    call_Ns2Split = service_caller('Ns2Split')
187
188    def __init__(self, config=None, auth=None):
189        """
190        Intialize the various attributes, most from the config object
191        """
192
193        def parse_tarfile_list(tf):
194            """
195            Parse a tarfile list from the configuration.  This is a set of
196            paths and tarfiles separated by spaces.
197            """
198            rv = [ ]
199            if tf is not None:
200                tl = tf.split()
201                while len(tl) > 1:
202                    p, t = tl[0:2]
203                    del tl[0:2]
204                    rv.append((p, t))
205            return rv
206
207        self.thread_with_rv = experiment_control_local.pooled_thread
208        self.thread_pool = experiment_control_local.thread_pool
209        self.list_log = list_log.list_log
210
211        self.cert_file = config.get("experiment_control", "cert_file")
212        if self.cert_file:
213            self.cert_pwd = config.get("experiment_control", "cert_pwd")
214        else:
215            self.cert_file = config.get("globals", "cert_file")
216            self.cert_pwd = config.get("globals", "cert_pwd")
217
218        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
219                or config.get("globals", "trusted_certs")
220
221        self.repodir = config.get("experiment_control", "repodir")
222        self.repo_url = config.get("experiment_control", "repo_url", 
223                "https://users.isi.deterlab.net:23235");
224
225        self.exp_stem = "fed-stem"
226        self.log = logging.getLogger("fedd.experiment_control")
227        set_log_level(config, "experiment_control", self.log)
228        self.muxmax = 2
229        self.nthreads = 2
230        self.randomize_experiments = False
231
232        self.splitter = None
233        self.ssh_keygen = "/usr/bin/ssh-keygen"
234        self.ssh_identity_file = None
235
236
237        self.debug = config.getboolean("experiment_control", "create_debug")
238        self.cleanup = not config.getboolean("experiment_control", 
239                "leave_tmpfiles")
240        self.state_filename = config.get("experiment_control", 
241                "experiment_state")
242        self.store_filename = config.get("experiment_control", 
243                "synch_store")
244        self.store_url = config.get("experiment_control", "store_url")
245        self.splitter_url = config.get("experiment_control", "splitter_uri")
246        self.fedkit = parse_tarfile_list(\
247                config.get("experiment_control", "fedkit"))
248        self.gatewaykit = parse_tarfile_list(\
249                config.get("experiment_control", "gatewaykit"))
250        accessdb_file = config.get("experiment_control", "accessdb")
251
252        self.ssh_pubkey_file = config.get("experiment_control", 
253                "ssh_pubkey_file")
254        self.ssh_privkey_file = config.get("experiment_control",
255                "ssh_privkey_file")
256        # NB for internal master/slave ops, not experiment setup
257        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
258
259        self.overrides = set([])
260        ovr = config.get('experiment_control', 'overrides')
261        if ovr:
262            for o in ovr.split(","):
263                o = o.strip()
264                if o.startswith('fedid:'): o = o[len('fedid:'):]
265                self.overrides.add(fedid(hexstr=o))
266
267        self.state = { }
268        self.state_lock = Lock()
269        self.tclsh = "/usr/local/bin/otclsh"
270        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
271                config.get("experiment_control", "tcl_splitter",
272                        "/usr/testbed/lib/ns2ir/parse.tcl")
273        mapdb_file = config.get("experiment_control", "mapdb")
274        self.trace_file = sys.stderr
275
276        self.def_expstart = \
277                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
278                "/tmp/federate";
279        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
280                "FEDDIR/hosts";
281        self.def_gwstart = \
282                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
283                "/tmp/bridge.log";
284        self.def_mgwstart = \
285                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
286                "/tmp/bridge.log";
287        self.def_gwimage = "FBSD61-TUNNEL2";
288        self.def_gwtype = "pc";
289        self.local_access = { }
290
291        if auth:
292            self.auth = auth
293        else:
294            self.log.error(\
295                    "[access]: No authorizer initialized, creating local one.")
296            auth = authorizer()
297
298
299        if self.ssh_pubkey_file:
300            try:
301                f = open(self.ssh_pubkey_file, 'r')
302                self.ssh_pubkey = f.read()
303                f.close()
304            except IOError:
305                raise service_error(service_error.internal,
306                        "Cannot read sshpubkey")
307        else:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311        if not self.ssh_privkey_file:
312            raise service_error(service_error.internal, 
313                    "No SSH public key file?")
314
315
316        if mapdb_file:
317            self.read_mapdb(mapdb_file)
318        else:
319            self.log.warn("[experiment_control] No testbed map, using defaults")
320            self.tbmap = { 
321                    'deter':'https://users.isi.deterlab.net:23235',
322                    'emulab':'https://users.isi.deterlab.net:23236',
323                    'ucb':'https://users.isi.deterlab.net:23237',
324                    }
325
326        if accessdb_file:
327                self.read_accessdb(accessdb_file)
328        else:
329            raise service_error(service_error.internal,
330                    "No accessdb specified in config")
331
332        # Grab saved state.  OK to do this w/o locking because it's read only
333        # and only one thread should be in existence that can see self.state at
334        # this point.
335        if self.state_filename:
336            self.read_state()
337
338        if self.store_filename:
339            self.read_store()
340        else:
341            self.log.warning("No saved synch store")
342            self.synch_store = synch_store
343
344        # Dispatch tables
345        self.soap_services = {\
346                'New': soap_handler('New', self.new_experiment),
347                'Create': soap_handler('Create', self.create_experiment),
348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
352                'Terminate': soap_handler('Terminate', 
353                    self.terminate_experiment),
354                'GetValue': soap_handler('GetValue', self.GetValue),
355                'SetValue': soap_handler('SetValue', self.SetValue),
356        }
357
358        self.xmlrpc_services = {\
359                'New': xmlrpc_handler('New', self.new_experiment),
360                'Create': xmlrpc_handler('Create', self.create_experiment),
361                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
362                'Vis': xmlrpc_handler('Vis', self.get_vis),
363                'Info': xmlrpc_handler('Info', self.get_info),
364                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
365                'Terminate': xmlrpc_handler('Terminate',
366                    self.terminate_experiment),
367                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
368                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
369        }
370
371    # Call while holding self.state_lock
372    def write_state(self):
373        """
374        Write a new copy of experiment state after copying the existing state
375        to a backup.
376
377        State format is a simple pickling of the state dictionary.
378        """
379        if os.access(self.state_filename, os.W_OK):
380            copy_file(self.state_filename, \
381                    "%s.bak" % self.state_filename)
382        try:
383            f = open(self.state_filename, 'w')
384            pickle.dump(self.state, f)
385        except IOError, e:
386            self.log.error("Can't write file %s: %s" % \
387                    (self.state_filename, e))
388        except pickle.PicklingError, e:
389            self.log.error("Pickling problem: %s" % e)
390        except TypeError, e:
391            self.log.error("Pickling problem (TypeError): %s" % e)
392
393    @staticmethod
394    def get_alloc_ids(state):
395        """
396        Pull the fedids of the identifiers of each allocation from the
397        state.  Again, a dict dive that's best isolated.
398
399        Used by read_store and read state
400        """
401
402        return [ f['allocID']['fedid'] 
403                for f in state.get('federant',[]) \
404                    if f.has_key('allocID') and \
405                        f['allocID'].has_key('fedid')]
406
407    # Call while holding self.state_lock
408    def read_state(self):
409        """
410        Read a new copy of experiment state.  Old state is overwritten.
411
412        State format is a simple pickling of the state dictionary.
413        """
414       
415        def get_experiment_id(state):
416            """
417            Pull the fedid experimentID out of the saved state.  This is kind
418            of a gross walk through the dict.
419            """
420
421            if state.has_key('experimentID'):
422                for e in state['experimentID']:
423                    if e.has_key('fedid'):
424                        return e['fedid']
425                else:
426                    return None
427            else:
428                return None
429
430        try:
431            f = open(self.state_filename, "r")
432            self.state = pickle.load(f)
433            self.log.debug("[read_state]: Read state from %s" % \
434                    self.state_filename)
435        except IOError, e:
436            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
437                    % (self.state_filename, e))
438        except pickle.UnpicklingError, e:
439            self.log.warning(("[read_state]: No saved state: " + \
440                    "Unpickling failed: %s") % e)
441       
442        for s in self.state.values():
443            try:
444
445                eid = get_experiment_id(s)
446                if eid : 
447                    # Give the owner rights to the experiment
448                    self.auth.set_attribute(s['owner'], eid)
449                    # And holders of the eid as well
450                    self.auth.set_attribute(eid, eid)
451                    # allow overrides to control experiments as well
452                    for o in self.overrides:
453                        self.auth.set_attribute(o, eid)
454                    # Set permissions to allow reading of the software repo, if
455                    # any, as well.
456                    for a in self.get_alloc_ids(s):
457                        self.auth.set_attribute(a, 'repo/%s' % eid)
458                else:
459                    raise KeyError("No experiment id")
460            except KeyError, e:
461                self.log.warning("[read_state]: State ownership or identity " +\
462                        "misformatted in %s: %s" % (self.state_filename, e))
463
464
465    def read_accessdb(self, accessdb_file):
466        """
467        Read the mapping from fedids that can create experiments to their name
468        in the 3-level access namespace.  All will be asserted from this
469        testbed and can include the local username and porject that will be
470        asserted on their behalf by this fedd.  Each fedid is also added to the
471        authorization system with the "create" attribute.
472        """
473        self.accessdb = {}
474        # These are the regexps for parsing the db
475        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
476        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
477                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
478        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
479                "\s*->\s*(" + name_expr + ")\s*$")
480        lineno = 0
481
482        # Parse the mappings and store in self.authdb, a dict of
483        # fedid -> (proj, user)
484        try:
485            f = open(accessdb_file, "r")
486            for line in f:
487                lineno += 1
488                line = line.strip()
489                if len(line) == 0 or line.startswith('#'):
490                    continue
491                m = project_line.match(line)
492                if m:
493                    fid = fedid(hexstr=m.group(1))
494                    project, user = m.group(2,3)
495                    if not self.accessdb.has_key(fid):
496                        self.accessdb[fid] = []
497                    self.accessdb[fid].append((project, user))
498                    continue
499
500                m = user_line.match(line)
501                if m:
502                    fid = fedid(hexstr=m.group(1))
503                    project = None
504                    user = m.group(2)
505                    if not self.accessdb.has_key(fid):
506                        self.accessdb[fid] = []
507                    self.accessdb[fid].append((project, user))
508                    continue
509                self.log.warn("[experiment_control] Error parsing access " +\
510                        "db %s at line %d" %  (accessdb_file, lineno))
511        except IOError:
512            raise service_error(service_error.internal,
513                    "Error opening/reading %s as experiment " +\
514                            "control accessdb" %  accessdb_file)
515        f.close()
516
517        # Initialize the authorization attributes
518        for fid in self.accessdb.keys():
519            self.auth.set_attribute(fid, 'create')
520            self.auth.set_attribute(fid, 'new')
521
522    def read_mapdb(self, file):
523        """
524        Read a simple colon separated list of mappings for the
525        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
526        """
527
528        self.tbmap = { }
529        lineno =0
530        try:
531            f = open(file, "r")
532            for line in f:
533                lineno += 1
534                line = line.strip()
535                if line.startswith('#') or len(line) == 0:
536                    continue
537                try:
538                    label, url = line.split(':', 1)
539                    self.tbmap[label] = url
540                except ValueError, e:
541                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
542                            "map db: %s %s" % (lineno, line, e))
543        except IOError, e:
544            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
545                    "open %s: %s" % (file, e))
546        f.close()
547
548    def read_store(self):
549        try:
550            self.synch_store = synch_store()
551            self.synch_store.load(self.store_filename)
552            self.log.debug("[read_store]: Read store from %s" % \
553                    self.store_filename)
554        except IOError, e:
555            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
556                    % (self.state_filename, e))
557            self.synch_store = synch_store()
558
559        # Set the initial permissions on data in the store.  XXX: This ad hoc
560        # authorization attribute initialization is getting out of hand.
561        for k in self.synch_store.all_keys():
562            try:
563                if k.startswith('fedid:'):
564                    fid = fedid(hexstr=k[6:46])
565                    if self.state.has_key(fid):
566                        for a in self.get_alloc_ids(self.state[fid]):
567                            self.auth.set_attribute(a, k)
568            except ValueError, e:
569                self.log.warn("Cannot deduce permissions for %s" % k)
570
571
572    def write_store(self):
573        """
574        Write a new copy of synch_store after writing current state
575        to a backup.  We use the internal synch_store pickle method to avoid
576        incinsistent data.
577
578        State format is a simple pickling of the store.
579        """
580        if os.access(self.store_filename, os.W_OK):
581            copy_file(self.store_filename, \
582                    "%s.bak" % self.store_filename)
583        try:
584            self.synch_store.save(self.store_filename)
585        except IOError, e:
586            self.log.error("Can't write file %s: %s" % \
587                    (self.store_filename, e))
588        except TypeError, e:
589            self.log.error("Pickling problem (TypeError): %s" % e)
590
591       
592    def generate_ssh_keys(self, dest, type="rsa" ):
593        """
594        Generate a set of keys for the gateways to use to talk.
595
596        Keys are of type type and are stored in the required dest file.
597        """
598        valid_types = ("rsa", "dsa")
599        t = type.lower();
600        if t not in valid_types: raise ValueError
601        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
602
603        try:
604            trace = open("/dev/null", "w")
605        except IOError:
606            raise service_error(service_error.internal,
607                    "Cannot open /dev/null??");
608
609        # May raise CalledProcessError
610        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
611        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
612        if rv != 0:
613            raise service_error(service_error.internal, 
614                    "Cannot generate nonce ssh keys.  %s return code %d" \
615                            % (self.ssh_keygen, rv))
616
617    def gentopo(self, str):
618        """
619        Generate the topology dtat structure from the splitter's XML
620        representation of it.
621
622        The topology XML looks like:
623            <experiment>
624                <nodes>
625                    <node><vname></vname><ips>ip1:ip2</ips></node>
626                </nodes>
627                <lans>
628                    <lan>
629                        <vname></vname><vnode></vnode><ip></ip>
630                        <bandwidth></bandwidth><member>node:port</member>
631                    </lan>
632                </lans>
633        """
634        class topo_parse:
635            """
636            Parse the topology XML and create the dats structure.
637            """
638            def __init__(self):
639                # Typing of the subelements for data conversion
640                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
641                self.int_subelements = ( 'bandwidth',)
642                self.float_subelements = ( 'delay',)
643                # The final data structure
644                self.nodes = [ ]
645                self.lans =  [ ]
646                self.topo = { \
647                        'node': self.nodes,\
648                        'lan' : self.lans,\
649                    }
650                self.element = { }  # Current element being created
651                self.chars = ""     # Last text seen
652
653            def end_element(self, name):
654                # After each sub element the contents is added to the current
655                # element or to the appropriate list.
656                if name == 'node':
657                    self.nodes.append(self.element)
658                    self.element = { }
659                elif name == 'lan':
660                    self.lans.append(self.element)
661                    self.element = { }
662                elif name in self.str_subelements:
663                    self.element[name] = self.chars
664                    self.chars = ""
665                elif name in self.int_subelements:
666                    self.element[name] = int(self.chars)
667                    self.chars = ""
668                elif name in self.float_subelements:
669                    self.element[name] = float(self.chars)
670                    self.chars = ""
671
672            def found_chars(self, data):
673                self.chars += data.rstrip()
674
675
676        tp = topo_parse();
677        parser = xml.parsers.expat.ParserCreate()
678        parser.EndElementHandler = tp.end_element
679        parser.CharacterDataHandler = tp.found_chars
680
681        parser.Parse(str)
682
683        return tp.topo
684       
685
686    def genviz(self, topo):
687        """
688        Generate the visualization the virtual topology
689        """
690
691        neato = "/usr/local/bin/neato"
692        # These are used to parse neato output and to create the visualization
693        # file.
694        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
695        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
696                "%s</type></node>"
697
698        try:
699            # Node names
700            nodes = [ n['vname'] for n in topo['node'] ]
701            topo_lans = topo['lan']
702        except KeyError, e:
703            raise service_error(service_error.internal, "Bad topology: %s" %e)
704
705        lans = { }
706        links = { }
707
708        # Walk through the virtual topology, organizing the connections into
709        # 2-node connections (links) and more-than-2-node connections (lans).
710        # When a lan is created, it's added to the list of nodes (there's a
711        # node in the visualization for the lan).
712        for l in topo_lans:
713            if links.has_key(l['vname']):
714                if len(links[l['vname']]) < 2:
715                    links[l['vname']].append(l['vnode'])
716                else:
717                    nodes.append(l['vname'])
718                    lans[l['vname']] = links[l['vname']]
719                    del links[l['vname']]
720                    lans[l['vname']].append(l['vnode'])
721            elif lans.has_key(l['vname']):
722                lans[l['vname']].append(l['vnode'])
723            else:
724                links[l['vname']] = [ l['vnode'] ]
725
726
727        # Open up a temporary file for dot to turn into a visualization
728        try:
729            df, dotname = tempfile.mkstemp()
730            dotfile = os.fdopen(df, 'w')
731        except IOError:
732            raise service_error(service_error.internal,
733                    "Failed to open file in genviz")
734
735        try:
736            dnull = open('/dev/null', 'w')
737        except IOError:
738            service_error(service_error.internal,
739                    "Failed to open /dev/null in genviz")
740
741        # Generate a dot/neato input file from the links, nodes and lans
742        try:
743            print >>dotfile, "graph G {"
744            for n in nodes:
745                print >>dotfile, '\t"%s"' % n
746            for l in links.keys():
747                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
748            for l in lans.keys():
749                for n in lans[l]:
750                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
751            print >>dotfile, "}"
752            dotfile.close()
753        except TypeError:
754            raise service_error(service_error.internal,
755                    "Single endpoint link in vtopo")
756        except IOError:
757            raise service_error(service_error.internal, "Cannot write dot file")
758
759        # Use dot to create a visualization
760        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
761                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
762                close_fds=True)
763        dnull.close()
764
765        # Translate dot to vis format
766        vis_nodes = [ ]
767        vis = { 'node': vis_nodes }
768        for line in dot.stdout:
769            m = vis_re.match(line)
770            if m:
771                vn = m.group(1)
772                vis_node = {'name': vn, \
773                        'x': float(m.group(2)),\
774                        'y' : float(m.group(3)),\
775                    }
776                if vn in links.keys() or vn in lans.keys():
777                    vis_node['type'] = 'lan'
778                else:
779                    vis_node['type'] = 'node'
780                vis_nodes.append(vis_node)
781        rv = dot.wait()
782
783        os.remove(dotname)
784        if rv == 0 : return vis
785        else: return None
786
787    def get_access(self, tb, nodes, tbparam, master, export_project,
788            access_user, services):
789        """
790        Get access to testbed through fedd and set the parameters for that tb
791        """
792        uri = self.tbmap.get(tb, None)
793        if not uri:
794            raise service_error(service_error.server_config, 
795                    "Unknown testbed: %s" % tb)
796
797        # Tweak search order so that if there are entries in access_user that
798        # have a project matching the export project, we try them first
799        if export_project and export_project.has_key('localname'): 
800            pn = export_project['localname'] 
801               
802            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
803            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
804        else: 
805            access_sequence = access_user
806
807        for p, u in access_sequence: 
808            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
809                    "to %s") %  ((p or "None"), u, uri))
810
811            if p:
812                # Request with user and project specified
813                req = {\
814                        'destinationTestbed' : { 'uri' : uri },
815                        'credential': [ "project: %s" % p, "user: %s"  % u],
816                        'allocID' : { 'localname': 'test' },
817                    }
818            else:
819                # Request with only user specified
820                req = {\
821                        'destinationTestbed' : { 'uri' : uri },
822                        'credential': [ 'user: %s' % u ],
823                        'user':  [ {'userID': { 'localname': u } } ],
824                        'allocID' : { 'localname': 'test' },
825                    }
826
827            if tb == master:
828                # NB, the export_project parameter is a dict that includes
829                # the type
830                req['exportProject'] = export_project
831                req['service'] = [
832                        { 'name': 'userconfig', 'visibility': 'export'},
833                        { 'name': 'SMB', 'visibility': 'export'},
834                        { 'name': 'seer', 'visibility': 'export'},
835                        { 'name': 'tmcd', 'visibility': 'export'},
836                    ]
837
838            # node resources if any
839            if nodes != None and len(nodes) > 0:
840                rnodes = [ ]
841                for n in nodes:
842                    rn = { }
843                    image, hw, count = n.split(":")
844                    if image: rn['image'] = [ image ]
845                    if hw: rn['hardware'] = [ hw ]
846                    if count and int(count) >0 : rn['count'] = int(count)
847                    rnodes.append(rn)
848                req['resources']= { }
849                req['resources']['node'] = rnodes
850
851            try:
852                if self.local_access.has_key(uri):
853                    # Local access call
854                    req = { 'RequestAccessRequestBody' : req }
855                    r = self.local_access[uri].RequestAccess(req, 
856                            fedid(file=self.cert_file))
857                    r = { 'RequestAccessResponseBody' : r }
858                else:
859                    r = self.call_RequestAccess(uri, req, 
860                            self.cert_file, self.cert_pwd, self.trusted_certs)
861            except service_error, e:
862                if e.code == service_error.access:
863                    self.log.debug("[get_access] Access denied")
864                    r = None
865                    continue
866                else:
867                    raise e
868
869            if r.has_key('RequestAccessResponseBody'):
870                # Through to here we have a valid response, not a fault.
871                # Access denied is a fault, so something better or worse than
872                # access denied has happened.
873                r = r['RequestAccessResponseBody']
874                self.log.debug("[get_access] Access granted")
875                break
876            else:
877                raise service_error(service_error.protocol,
878                        "Bad proxy response")
879       
880        if not r:
881            raise service_error(service_error.access, 
882                    "Access denied by %s (%s)" % (tb, uri))
883
884        tbparam[tb] = { 
885                "allocID" : r['allocID'],
886                "uri": uri,
887                }
888        if 'service' in r: 
889            services.extend(r['service'])
890
891        # Add attributes to parameter space.  We don't allow attributes to
892        # overlay any parameters already installed.
893        for a in r.get('fedAttr', []):
894            try:
895                if a['attribute'] and \
896                        isinstance(a['attribute'], basestring)\
897                        and not tbparam[tb].has_key(a['attribute'].lower()):
898                    tbparam[tb][a['attribute'].lower()] = a['value']
899            except KeyError:
900                self.log.error("Bad attribute in response: %s" % a)
901
902    def release_access(self, tb, aid, uri=None):
903        """
904        Release access to testbed through fedd
905        """
906
907        if not uri:
908            uri = self.tbmap.get(tb, None)
909        if not uri:
910            raise service_error(service_error.server_config, 
911                    "Unknown testbed: %s" % tb)
912
913        if self.local_access.has_key(uri):
914            resp = self.local_access[uri].ReleaseAccess(\
915                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
916                    fedid(file=self.cert_file))
917            resp = { 'ReleaseAccessResponseBody': resp } 
918        else:
919            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
920                    self.cert_file, self.cert_pwd, self.trusted_certs)
921
922        # better error coding
923
924    def remote_splitter(self, uri, desc, master):
925
926        req = {
927                'description' : { 'ns2description': desc },
928                'master': master,
929                'include_fedkit': bool(self.fedkit),
930                'include_gatewaykit': bool(self.gatewaykit)
931            }
932
933        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
934                self.trusted_certs)
935
936        if r.has_key('Ns2SplitResponseBody'):
937            r = r['Ns2SplitResponseBody']
938            if r.has_key('output'):
939                return r['output'].splitlines()
940            else:
941                raise service_error(service_error.protocol, 
942                        "Bad splitter response (no output)")
943        else:
944            raise service_error(service_error.protocol, "Bad splitter response")
945
946    class start_segment:
947        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
948                cert_pwd=None, trusted_certs=None, caller=None,
949                log_collector=None):
950            self.log = log
951            self.debug = debug
952            self.cert_file = cert_file
953            self.cert_pwd = cert_pwd
954            self.trusted_certs = None
955            self.caller = caller
956            self.testbed = testbed
957            self.log_collector = log_collector
958            self.response = None
959
960        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
961                services=None):
962            req = {
963                    'allocID': { 'fedid' : aid }, 
964                    'segmentdescription': { 
965                        'topdldescription': topo.to_dict(),
966                    },
967                    'master': master,
968                }
969
970            if connInfo:
971                req['connection'] = connInfo
972            # Add services to request.  The master exports, everyone else
973            # imports.
974            if services:
975                svcs = [ x.copy() for x in services]
976                for s in svcs:
977                    if master: s['visibility'] = 'export'
978                    else: s['visibility'] = 'import'
979                req['service'] = svcs
980            if attrs:
981                req['fedAttr'] = attrs
982
983            try:
984                self.log.debug("Calling StartSegment at %s " % uri)
985                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
986                        self.trusted_certs)
987                if r.has_key('StartSegmentResponseBody'):
988                    lval = r['StartSegmentResponseBody'].get('allocationLog',
989                            None)
990                    if lval and self.log_collector:
991                        for line in  lval.splitlines(True):
992                            self.log_collector.write(line)
993                    self.response = r
994                else:
995                    raise service_error(service_error.internal, 
996                            "Bad response!?: %s" %r)
997                return True
998            except service_error, e:
999                self.log.error("Start segment failed on %s: %s" % \
1000                        (self.testbed, e))
1001                return False
1002
1003
1004
1005    class terminate_segment:
1006        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1007                cert_pwd=None, trusted_certs=None, caller=None):
1008            self.log = log
1009            self.debug = debug
1010            self.cert_file = cert_file
1011            self.cert_pwd = cert_pwd
1012            self.trusted_certs = None
1013            self.caller = caller
1014            self.testbed = testbed
1015
1016        def __call__(self, uri, aid ):
1017            req = {
1018                    'allocID': aid , 
1019                }
1020            try:
1021                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1022                        self.trusted_certs)
1023                return True
1024            except service_error, e:
1025                self.log.error("Terminate segment failed on %s: %s" % \
1026                        (self.testbed, e))
1027                return False
1028   
1029
1030    def allocate_resources(self, allocated, master, eid, expid, 
1031            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
1032            attrs=None, connInfo={}, services=[]):
1033
1034        started = { }           # Testbeds where a sub-experiment started
1035                                # successfully
1036
1037        # XXX
1038        fail_soft = False
1039
1040        log = alloc_log or self.log
1041
1042        thread_pool = self.thread_pool(self.nthreads)
1043        threads = [ ]
1044
1045        for tb in allocated.keys():
1046            # Create and start a thread to start the segment, and save it
1047            # to get the return value later
1048            thread_pool.wait_for_slot()
1049            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1050            if not uri:
1051                raise service_error(service_error.internal, 
1052                        "Unknown testbed %s !?" % tb)
1053
1054            if tbparams[tb].has_key('allocID') and \
1055                    tbparams[tb]['allocID'].has_key('fedid'):
1056                aid = tbparams[tb]['allocID']['fedid']
1057            else:
1058                raise service_error(service_error.internal, 
1059                        "No alloc id for testbed %s !?" % tb)
1060
1061            t  = self.pooled_thread(\
1062                    target=self.start_segment(log=log, debug=self.debug,
1063                        testbed=tb, cert_file=self.cert_file,
1064                        cert_pwd=self.cert_pwd,
1065                        trusted_certs=self.trusted_certs,
1066                        caller=self.call_StartSegment,
1067                        log_collector=log_collector), 
1068                    args=(uri, aid, topo[tb], tb == master, 
1069                        attrs, connInfo[tb], services),
1070                    name=tb,
1071                    pdata=thread_pool, trace_file=self.trace_file)
1072            threads.append(t)
1073            t.start()
1074
1075        # Wait until all finish (keep pinging the log, though)
1076        mins = 0
1077        revoked = False
1078        while not thread_pool.wait_for_all_done(60.0):
1079            mins += 1
1080            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1081                    % mins)
1082            if not revoked and \
1083                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1084                # a testbed has failed.  Revoke this experiment's
1085                # synchronizarion values so that sub experiments will not
1086                # deadlock waiting for synchronization that will never happen
1087                self.log.info("A subexperiment has failed to swap in, " + \
1088                        "revoking synch keys")
1089                var_key = "fedid:%s" % expid
1090                for k in self.synch_store.all_keys():
1091                    if len(k) > 45 and k[0:46] == var_key:
1092                        self.synch_store.revoke_key(k)
1093                revoked = True
1094
1095        failed = [ t.getName() for t in threads if not t.rv ]
1096        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1097
1098        # If one failed clean up, unless fail_soft is set
1099        if failed:
1100            if not fail_soft:
1101                thread_pool.clear()
1102                for tb in succeeded:
1103                    # Create and start a thread to stop the segment
1104                    thread_pool.wait_for_slot()
1105                    uri = tbparams[tb]['uri']
1106                    t  = self.pooled_thread(\
1107                            target=self.terminate_segment(log=log,
1108                                testbed=tb,
1109                                cert_file=self.cert_file, 
1110                                cert_pwd=self.cert_pwd,
1111                                trusted_certs=self.trusted_certs,
1112                                caller=self.call_TerminateSegment),
1113                            args=(uri, tbparams[tb]['federant']['allocID']),
1114                            name=tb,
1115                            pdata=thread_pool, trace_file=self.trace_file)
1116                    t.start()
1117                # Wait until all finish (if any are being stopped)
1118                if succeeded:
1119                    thread_pool.wait_for_all_done()
1120
1121                # release the allocations
1122                for tb in tbparams.keys():
1123                    self.release_access(tb, tbparams[tb]['allocID'],
1124                            tbparams[tb].get('uri', None))
1125                # Remove the placeholder
1126                self.state_lock.acquire()
1127                self.state[eid]['experimentStatus'] = 'failed'
1128                if self.state_filename: self.write_state()
1129                self.state_lock.release()
1130
1131                log.error("Swap in failed on %s" % ",".join(failed))
1132                return
1133        else:
1134            log.info("[start_segment]: Experiment %s active" % eid)
1135
1136
1137        # Walk up tmpdir, deleting as we go
1138        if self.cleanup:
1139            log.debug("[start_experiment]: removing %s" % tmpdir)
1140            for path, dirs, files in os.walk(tmpdir, topdown=False):
1141                for f in files:
1142                    os.remove(os.path.join(path, f))
1143                for d in dirs:
1144                    os.rmdir(os.path.join(path, d))
1145            os.rmdir(tmpdir)
1146        else:
1147            log.debug("[start_experiment]: not removing %s" % tmpdir)
1148
1149        # Insert the experiment into our state and update the disk copy
1150        self.state_lock.acquire()
1151        self.state[expid]['experimentStatus'] = 'active'
1152        self.state[eid] = self.state[expid]
1153        if self.state_filename: self.write_state()
1154        self.state_lock.release()
1155        return
1156
1157
1158    def add_kit(self, e, kit):
1159        """
1160        Add a Software object created from the list of (install, location)
1161        tuples passed as kit  to the software attribute of an object e.  We
1162        do this enough to break out the code, but it's kind of a hack to
1163        avoid changing the old tuple rep.
1164        """
1165
1166        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1167
1168        if isinstance(e.software, list): e.software.extend(s)
1169        else: e.software = s
1170
1171
1172    def create_experiment_state(self, fid, req, expid, expcert, 
1173            state='starting'):
1174        """
1175        Create the initial entry in the experiment's state.  The expid and
1176        expcert are the experiment's fedid and certifacte that represents that
1177        ID, which are installed in the experiment state.  If the request
1178        includes a suggested local name that is used if possible.  If the local
1179        name is already taken by an experiment owned by this user that has
1180        failed, it is overwritten.  Otherwise new letters are added until a
1181        valid localname is found.  The generated local name is returned.
1182        """
1183
1184        if req.has_key('experimentID') and \
1185                req['experimentID'].has_key('localname'):
1186            overwrite = False
1187            eid = req['experimentID']['localname']
1188            # If there's an old failed experiment here with the same local name
1189            # and accessible by this user, we'll overwrite it, otherwise we'll
1190            # fall through and do the collision avoidance.
1191            old_expid = self.get_experiment_fedid(eid)
1192            if old_expid and self.check_experiment_access(fid, old_expid):
1193                self.state_lock.acquire()
1194                status = self.state[eid].get('experimentStatus', None)
1195                if status and status == 'failed':
1196                    # remove the old access attribute
1197                    self.auth.unset_attribute(fid, old_expid)
1198                    overwrite = True
1199                    del self.state[eid]
1200                    del self.state[old_expid]
1201                self.state_lock.release()
1202            self.state_lock.acquire()
1203            while (self.state.has_key(eid) and not overwrite):
1204                eid += random.choice(string.ascii_letters)
1205            # Initial state
1206            self.state[eid] = {
1207                    'experimentID' : \
1208                            [ { 'localname' : eid }, {'fedid': expid } ],
1209                    'experimentStatus': state,
1210                    'experimentAccess': { 'X509' : expcert },
1211                    'owner': fid,
1212                    'log' : [],
1213                }
1214            self.state[expid] = self.state[eid]
1215            if self.state_filename: self.write_state()
1216            self.state_lock.release()
1217        else:
1218            eid = self.exp_stem
1219            for i in range(0,5):
1220                eid += random.choice(string.ascii_letters)
1221            self.state_lock.acquire()
1222            while (self.state.has_key(eid)):
1223                eid = self.exp_stem
1224                for i in range(0,5):
1225                    eid += random.choice(string.ascii_letters)
1226            # Initial state
1227            self.state[eid] = {
1228                    'experimentID' : \
1229                            [ { 'localname' : eid }, {'fedid': expid } ],
1230                    'experimentStatus': state,
1231                    'experimentAccess': { 'X509' : expcert },
1232                    'owner': fid,
1233                    'log' : [],
1234                }
1235            self.state[expid] = self.state[eid]
1236            if self.state_filename: self.write_state()
1237            self.state_lock.release()
1238
1239        return eid
1240
1241
1242    def allocate_ips_to_topo(self, top):
1243        """
1244        Add an ip4_address attribute to all the hosts in the topology, based on
1245        the shared substrates on which they sit.  An /etc/hosts file is also
1246        created and returned as a list of hostfiles entries.  We also return
1247        the allocator, because we may need to allocate IPs to portals
1248        (specifically DRAGON portals).
1249        """
1250        subs = sorted(top.substrates, 
1251                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1252                reverse=True)
1253        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1254        ifs = { }
1255        hosts = [ ]
1256
1257        for idx, s in enumerate(subs):
1258            net_size = len(s.interfaces)+2
1259
1260            a = ips.allocate(net_size)
1261            if a :
1262                base, num = a
1263                if num < net_size: 
1264                    raise service_error(service_error.internal,
1265                            "Allocator returned wrong number of IPs??")
1266            else:
1267                raise service_error(service_error.req, 
1268                        "Cannot allocate IP addresses")
1269            mask = 2
1270            while 2 **mask < net_size:
1271                mask += 1
1272
1273            netmask = ((2**32-1) ^ (mask**2 -1))
1274
1275            base += 1
1276            for i in s.interfaces:
1277                i.attribute.append(
1278                        topdl.Attribute('ip4_address', 
1279                            "%s" % ip_addr(base)))
1280                i.attribute.append(
1281                        topdl.Attribute('ip4_netmask', 
1282                            "%s" % ip_addr(int(netmask))))
1283
1284                hname = i.element.name[0]
1285                if ifs.has_key(hname):
1286                    hosts.append("%s\t%s-%s %s-%d" % \
1287                            (ip_addr(base), hname, s.name, hname,
1288                                ifs[hname]))
1289                else:
1290                    ifs[hname] = 0
1291                    hosts.append("%s\t%s-%s %s-%d %s" % \
1292                            (ip_addr(base), hname, s.name, hname,
1293                                ifs[hname], hname))
1294
1295                ifs[hname] += 1
1296                base += 1
1297        return hosts, ips
1298
1299    def get_access_to_testbeds(self, testbeds, access_user, 
1300            export_project, master, allocated, tbparams, services):
1301        """
1302        Request access to the various testbeds required for this instantiation
1303        (passed in as testbeds).  User, access_user, expoert_project and master
1304        are used to construct the correct requests.  Per-testbed parameters are
1305        returned in tbparams.
1306        """
1307        for tb in testbeds:
1308            self.get_access(tb, None, tbparams, master,
1309                    export_project, access_user, services)
1310            allocated[tb] = 1
1311
1312    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1313        """
1314        Create the sub-topologies that are needed for experiment instantiation.
1315        """
1316        for tb in testbeds:
1317            topo[tb] = top.clone()
1318            to_delete = [ ]
1319            # XXX: copy in for loop to simplify
1320            for e in topo[tb].elements:
1321                etb = e.get_attribute('testbed')
1322                if etb and etb != tb:
1323                    for i in e.interface:
1324                        for s in i.subs:
1325                            try:
1326                                s.interfaces.remove(i)
1327                            except ValueError:
1328                                raise service_error(service_error.internal,
1329                                        "Can't remove interface??")
1330                    to_delete.append(e)
1331            for e in to_delete:
1332                topo[tb].elements.remove(e)
1333            topo[tb].make_indices()
1334
1335            for e in [ e for e in topo[tb].elements \
1336                    if isinstance(e,topdl.Computer)]:
1337                if self.fedkit: self.add_kit(e, self.fedkit)
1338
1339    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1340            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
1341            expid=None):
1342        """
1343        Return a new internet portal node and a dict with the connectionInfo to
1344        be attached.
1345        """
1346        dproject = tbparams[dt].get('project', 'project')
1347        ddomain = tbparams[dt].get('domain', ".example.com")
1348        mdomain = tbparams[master].get('domain', '.example.com')
1349        mproject = tbparams[master].get('project', 'project')
1350        muser = tbparams[master].get('user', 'root')
1351        smbshare = tbparams[master].get('smbshare', 'USERS')
1352
1353        if st == master or dt == master:
1354            active = ("%s" % (st == master))
1355        else:
1356            active = ("%s" % (st > dt))
1357
1358        ifaces = [ ]
1359        for sub, attrs in iface_desc:
1360            inf = topdl.Interface(
1361                    name="inf%03d" % len(ifaces),
1362                    substrate=sub,
1363                    attribute=[
1364                        topdl.Attribute(
1365                            attribute=n,
1366                            value = v)
1367                        for n, v in attrs
1368                        ]
1369                    )
1370            ifaces.append(inf)
1371        if conn_type == "ssh":
1372            try:
1373                aid = tbparams[st]['allocID']['fedid']
1374            except:
1375                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
1376                        % st)
1377                aid = None
1378            info = {
1379                    "type" : conn_type, 
1380                    "portal": myname,
1381                    'fedAttr': [ 
1382                            { 'attribute': 'masterdomain', 'value': mdomain},
1383                            { 'attribute': 'masterexperiment', 'value': 
1384                                "%s/%s" % (mproject, eid)},
1385                            { 'attribute': 'active', 'value': active},
1386                            # Move to SMB service description
1387                            { 'attribute': 'masteruser', 'value': muser},
1388                            { 'attribute': 'smbshare', 'value': smbshare},
1389                        ],
1390                    'parameter': [
1391                        {
1392                            'name': 'peer',
1393                            'key': 'fedid:%s/%s' % (expid, myname),
1394                            'store': self.store_url,
1395                            'type': 'output',
1396                        },
1397                        {
1398                            'name': 'ssh_port',
1399                            'key': 'fedid:%s/%s-port' % (expid, myname),
1400                            'store': self.store_url,
1401                            'type': 'output',
1402                        },
1403                        {
1404                            'name': 'peer',
1405                            'key': 'fedid:%s/%s' % (expid, desthost),
1406                            'store': self.store_url,
1407                            'type': 'input',
1408                        },
1409                        {
1410                            'name': 'ssh_port',
1411                            'key': 'fedid:%s/%s-port' % (expid, desthost),
1412                            'store': self.store_url,
1413                            'type': 'input',
1414                        },
1415                        ]
1416                    }
1417            # Give this allocation the rights to access the key of the
1418            # peers
1419            if aid:
1420                for h in (myname, desthost):
1421                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
1422                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
1423                            (expid, h))
1424            else:
1425                self.log.error("No aid for %s in new_portal_node" % st)
1426        else:
1427            info = None
1428
1429        return (topdl.Computer(
1430                name=myname,
1431                attribute=[ 
1432                    topdl.Attribute(attribute=n,value=v)
1433                        for n, v in (\
1434                            ('portal', 'true'),
1435                            ('portal_type', portal_type), 
1436                        )
1437                    ],
1438                interface=ifaces,
1439                ), info)
1440
1441    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
1442        ddomain = tbparams[dt].get('domain', ".example.com")
1443        dproject = tbparams[dt].get('project', 'project')
1444        tsubstrate = \
1445                topdl.Substrate(name='%s-%s' % (st, dt),
1446                        attribute= [
1447                            topdl.Attribute(
1448                                attribute='portal',
1449                                value='true')
1450                            ]
1451                        )
1452        segment_element = topdl.Segment(
1453                id= tbparams[dt]['allocID'],
1454                type='emulab',
1455                uri = self.tbmap.get(dt, None),
1456                interface=[ 
1457                    topdl.Interface(
1458                        substrate=tsubstrate.name),
1459                    ],
1460                attribute = [
1461                    topdl.Attribute(attribute=n, value=v)
1462                        for n, v in (\
1463                            ('domain', ddomain),
1464                            ('experiment', "%s/%s" % \
1465                                    (dproject, eid)),)
1466                    ],
1467                )
1468
1469        return (tsubstrate, segment_element)
1470
1471    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
1472        if sub.capacity is None:
1473            raise service_error(service_error.internal,
1474                    "Cannot DRAGON split substrate w/o capacity")
1475        segs = [ ]
1476        substr = topdl.Substrate(name="dragon%d" % idx, 
1477                capacity=sub.capacity.clone(),
1478                attribute=[ topdl.Attribute(attribute=n, value=v)
1479                    for n, v, in (\
1480                            ('vlan', 'unassigned%d' % idx),)])
1481        name = "dragon%d" % idx
1482        store_key = 'fedid:%s/vlan%d' % (expid, idx)
1483        for tb in tbs.keys():
1484            seg = topdl.Segment(
1485                    id = tbparams[tb]['allocID'],
1486                    type='emulab',
1487                    uri = self.tbmap.get(tb, None),
1488                    interface=[ 
1489                        topdl.Interface(
1490                            substrate=substr.name),
1491                        ],
1492                    attribute=[ topdl.Attribute(
1493                        attribute='dragon_endpoint', 
1494                        value=tbparams[tb]['dragon']),
1495                        ]
1496                    )
1497            if tbparams[tb].has_key('vlans'):
1498                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1499            segs.append(seg)
1500
1501            # Give this allocation the rights to access the key of the
1502            # vlan_id
1503            try:
1504                aid = tbparams[tb]['allocID']['fedid']
1505                self.auth.set_attribute(aid, store_key)
1506            except:
1507                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
1508                        % tb)
1509
1510        connInfo[name] = [ { 
1511            'type': 'transit',
1512            'parameter': [ { 
1513                'name': 'vlan_id',
1514                'key': store_key,
1515                'store': self.store_url,
1516                'type': 'output'
1517                } ]
1518            } ]
1519
1520        topo[name] = \
1521                topdl.Topology(substrates=[substr], elements=segs,
1522                        attribute=[
1523                            topdl.Attribute(attribute="transit", value='true'),
1524                            topdl.Attribute(attribute="dynamic", value='true'),
1525                            topdl.Attribute(attribute="testbed", 
1526                                value='dragon'),
1527                            topdl.Attribute(attribute="store_keys", 
1528                                value=store_key),
1529                            ]
1530                        )
1531
1532    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
1533            connInfo, expid=None):
1534        """
1535        Add attribiutes to the various elements indicating that they are to be
1536        dragon connected and create a dragon segment in topo to be
1537        instantiated.
1538        """
1539
1540        def get_substrate_from_topo(name, t):
1541            for s in t.substrates:
1542                if s.name == name: return s
1543            else: return None
1544
1545
1546        mdomain = tbparams[master].get('domain', '.example.com')
1547        mproject = tbparams[master].get('project', 'project')
1548        # dn is the number of previously created dragon nets.  This routine
1549        # creates a net numbered by dn
1550        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1551        # Count the number of interfaces on this substrate in each testbed from
1552        # the global topology
1553        count = { }
1554        node = { }
1555        for e in [ i.element for i in sub.interfaces ]:
1556            tb = e.get_attribute('testbed')
1557            count[tb] = count.get(tb, 0) + 1
1558            node[tb] = i.get_attribute('ip4_address')
1559
1560
1561        # Set the attributes in the copies that will allow setup of dragon
1562        # connections.
1563        for tb in tbs.keys():
1564            s = get_substrate_from_topo(sub.name, topo[tb])
1565            if s:
1566                if not connInfo.has_key(tb):
1567                    connInfo[tb] = [ ]
1568
1569                try:
1570                    aid = tbparams[tb]['allocID']['fedid']
1571                except:
1572                    self.log.debug("[creat_dragon_substrate] " + 
1573                            "Can't get alloc id for %s?" %tb)
1574                    aid = None
1575
1576                # This may need another look, but only a service gateway will
1577                # look at the active parameter, and these are only inserted to
1578                # connect to the master.
1579                active = "%s" % ( tb == master)
1580                info = {
1581                        'type': 'transit',
1582                        'member': [ {
1583                            'element': i.element.name[0], 
1584                            'interface': i.name
1585                            } for i in s.interfaces \
1586                                    if isinstance(i.element, topdl.Computer) ],
1587                        'fedAttr': [ 
1588                            { 'attribute': 'masterdomain', 'value': mdomain},
1589                            { 'attribute': 'masterexperiment', 'value': 
1590                                "%s/%s" % (mproject, eid)},
1591                            { 'attribute': 'active', 'value': active},
1592                            ],
1593                        'parameter': [ {
1594                            'name': 'vlan_id',
1595                            'key': 'fedid:%s/vlan%d' % (expid, dn),
1596                            'store': self.store_url,
1597                            'type': 'input',
1598                            } ]
1599                        }
1600                if tbs.has_key(tb):
1601                    info['peer'] = tbs[tb]
1602                connInfo[tb].append(info)
1603
1604                # Give this allocation the rights to access the key of the
1605                # vlan_id
1606                if aid:
1607                    self.auth.set_attribute(aid, 
1608                            'fedid:%s/vlan%d' % (expid, dn))
1609            else:
1610                raise service_error(service_error.internal,
1611                        "No substrate %s in testbed %s" % (sub.name, tb))
1612
1613        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
1614
1615    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1616            segment_substrate, portals, connInfo, expid):
1617        # More than one testbed is on this substrate.  Insert
1618        # some portals into the subtopologies.  st == source testbed,
1619        # dt == destination testbed.
1620        for st in tbs.keys():
1621            if not segment_substrate.has_key(st):
1622                segment_substrate[st] = { }
1623            if not portals.has_key(st): 
1624                portals[st] = { }
1625            if not connInfo.has_key(st):
1626                connInfo[st] = [ ]
1627            for dt in [ t for t in tbs.keys() if t != st]:
1628                sproject = tbparams[st].get('project', 'project')
1629                dproject = tbparams[dt].get('project', 'project')
1630                mproject = tbparams[master].get('project', 'project')
1631                sdomain = tbparams[st].get('domain', ".example.com")
1632                ddomain = tbparams[dt].get('domain', ".example.com")
1633                mdomain = tbparams[master].get('domain', '.example.com')
1634                muser = tbparams[master].get('user', 'root')
1635                smbshare = tbparams[master].get('smbshare', 'USERS')
1636                aid = tbparams[dt]['allocID']['fedid']
1637                if st == master or dt == master:
1638                    active = ("%s" % (st == master))
1639                else:
1640                    active = ("%s" %(st > dt))
1641                if not segment_substrate[st].has_key(dt):
1642                    # Put a substrate and a segment for the connected
1643                    # testbed in there.
1644                    tsubstrate, segment_element = \
1645                            self.new_portal_substrate(st, dt, eid, tbparams,
1646                                    expid)
1647                    segment_substrate[st][dt] = tsubstrate
1648                    topo[st].substrates.append(tsubstrate)
1649                    topo[st].elements.append(segment_element)
1650
1651                new_portal = False
1652                if portals[st].has_key(dt):
1653                    # There's a portal set up to go to this destination.
1654                    # See if there's room to multiplex this connection on
1655                    # it.  If so, add an interface to the portal; if not,
1656                    # set up to add a portal below.
1657                    # [This little festival of braces is just a pop of the
1658                    # last element in the list of portals between st and
1659                    # dt.]
1660                    portal = portals[st][dt][-1]
1661                    mux = len([ i for i in portal.interface \
1662                            if not i.get_attribute('portal')])
1663                    if mux == self.muxmax:
1664                        new_portal = True
1665                        portal_type = "experiment"
1666                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1667                        desthost = "%stunnel%d" % (st.lower(), 
1668                                len(portals[st][dt]))
1669                    else:
1670                        new_i = topdl.Interface(
1671                                substrate=sub.name,
1672                                attribute=[ 
1673                                    topdl.Attribute(
1674                                        attribute='ip4_address', 
1675                                        value=tbs[dt]
1676                                    )
1677                                ])
1678                        portal.interface.append(new_i)
1679                else:
1680                    # First connection to this testbed, make an empty list
1681                    # and set up to add the new portal below
1682                    new_portal = True
1683                    portals[st][dt] = [ ]
1684                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1685                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
1686
1687                    if dt == master or st == master: portal_type = "both"
1688                    else: portal_type = "experiment"
1689
1690                if new_portal:
1691                    infs = (
1692                            (segment_substrate[st][dt].name, 
1693                                (('portal', 'true'),)),
1694                            (sub.name, 
1695                                (('ip4_address', tbs[dt]),))
1696                        )
1697                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
1698                            master, eid, myname, desthost, portal_type,
1699                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
1700                    if self.fedkit:
1701                        self.add_kit(portal, self.fedkit)
1702                    if self.gatewaykit: 
1703                        self.add_kit(portal, self.gatewaykit)
1704
1705                    topo[st].elements.append(portal)
1706                    portals[st][dt].append(portal)
1707                    connInfo[st].append(info)
1708
1709    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo, expid):
1710        # Add to the master testbed
1711        tsubstrate, segment_element = \
1712                self.new_portal_substrate(st, dt, eid, tbparams, expid)
1713        myname = "%stunnel" % dt
1714        desthost = "%stunnel" % st
1715
1716        portal, info = self.new_portal_node(st, dt, tbparams, master,
1717                eid, myname, desthost, "control", 
1718                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
1719                conn_attrs=[], expid=expid)
1720        if self.fedkit:
1721            self.add_kit(portal, self.fedkit)
1722        if self.gatewaykit: 
1723            self.add_kit(portal, self.gatewaykit)
1724
1725        topo[st].substrates.append(tsubstrate)
1726        topo[st].elements.append(segment_element)
1727        topo[st].elements.append(portal)
1728        if not connInfo.has_key(st):
1729            connInfo[st] = [ ]
1730        connInfo[st].append(info)
1731
1732    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1733            substrate, tbparams, expid):
1734        # Add to the master testbed
1735        myname = "%stunnel" % dt
1736        desthost = "%s" % ip_addr(dip)
1737
1738        portal, info = self.new_portal_node(st, dt, tbparams, master,
1739                eid, myname, desthost, "control", 
1740                ((substrate.name,(
1741                    ('portal','true'),
1742                    ('ip4_address', "%s" % ip_addr(myip)),)),),
1743                conn_type="transit", conn_attrs=[], expid=expid)
1744        if self.fedkit:
1745            self.add_kit(portal, self.fedkit)
1746        if self.gatewaykit: 
1747            self.add_kit(portal, self.gatewaykit)
1748
1749        return portal
1750
1751    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
1752            connInfo, expid):
1753        """
1754        For each substrate in the main topology, find those that
1755        have nodes on more than one testbed.  Insert portal nodes
1756        into the copies of those substrates on the sub topologies.
1757        """
1758        segment_substrate = { }
1759        portals = { }
1760        for s in top.substrates:
1761            # tbs will contain an ip address on this subsrate that is in
1762            # each testbed.
1763            tbs = { }
1764            for i in s.interfaces:
1765                e = i.element
1766                tb = e.get_attribute('testbed')
1767                if tb and not tbs.has_key(tb):
1768                    for i in e.interface:
1769                        if s in i.subs:
1770                            tbs[tb]= i.get_attribute('ip4_address')
1771            if len(tbs) < 2:
1772                continue
1773
1774            # DRAGON will not create multi-site vlans yet
1775            if len(tbs) == 2 and \
1776                    all([tbparams[x].has_key('dragon') for x in tbs]):
1777                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1778                        master, eid, connInfo, expid)
1779            else:
1780                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1781                        eid, segment_substrate, portals, connInfo, expid)
1782
1783        # Make sure that all the slaves have a control portal back to the
1784        # master.
1785        for tb in [ t for t in tbparams.keys() if t != master ]:
1786            if len([e for e in topo[tb].elements \
1787                    if isinstance(e, topdl.Computer) and \
1788                    e.get_attribute('portal') and \
1789                    e.get_attribute('portal_type') == 'both']) == 0:
1790
1791                if tbparams[master].has_key('dragon') \
1792                        and tbparams[tb].has_key('dragon'):
1793
1794                    idx = len([x for x in topo.keys() \
1795                            if x.startswith('dragon')])
1796                    dip, leng = ip_allocator.allocate(4)
1797                    dip += 1
1798                    mip = dip+1
1799                    csub = topdl.Substrate(
1800                            name="dragon-control-%s" % tb,
1801                            capacity=topdl.Capacity(100000.0, 'max'),
1802                            attribute=[
1803                                topdl.Attribute(
1804                                    attribute='portal',
1805                                    value='true'
1806                                    )
1807                                ]
1808                            )
1809                    seg = topdl.Segment(
1810                            id= tbparams[master]['allocID'],
1811                            type='emulab',
1812                            uri = self.tbmap.get(master, None),
1813                            interface=[ 
1814                                topdl.Interface(
1815                                    substrate=csub.name),
1816                                ],
1817                            attribute = [
1818                                topdl.Attribute(attribute=n, value=v)
1819                                    for n, v in (\
1820                                        ('domain', 
1821                                            tbparams[master].get('domain',
1822                                                ".example.com")),
1823                                        ('experiment', "%s/%s" % \
1824                                                (tbparams[master].get(
1825                                                    'project', 
1826                                                    'project'), 
1827                                                    eid)),)
1828                                ],
1829                            )
1830                    portal = self.new_dragon_portal(tb, master,
1831                            master, eid, dip, mip, idx, csub, tbparams, expid)
1832                    topo[tb].substrates.append(csub)
1833                    topo[tb].elements.append(portal)
1834                    topo[tb].elements.append(seg)
1835
1836                    mcsub = csub.clone()
1837                    seg = topdl.Segment(
1838                            id= tbparams[tb]['allocID'],
1839                            type='emulab',
1840                            uri = self.tbmap.get(tb, None),
1841                            interface=[ 
1842                                topdl.Interface(
1843                                    substrate=csub.name),
1844                                ],
1845                            attribute = [
1846                                topdl.Attribute(attribute=n, value=v)
1847                                    for n, v in (\
1848                                        ('domain', 
1849                                            tbparams[tb].get('domain',
1850                                                ".example.com")),
1851                                        ('experiment', "%s/%s" % \
1852                                                (tbparams[tb].get('project', 
1853                                                    'project'), 
1854                                                    eid)),)
1855                                ],
1856                            )
1857                    portal = self.new_dragon_portal(master, tb, master,
1858                            eid, mip, dip, idx, mcsub, tbparams, expid)
1859                    topo[master].substrates.append(mcsub)
1860                    topo[master].elements.append(portal)
1861                    topo[master].elements.append(seg)
1862                    for t in (master, tb):
1863                        topo[t].incorporate_elements()
1864
1865                    self.create_dragon_substrate(csub, topo, 
1866                            {tb: ip_addr(mip), master: ip_addr(dip)}, 
1867                            tbparams, master, eid, connInfo,
1868                            expid)
1869                else:
1870                    self.add_control_portal(master, tb, master, eid, topo, 
1871                            tbparams, connInfo, expid)
1872                    self.add_control_portal(tb, master, master, eid, topo, 
1873                            tbparams, connInfo, expid)
1874
1875        # Connect the portal nodes into the topologies and clear out
1876        # substrates that are not in the topologies
1877        for tb in tbparams.keys():
1878            topo[tb].incorporate_elements()
1879            topo[tb].substrates = \
1880                    [s for s in topo[tb].substrates \
1881                        if len(s.interfaces) >0]
1882
1883    def wrangle_software(self, expid, top, topo, tbparams):
1884        """
1885        Copy software out to the repository directory, allocate permissions and
1886        rewrite the segment topologies to look for the software in local
1887        places.
1888        """
1889
1890        # Copy the rpms and tarfiles to a distribution directory from
1891        # which the federants can retrieve them
1892        linkpath = "%s/software" %  expid
1893        softdir ="%s/%s" % ( self.repodir, linkpath)
1894        softmap = { }
1895        # These are in a list of tuples format (each kit).  This comprehension
1896        # unwraps them into a single list of tuples that initilaizes the set of
1897        # tuples.
1898        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1899                for p, t in l ])
1900        pkgs.update([x.location for e in top.elements \
1901                for x in e.software])
1902        try:
1903            os.makedirs(softdir)
1904        except IOError, e:
1905            raise service_error(
1906                    "Cannot create software directory: %s" % e)
1907        # The actual copying.  Everything's converted into a url for copying.
1908        for pkg in pkgs:
1909            loc = pkg
1910
1911            scheme, host, path = urlparse(loc)[0:3]
1912            dest = os.path.basename(path)
1913            if not scheme:
1914                if not loc.startswith('/'):
1915                    loc = "/%s" % loc
1916                loc = "file://%s" %loc
1917            try:
1918                u = urlopen(loc)
1919            except Exception, e:
1920                raise service_error(service_error.req, 
1921                        "Cannot open %s: %s" % (loc, e))
1922            try:
1923                f = open("%s/%s" % (softdir, dest) , "w")
1924                self.log.debug("Writing %s/%s" % (softdir,dest) )
1925                data = u.read(4096)
1926                while data:
1927                    f.write(data)
1928                    data = u.read(4096)
1929                f.close()
1930                u.close()
1931            except Exception, e:
1932                raise service_error(service_error.internal,
1933                        "Could not copy %s: %s" % (loc, e))
1934            path = re.sub("/tmp", "", linkpath)
1935            # XXX
1936            softmap[pkg] = \
1937                    "%s/%s/%s" %\
1938                    ( self.repo_url, path, dest)
1939
1940            # Allow the individual segments to access the software.
1941            for tb in tbparams.keys():
1942                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1943                        "/%s/%s" % ( path, dest))
1944
1945        # Convert the software locations in the segments into the local
1946        # copies on this host
1947        for soft in [ s for tb in topo.values() \
1948                for e in tb.elements \
1949                    if getattr(e, 'software', False) \
1950                        for s in e.software ]:
1951            if softmap.has_key(soft.location):
1952                soft.location = softmap[soft.location]
1953
1954
1955    def new_experiment(self, req, fid):
1956        """
1957        The external interface to empty initial experiment creation called from
1958        the dispatcher.
1959
1960        Creates a working directory, splits the incoming description using the
1961        splitter script and parses out the avrious subsections using the
1962        lcasses above.  Once each sub-experiment is created, use pooled threads
1963        to instantiate them and start it all up.
1964        """
1965        if not self.auth.check_attribute(fid, 'new'):
1966            raise service_error(service_error.access, "New access denied")
1967
1968        try:
1969            tmpdir = tempfile.mkdtemp(prefix="split-")
1970        except IOError:
1971            raise service_error(service_error.internal, "Cannot create tmp dir")
1972
1973        try:
1974            access_user = self.accessdb[fid]
1975        except KeyError:
1976            raise service_error(service_error.internal,
1977                    "Access map and authorizer out of sync in " + \
1978                            "new_experiment for fedid %s"  % fid)
1979
1980        pid = "dummy"
1981        gid = "dummy"
1982
1983        req = req.get('NewRequestBody', None)
1984        if not req:
1985            raise service_error(service_error.req,
1986                    "Bad request format (no NewRequestBody)")
1987
1988        # Generate an ID for the experiment (slice) and a certificate that the
1989        # allocator can use to prove they own it.  We'll ship it back through
1990        # the encrypted connection.
1991        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1992
1993        #now we're done with the tmpdir, and it should be empty
1994        if self.cleanup:
1995            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1996            os.rmdir(tmpdir)
1997        else:
1998            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1999
2000        eid = self.create_experiment_state(fid, req, expid, expcert, 
2001                state='empty')
2002
2003        # Let users touch the state
2004        self.auth.set_attribute(fid, expid)
2005        self.auth.set_attribute(expid, expid)
2006        # Override fedids can manipulate state as well
2007        for o in self.overrides:
2008            self.auth.set_attribute(o, expid)
2009
2010        rv = {
2011                'experimentID': [
2012                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2013                ],
2014                'experimentStatus': 'empty',
2015                'experimentAccess': { 'X509' : expcert }
2016            }
2017
2018        return rv
2019
2020
2021    def create_experiment(self, req, fid):
2022        """
2023        The external interface to experiment creation called from the
2024        dispatcher.
2025
2026        Creates a working directory, splits the incoming description using the
2027        splitter script and parses out the avrious subsections using the
2028        lcasses above.  Once each sub-experiment is created, use pooled threads
2029        to instantiate them and start it all up.
2030        """
2031
2032        req = req.get('CreateRequestBody', None)
2033        if not req:
2034            raise service_error(service_error.req,
2035                    "Bad request format (no CreateRequestBody)")
2036
2037        # Get the experiment access
2038        exp = req.get('experimentID', None)
2039        if exp:
2040            if exp.has_key('fedid'):
2041                key = exp['fedid']
2042                expid = key
2043                eid = None
2044            elif exp.has_key('localname'):
2045                key = exp['localname']
2046                eid = key
2047                expid = None
2048            else:
2049                raise service_error(service_error.req, "Unknown lookup type")
2050        else:
2051            raise service_error(service_error.req, "No request?")
2052
2053        self.check_experiment_access(fid, key)
2054
2055        try:
2056            tmpdir = tempfile.mkdtemp(prefix="split-")
2057            os.mkdir(tmpdir+"/keys")
2058        except IOError:
2059            raise service_error(service_error.internal, "Cannot create tmp dir")
2060
2061        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2062        gw_secretkey_base = "fed.%s" % self.ssh_type
2063        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2064        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2065        tclfile = tmpdir + "/experiment.tcl"
2066        tbparams = { }
2067        try:
2068            access_user = self.accessdb[fid]
2069        except KeyError:
2070            raise service_error(service_error.internal,
2071                    "Access map and authorizer out of sync in " + \
2072                            "create_experiment for fedid %s"  % fid)
2073
2074        pid = "dummy"
2075        gid = "dummy"
2076
2077        # The tcl parser needs to read a file so put the content into that file
2078        descr=req.get('experimentdescription', None)
2079        if descr:
2080            file_content=descr.get('ns2description', None)
2081            if file_content:
2082                try:
2083                    f = open(tclfile, 'w')
2084                    f.write(file_content)
2085                    f.close()
2086                except IOError:
2087                    raise service_error(service_error.internal,
2088                            "Cannot write temp experiment description")
2089            else:
2090                raise service_error(service_error.req, 
2091                        "Only ns2descriptions supported")
2092        else:
2093            raise service_error(service_error.req, "No experiment description")
2094
2095        self.state_lock.acquire()
2096        if self.state.has_key(key):
2097            self.state[key]['experimentStatus'] = "starting"
2098            for e in self.state[key].get('experimentID',[]):
2099                if not expid and e.has_key('fedid'):
2100                    expid = e['fedid']
2101                elif not eid and e.has_key('localname'):
2102                    eid = e['localname']
2103        self.state_lock.release()
2104
2105        if not (eid and expid):
2106            raise service_error(service_error.internal, 
2107                    "Cannot find local experiment info!?")
2108
2109        try: 
2110            # This catches exceptions to clear the placeholder if necessary
2111            try:
2112                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2113            except ValueError:
2114                raise service_error(service_error.server_config, 
2115                        "Bad key type (%s)" % self.ssh_type)
2116
2117            master = req.get('master', None)
2118            if not master:
2119                raise service_error(service_error.req,
2120                        "No master testbed label")
2121            export_project = req.get('exportProject', None)
2122            if not export_project:
2123                raise service_error(service_error.req, "No export project")
2124           
2125            # Translate to topdl
2126            if self.splitter_url:
2127                # XXX: need remote topdl translator
2128                self.log.debug("Calling remote splitter at %s" % \
2129                        self.splitter_url)
2130                split_data = self.remote_splitter(self.splitter_url,
2131                        file_content, master)
2132            else:
2133                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2134                    str(self.muxmax), '-m', master]
2135
2136                if self.fedkit:
2137                    tclcmd.append('-k')
2138
2139                if self.gatewaykit:
2140                    tclcmd.append('-K')
2141
2142                tclcmd.extend([pid, gid, eid, tclfile])
2143
2144                self.log.debug("running local splitter %s", " ".join(tclcmd))
2145                # This is just fantastic.  As a side effect the parser copies
2146                # tb_compat.tcl into the current directory, so that directory
2147                # must be writable by the fedd user.  Doing this in the
2148                # temporary subdir ensures this is the case.
2149                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2150                        cwd=tmpdir)
2151                split_data = tclparser.stdout
2152
2153            top = topdl.topology_from_xml(file=split_data, top="experiment")
2154
2155            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2156             # Find the testbeds to look up
2157            testbeds = set([ a.value for e in top.elements \
2158                    for a in e.attribute \
2159                    if a.attribute == 'testbed'] )
2160
2161            allocated = { }         # Testbeds we can access
2162            topo ={ }               # Sub topologies
2163            connInfo = { }          # Connection information
2164            services = [ ]
2165            self.get_access_to_testbeds(testbeds, access_user, 
2166                    export_project, master, allocated, tbparams, services)
2167            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2168
2169            # Copy configuration files into the remote file store
2170            # The config urlpath
2171            configpath = "/%s/config" % expid
2172            # The config file system location
2173            configdir ="%s%s" % ( self.repodir, configpath)
2174            try:
2175                os.makedirs(configdir)
2176            except IOError, e:
2177                raise service_error(
2178                        "Cannot create config directory: %s" % e)
2179            try:
2180                f = open("%s/hosts" % configdir, "w")
2181                f.write('\n'.join(hosts))
2182                f.close()
2183            except IOError, e:
2184                raise service_error(service_error.internal, 
2185                        "Cannot write hosts file: %s" % e)
2186            try:
2187                copy_file("%s" % gw_pubkey, "%s/%s" % \
2188                        (configdir, gw_pubkey_base))
2189                copy_file("%s" % gw_secretkey, "%s/%s" % \
2190                        (configdir, gw_secretkey_base))
2191            except IOError, e:
2192                raise service_error(service_error.internal, 
2193                        "Cannot copy keyfiles: %s" % e)
2194
2195            # Allow the individual testbeds to access the configuration files.
2196            for tb in tbparams.keys():
2197                asignee = tbparams[tb]['allocID']['fedid']
2198                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2199                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2200
2201            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
2202                    connInfo, expid)
2203            # Now get access to the dynamic testbeds
2204            for k, t in topo.items():
2205                if not t.get_attribute('dynamic'):
2206                    continue
2207                tb = t.get_attribute('testbed')
2208                if tb: 
2209                    self.get_access(tb, None, tbparams, master, 
2210                            export_project, access_user, services)
2211                    tbparams[k] = tbparams[tb]
2212                    del tbparams[tb]
2213                    allocated[k] = 1
2214                    store_keys = t.get_attribute('store_keys')
2215                    # Give the testbed access to keys it exports or imports
2216                    if store_keys:
2217                        for sk in store_keys.split(" "):
2218                            self.auth.set_attribute(\
2219                                    tbparams[k]['allocID']['fedid'], sk)
2220                else:
2221                    raise service_error(service_error.internal, 
2222                            "Dynamic allocation from no testbed!?")
2223
2224            self.wrangle_software(expid, top, topo, tbparams)
2225
2226            vtopo = topdl.topology_to_vtopo(top)
2227            vis = self.genviz(vtopo)
2228
2229            # save federant information
2230            for k in allocated.keys():
2231                tbparams[k]['federant'] = {
2232                        'name': [ { 'localname' : eid} ],
2233                        'allocID' : tbparams[k]['allocID'],
2234                        'master' : k == master,
2235                        'uri': tbparams[k]['uri'],
2236                    }
2237                if tbparams[k].has_key('emulab'):
2238                        tbparams[k]['federant']['emulab'] = \
2239                                tbparams[k]['emulab']
2240
2241            self.state_lock.acquire()
2242            self.state[eid]['vtopo'] = vtopo
2243            self.state[eid]['vis'] = vis
2244            self.state[expid]['federant'] = \
2245                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2246                        if tbparams[tb].has_key('federant') ]
2247            if self.state_filename: 
2248                self.write_state()
2249            self.state_lock.release()
2250        except service_error, e:
2251            # If something goes wrong in the parse (usually an access error)
2252            # clear the placeholder state.  From here on out the code delays
2253            # exceptions.  Failing at this point returns a fault to the remote
2254            # caller.
2255
2256            self.state_lock.acquire()
2257            del self.state[eid]
2258            del self.state[expid]
2259            if self.state_filename: self.write_state()
2260            self.state_lock.release()
2261            raise e
2262
2263
2264        # Start the background swapper and return the starting state.  From
2265        # here on out, the state will stick around a while.
2266
2267        # Let users touch the state
2268        self.auth.set_attribute(fid, expid)
2269        self.auth.set_attribute(expid, expid)
2270        # Override fedids can manipulate state as well
2271        for o in self.overrides:
2272            self.auth.set_attribute(o, expid)
2273
2274        # Create a logger that logs to the experiment's state object as well as
2275        # to the main log file.
2276        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2277        alloc_collector = self.list_log(self.state[eid]['log'])
2278        h = logging.StreamHandler(alloc_collector)
2279        # XXX: there should be a global one of these rather than repeating the
2280        # code.
2281        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2282                    '%d %b %y %H:%M:%S'))
2283        alloc_log.addHandler(h)
2284       
2285        attrs = [ 
2286                {
2287                    'attribute': 'ssh_pubkey', 
2288                    'value': '%s/%s/config/%s' % \
2289                            (self.repo_url, expid, gw_pubkey_base)
2290                },
2291                {
2292                    'attribute': 'ssh_secretkey', 
2293                    'value': '%s/%s/config/%s' % \
2294                            (self.repo_url, expid, gw_secretkey_base)
2295                },
2296                {
2297                    'attribute': 'hosts', 
2298                    'value': '%s/%s/config/hosts' % \
2299                            (self.repo_url, expid)
2300                },
2301                {
2302                    'attribute': 'experiment_name',
2303                    'value': eid,
2304                },
2305            ]
2306
2307        # transit and disconnected testbeds may not have a connInfo entry.
2308        # Fill in the blanks.
2309        for t in allocated.keys():
2310            if not connInfo.has_key(t):
2311                connInfo[t] = { }
2312
2313        # Start a thread to do the resource allocation
2314        t  = Thread(target=self.allocate_resources,
2315                args=(allocated, master, eid, expid, tbparams, 
2316                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
2317                    services),
2318                name=eid)
2319        t.start()
2320
2321        rv = {
2322                'experimentID': [
2323                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2324                ],
2325                'experimentStatus': 'starting',
2326            }
2327
2328        return rv
2329   
2330    def get_experiment_fedid(self, key):
2331        """
2332        find the fedid associated with the localname key in the state database.
2333        """
2334
2335        rv = None
2336        self.state_lock.acquire()
2337        if self.state.has_key(key):
2338            if isinstance(self.state[key], dict):
2339                try:
2340                    kl = [ f['fedid'] for f in \
2341                            self.state[key]['experimentID']\
2342                                if f.has_key('fedid') ]
2343                except KeyError:
2344                    self.state_lock.release()
2345                    raise service_error(service_error.internal, 
2346                            "No fedid for experiment %s when getting "+\
2347                                    "fedid(!?)" % key)
2348                if len(kl) == 1:
2349                    rv = kl[0]
2350                else:
2351                    self.state_lock.release()
2352                    raise service_error(service_error.internal, 
2353                            "multiple fedids for experiment %s when " +\
2354                                    "getting fedid(!?)" % key)
2355            else:
2356                self.state_lock.release()
2357                raise service_error(service_error.internal, 
2358                        "Unexpected state for %s" % key)
2359        self.state_lock.release()
2360        return rv
2361
2362    def check_experiment_access(self, fid, key):
2363        """
2364        Confirm that the fid has access to the experiment.  Though a request
2365        may be made in terms of a local name, the access attribute is always
2366        the experiment's fedid.
2367        """
2368        if not isinstance(key, fedid):
2369            key = self.get_experiment_fedid(key)
2370
2371        if self.auth.check_attribute(fid, key):
2372            return True
2373        else:
2374            raise service_error(service_error.access, "Access Denied")
2375
2376
2377    def get_handler(self, path, fid):
2378        self.log.info("Get handler %s %s" % (path, fid))
2379        if self.auth.check_attribute(fid, path):
2380            return ("%s/%s" % (self.repodir, path), "application/binary")
2381        else:
2382            return (None, None)
2383
2384    def get_vtopo(self, req, fid):
2385        """
2386        Return the stored virtual topology for this experiment
2387        """
2388        rv = None
2389        state = None
2390
2391        req = req.get('VtopoRequestBody', None)
2392        if not req:
2393            raise service_error(service_error.req,
2394                    "Bad request format (no VtopoRequestBody)")
2395        exp = req.get('experiment', None)
2396        if exp:
2397            if exp.has_key('fedid'):
2398                key = exp['fedid']
2399                keytype = "fedid"
2400            elif exp.has_key('localname'):
2401                key = exp['localname']
2402                keytype = "localname"
2403            else:
2404                raise service_error(service_error.req, "Unknown lookup type")
2405        else:
2406            raise service_error(service_error.req, "No request?")
2407
2408        self.check_experiment_access(fid, key)
2409
2410        self.state_lock.acquire()
2411        if self.state.has_key(key):
2412            if self.state[key].has_key('vtopo'):
2413                rv = { 'experiment' : {keytype: key },\
2414                        'vtopo': self.state[key]['vtopo'],\
2415                    }
2416            else:
2417                state = self.state[key]['experimentStatus']
2418        self.state_lock.release()
2419
2420        if rv: return rv
2421        else: 
2422            if state:
2423                raise service_error(service_error.partial, 
2424                        "Not ready: %s" % state)
2425            else:
2426                raise service_error(service_error.req, "No such experiment")
2427
2428    def get_vis(self, req, fid):
2429        """
2430        Return the stored visualization for this experiment
2431        """
2432        rv = None
2433        state = None
2434
2435        req = req.get('VisRequestBody', None)
2436        if not req:
2437            raise service_error(service_error.req,
2438                    "Bad request format (no VisRequestBody)")
2439        exp = req.get('experiment', None)
2440        if exp:
2441            if exp.has_key('fedid'):
2442                key = exp['fedid']
2443                keytype = "fedid"
2444            elif exp.has_key('localname'):
2445                key = exp['localname']
2446                keytype = "localname"
2447            else:
2448                raise service_error(service_error.req, "Unknown lookup type")
2449        else:
2450            raise service_error(service_error.req, "No request?")
2451
2452        self.check_experiment_access(fid, key)
2453
2454        self.state_lock.acquire()
2455        if self.state.has_key(key):
2456            if self.state[key].has_key('vis'):
2457                rv =  { 'experiment' : {keytype: key },\
2458                        'vis': self.state[key]['vis'],\
2459                        }
2460            else:
2461                state = self.state[key]['experimentStatus']
2462        self.state_lock.release()
2463
2464        if rv: return rv
2465        else:
2466            if state:
2467                raise service_error(service_error.partial, 
2468                        "Not ready: %s" % state)
2469            else:
2470                raise service_error(service_error.req, "No such experiment")
2471
2472    def clean_info_response(self, rv):
2473        """
2474        Remove the information in the experiment's state object that is not in
2475        the info response.
2476        """
2477        # Remove the owner info (should always be there, but...)
2478        if rv.has_key('owner'): del rv['owner']
2479
2480        # Convert the log into the allocationLog parameter and remove the
2481        # log entry (with defensive programming)
2482        if rv.has_key('log'):
2483            rv['allocationLog'] = "".join(rv['log'])
2484            del rv['log']
2485        else:
2486            rv['allocationLog'] = ""
2487
2488        if rv['experimentStatus'] != 'active':
2489            if rv.has_key('federant'): del rv['federant']
2490        else:
2491            # remove the allocationID and uri info from each federant
2492            for f in rv.get('federant', []):
2493                if f.has_key('allocID'): del f['allocID']
2494                if f.has_key('uri'): del f['uri']
2495        return rv
2496
2497    def get_info(self, req, fid):
2498        """
2499        Return all the stored info about this experiment
2500        """
2501        rv = None
2502
2503        req = req.get('InfoRequestBody', None)
2504        if not req:
2505            raise service_error(service_error.req,
2506                    "Bad request format (no InfoRequestBody)")
2507        exp = req.get('experiment', None)
2508        if exp:
2509            if exp.has_key('fedid'):
2510                key = exp['fedid']
2511                keytype = "fedid"
2512            elif exp.has_key('localname'):
2513                key = exp['localname']
2514                keytype = "localname"
2515            else:
2516                raise service_error(service_error.req, "Unknown lookup type")
2517        else:
2518            raise service_error(service_error.req, "No request?")
2519
2520        self.check_experiment_access(fid, key)
2521
2522        # The state may be massaged by the service function that called
2523        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2524        # state.
2525        self.state_lock.acquire()
2526        if self.state.has_key(key):
2527            rv = copy.deepcopy(self.state[key])
2528        self.state_lock.release()
2529
2530        if rv:
2531            return self.clean_info_response(rv)
2532        else:
2533            raise service_error(service_error.req, "No such experiment")
2534
2535    def get_multi_info(self, req, fid):
2536        """
2537        Return all the stored info that this fedid can access
2538        """
2539        rv = { 'info': [ ] }
2540
2541        self.state_lock.acquire()
2542        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2543            try:
2544                self.check_experiment_access(fid, key)
2545            except service_error, e:
2546                if e.code == service_error.access:
2547                    continue
2548                else:
2549                    self.state_lock.release()
2550                    raise e
2551
2552            if self.state.has_key(key):
2553                e = copy.deepcopy(self.state[key])
2554                e = self.clean_info_response(e)
2555                rv['info'].append(e)
2556        self.state_lock.release()
2557        return rv
2558
2559    def terminate_experiment(self, req, fid):
2560        """
2561        Swap this experiment out on the federants and delete the shared
2562        information
2563        """
2564        tbparams = { }
2565        req = req.get('TerminateRequestBody', None)
2566        if not req:
2567            raise service_error(service_error.req,
2568                    "Bad request format (no TerminateRequestBody)")
2569        force = req.get('force', False)
2570        exp = req.get('experiment', None)
2571        if exp:
2572            if exp.has_key('fedid'):
2573                key = exp['fedid']
2574                keytype = "fedid"
2575            elif exp.has_key('localname'):
2576                key = exp['localname']
2577                keytype = "localname"
2578            else:
2579                raise service_error(service_error.req, "Unknown lookup type")
2580        else:
2581            raise service_error(service_error.req, "No request?")
2582
2583        self.check_experiment_access(fid, key)
2584
2585        dealloc_list = [ ]
2586
2587
2588        # Create a logger that logs to the dealloc_list as well as to the main
2589        # log file.
2590        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2591        h = logging.StreamHandler(self.list_log(dealloc_list))
2592        # XXX: there should be a global one of these rather than repeating the
2593        # code.
2594        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2595                    '%d %b %y %H:%M:%S'))
2596        dealloc_log.addHandler(h)
2597
2598        self.state_lock.acquire()
2599        fed_exp = self.state.get(key, None)
2600
2601        if fed_exp:
2602            # This branch of the conditional holds the lock to generate a
2603            # consistent temporary tbparams variable to deallocate experiments.
2604            # It releases the lock to do the deallocations and reacquires it to
2605            # remove the experiment state when the termination is complete.
2606
2607            # First make sure that the experiment creation is complete.
2608            status = fed_exp.get('experimentStatus', None)
2609
2610            if status:
2611                if status in ('starting', 'terminating'):
2612                    if not force:
2613                        self.state_lock.release()
2614                        raise service_error(service_error.partial, 
2615                                'Experiment still being created or destroyed')
2616                    else:
2617                        self.log.warning('Experiment in %s state ' % status + \
2618                                'being terminated by force.')
2619            else:
2620                # No status??? trouble
2621                self.state_lock.release()
2622                raise service_error(service_error.internal,
2623                        "Experiment has no status!?")
2624
2625            ids = []
2626            #  experimentID is a list of dicts that are self-describing
2627            #  identifiers.  This finds all the fedids and localnames - the
2628            #  keys of self.state - and puts them into ids.
2629            for id in fed_exp.get('experimentID', []):
2630                if id.has_key('fedid'): ids.append(id['fedid'])
2631                if id.has_key('localname'): ids.append(id['localname'])
2632
2633            # Collect the allocation/segment ids into a dict keyed by the fedid
2634            # of the allocation (or a monotonically increasing integer) that
2635            # contains a tuple of uri, aid (which is a dict...)
2636            for i, fed in enumerate(fed_exp.get('federant', [])):
2637                try:
2638                    uri = fed['uri']
2639                    aid = fed['allocID']
2640                    k = fed['allocID'].get('fedid', i)
2641                except KeyError, e:
2642                    continue
2643                tbparams[k] = (uri, aid)
2644            fed_exp['experimentStatus'] = 'terminating'
2645            if self.state_filename: self.write_state()
2646            self.state_lock.release()
2647
2648            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2649            # then completes, so we can't wait if nothing starts.  So, no
2650            # tbparams, no start.
2651            if len(tbparams) > 0:
2652                thread_pool = self.thread_pool(self.nthreads)
2653                for k in tbparams.keys():
2654                    # Create and start a thread to stop the segment
2655                    thread_pool.wait_for_slot()
2656                    uri, aid = tbparams[k]
2657                    t  = self.pooled_thread(\
2658                            target=self.terminate_segment(log=dealloc_log,
2659                                testbed=uri,
2660                                cert_file=self.cert_file, 
2661                                cert_pwd=self.cert_pwd,
2662                                trusted_certs=self.trusted_certs,
2663                                caller=self.call_TerminateSegment),
2664                            args=(uri, aid), name=k,
2665                            pdata=thread_pool, trace_file=self.trace_file)
2666                    t.start()
2667                # Wait for completions
2668                thread_pool.wait_for_all_done()
2669
2670            # release the allocations (failed experiments have done this
2671            # already, and starting experiments may be in odd states, so we
2672            # ignore errors releasing those allocations
2673            try: 
2674                for k in tbparams.keys():
2675                    # This releases access by uri
2676                    uri, aid = tbparams[k]
2677                    self.release_access(None, aid, uri=uri)
2678            except service_error, e:
2679                if status != 'failed' and not force:
2680                    raise e
2681
2682            # Remove the terminated experiment
2683            self.state_lock.acquire()
2684            for id in ids:
2685                if self.state.has_key(id): del self.state[id]
2686
2687            if self.state_filename: self.write_state()
2688            self.state_lock.release()
2689
2690            # Delete any synch points associated with this experiment.  All
2691            # synch points begin with the fedid of the experiment.
2692            fedid_keys = set(["fedid:%s" % f for f in ids \
2693                    if isinstance(f, fedid)])
2694            for k in self.synch_store.all_keys():
2695                try:
2696                    if len(k) > 45 and k[0:46] in fedid_keys:
2697                        self.synch_store.del_value(k)
2698                except synch_store.BadDeletionError:
2699                    pass
2700            self.write_store()
2701               
2702            return { 
2703                    'experiment': exp , 
2704                    'deallocationLog': "".join(dealloc_list),
2705                    }
2706        else:
2707            # Don't forget to release the lock
2708            self.state_lock.release()
2709            raise service_error(service_error.req, "No saved state")
2710
2711
2712    def GetValue(self, req, fid):
2713        """
2714        Get a value from the synchronized store
2715        """
2716        req = req.get('GetValueRequestBody', None)
2717        if not req:
2718            raise service_error(service_error.req,
2719                    "Bad request format (no GetValueRequestBody)")
2720       
2721        name = req['name']
2722        wait = req['wait']
2723        rv = { 'name': name }
2724
2725        if self.auth.check_attribute(fid, name):
2726            try:
2727                v = self.synch_store.get_value(name, wait)
2728            except synch_store.RevokedKeyError:
2729                # No more synch on this key
2730                raise service_error(service_error.federant, 
2731                        "Synch key %s revoked" % name)
2732            if v is not None:
2733                rv['value'] = v
2734            self.log.debug("[GetValue] got %s from %s" % (v, name))
2735            return rv
2736        else:
2737            raise service_error(service_error.access, "Access Denied")
2738       
2739
2740    def SetValue(self, req, fid):
2741        """
2742        Set a value in the synchronized store
2743        """
2744        req = req.get('SetValueRequestBody', None)
2745        if not req:
2746            raise service_error(service_error.req,
2747                    "Bad request format (no SetValueRequestBody)")
2748       
2749        name = req['name']
2750        v = req['value']
2751
2752        if self.auth.check_attribute(fid, name):
2753            try:
2754                self.synch_store.set_value(name, v)
2755                self.write_store()
2756                self.log.debug("[SetValue] set %s to %s" % (name, v))
2757            except synch_store.CollisionError:
2758                # Translate into a service_error
2759                raise service_error(service_error.req,
2760                        "Value already set: %s" %name)
2761            except synch_store.RevokedKeyError:
2762                # No more synch on this key
2763                raise service_error(service_error.federant, 
2764                        "Synch key %s revoked" % name)
2765            return { 'name': name, 'value': v }
2766        else:
2767            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.