source: fedd/federation/experiment_control.py @ 5f6929a

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 5f6929a was 5f6929a, checked in by Ted Faber <faber@…>, 14 years ago

Two changes at once

Remove master and export project from the create request and rename the splitter external interface into a translation interface. The export_project pseudo service is staring.

Also start removeing some deprecated fields.

  • Property mode set to 100644
File size: 93.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_control_local:
45    """
46    Control of experiments that this system can directly access.
47
48    Includes experiment creation, termination and information dissemination.
49    Thred safe.
50    """
51
52    class ssh_cmd_timeout(RuntimeError): pass
53   
54    class thread_pool:
55        """
56        A class to keep track of a set of threads all invoked for the same
57        task.  Manages the mutual exclusion of the states.
58        """
59        def __init__(self, nthreads):
60            """
61            Start a pool.
62            """
63            self.changed = Condition()
64            self.started = 0
65            self.terminated = 0
66            self.nthreads = nthreads
67
68        def acquire(self):
69            """
70            Get the pool's lock.
71            """
72            self.changed.acquire()
73
74        def release(self):
75            """
76            Release the pool's lock.
77            """
78            self.changed.release()
79
80        def wait(self, timeout = None):
81            """
82            Wait for a pool thread to start or stop.
83            """
84            self.changed.wait(timeout)
85
86        def start(self):
87            """
88            Called by a pool thread to report starting.
89            """
90            self.changed.acquire()
91            self.started += 1
92            self.changed.notifyAll()
93            self.changed.release()
94
95        def terminate(self):
96            """
97            Called by a pool thread to report finishing.
98            """
99            self.changed.acquire()
100            self.terminated += 1
101            self.changed.notifyAll()
102            self.changed.release()
103
104        def clear(self):
105            """
106            Clear all pool data.
107            """
108            self.changed.acquire()
109            self.started = 0
110            self.terminated =0
111            self.changed.notifyAll()
112            self.changed.release()
113
114        def wait_for_slot(self):
115            """
116            Wait until we have a free slot to start another pooled thread
117            """
118            self.acquire()
119            while self.started - self.terminated >= self.nthreads:
120                self.wait()
121            self.release()
122
123        def wait_for_all_done(self, timeout=None):
124            """
125            Wait until all active threads finish (and at least one has
126            started).  If a timeout is given, return after waiting that long
127            for termination.  If all threads are done (and one has started in
128            the since the last clear()) return True, otherwise False.
129            """
130            if timeout:
131                deadline = time.time() + timeout
132            self.acquire()
133            while self.started == 0 or self.started > self.terminated:
134                self.wait(timeout)
135                if timeout:
136                    if time.time() > deadline:
137                        break
138                    timeout = deadline - time.time()
139            self.release()
140            return not (self.started == 0 or self.started > self.terminated)
141
142    class pooled_thread(Thread):
143        """
144        One of a set of threads dedicated to a specific task.  Uses the
145        thread_pool class above for coordination.
146        """
147        def __init__(self, group=None, target=None, name=None, args=(), 
148                kwargs={}, pdata=None, trace_file=None):
149            Thread.__init__(self, group, target, name, args, kwargs)
150            self.rv = None          # Return value of the ops in this thread
151            self.exception = None   # Exception that terminated this thread
152            self.target=target      # Target function to run on start()
153            self.args = args        # Args to pass to target
154            self.kwargs = kwargs    # Additional kw args
155            self.pdata = pdata      # thread_pool for this class
156            # Logger for this thread
157            self.log = logging.getLogger("fedd.experiment_control")
158       
159        def run(self):
160            """
161            Emulate Thread.run, except add pool data manipulation and error
162            logging.
163            """
164            if self.pdata:
165                self.pdata.start()
166
167            if self.target:
168                try:
169                    self.rv = self.target(*self.args, **self.kwargs)
170                except service_error, s:
171                    self.exception = s
172                    self.log.error("Thread exception: %s %s" % \
173                            (s.code_string(), s.desc))
174                except:
175                    self.exception = sys.exc_info()[1]
176                    self.log.error(("Unexpected thread exception: %s" +\
177                            "Trace %s") % (self.exception,\
178                                traceback.format_exc()))
179            if self.pdata:
180                self.pdata.terminate()
181
182    call_RequestAccess = service_caller('RequestAccess')
183    call_ReleaseAccess = service_caller('ReleaseAccess')
184    call_StartSegment = service_caller('StartSegment')
185    call_TerminateSegment = service_caller('TerminateSegment')
186    call_Ns2Topdl = service_caller('Ns2Topdl')
187
188    def __init__(self, config=None, auth=None):
189        """
190        Intialize the various attributes, most from the config object
191        """
192
193        def parse_tarfile_list(tf):
194            """
195            Parse a tarfile list from the configuration.  This is a set of
196            paths and tarfiles separated by spaces.
197            """
198            rv = [ ]
199            if tf is not None:
200                tl = tf.split()
201                while len(tl) > 1:
202                    p, t = tl[0:2]
203                    del tl[0:2]
204                    rv.append((p, t))
205            return rv
206
207        self.thread_with_rv = experiment_control_local.pooled_thread
208        self.thread_pool = experiment_control_local.thread_pool
209        self.list_log = list_log.list_log
210
211        self.cert_file = config.get("experiment_control", "cert_file")
212        if self.cert_file:
213            self.cert_pwd = config.get("experiment_control", "cert_pwd")
214        else:
215            self.cert_file = config.get("globals", "cert_file")
216            self.cert_pwd = config.get("globals", "cert_pwd")
217
218        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
219                or config.get("globals", "trusted_certs")
220
221        self.repodir = config.get("experiment_control", "repodir")
222        self.repo_url = config.get("experiment_control", "repo_url", 
223                "https://users.isi.deterlab.net:23235");
224
225        self.exp_stem = "fed-stem"
226        self.log = logging.getLogger("fedd.experiment_control")
227        set_log_level(config, "experiment_control", self.log)
228        self.muxmax = 2
229        self.nthreads = 10
230        self.randomize_experiments = False
231
232        self.splitter = None
233        self.ssh_keygen = "/usr/bin/ssh-keygen"
234        self.ssh_identity_file = None
235
236
237        self.debug = config.getboolean("experiment_control", "create_debug")
238        self.cleanup = not config.getboolean("experiment_control", 
239                "leave_tmpfiles")
240        self.state_filename = config.get("experiment_control", 
241                "experiment_state")
242        self.store_filename = config.get("experiment_control", 
243                "synch_store")
244        self.store_url = config.get("experiment_control", "store_url")
245        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
246        self.fedkit = parse_tarfile_list(\
247                config.get("experiment_control", "fedkit"))
248        self.gatewaykit = parse_tarfile_list(\
249                config.get("experiment_control", "gatewaykit"))
250        accessdb_file = config.get("experiment_control", "accessdb")
251
252        self.ssh_pubkey_file = config.get("experiment_control", 
253                "ssh_pubkey_file")
254        self.ssh_privkey_file = config.get("experiment_control",
255                "ssh_privkey_file")
256        # NB for internal master/slave ops, not experiment setup
257        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
258
259        self.overrides = set([])
260        ovr = config.get('experiment_control', 'overrides')
261        if ovr:
262            for o in ovr.split(","):
263                o = o.strip()
264                if o.startswith('fedid:'): o = o[len('fedid:'):]
265                self.overrides.add(fedid(hexstr=o))
266
267        self.state = { }
268        self.state_lock = Lock()
269        self.tclsh = "/usr/local/bin/otclsh"
270        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
271                config.get("experiment_control", "tcl_splitter",
272                        "/usr/testbed/lib/ns2ir/parse.tcl")
273        mapdb_file = config.get("experiment_control", "mapdb")
274        self.trace_file = sys.stderr
275
276        self.def_expstart = \
277                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
278                "/tmp/federate";
279        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
280                "FEDDIR/hosts";
281        self.def_gwstart = \
282                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
283                "/tmp/bridge.log";
284        self.def_mgwstart = \
285                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
286                "/tmp/bridge.log";
287        self.def_gwimage = "FBSD61-TUNNEL2";
288        self.def_gwtype = "pc";
289        self.local_access = { }
290
291        if auth:
292            self.auth = auth
293        else:
294            self.log.error(\
295                    "[access]: No authorizer initialized, creating local one.")
296            auth = authorizer()
297
298
299        if self.ssh_pubkey_file:
300            try:
301                f = open(self.ssh_pubkey_file, 'r')
302                self.ssh_pubkey = f.read()
303                f.close()
304            except IOError:
305                raise service_error(service_error.internal,
306                        "Cannot read sshpubkey")
307        else:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311        if not self.ssh_privkey_file:
312            raise service_error(service_error.internal, 
313                    "No SSH public key file?")
314
315
316        if mapdb_file:
317            self.read_mapdb(mapdb_file)
318        else:
319            self.log.warn("[experiment_control] No testbed map, using defaults")
320            self.tbmap = { 
321                    'deter':'https://users.isi.deterlab.net:23235',
322                    'emulab':'https://users.isi.deterlab.net:23236',
323                    'ucb':'https://users.isi.deterlab.net:23237',
324                    }
325
326        if accessdb_file:
327                self.read_accessdb(accessdb_file)
328        else:
329            raise service_error(service_error.internal,
330                    "No accessdb specified in config")
331
332        # Grab saved state.  OK to do this w/o locking because it's read only
333        # and only one thread should be in existence that can see self.state at
334        # this point.
335        if self.state_filename:
336            self.read_state()
337
338        if self.store_filename:
339            self.read_store()
340        else:
341            self.log.warning("No saved synch store")
342            self.synch_store = synch_store
343
344        # Dispatch tables
345        self.soap_services = {\
346                'New': soap_handler('New', self.new_experiment),
347                'Create': soap_handler('Create', self.create_experiment),
348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
352                'Terminate': soap_handler('Terminate', 
353                    self.terminate_experiment),
354                'GetValue': soap_handler('GetValue', self.GetValue),
355                'SetValue': soap_handler('SetValue', self.SetValue),
356        }
357
358        self.xmlrpc_services = {\
359                'New': xmlrpc_handler('New', self.new_experiment),
360                'Create': xmlrpc_handler('Create', self.create_experiment),
361                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
362                'Vis': xmlrpc_handler('Vis', self.get_vis),
363                'Info': xmlrpc_handler('Info', self.get_info),
364                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
365                'Terminate': xmlrpc_handler('Terminate',
366                    self.terminate_experiment),
367                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
368                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
369        }
370
371    # Call while holding self.state_lock
372    def write_state(self):
373        """
374        Write a new copy of experiment state after copying the existing state
375        to a backup.
376
377        State format is a simple pickling of the state dictionary.
378        """
379        if os.access(self.state_filename, os.W_OK):
380            copy_file(self.state_filename, \
381                    "%s.bak" % self.state_filename)
382        try:
383            f = open(self.state_filename, 'w')
384            pickle.dump(self.state, f)
385        except IOError, e:
386            self.log.error("Can't write file %s: %s" % \
387                    (self.state_filename, e))
388        except pickle.PicklingError, e:
389            self.log.error("Pickling problem: %s" % e)
390        except TypeError, e:
391            self.log.error("Pickling problem (TypeError): %s" % e)
392
393    @staticmethod
394    def get_alloc_ids(state):
395        """
396        Pull the fedids of the identifiers of each allocation from the
397        state.  Again, a dict dive that's best isolated.
398
399        Used by read_store and read state
400        """
401
402        return [ f['allocID']['fedid'] 
403                for f in state.get('federant',[]) \
404                    if f.has_key('allocID') and \
405                        f['allocID'].has_key('fedid')]
406
407    # Call while holding self.state_lock
408    def read_state(self):
409        """
410        Read a new copy of experiment state.  Old state is overwritten.
411
412        State format is a simple pickling of the state dictionary.
413        """
414       
415        def get_experiment_id(state):
416            """
417            Pull the fedid experimentID out of the saved state.  This is kind
418            of a gross walk through the dict.
419            """
420
421            if state.has_key('experimentID'):
422                for e in state['experimentID']:
423                    if e.has_key('fedid'):
424                        return e['fedid']
425                else:
426                    return None
427            else:
428                return None
429
430        try:
431            f = open(self.state_filename, "r")
432            self.state = pickle.load(f)
433            self.log.debug("[read_state]: Read state from %s" % \
434                    self.state_filename)
435        except IOError, e:
436            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
437                    % (self.state_filename, e))
438        except pickle.UnpicklingError, e:
439            self.log.warning(("[read_state]: No saved state: " + \
440                    "Unpickling failed: %s") % e)
441       
442        for s in self.state.values():
443            try:
444
445                eid = get_experiment_id(s)
446                if eid : 
447                    # Give the owner rights to the experiment
448                    self.auth.set_attribute(s['owner'], eid)
449                    # And holders of the eid as well
450                    self.auth.set_attribute(eid, eid)
451                    # allow overrides to control experiments as well
452                    for o in self.overrides:
453                        self.auth.set_attribute(o, eid)
454                    # Set permissions to allow reading of the software repo, if
455                    # any, as well.
456                    for a in self.get_alloc_ids(s):
457                        self.auth.set_attribute(a, 'repo/%s' % eid)
458                else:
459                    raise KeyError("No experiment id")
460            except KeyError, e:
461                self.log.warning("[read_state]: State ownership or identity " +\
462                        "misformatted in %s: %s" % (self.state_filename, e))
463
464
465    def read_accessdb(self, accessdb_file):
466        """
467        Read the mapping from fedids that can create experiments to their name
468        in the 3-level access namespace.  All will be asserted from this
469        testbed and can include the local username and porject that will be
470        asserted on their behalf by this fedd.  Each fedid is also added to the
471        authorization system with the "create" attribute.
472        """
473        self.accessdb = {}
474        # These are the regexps for parsing the db
475        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
476        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
477                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
478        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
479                "\s*->\s*(" + name_expr + ")\s*$")
480        lineno = 0
481
482        # Parse the mappings and store in self.authdb, a dict of
483        # fedid -> (proj, user)
484        try:
485            f = open(accessdb_file, "r")
486            for line in f:
487                lineno += 1
488                line = line.strip()
489                if len(line) == 0 or line.startswith('#'):
490                    continue
491                m = project_line.match(line)
492                if m:
493                    fid = fedid(hexstr=m.group(1))
494                    project, user = m.group(2,3)
495                    if not self.accessdb.has_key(fid):
496                        self.accessdb[fid] = []
497                    self.accessdb[fid].append((project, user))
498                    continue
499
500                m = user_line.match(line)
501                if m:
502                    fid = fedid(hexstr=m.group(1))
503                    project = None
504                    user = m.group(2)
505                    if not self.accessdb.has_key(fid):
506                        self.accessdb[fid] = []
507                    self.accessdb[fid].append((project, user))
508                    continue
509                self.log.warn("[experiment_control] Error parsing access " +\
510                        "db %s at line %d" %  (accessdb_file, lineno))
511        except IOError:
512            raise service_error(service_error.internal,
513                    "Error opening/reading %s as experiment " +\
514                            "control accessdb" %  accessdb_file)
515        f.close()
516
517        # Initialize the authorization attributes
518        for fid in self.accessdb.keys():
519            self.auth.set_attribute(fid, 'create')
520            self.auth.set_attribute(fid, 'new')
521
522    def read_mapdb(self, file):
523        """
524        Read a simple colon separated list of mappings for the
525        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
526        """
527
528        self.tbmap = { }
529        lineno =0
530        try:
531            f = open(file, "r")
532            for line in f:
533                lineno += 1
534                line = line.strip()
535                if line.startswith('#') or len(line) == 0:
536                    continue
537                try:
538                    label, url = line.split(':', 1)
539                    self.tbmap[label] = url
540                except ValueError, e:
541                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
542                            "map db: %s %s" % (lineno, line, e))
543        except IOError, e:
544            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
545                    "open %s: %s" % (file, e))
546        f.close()
547
548    def read_store(self):
549        try:
550            self.synch_store = synch_store()
551            self.synch_store.load(self.store_filename)
552            self.log.debug("[read_store]: Read store from %s" % \
553                    self.store_filename)
554        except IOError, e:
555            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
556                    % (self.state_filename, e))
557            self.synch_store = synch_store()
558
559        # Set the initial permissions on data in the store.  XXX: This ad hoc
560        # authorization attribute initialization is getting out of hand.
561        for k in self.synch_store.all_keys():
562            try:
563                if k.startswith('fedid:'):
564                    fid = fedid(hexstr=k[6:46])
565                    if self.state.has_key(fid):
566                        for a in self.get_alloc_ids(self.state[fid]):
567                            self.auth.set_attribute(a, k)
568            except ValueError, e:
569                self.log.warn("Cannot deduce permissions for %s" % k)
570
571
572    def write_store(self):
573        """
574        Write a new copy of synch_store after writing current state
575        to a backup.  We use the internal synch_store pickle method to avoid
576        incinsistent data.
577
578        State format is a simple pickling of the store.
579        """
580        if os.access(self.store_filename, os.W_OK):
581            copy_file(self.store_filename, \
582                    "%s.bak" % self.store_filename)
583        try:
584            self.synch_store.save(self.store_filename)
585        except IOError, e:
586            self.log.error("Can't write file %s: %s" % \
587                    (self.store_filename, e))
588        except TypeError, e:
589            self.log.error("Pickling problem (TypeError): %s" % e)
590
591       
592    def generate_ssh_keys(self, dest, type="rsa" ):
593        """
594        Generate a set of keys for the gateways to use to talk.
595
596        Keys are of type type and are stored in the required dest file.
597        """
598        valid_types = ("rsa", "dsa")
599        t = type.lower();
600        if t not in valid_types: raise ValueError
601        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
602
603        try:
604            trace = open("/dev/null", "w")
605        except IOError:
606            raise service_error(service_error.internal,
607                    "Cannot open /dev/null??");
608
609        # May raise CalledProcessError
610        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
611        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
612        if rv != 0:
613            raise service_error(service_error.internal, 
614                    "Cannot generate nonce ssh keys.  %s return code %d" \
615                            % (self.ssh_keygen, rv))
616
617    def gentopo(self, str):
618        """
619        Generate the topology dtat structure from the splitter's XML
620        representation of it.
621
622        The topology XML looks like:
623            <experiment>
624                <nodes>
625                    <node><vname></vname><ips>ip1:ip2</ips></node>
626                </nodes>
627                <lans>
628                    <lan>
629                        <vname></vname><vnode></vnode><ip></ip>
630                        <bandwidth></bandwidth><member>node:port</member>
631                    </lan>
632                </lans>
633        """
634        class topo_parse:
635            """
636            Parse the topology XML and create the dats structure.
637            """
638            def __init__(self):
639                # Typing of the subelements for data conversion
640                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
641                self.int_subelements = ( 'bandwidth',)
642                self.float_subelements = ( 'delay',)
643                # The final data structure
644                self.nodes = [ ]
645                self.lans =  [ ]
646                self.topo = { \
647                        'node': self.nodes,\
648                        'lan' : self.lans,\
649                    }
650                self.element = { }  # Current element being created
651                self.chars = ""     # Last text seen
652
653            def end_element(self, name):
654                # After each sub element the contents is added to the current
655                # element or to the appropriate list.
656                if name == 'node':
657                    self.nodes.append(self.element)
658                    self.element = { }
659                elif name == 'lan':
660                    self.lans.append(self.element)
661                    self.element = { }
662                elif name in self.str_subelements:
663                    self.element[name] = self.chars
664                    self.chars = ""
665                elif name in self.int_subelements:
666                    self.element[name] = int(self.chars)
667                    self.chars = ""
668                elif name in self.float_subelements:
669                    self.element[name] = float(self.chars)
670                    self.chars = ""
671
672            def found_chars(self, data):
673                self.chars += data.rstrip()
674
675
676        tp = topo_parse();
677        parser = xml.parsers.expat.ParserCreate()
678        parser.EndElementHandler = tp.end_element
679        parser.CharacterDataHandler = tp.found_chars
680
681        parser.Parse(str)
682
683        return tp.topo
684       
685
686    def genviz(self, topo):
687        """
688        Generate the visualization the virtual topology
689        """
690
691        neato = "/usr/local/bin/neato"
692        # These are used to parse neato output and to create the visualization
693        # file.
694        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
695        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
696                "%s</type></node>"
697
698        try:
699            # Node names
700            nodes = [ n['vname'] for n in topo['node'] ]
701            topo_lans = topo['lan']
702        except KeyError, e:
703            raise service_error(service_error.internal, "Bad topology: %s" %e)
704
705        lans = { }
706        links = { }
707
708        # Walk through the virtual topology, organizing the connections into
709        # 2-node connections (links) and more-than-2-node connections (lans).
710        # When a lan is created, it's added to the list of nodes (there's a
711        # node in the visualization for the lan).
712        for l in topo_lans:
713            if links.has_key(l['vname']):
714                if len(links[l['vname']]) < 2:
715                    links[l['vname']].append(l['vnode'])
716                else:
717                    nodes.append(l['vname'])
718                    lans[l['vname']] = links[l['vname']]
719                    del links[l['vname']]
720                    lans[l['vname']].append(l['vnode'])
721            elif lans.has_key(l['vname']):
722                lans[l['vname']].append(l['vnode'])
723            else:
724                links[l['vname']] = [ l['vnode'] ]
725
726
727        # Open up a temporary file for dot to turn into a visualization
728        try:
729            df, dotname = tempfile.mkstemp()
730            dotfile = os.fdopen(df, 'w')
731        except IOError:
732            raise service_error(service_error.internal,
733                    "Failed to open file in genviz")
734
735        try:
736            dnull = open('/dev/null', 'w')
737        except IOError:
738            service_error(service_error.internal,
739                    "Failed to open /dev/null in genviz")
740
741        # Generate a dot/neato input file from the links, nodes and lans
742        try:
743            print >>dotfile, "graph G {"
744            for n in nodes:
745                print >>dotfile, '\t"%s"' % n
746            for l in links.keys():
747                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
748            for l in lans.keys():
749                for n in lans[l]:
750                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
751            print >>dotfile, "}"
752            dotfile.close()
753        except TypeError:
754            raise service_error(service_error.internal,
755                    "Single endpoint link in vtopo")
756        except IOError:
757            raise service_error(service_error.internal, "Cannot write dot file")
758
759        # Use dot to create a visualization
760        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
761                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
762                close_fds=True)
763        dnull.close()
764
765        # Translate dot to vis format
766        vis_nodes = [ ]
767        vis = { 'node': vis_nodes }
768        for line in dot.stdout:
769            m = vis_re.match(line)
770            if m:
771                vn = m.group(1)
772                vis_node = {'name': vn, \
773                        'x': float(m.group(2)),\
774                        'y' : float(m.group(3)),\
775                    }
776                if vn in links.keys() or vn in lans.keys():
777                    vis_node['type'] = 'lan'
778                else:
779                    vis_node['type'] = 'node'
780                vis_nodes.append(vis_node)
781        rv = dot.wait()
782
783        os.remove(dotname)
784        if rv == 0 : return vis
785        else: return None
786
787    def get_access(self, tb, nodes, tbparam, master, export_project,
788            access_user, services):
789        """
790        Get access to testbed through fedd and set the parameters for that tb
791        """
792        uri = self.tbmap.get(tb, None)
793        if not uri:
794            raise service_error(service_error.server_config, 
795                    "Unknown testbed: %s" % tb)
796
797        # Tweak search order so that if there are entries in access_user that
798        # have a project matching the export project, we try them first
799        if export_project: 
800            access_sequence = [ (p, u) for p, u in access_user \
801                    if p == export_project] 
802            access_sequence.extend([(p, u) for p, u in access_user \
803                    if p != export_project]) 
804        else: 
805            access_sequence = access_user
806
807        for p, u in access_sequence: 
808            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
809                    "to %s") %  ((p or "None"), u, uri))
810
811            if p:
812                # Request with user and project specified
813                req = {\
814                        'destinationTestbed' : { 'uri' : uri },
815                        'credential': [ "project: %s" % p, "user: %s"  % u],
816                        'allocID' : { 'localname': 'test' },
817                    }
818            else:
819                # Request with only user specified
820                req = {\
821                        'destinationTestbed' : { 'uri' : uri },
822                        'credential': [ 'user: %s' % u ],
823                        'user':  [ {'userID': { 'localname': u } } ],
824                        'allocID' : { 'localname': 'test' },
825                    }
826
827            # If there is a master, and this is it, ask it to export services
828            # XXX move this to export pseudo-service
829            if tb == master:
830                req['service'] = [
831                        { 'name': 'userconfig', 'visibility': 'export'},
832                        { 'name': 'SMB', 'visibility': 'export'},
833                        { 'name': 'seer', 'visibility': 'export'},
834                        { 'name': 'tmcd', 'visibility': 'export'},
835                    ]
836
837            # node resources if any
838            if nodes != None and len(nodes) > 0:
839                rnodes = [ ]
840                for n in nodes:
841                    rn = { }
842                    image, hw, count = n.split(":")
843                    if image: rn['image'] = [ image ]
844                    if hw: rn['hardware'] = [ hw ]
845                    if count and int(count) >0 : rn['count'] = int(count)
846                    rnodes.append(rn)
847                req['resources']= { }
848                req['resources']['node'] = rnodes
849
850            try:
851                if self.local_access.has_key(uri):
852                    # Local access call
853                    req = { 'RequestAccessRequestBody' : req }
854                    r = self.local_access[uri].RequestAccess(req, 
855                            fedid(file=self.cert_file))
856                    r = { 'RequestAccessResponseBody' : r }
857                else:
858                    r = self.call_RequestAccess(uri, req, 
859                            self.cert_file, self.cert_pwd, self.trusted_certs)
860            except service_error, e:
861                if e.code == service_error.access:
862                    self.log.debug("[get_access] Access denied")
863                    r = None
864                    continue
865                else:
866                    raise e
867
868            if r.has_key('RequestAccessResponseBody'):
869                # Through to here we have a valid response, not a fault.
870                # Access denied is a fault, so something better or worse than
871                # access denied has happened.
872                r = r['RequestAccessResponseBody']
873                self.log.debug("[get_access] Access granted")
874                break
875            else:
876                raise service_error(service_error.protocol,
877                        "Bad proxy response")
878       
879        if not r:
880            raise service_error(service_error.access, 
881                    "Access denied by %s (%s)" % (tb, uri))
882
883        tbparam[tb] = { 
884                "allocID" : r['allocID'],
885                "uri": uri,
886                }
887        if 'service' in r: 
888            services.extend(r['service'])
889
890        # Add attributes to parameter space.  We don't allow attributes to
891        # overlay any parameters already installed.
892        for a in r.get('fedAttr', []):
893            try:
894                if a['attribute'] and \
895                        isinstance(a['attribute'], basestring)\
896                        and not tbparam[tb].has_key(a['attribute'].lower()):
897                    tbparam[tb][a['attribute'].lower()] = a['value']
898            except KeyError:
899                self.log.error("Bad attribute in response: %s" % a)
900
901    def release_access(self, tb, aid, uri=None):
902        """
903        Release access to testbed through fedd
904        """
905
906        if not uri:
907            uri = self.tbmap.get(tb, None)
908        if not uri:
909            raise service_error(service_error.server_config, 
910                    "Unknown testbed: %s" % tb)
911
912        if self.local_access.has_key(uri):
913            resp = self.local_access[uri].ReleaseAccess(\
914                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
915                    fedid(file=self.cert_file))
916            resp = { 'ReleaseAccessResponseBody': resp } 
917        else:
918            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
919                    self.cert_file, self.cert_pwd, self.trusted_certs)
920
921        # better error coding
922
923    def remote_ns2topdl(self, uri, desc):
924
925        req = {
926                'description' : { 'ns2description': desc },
927            }
928
929        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
930                self.trusted_certs)
931
932        if r.has_key('Ns2TopdlResponseBody'):
933            r = r['Ns2TopdlResponseBody']
934            ed = r.get('experimentdescription', None)
935            if ed.has_key('topdldescription'):
936                return topdl.Topology(**ed['topdldescription'])
937            else:
938                raise service_error(service_error.protocol, 
939                        "Bad splitter response (no output)")
940        else:
941            raise service_error(service_error.protocol, "Bad splitter response")
942
943    class start_segment:
944        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
945                cert_pwd=None, trusted_certs=None, caller=None,
946                log_collector=None):
947            self.log = log
948            self.debug = debug
949            self.cert_file = cert_file
950            self.cert_pwd = cert_pwd
951            self.trusted_certs = None
952            self.caller = caller
953            self.testbed = testbed
954            self.log_collector = log_collector
955            self.response = None
956
957        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
958                services=None):
959            req = {
960                    'allocID': { 'fedid' : aid }, 
961                    'segmentdescription': { 
962                        'topdldescription': topo.to_dict(),
963                    },
964                    'master': master,
965                }
966
967            if connInfo:
968                req['connection'] = connInfo
969            # Add services to request.  The master exports, everyone else
970            # imports.
971            if services:
972                svcs = [ x.copy() for x in services]
973                for s in svcs:
974                    if master: s['visibility'] = 'export'
975                    else: s['visibility'] = 'import'
976                req['service'] = svcs
977            if attrs:
978                req['fedAttr'] = attrs
979
980            try:
981                self.log.debug("Calling StartSegment at %s " % uri)
982                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
983                        self.trusted_certs)
984                if r.has_key('StartSegmentResponseBody'):
985                    lval = r['StartSegmentResponseBody'].get('allocationLog',
986                            None)
987                    if lval and self.log_collector:
988                        for line in  lval.splitlines(True):
989                            self.log_collector.write(line)
990                    self.response = r
991                else:
992                    raise service_error(service_error.internal, 
993                            "Bad response!?: %s" %r)
994                return True
995            except service_error, e:
996                self.log.error("Start segment failed on %s: %s" % \
997                        (self.testbed, e))
998                return False
999
1000
1001
1002    class terminate_segment:
1003        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1004                cert_pwd=None, trusted_certs=None, caller=None):
1005            self.log = log
1006            self.debug = debug
1007            self.cert_file = cert_file
1008            self.cert_pwd = cert_pwd
1009            self.trusted_certs = None
1010            self.caller = caller
1011            self.testbed = testbed
1012
1013        def __call__(self, uri, aid ):
1014            req = {
1015                    'allocID': aid , 
1016                }
1017            try:
1018                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1019                        self.trusted_certs)
1020                return True
1021            except service_error, e:
1022                self.log.error("Terminate segment failed on %s: %s" % \
1023                        (self.testbed, e))
1024                return False
1025   
1026
1027    def allocate_resources(self, allocated, master, eid, expid, 
1028            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
1029            attrs=None, connInfo={}, services=[]):
1030
1031        started = { }           # Testbeds where a sub-experiment started
1032                                # successfully
1033
1034        # XXX
1035        fail_soft = False
1036
1037        log = alloc_log or self.log
1038
1039        thread_pool = self.thread_pool(self.nthreads)
1040        threads = [ ]
1041
1042        for tb in allocated.keys():
1043            # Create and start a thread to start the segment, and save it
1044            # to get the return value later
1045            thread_pool.wait_for_slot()
1046            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1047            if not uri:
1048                raise service_error(service_error.internal, 
1049                        "Unknown testbed %s !?" % tb)
1050
1051            if tbparams[tb].has_key('allocID') and \
1052                    tbparams[tb]['allocID'].has_key('fedid'):
1053                aid = tbparams[tb]['allocID']['fedid']
1054            else:
1055                raise service_error(service_error.internal, 
1056                        "No alloc id for testbed %s !?" % tb)
1057
1058            t  = self.pooled_thread(\
1059                    target=self.start_segment(log=log, debug=self.debug,
1060                        testbed=tb, cert_file=self.cert_file,
1061                        cert_pwd=self.cert_pwd,
1062                        trusted_certs=self.trusted_certs,
1063                        caller=self.call_StartSegment,
1064                        log_collector=log_collector), 
1065                    args=(uri, aid, topo[tb], tb == master, 
1066                        attrs, connInfo[tb], services),
1067                    name=tb,
1068                    pdata=thread_pool, trace_file=self.trace_file)
1069            threads.append(t)
1070            t.start()
1071
1072        # Wait until all finish (keep pinging the log, though)
1073        mins = 0
1074        revoked = False
1075        while not thread_pool.wait_for_all_done(60.0):
1076            mins += 1
1077            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1078                    % mins)
1079            if not revoked and \
1080                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1081                # a testbed has failed.  Revoke this experiment's
1082                # synchronizarion values so that sub experiments will not
1083                # deadlock waiting for synchronization that will never happen
1084                self.log.info("A subexperiment has failed to swap in, " + \
1085                        "revoking synch keys")
1086                var_key = "fedid:%s" % expid
1087                for k in self.synch_store.all_keys():
1088                    if len(k) > 45 and k[0:46] == var_key:
1089                        self.synch_store.revoke_key(k)
1090                revoked = True
1091
1092        failed = [ t.getName() for t in threads if not t.rv ]
1093        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1094
1095        # If one failed clean up, unless fail_soft is set
1096        if failed:
1097            if not fail_soft:
1098                thread_pool.clear()
1099                for tb in succeeded:
1100                    # Create and start a thread to stop the segment
1101                    thread_pool.wait_for_slot()
1102                    uri = tbparams[tb]['uri']
1103                    t  = self.pooled_thread(\
1104                            target=self.terminate_segment(log=log,
1105                                testbed=tb,
1106                                cert_file=self.cert_file, 
1107                                cert_pwd=self.cert_pwd,
1108                                trusted_certs=self.trusted_certs,
1109                                caller=self.call_TerminateSegment),
1110                            args=(uri, tbparams[tb]['federant']['allocID']),
1111                            name=tb,
1112                            pdata=thread_pool, trace_file=self.trace_file)
1113                    t.start()
1114                # Wait until all finish (if any are being stopped)
1115                if succeeded:
1116                    thread_pool.wait_for_all_done()
1117
1118                # release the allocations
1119                for tb in tbparams.keys():
1120                    self.release_access(tb, tbparams[tb]['allocID'],
1121                            tbparams[tb].get('uri', None))
1122                # Remove the placeholder
1123                self.state_lock.acquire()
1124                self.state[eid]['experimentStatus'] = 'failed'
1125                if self.state_filename: self.write_state()
1126                self.state_lock.release()
1127
1128                log.error("Swap in failed on %s" % ",".join(failed))
1129                return
1130        else:
1131            log.info("[start_segment]: Experiment %s active" % eid)
1132
1133
1134        # Walk up tmpdir, deleting as we go
1135        if self.cleanup:
1136            log.debug("[start_experiment]: removing %s" % tmpdir)
1137            for path, dirs, files in os.walk(tmpdir, topdown=False):
1138                for f in files:
1139                    os.remove(os.path.join(path, f))
1140                for d in dirs:
1141                    os.rmdir(os.path.join(path, d))
1142            os.rmdir(tmpdir)
1143        else:
1144            log.debug("[start_experiment]: not removing %s" % tmpdir)
1145
1146        # Insert the experiment into our state and update the disk copy
1147        self.state_lock.acquire()
1148        self.state[expid]['experimentStatus'] = 'active'
1149        self.state[eid] = self.state[expid]
1150        if self.state_filename: self.write_state()
1151        self.state_lock.release()
1152        return
1153
1154
1155    def add_kit(self, e, kit):
1156        """
1157        Add a Software object created from the list of (install, location)
1158        tuples passed as kit  to the software attribute of an object e.  We
1159        do this enough to break out the code, but it's kind of a hack to
1160        avoid changing the old tuple rep.
1161        """
1162
1163        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1164
1165        if isinstance(e.software, list): e.software.extend(s)
1166        else: e.software = s
1167
1168
1169    def create_experiment_state(self, fid, req, expid, expcert, 
1170            state='starting'):
1171        """
1172        Create the initial entry in the experiment's state.  The expid and
1173        expcert are the experiment's fedid and certifacte that represents that
1174        ID, which are installed in the experiment state.  If the request
1175        includes a suggested local name that is used if possible.  If the local
1176        name is already taken by an experiment owned by this user that has
1177        failed, it is overwritten.  Otherwise new letters are added until a
1178        valid localname is found.  The generated local name is returned.
1179        """
1180
1181        if req.has_key('experimentID') and \
1182                req['experimentID'].has_key('localname'):
1183            overwrite = False
1184            eid = req['experimentID']['localname']
1185            # If there's an old failed experiment here with the same local name
1186            # and accessible by this user, we'll overwrite it, otherwise we'll
1187            # fall through and do the collision avoidance.
1188            old_expid = self.get_experiment_fedid(eid)
1189            if old_expid and self.check_experiment_access(fid, old_expid):
1190                self.state_lock.acquire()
1191                status = self.state[eid].get('experimentStatus', None)
1192                if status and status == 'failed':
1193                    # remove the old access attribute
1194                    self.auth.unset_attribute(fid, old_expid)
1195                    overwrite = True
1196                    del self.state[eid]
1197                    del self.state[old_expid]
1198                self.state_lock.release()
1199            self.state_lock.acquire()
1200            while (self.state.has_key(eid) and not overwrite):
1201                eid += random.choice(string.ascii_letters)
1202            # Initial state
1203            self.state[eid] = {
1204                    'experimentID' : \
1205                            [ { 'localname' : eid }, {'fedid': expid } ],
1206                    'experimentStatus': state,
1207                    'experimentAccess': { 'X509' : expcert },
1208                    'owner': fid,
1209                    'log' : [],
1210                }
1211            self.state[expid] = self.state[eid]
1212            if self.state_filename: self.write_state()
1213            self.state_lock.release()
1214        else:
1215            eid = self.exp_stem
1216            for i in range(0,5):
1217                eid += random.choice(string.ascii_letters)
1218            self.state_lock.acquire()
1219            while (self.state.has_key(eid)):
1220                eid = self.exp_stem
1221                for i in range(0,5):
1222                    eid += random.choice(string.ascii_letters)
1223            # Initial state
1224            self.state[eid] = {
1225                    'experimentID' : \
1226                            [ { 'localname' : eid }, {'fedid': expid } ],
1227                    'experimentStatus': state,
1228                    'experimentAccess': { 'X509' : expcert },
1229                    'owner': fid,
1230                    'log' : [],
1231                }
1232            self.state[expid] = self.state[eid]
1233            if self.state_filename: self.write_state()
1234            self.state_lock.release()
1235
1236        return eid
1237
1238
1239    def allocate_ips_to_topo(self, top):
1240        """
1241        Add an ip4_address attribute to all the hosts in the topology, based on
1242        the shared substrates on which they sit.  An /etc/hosts file is also
1243        created and returned as a list of hostfiles entries.  We also return
1244        the allocator, because we may need to allocate IPs to portals
1245        (specifically DRAGON portals).
1246        """
1247        subs = sorted(top.substrates, 
1248                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1249                reverse=True)
1250        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1251        ifs = { }
1252        hosts = [ ]
1253
1254        for idx, s in enumerate(subs):
1255            net_size = len(s.interfaces)+2
1256
1257            a = ips.allocate(net_size)
1258            if a :
1259                base, num = a
1260                if num < net_size: 
1261                    raise service_error(service_error.internal,
1262                            "Allocator returned wrong number of IPs??")
1263            else:
1264                raise service_error(service_error.req, 
1265                        "Cannot allocate IP addresses")
1266            mask = ips.min_alloc
1267            while mask < net_size:
1268                mask *= 2
1269
1270            netmask = ((2**32-1) ^ (mask-1))
1271
1272            base += 1
1273            for i in s.interfaces:
1274                i.attribute.append(
1275                        topdl.Attribute('ip4_address', 
1276                            "%s" % ip_addr(base)))
1277                i.attribute.append(
1278                        topdl.Attribute('ip4_netmask', 
1279                            "%s" % ip_addr(int(netmask))))
1280
1281                hname = i.element.name[0]
1282                if ifs.has_key(hname):
1283                    hosts.append("%s\t%s-%s %s-%d" % \
1284                            (ip_addr(base), hname, s.name, hname,
1285                                ifs[hname]))
1286                else:
1287                    ifs[hname] = 0
1288                    hosts.append("%s\t%s-%s %s-%d %s" % \
1289                            (ip_addr(base), hname, s.name, hname,
1290                                ifs[hname], hname))
1291
1292                ifs[hname] += 1
1293                base += 1
1294        return hosts, ips
1295
1296    def get_access_to_testbeds(self, testbeds, access_user, 
1297            export_project, master, allocated, tbparams, services):
1298        """
1299        Request access to the various testbeds required for this instantiation
1300        (passed in as testbeds).  User, access_user, expoert_project and master
1301        are used to construct the correct requests.  Per-testbed parameters are
1302        returned in tbparams.
1303        """
1304        for tb in testbeds:
1305            self.get_access(tb, None, tbparams, master,
1306                    export_project, access_user, services)
1307            allocated[tb] = 1
1308
1309    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1310        """
1311        Create the sub-topologies that are needed for experiment instantiation.
1312        """
1313        for tb in testbeds:
1314            topo[tb] = top.clone()
1315            to_delete = [ ]
1316            # XXX: copy in for loop to simplify
1317            for e in topo[tb].elements:
1318                etb = e.get_attribute('testbed')
1319                if etb and etb != tb:
1320                    for i in e.interface:
1321                        for s in i.subs:
1322                            try:
1323                                s.interfaces.remove(i)
1324                            except ValueError:
1325                                raise service_error(service_error.internal,
1326                                        "Can't remove interface??")
1327                    to_delete.append(e)
1328            for e in to_delete:
1329                topo[tb].elements.remove(e)
1330            topo[tb].make_indices()
1331
1332    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1333            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
1334            expid=None):
1335        """
1336        Return a new internet portal node and a dict with the connectionInfo to
1337        be attached.
1338        """
1339        dproject = tbparams[dt].get('project', 'project')
1340        ddomain = tbparams[dt].get('domain', ".example.com")
1341        mdomain = tbparams[master].get('domain', '.example.com')
1342        mproject = tbparams[master].get('project', 'project')
1343        muser = tbparams[master].get('user', 'root')
1344        smbshare = tbparams[master].get('smbshare', 'USERS')
1345
1346        if st == master or dt == master:
1347            active = ("%s" % (st == master))
1348        else:
1349            active = ("%s" % (st > dt))
1350
1351        ifaces = [ ]
1352        for sub, attrs in iface_desc:
1353            inf = topdl.Interface(
1354                    name="inf%03d" % len(ifaces),
1355                    substrate=sub,
1356                    attribute=[
1357                        topdl.Attribute(
1358                            attribute=n,
1359                            value = v)
1360                        for n, v in attrs
1361                        ]
1362                    )
1363            ifaces.append(inf)
1364        if conn_type == "ssh":
1365            try:
1366                aid = tbparams[st]['allocID']['fedid']
1367            except:
1368                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
1369                        % st)
1370                aid = None
1371            info = {
1372                    "type" : conn_type, 
1373                    "portal": myname,
1374                    'fedAttr': [ 
1375                            { 'attribute': 'masterdomain', 'value': mdomain},
1376                            { 'attribute': 'masterexperiment', 'value': 
1377                                "%s/%s" % (mproject, eid)},
1378                            { 'attribute': 'active', 'value': active},
1379                            # Move to SMB service description
1380                            { 'attribute': 'masteruser', 'value': muser},
1381                            { 'attribute': 'smbshare', 'value': smbshare},
1382                        ],
1383                    'parameter': [
1384                        {
1385                            'name': 'peer',
1386                            'key': 'fedid:%s/%s' % (expid, myname),
1387                            'store': self.store_url,
1388                            'type': 'output',
1389                        },
1390                        {
1391                            'name': 'ssh_port',
1392                            'key': 'fedid:%s/%s-port' % (expid, myname),
1393                            'store': self.store_url,
1394                            'type': 'output',
1395                        },
1396                        {
1397                            'name': 'peer',
1398                            'key': 'fedid:%s/%s' % (expid, desthost),
1399                            'store': self.store_url,
1400                            'type': 'input',
1401                        },
1402                        {
1403                            'name': 'ssh_port',
1404                            'key': 'fedid:%s/%s-port' % (expid, desthost),
1405                            'store': self.store_url,
1406                            'type': 'input',
1407                        },
1408                        ]
1409                    }
1410            # Give this allocation the rights to access the key of the
1411            # peers
1412            if aid:
1413                for h in (myname, desthost):
1414                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
1415                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
1416                            (expid, h))
1417            else:
1418                self.log.error("No aid for %s in new_portal_node" % st)
1419        else:
1420            info = None
1421
1422        return (topdl.Computer(
1423                name=myname,
1424                attribute=[ 
1425                    topdl.Attribute(attribute=n,value=v)
1426                        for n, v in (\
1427                            ('portal', 'true'),
1428                            ('portal_type', portal_type), 
1429                        )
1430                    ],
1431                interface=ifaces,
1432                ), info)
1433
1434    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
1435        ddomain = tbparams[dt].get('domain', ".example.com")
1436        dproject = tbparams[dt].get('project', 'project')
1437        tsubstrate = \
1438                topdl.Substrate(name='%s-%s' % (st, dt),
1439                        attribute= [
1440                            topdl.Attribute(
1441                                attribute='portal',
1442                                value='true')
1443                            ]
1444                        )
1445        segment_element = topdl.Segment(
1446                id= tbparams[dt]['allocID'],
1447                type='emulab',
1448                uri = self.tbmap.get(dt, None),
1449                interface=[ 
1450                    topdl.Interface(
1451                        substrate=tsubstrate.name),
1452                    ],
1453                attribute = [
1454                    topdl.Attribute(attribute=n, value=v)
1455                        for n, v in (\
1456                            ('domain', ddomain),
1457                            ('experiment', "%s/%s" % \
1458                                    (dproject, eid)),)
1459                    ],
1460                )
1461
1462        return (tsubstrate, segment_element)
1463
1464    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
1465        if sub.capacity is None:
1466            raise service_error(service_error.internal,
1467                    "Cannot DRAGON split substrate w/o capacity")
1468        segs = [ ]
1469        substr = topdl.Substrate(name="dragon%d" % idx, 
1470                capacity=sub.capacity.clone(),
1471                attribute=[ topdl.Attribute(attribute=n, value=v)
1472                    for n, v, in (\
1473                            ('vlan', 'unassigned%d' % idx),)])
1474        name = "dragon%d" % idx
1475        store_key = 'fedid:%s/vlan%d' % (expid, idx)
1476        for tb in tbs.keys():
1477            seg = topdl.Segment(
1478                    id = tbparams[tb]['allocID'],
1479                    type='emulab',
1480                    uri = self.tbmap.get(tb, None),
1481                    interface=[ 
1482                        topdl.Interface(
1483                            substrate=substr.name),
1484                        ],
1485                    attribute=[ topdl.Attribute(
1486                        attribute='dragon_endpoint', 
1487                        value=tbparams[tb]['dragon']),
1488                        ]
1489                    )
1490            if tbparams[tb].has_key('vlans'):
1491                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1492            segs.append(seg)
1493
1494            # Give this allocation the rights to access the key of the
1495            # vlan_id
1496            try:
1497                aid = tbparams[tb]['allocID']['fedid']
1498                self.auth.set_attribute(aid, store_key)
1499            except:
1500                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
1501                        % tb)
1502
1503        connInfo[name] = [ { 
1504            'type': 'transit',
1505            'parameter': [ { 
1506                'name': 'vlan_id',
1507                'key': store_key,
1508                'store': self.store_url,
1509                'type': 'output'
1510                } ]
1511            } ]
1512
1513        topo[name] = \
1514                topdl.Topology(substrates=[substr], elements=segs,
1515                        attribute=[
1516                            topdl.Attribute(attribute="transit", value='true'),
1517                            topdl.Attribute(attribute="dynamic", value='true'),
1518                            topdl.Attribute(attribute="testbed", 
1519                                value='dragon'),
1520                            topdl.Attribute(attribute="store_keys", 
1521                                value=store_key),
1522                            ]
1523                        )
1524
1525    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
1526            connInfo, expid=None):
1527        """
1528        Add attribiutes to the various elements indicating that they are to be
1529        dragon connected and create a dragon segment in topo to be
1530        instantiated.
1531        """
1532
1533        def get_substrate_from_topo(name, t):
1534            for s in t.substrates:
1535                if s.name == name: return s
1536            else: return None
1537
1538
1539        mdomain = tbparams[master].get('domain', '.example.com')
1540        mproject = tbparams[master].get('project', 'project')
1541        # dn is the number of previously created dragon nets.  This routine
1542        # creates a net numbered by dn
1543        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1544        # Count the number of interfaces on this substrate in each testbed from
1545        # the global topology
1546        count = { }
1547        node = { }
1548        for e in [ i.element for i in sub.interfaces ]:
1549            tb = e.get_attribute('testbed')
1550            count[tb] = count.get(tb, 0) + 1
1551            node[tb] = i.get_attribute('ip4_address')
1552
1553
1554        # Set the attributes in the copies that will allow setup of dragon
1555        # connections.
1556        for tb in tbs.keys():
1557            s = get_substrate_from_topo(sub.name, topo[tb])
1558            if s:
1559                if not connInfo.has_key(tb):
1560                    connInfo[tb] = [ ]
1561
1562                try:
1563                    aid = tbparams[tb]['allocID']['fedid']
1564                except:
1565                    self.log.debug("[creat_dragon_substrate] " + 
1566                            "Can't get alloc id for %s?" %tb)
1567                    aid = None
1568
1569                # This may need another look, but only a service gateway will
1570                # look at the active parameter, and these are only inserted to
1571                # connect to the master.
1572                active = "%s" % ( tb == master)
1573                info = {
1574                        'type': 'transit',
1575                        'member': [ {
1576                            'element': i.element.name[0], 
1577                            'interface': i.name
1578                            } for i in s.interfaces \
1579                                    if isinstance(i.element, topdl.Computer) ],
1580                        'fedAttr': [ 
1581                            { 'attribute': 'masterdomain', 'value': mdomain},
1582                            { 'attribute': 'masterexperiment', 'value': 
1583                                "%s/%s" % (mproject, eid)},
1584                            { 'attribute': 'active', 'value': active},
1585                            ],
1586                        'parameter': [ {
1587                            'name': 'vlan_id',
1588                            'key': 'fedid:%s/vlan%d' % (expid, dn),
1589                            'store': self.store_url,
1590                            'type': 'input',
1591                            } ]
1592                        }
1593                if tbs.has_key(tb):
1594                    info['peer'] = tbs[tb]
1595                connInfo[tb].append(info)
1596
1597                # Give this allocation the rights to access the key of the
1598                # vlan_id
1599                if aid:
1600                    self.auth.set_attribute(aid, 
1601                            'fedid:%s/vlan%d' % (expid, dn))
1602            else:
1603                raise service_error(service_error.internal,
1604                        "No substrate %s in testbed %s" % (sub.name, tb))
1605
1606        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
1607
1608    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1609            segment_substrate, portals, connInfo, expid):
1610        # More than one testbed is on this substrate.  Insert
1611        # some portals into the subtopologies.  st == source testbed,
1612        # dt == destination testbed.
1613        for st in tbs.keys():
1614            if not segment_substrate.has_key(st):
1615                segment_substrate[st] = { }
1616            if not portals.has_key(st): 
1617                portals[st] = { }
1618            if not connInfo.has_key(st):
1619                connInfo[st] = [ ]
1620            for dt in [ t for t in tbs.keys() if t != st]:
1621                sproject = tbparams[st].get('project', 'project')
1622                dproject = tbparams[dt].get('project', 'project')
1623                mproject = tbparams[master].get('project', 'project')
1624                sdomain = tbparams[st].get('domain', ".example.com")
1625                ddomain = tbparams[dt].get('domain', ".example.com")
1626                mdomain = tbparams[master].get('domain', '.example.com')
1627                muser = tbparams[master].get('user', 'root')
1628                smbshare = tbparams[master].get('smbshare', 'USERS')
1629                aid = tbparams[dt]['allocID']['fedid']
1630                if st == master or dt == master:
1631                    active = ("%s" % (st == master))
1632                else:
1633                    active = ("%s" %(st > dt))
1634                if not segment_substrate[st].has_key(dt):
1635                    # Put a substrate and a segment for the connected
1636                    # testbed in there.
1637                    tsubstrate, segment_element = \
1638                            self.new_portal_substrate(st, dt, eid, tbparams,
1639                                    expid)
1640                    segment_substrate[st][dt] = tsubstrate
1641                    topo[st].substrates.append(tsubstrate)
1642                    topo[st].elements.append(segment_element)
1643
1644                new_portal = False
1645                if portals[st].has_key(dt):
1646                    # There's a portal set up to go to this destination.
1647                    # See if there's room to multiplex this connection on
1648                    # it.  If so, add an interface to the portal; if not,
1649                    # set up to add a portal below.
1650                    # [This little festival of braces is just a pop of the
1651                    # last element in the list of portals between st and
1652                    # dt.]
1653                    portal = portals[st][dt][-1]
1654                    mux = len([ i for i in portal.interface \
1655                            if not i.get_attribute('portal')])
1656                    if mux == self.muxmax:
1657                        new_portal = True
1658                        portal_type = "experiment"
1659                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1660                        desthost = "%stunnel%d" % (st.lower(), 
1661                                len(portals[st][dt]))
1662                    else:
1663                        new_i = topdl.Interface(
1664                                substrate=sub.name,
1665                                attribute=[ 
1666                                    topdl.Attribute(
1667                                        attribute='ip4_address', 
1668                                        value=tbs[dt]
1669                                    )
1670                                ])
1671                        portal.interface.append(new_i)
1672                else:
1673                    # First connection to this testbed, make an empty list
1674                    # and set up to add the new portal below
1675                    new_portal = True
1676                    portals[st][dt] = [ ]
1677                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1678                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
1679
1680                    if dt == master or st == master: portal_type = "both"
1681                    else: portal_type = "experiment"
1682
1683                if new_portal:
1684                    infs = (
1685                            (segment_substrate[st][dt].name, 
1686                                (('portal', 'true'),)),
1687                            (sub.name, 
1688                                (('ip4_address', tbs[dt]),))
1689                        )
1690                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
1691                            master, eid, myname, desthost, portal_type,
1692                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
1693                    #if self.fedkit:
1694                        #self.add_kit(portal, self.fedkit)
1695                    #if self.gatewaykit:
1696                        #self.add_kit(portal, self.gatewaykit)
1697
1698                    topo[st].elements.append(portal)
1699                    portals[st][dt].append(portal)
1700                    connInfo[st].append(info)
1701
1702    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo, expid):
1703        # Add to the master testbed
1704        tsubstrate, segment_element = \
1705                self.new_portal_substrate(st, dt, eid, tbparams, expid)
1706        myname = "%stunnel" % dt
1707        desthost = "%stunnel" % st
1708
1709        portal, info = self.new_portal_node(st, dt, tbparams, master,
1710                eid, myname, desthost, "control", 
1711                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
1712                conn_attrs=[], expid=expid)
1713
1714        topo[st].substrates.append(tsubstrate)
1715        topo[st].elements.append(segment_element)
1716        topo[st].elements.append(portal)
1717        if not connInfo.has_key(st):
1718            connInfo[st] = [ ]
1719        connInfo[st].append(info)
1720
1721    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1722            substrate, tbparams, expid):
1723        # Add to the master testbed
1724        myname = "%stunnel" % dt
1725        desthost = "%s" % ip_addr(dip)
1726
1727        portal, info = self.new_portal_node(st, dt, tbparams, master,
1728                eid, myname, desthost, "control", 
1729                ((substrate.name,(
1730                    ('portal','true'),
1731                    ('ip4_address', "%s" % ip_addr(myip)),)),),
1732                conn_type="transit", conn_attrs=[], expid=expid)
1733
1734        return portal
1735
1736    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
1737            connInfo, expid):
1738        """
1739        For each substrate in the main topology, find those that
1740        have nodes on more than one testbed.  Insert portal nodes
1741        into the copies of those substrates on the sub topologies.
1742        """
1743        segment_substrate = { }
1744        portals = { }
1745        for s in top.substrates:
1746            # tbs will contain an ip address on this subsrate that is in
1747            # each testbed.
1748            tbs = { }
1749            for i in s.interfaces:
1750                e = i.element
1751                tb = e.get_attribute('testbed')
1752                if tb and not tbs.has_key(tb):
1753                    for i in e.interface:
1754                        if s in i.subs:
1755                            tbs[tb]= i.get_attribute('ip4_address')
1756            if len(tbs) < 2:
1757                continue
1758
1759            # DRAGON will not create multi-site vlans yet
1760            if len(tbs) == 2 and \
1761                    all([tbparams[x].has_key('dragon') for x in tbs]):
1762                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1763                        master, eid, connInfo, expid)
1764            else:
1765                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1766                        eid, segment_substrate, portals, connInfo, expid)
1767
1768        # Make sure that all the slaves have a control portal back to the
1769        # master.
1770        for tb in [ t for t in tbparams.keys() if t != master ]:
1771            if len([e for e in topo[tb].elements \
1772                    if isinstance(e, topdl.Computer) and \
1773                    e.get_attribute('portal') and \
1774                    e.get_attribute('portal_type') == 'both']) == 0:
1775
1776                if tbparams[master].has_key('dragon') \
1777                        and tbparams[tb].has_key('dragon'):
1778
1779                    idx = len([x for x in topo.keys() \
1780                            if x.startswith('dragon')])
1781                    dip, leng = ip_allocator.allocate(4)
1782                    dip += 1
1783                    mip = dip+1
1784                    csub = topdl.Substrate(
1785                            name="dragon-control-%s" % tb,
1786                            capacity=topdl.Capacity(100000.0, 'max'),
1787                            attribute=[
1788                                topdl.Attribute(
1789                                    attribute='portal',
1790                                    value='true'
1791                                    )
1792                                ]
1793                            )
1794                    seg = topdl.Segment(
1795                            id= tbparams[master]['allocID'],
1796                            type='emulab',
1797                            uri = self.tbmap.get(master, None),
1798                            interface=[ 
1799                                topdl.Interface(
1800                                    substrate=csub.name),
1801                                ],
1802                            attribute = [
1803                                topdl.Attribute(attribute=n, value=v)
1804                                    for n, v in (\
1805                                        ('domain', 
1806                                            tbparams[master].get('domain',
1807                                                ".example.com")),
1808                                        ('experiment', "%s/%s" % \
1809                                                (tbparams[master].get(
1810                                                    'project', 
1811                                                    'project'), 
1812                                                    eid)),)
1813                                ],
1814                            )
1815                    portal = self.new_dragon_portal(tb, master,
1816                            master, eid, dip, mip, idx, csub, tbparams, expid)
1817                    topo[tb].substrates.append(csub)
1818                    topo[tb].elements.append(portal)
1819                    topo[tb].elements.append(seg)
1820
1821                    mcsub = csub.clone()
1822                    seg = topdl.Segment(
1823                            id= tbparams[tb]['allocID'],
1824                            type='emulab',
1825                            uri = self.tbmap.get(tb, None),
1826                            interface=[ 
1827                                topdl.Interface(
1828                                    substrate=csub.name),
1829                                ],
1830                            attribute = [
1831                                topdl.Attribute(attribute=n, value=v)
1832                                    for n, v in (\
1833                                        ('domain', 
1834                                            tbparams[tb].get('domain',
1835                                                ".example.com")),
1836                                        ('experiment', "%s/%s" % \
1837                                                (tbparams[tb].get('project', 
1838                                                    'project'), 
1839                                                    eid)),)
1840                                ],
1841                            )
1842                    portal = self.new_dragon_portal(master, tb, master,
1843                            eid, mip, dip, idx, mcsub, tbparams, expid)
1844                    topo[master].substrates.append(mcsub)
1845                    topo[master].elements.append(portal)
1846                    topo[master].elements.append(seg)
1847                    for t in (master, tb):
1848                        topo[t].incorporate_elements()
1849
1850                    self.create_dragon_substrate(csub, topo, 
1851                            {tb: ip_addr(mip), master: ip_addr(dip)}, 
1852                            tbparams, master, eid, connInfo,
1853                            expid)
1854                else:
1855                    self.add_control_portal(master, tb, master, eid, topo, 
1856                            tbparams, connInfo, expid)
1857                    self.add_control_portal(tb, master, master, eid, topo, 
1858                            tbparams, connInfo, expid)
1859
1860        # Connect the portal nodes into the topologies and clear out
1861        # substrates that are not in the topologies
1862        for tb in tbparams.keys():
1863            topo[tb].incorporate_elements()
1864            topo[tb].substrates = \
1865                    [s for s in topo[tb].substrates \
1866                        if len(s.interfaces) >0]
1867
1868    def wrangle_software(self, expid, top, topo, tbparams):
1869        """
1870        Copy software out to the repository directory, allocate permissions and
1871        rewrite the segment topologies to look for the software in local
1872        places.
1873        """
1874
1875        # Copy the rpms and tarfiles to a distribution directory from
1876        # which the federants can retrieve them
1877        linkpath = "%s/software" %  expid
1878        softdir ="%s/%s" % ( self.repodir, linkpath)
1879        softmap = { }
1880        # These are in a list of tuples format (each kit).  This comprehension
1881        # unwraps them into a single list of tuples that initilaizes the set of
1882        # tuples.
1883        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1884                for p, t in l ])
1885        pkgs.update([x.location for e in top.elements \
1886                for x in e.software])
1887        try:
1888            os.makedirs(softdir)
1889        except IOError, e:
1890            raise service_error(
1891                    "Cannot create software directory: %s" % e)
1892        # The actual copying.  Everything's converted into a url for copying.
1893        for pkg in pkgs:
1894            loc = pkg
1895
1896            scheme, host, path = urlparse(loc)[0:3]
1897            dest = os.path.basename(path)
1898            if not scheme:
1899                if not loc.startswith('/'):
1900                    loc = "/%s" % loc
1901                loc = "file://%s" %loc
1902            try:
1903                u = urlopen(loc)
1904            except Exception, e:
1905                raise service_error(service_error.req, 
1906                        "Cannot open %s: %s" % (loc, e))
1907            try:
1908                f = open("%s/%s" % (softdir, dest) , "w")
1909                self.log.debug("Writing %s/%s" % (softdir,dest) )
1910                data = u.read(4096)
1911                while data:
1912                    f.write(data)
1913                    data = u.read(4096)
1914                f.close()
1915                u.close()
1916            except Exception, e:
1917                raise service_error(service_error.internal,
1918                        "Could not copy %s: %s" % (loc, e))
1919            path = re.sub("/tmp", "", linkpath)
1920            # XXX
1921            softmap[pkg] = \
1922                    "%s/%s/%s" %\
1923                    ( self.repo_url, path, dest)
1924
1925            # Allow the individual segments to access the software.
1926            for tb in tbparams.keys():
1927                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1928                        "/%s/%s" % ( path, dest))
1929
1930        # Convert the software locations in the segments into the local
1931        # copies on this host
1932        for soft in [ s for tb in topo.values() \
1933                for e in tb.elements \
1934                    if getattr(e, 'software', False) \
1935                        for s in e.software ]:
1936            if softmap.has_key(soft.location):
1937                soft.location = softmap[soft.location]
1938
1939
1940    def new_experiment(self, req, fid):
1941        """
1942        The external interface to empty initial experiment creation called from
1943        the dispatcher.
1944
1945        Creates a working directory, splits the incoming description using the
1946        splitter script and parses out the avrious subsections using the
1947        lcasses above.  Once each sub-experiment is created, use pooled threads
1948        to instantiate them and start it all up.
1949        """
1950        if not self.auth.check_attribute(fid, 'new'):
1951            raise service_error(service_error.access, "New access denied")
1952
1953        try:
1954            tmpdir = tempfile.mkdtemp(prefix="split-")
1955        except IOError:
1956            raise service_error(service_error.internal, "Cannot create tmp dir")
1957
1958        try:
1959            access_user = self.accessdb[fid]
1960        except KeyError:
1961            raise service_error(service_error.internal,
1962                    "Access map and authorizer out of sync in " + \
1963                            "new_experiment for fedid %s"  % fid)
1964
1965        pid = "dummy"
1966        gid = "dummy"
1967
1968        req = req.get('NewRequestBody', None)
1969        if not req:
1970            raise service_error(service_error.req,
1971                    "Bad request format (no NewRequestBody)")
1972
1973        # Generate an ID for the experiment (slice) and a certificate that the
1974        # allocator can use to prove they own it.  We'll ship it back through
1975        # the encrypted connection.
1976        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1977
1978        #now we're done with the tmpdir, and it should be empty
1979        if self.cleanup:
1980            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1981            os.rmdir(tmpdir)
1982        else:
1983            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1984
1985        eid = self.create_experiment_state(fid, req, expid, expcert, 
1986                state='empty')
1987
1988        # Let users touch the state
1989        self.auth.set_attribute(fid, expid)
1990        self.auth.set_attribute(expid, expid)
1991        # Override fedids can manipulate state as well
1992        for o in self.overrides:
1993            self.auth.set_attribute(o, expid)
1994
1995        rv = {
1996                'experimentID': [
1997                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1998                ],
1999                'experimentStatus': 'empty',
2000                'experimentAccess': { 'X509' : expcert }
2001            }
2002
2003        return rv
2004
2005    def get_master_project(self, req):
2006        master= None
2007        export_project = None
2008        for e in [ s for s in req.get('service', []) \
2009                if s.get('name') == 'project_export']:
2010            for a in e.get('fedAttr', []):
2011                attr = a.get('attribute', None)
2012                if attr == 'testbed':
2013                    master = a.get('value', None)
2014                elif attr == 'project':
2015                    export_project = a.get('value', None)
2016       
2017        return (master, export_project)
2018
2019
2020
2021    def create_experiment(self, req, fid):
2022        """
2023        The external interface to experiment creation called from the
2024        dispatcher.
2025
2026        Creates a working directory, splits the incoming description using the
2027        splitter script and parses out the avrious subsections using the
2028        lcasses above.  Once each sub-experiment is created, use pooled threads
2029        to instantiate them and start it all up.
2030        """
2031
2032        req = req.get('CreateRequestBody', None)
2033        if not req:
2034            raise service_error(service_error.req,
2035                    "Bad request format (no CreateRequestBody)")
2036
2037        # Get the experiment access
2038        exp = req.get('experimentID', None)
2039        if exp:
2040            if exp.has_key('fedid'):
2041                key = exp['fedid']
2042                expid = key
2043                eid = None
2044            elif exp.has_key('localname'):
2045                key = exp['localname']
2046                eid = key
2047                expid = None
2048            else:
2049                raise service_error(service_error.req, "Unknown lookup type")
2050        else:
2051            raise service_error(service_error.req, "No request?")
2052
2053        self.check_experiment_access(fid, key)
2054
2055        try:
2056            tmpdir = tempfile.mkdtemp(prefix="split-")
2057            os.mkdir(tmpdir+"/keys")
2058        except IOError:
2059            raise service_error(service_error.internal, "Cannot create tmp dir")
2060
2061        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2062        gw_secretkey_base = "fed.%s" % self.ssh_type
2063        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2064        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2065        tclfile = tmpdir + "/experiment.tcl"
2066        tbparams = { }
2067        try:
2068            access_user = self.accessdb[fid]
2069        except KeyError:
2070            raise service_error(service_error.internal,
2071                    "Access map and authorizer out of sync in " + \
2072                            "create_experiment for fedid %s"  % fid)
2073
2074        pid = "dummy"
2075        gid = "dummy"
2076
2077        # The tcl parser needs to read a file so put the content into that file
2078        descr=req.get('experimentdescription', None)
2079        if descr:
2080            file_content=descr.get('ns2description', None)
2081            if file_content:
2082                try:
2083                    f = open(tclfile, 'w')
2084                    f.write(file_content)
2085                    f.close()
2086                except IOError:
2087                    raise service_error(service_error.internal,
2088                            "Cannot write temp experiment description")
2089            else:
2090                raise service_error(service_error.req, 
2091                        "Only ns2descriptions supported")
2092        else:
2093            raise service_error(service_error.req, "No experiment description")
2094
2095        self.state_lock.acquire()
2096        if self.state.has_key(key):
2097            self.state[key]['experimentStatus'] = "starting"
2098            for e in self.state[key].get('experimentID',[]):
2099                if not expid and e.has_key('fedid'):
2100                    expid = e['fedid']
2101                elif not eid and e.has_key('localname'):
2102                    eid = e['localname']
2103        self.state_lock.release()
2104
2105        if not (eid and expid):
2106            raise service_error(service_error.internal, 
2107                    "Cannot find local experiment info!?")
2108
2109        try: 
2110            # This catches exceptions to clear the placeholder if necessary
2111            try:
2112                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2113            except ValueError:
2114                raise service_error(service_error.server_config, 
2115                        "Bad key type (%s)" % self.ssh_type)
2116            master, export_project = self.get_master_project(req)
2117            # XXX get these out when master and project are optional
2118            if not master:
2119                raise service_error(service_error.req,
2120                        "No master testbed label")
2121            if not export_project:
2122                raise service_error(service_error.req, "No export project")
2123            # XXX
2124
2125            # Translate to topdl
2126            if self.splitter_url:
2127                # XXX: need remote topdl translator
2128                self.log.debug("Calling remote splitter at %s" % \
2129                        self.splitter_url)
2130                top = self.remote_ns2topdl(self.splitter_url, file_content)
2131            else:
2132                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2133                    str(self.muxmax), '-m', master]
2134
2135                tclcmd.extend([pid, gid, eid, tclfile])
2136
2137                self.log.debug("running local splitter %s", " ".join(tclcmd))
2138                # This is just fantastic.  As a side effect the parser copies
2139                # tb_compat.tcl into the current directory, so that directory
2140                # must be writable by the fedd user.  Doing this in the
2141                # temporary subdir ensures this is the case.
2142                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2143                        cwd=tmpdir)
2144                split_data = tclparser.stdout
2145
2146                top = topdl.topology_from_xml(file=split_data, top="experiment")
2147
2148            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2149             # Find the testbeds to look up
2150            testbeds = set([ a.value for e in top.elements \
2151                    for a in e.attribute \
2152                    if a.attribute == 'testbed'] )
2153
2154            allocated = { }         # Testbeds we can access
2155            topo ={ }               # Sub topologies
2156            connInfo = { }          # Connection information
2157            services = [ ]
2158            self.get_access_to_testbeds(testbeds, access_user, 
2159                    export_project, master, allocated, tbparams, services)
2160            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2161
2162            # Copy configuration files into the remote file store
2163            # The config urlpath
2164            configpath = "/%s/config" % expid
2165            # The config file system location
2166            configdir ="%s%s" % ( self.repodir, configpath)
2167            try:
2168                os.makedirs(configdir)
2169            except IOError, e:
2170                raise service_error(
2171                        "Cannot create config directory: %s" % e)
2172            try:
2173                f = open("%s/hosts" % configdir, "w")
2174                f.write('\n'.join(hosts))
2175                f.close()
2176            except IOError, e:
2177                raise service_error(service_error.internal, 
2178                        "Cannot write hosts file: %s" % e)
2179            try:
2180                copy_file("%s" % gw_pubkey, "%s/%s" % \
2181                        (configdir, gw_pubkey_base))
2182                copy_file("%s" % gw_secretkey, "%s/%s" % \
2183                        (configdir, gw_secretkey_base))
2184            except IOError, e:
2185                raise service_error(service_error.internal, 
2186                        "Cannot copy keyfiles: %s" % e)
2187
2188            # Allow the individual testbeds to access the configuration files.
2189            for tb in tbparams.keys():
2190                asignee = tbparams[tb]['allocID']['fedid']
2191                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2192                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2193
2194            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
2195                    connInfo, expid)
2196            # Now get access to the dynamic testbeds
2197            for k, t in topo.items():
2198                if not t.get_attribute('dynamic'):
2199                    continue
2200                tb = t.get_attribute('testbed')
2201                if tb: 
2202                    self.get_access(tb, None, tbparams, master, 
2203                            export_project, access_user, services)
2204                    tbparams[k] = tbparams[tb]
2205                    del tbparams[tb]
2206                    allocated[k] = 1
2207                    store_keys = t.get_attribute('store_keys')
2208                    # Give the testbed access to keys it exports or imports
2209                    if store_keys:
2210                        for sk in store_keys.split(" "):
2211                            self.auth.set_attribute(\
2212                                    tbparams[k]['allocID']['fedid'], sk)
2213                else:
2214                    raise service_error(service_error.internal, 
2215                            "Dynamic allocation from no testbed!?")
2216
2217            self.wrangle_software(expid, top, topo, tbparams)
2218
2219            vtopo = topdl.topology_to_vtopo(top)
2220            vis = self.genviz(vtopo)
2221
2222            # save federant information
2223            for k in allocated.keys():
2224                tbparams[k]['federant'] = {
2225                        'name': [ { 'localname' : eid} ],
2226                        'allocID' : tbparams[k]['allocID'],
2227                        'master' : k == master,
2228                        'uri': tbparams[k]['uri'],
2229                    }
2230                if tbparams[k].has_key('emulab'):
2231                        tbparams[k]['federant']['emulab'] = \
2232                                tbparams[k]['emulab']
2233
2234            self.state_lock.acquire()
2235            self.state[eid]['vtopo'] = vtopo
2236            self.state[eid]['vis'] = vis
2237            self.state[expid]['federant'] = \
2238                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2239                        if tbparams[tb].has_key('federant') ]
2240            if self.state_filename: 
2241                self.write_state()
2242            self.state_lock.release()
2243        except service_error, e:
2244            # If something goes wrong in the parse (usually an access error)
2245            # clear the placeholder state.  From here on out the code delays
2246            # exceptions.  Failing at this point returns a fault to the remote
2247            # caller.
2248
2249            self.state_lock.acquire()
2250            del self.state[eid]
2251            del self.state[expid]
2252            if self.state_filename: self.write_state()
2253            self.state_lock.release()
2254            raise e
2255
2256
2257        # Start the background swapper and return the starting state.  From
2258        # here on out, the state will stick around a while.
2259
2260        # Let users touch the state
2261        self.auth.set_attribute(fid, expid)
2262        self.auth.set_attribute(expid, expid)
2263        # Override fedids can manipulate state as well
2264        for o in self.overrides:
2265            self.auth.set_attribute(o, expid)
2266
2267        # Create a logger that logs to the experiment's state object as well as
2268        # to the main log file.
2269        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2270        alloc_collector = self.list_log(self.state[eid]['log'])
2271        h = logging.StreamHandler(alloc_collector)
2272        # XXX: there should be a global one of these rather than repeating the
2273        # code.
2274        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2275                    '%d %b %y %H:%M:%S'))
2276        alloc_log.addHandler(h)
2277       
2278        attrs = [ 
2279                {
2280                    'attribute': 'ssh_pubkey', 
2281                    'value': '%s/%s/config/%s' % \
2282                            (self.repo_url, expid, gw_pubkey_base)
2283                },
2284                {
2285                    'attribute': 'ssh_secretkey', 
2286                    'value': '%s/%s/config/%s' % \
2287                            (self.repo_url, expid, gw_secretkey_base)
2288                },
2289                {
2290                    'attribute': 'hosts', 
2291                    'value': '%s/%s/config/hosts' % \
2292                            (self.repo_url, expid)
2293                },
2294                {
2295                    'attribute': 'experiment_name',
2296                    'value': eid,
2297                },
2298            ]
2299
2300        # transit and disconnected testbeds may not have a connInfo entry.
2301        # Fill in the blanks.
2302        for t in allocated.keys():
2303            if not connInfo.has_key(t):
2304                connInfo[t] = { }
2305
2306        # Start a thread to do the resource allocation
2307        t  = Thread(target=self.allocate_resources,
2308                args=(allocated, master, eid, expid, tbparams, 
2309                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
2310                    services),
2311                name=eid)
2312        t.start()
2313
2314        rv = {
2315                'experimentID': [
2316                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2317                ],
2318                'experimentStatus': 'starting',
2319            }
2320
2321        return rv
2322   
2323    def get_experiment_fedid(self, key):
2324        """
2325        find the fedid associated with the localname key in the state database.
2326        """
2327
2328        rv = None
2329        self.state_lock.acquire()
2330        if self.state.has_key(key):
2331            if isinstance(self.state[key], dict):
2332                try:
2333                    kl = [ f['fedid'] for f in \
2334                            self.state[key]['experimentID']\
2335                                if f.has_key('fedid') ]
2336                except KeyError:
2337                    self.state_lock.release()
2338                    raise service_error(service_error.internal, 
2339                            "No fedid for experiment %s when getting "+\
2340                                    "fedid(!?)" % key)
2341                if len(kl) == 1:
2342                    rv = kl[0]
2343                else:
2344                    self.state_lock.release()
2345                    raise service_error(service_error.internal, 
2346                            "multiple fedids for experiment %s when " +\
2347                                    "getting fedid(!?)" % key)
2348            else:
2349                self.state_lock.release()
2350                raise service_error(service_error.internal, 
2351                        "Unexpected state for %s" % key)
2352        self.state_lock.release()
2353        return rv
2354
2355    def check_experiment_access(self, fid, key):
2356        """
2357        Confirm that the fid has access to the experiment.  Though a request
2358        may be made in terms of a local name, the access attribute is always
2359        the experiment's fedid.
2360        """
2361        if not isinstance(key, fedid):
2362            key = self.get_experiment_fedid(key)
2363
2364        if self.auth.check_attribute(fid, key):
2365            return True
2366        else:
2367            raise service_error(service_error.access, "Access Denied")
2368
2369
2370    def get_handler(self, path, fid):
2371        self.log.info("Get handler %s %s" % (path, fid))
2372        if self.auth.check_attribute(fid, path):
2373            return ("%s/%s" % (self.repodir, path), "application/binary")
2374        else:
2375            return (None, None)
2376
2377    def get_vtopo(self, req, fid):
2378        """
2379        Return the stored virtual topology for this experiment
2380        """
2381        rv = None
2382        state = None
2383
2384        req = req.get('VtopoRequestBody', None)
2385        if not req:
2386            raise service_error(service_error.req,
2387                    "Bad request format (no VtopoRequestBody)")
2388        exp = req.get('experiment', None)
2389        if exp:
2390            if exp.has_key('fedid'):
2391                key = exp['fedid']
2392                keytype = "fedid"
2393            elif exp.has_key('localname'):
2394                key = exp['localname']
2395                keytype = "localname"
2396            else:
2397                raise service_error(service_error.req, "Unknown lookup type")
2398        else:
2399            raise service_error(service_error.req, "No request?")
2400
2401        self.check_experiment_access(fid, key)
2402
2403        self.state_lock.acquire()
2404        if self.state.has_key(key):
2405            if self.state[key].has_key('vtopo'):
2406                rv = { 'experiment' : {keytype: key },\
2407                        'vtopo': self.state[key]['vtopo'],\
2408                    }
2409            else:
2410                state = self.state[key]['experimentStatus']
2411        self.state_lock.release()
2412
2413        if rv: return rv
2414        else: 
2415            if state:
2416                raise service_error(service_error.partial, 
2417                        "Not ready: %s" % state)
2418            else:
2419                raise service_error(service_error.req, "No such experiment")
2420
2421    def get_vis(self, req, fid):
2422        """
2423        Return the stored visualization for this experiment
2424        """
2425        rv = None
2426        state = None
2427
2428        req = req.get('VisRequestBody', None)
2429        if not req:
2430            raise service_error(service_error.req,
2431                    "Bad request format (no VisRequestBody)")
2432        exp = req.get('experiment', None)
2433        if exp:
2434            if exp.has_key('fedid'):
2435                key = exp['fedid']
2436                keytype = "fedid"
2437            elif exp.has_key('localname'):
2438                key = exp['localname']
2439                keytype = "localname"
2440            else:
2441                raise service_error(service_error.req, "Unknown lookup type")
2442        else:
2443            raise service_error(service_error.req, "No request?")
2444
2445        self.check_experiment_access(fid, key)
2446
2447        self.state_lock.acquire()
2448        if self.state.has_key(key):
2449            if self.state[key].has_key('vis'):
2450                rv =  { 'experiment' : {keytype: key },\
2451                        'vis': self.state[key]['vis'],\
2452                        }
2453            else:
2454                state = self.state[key]['experimentStatus']
2455        self.state_lock.release()
2456
2457        if rv: return rv
2458        else:
2459            if state:
2460                raise service_error(service_error.partial, 
2461                        "Not ready: %s" % state)
2462            else:
2463                raise service_error(service_error.req, "No such experiment")
2464
2465    def clean_info_response(self, rv):
2466        """
2467        Remove the information in the experiment's state object that is not in
2468        the info response.
2469        """
2470        # Remove the owner info (should always be there, but...)
2471        if rv.has_key('owner'): del rv['owner']
2472
2473        # Convert the log into the allocationLog parameter and remove the
2474        # log entry (with defensive programming)
2475        if rv.has_key('log'):
2476            rv['allocationLog'] = "".join(rv['log'])
2477            del rv['log']
2478        else:
2479            rv['allocationLog'] = ""
2480
2481        if rv['experimentStatus'] != 'active':
2482            if rv.has_key('federant'): del rv['federant']
2483        else:
2484            # remove the allocationID and uri info from each federant
2485            for f in rv.get('federant', []):
2486                if f.has_key('allocID'): del f['allocID']
2487                if f.has_key('uri'): del f['uri']
2488        return rv
2489
2490    def get_info(self, req, fid):
2491        """
2492        Return all the stored info about this experiment
2493        """
2494        rv = None
2495
2496        req = req.get('InfoRequestBody', None)
2497        if not req:
2498            raise service_error(service_error.req,
2499                    "Bad request format (no InfoRequestBody)")
2500        exp = req.get('experiment', None)
2501        if exp:
2502            if exp.has_key('fedid'):
2503                key = exp['fedid']
2504                keytype = "fedid"
2505            elif exp.has_key('localname'):
2506                key = exp['localname']
2507                keytype = "localname"
2508            else:
2509                raise service_error(service_error.req, "Unknown lookup type")
2510        else:
2511            raise service_error(service_error.req, "No request?")
2512
2513        self.check_experiment_access(fid, key)
2514
2515        # The state may be massaged by the service function that called
2516        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2517        # state.
2518        self.state_lock.acquire()
2519        if self.state.has_key(key):
2520            rv = copy.deepcopy(self.state[key])
2521        self.state_lock.release()
2522
2523        if rv:
2524            return self.clean_info_response(rv)
2525        else:
2526            raise service_error(service_error.req, "No such experiment")
2527
2528    def get_multi_info(self, req, fid):
2529        """
2530        Return all the stored info that this fedid can access
2531        """
2532        rv = { 'info': [ ] }
2533
2534        self.state_lock.acquire()
2535        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2536            try:
2537                self.check_experiment_access(fid, key)
2538            except service_error, e:
2539                if e.code == service_error.access:
2540                    continue
2541                else:
2542                    self.state_lock.release()
2543                    raise e
2544
2545            if self.state.has_key(key):
2546                e = copy.deepcopy(self.state[key])
2547                e = self.clean_info_response(e)
2548                rv['info'].append(e)
2549        self.state_lock.release()
2550        return rv
2551
2552    def terminate_experiment(self, req, fid):
2553        """
2554        Swap this experiment out on the federants and delete the shared
2555        information
2556        """
2557        tbparams = { }
2558        req = req.get('TerminateRequestBody', None)
2559        if not req:
2560            raise service_error(service_error.req,
2561                    "Bad request format (no TerminateRequestBody)")
2562        force = req.get('force', False)
2563        exp = req.get('experiment', None)
2564        if exp:
2565            if exp.has_key('fedid'):
2566                key = exp['fedid']
2567                keytype = "fedid"
2568            elif exp.has_key('localname'):
2569                key = exp['localname']
2570                keytype = "localname"
2571            else:
2572                raise service_error(service_error.req, "Unknown lookup type")
2573        else:
2574            raise service_error(service_error.req, "No request?")
2575
2576        self.check_experiment_access(fid, key)
2577
2578        dealloc_list = [ ]
2579
2580
2581        # Create a logger that logs to the dealloc_list as well as to the main
2582        # log file.
2583        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2584        h = logging.StreamHandler(self.list_log(dealloc_list))
2585        # XXX: there should be a global one of these rather than repeating the
2586        # code.
2587        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2588                    '%d %b %y %H:%M:%S'))
2589        dealloc_log.addHandler(h)
2590
2591        self.state_lock.acquire()
2592        fed_exp = self.state.get(key, None)
2593
2594        if fed_exp:
2595            # This branch of the conditional holds the lock to generate a
2596            # consistent temporary tbparams variable to deallocate experiments.
2597            # It releases the lock to do the deallocations and reacquires it to
2598            # remove the experiment state when the termination is complete.
2599
2600            # First make sure that the experiment creation is complete.
2601            status = fed_exp.get('experimentStatus', None)
2602
2603            if status:
2604                if status in ('starting', 'terminating'):
2605                    if not force:
2606                        self.state_lock.release()
2607                        raise service_error(service_error.partial, 
2608                                'Experiment still being created or destroyed')
2609                    else:
2610                        self.log.warning('Experiment in %s state ' % status + \
2611                                'being terminated by force.')
2612            else:
2613                # No status??? trouble
2614                self.state_lock.release()
2615                raise service_error(service_error.internal,
2616                        "Experiment has no status!?")
2617
2618            ids = []
2619            #  experimentID is a list of dicts that are self-describing
2620            #  identifiers.  This finds all the fedids and localnames - the
2621            #  keys of self.state - and puts them into ids.
2622            for id in fed_exp.get('experimentID', []):
2623                if id.has_key('fedid'): ids.append(id['fedid'])
2624                if id.has_key('localname'): ids.append(id['localname'])
2625
2626            # Collect the allocation/segment ids into a dict keyed by the fedid
2627            # of the allocation (or a monotonically increasing integer) that
2628            # contains a tuple of uri, aid (which is a dict...)
2629            for i, fed in enumerate(fed_exp.get('federant', [])):
2630                try:
2631                    uri = fed['uri']
2632                    aid = fed['allocID']
2633                    k = fed['allocID'].get('fedid', i)
2634                except KeyError, e:
2635                    continue
2636                tbparams[k] = (uri, aid)
2637            fed_exp['experimentStatus'] = 'terminating'
2638            if self.state_filename: self.write_state()
2639            self.state_lock.release()
2640
2641            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2642            # then completes, so we can't wait if nothing starts.  So, no
2643            # tbparams, no start.
2644            if len(tbparams) > 0:
2645                thread_pool = self.thread_pool(self.nthreads)
2646                for k in tbparams.keys():
2647                    # Create and start a thread to stop the segment
2648                    thread_pool.wait_for_slot()
2649                    uri, aid = tbparams[k]
2650                    t  = self.pooled_thread(\
2651                            target=self.terminate_segment(log=dealloc_log,
2652                                testbed=uri,
2653                                cert_file=self.cert_file, 
2654                                cert_pwd=self.cert_pwd,
2655                                trusted_certs=self.trusted_certs,
2656                                caller=self.call_TerminateSegment),
2657                            args=(uri, aid), name=k,
2658                            pdata=thread_pool, trace_file=self.trace_file)
2659                    t.start()
2660                # Wait for completions
2661                thread_pool.wait_for_all_done()
2662
2663            # release the allocations (failed experiments have done this
2664            # already, and starting experiments may be in odd states, so we
2665            # ignore errors releasing those allocations
2666            try: 
2667                for k in tbparams.keys():
2668                    # This releases access by uri
2669                    uri, aid = tbparams[k]
2670                    self.release_access(None, aid, uri=uri)
2671            except service_error, e:
2672                if status != 'failed' and not force:
2673                    raise e
2674
2675            # Remove the terminated experiment
2676            self.state_lock.acquire()
2677            for id in ids:
2678                if self.state.has_key(id): del self.state[id]
2679
2680            if self.state_filename: self.write_state()
2681            self.state_lock.release()
2682
2683            # Delete any synch points associated with this experiment.  All
2684            # synch points begin with the fedid of the experiment.
2685            fedid_keys = set(["fedid:%s" % f for f in ids \
2686                    if isinstance(f, fedid)])
2687            for k in self.synch_store.all_keys():
2688                try:
2689                    if len(k) > 45 and k[0:46] in fedid_keys:
2690                        self.synch_store.del_value(k)
2691                except synch_store.BadDeletionError:
2692                    pass
2693            self.write_store()
2694               
2695            return { 
2696                    'experiment': exp , 
2697                    'deallocationLog': "".join(dealloc_list),
2698                    }
2699        else:
2700            # Don't forget to release the lock
2701            self.state_lock.release()
2702            raise service_error(service_error.req, "No saved state")
2703
2704
2705    def GetValue(self, req, fid):
2706        """
2707        Get a value from the synchronized store
2708        """
2709        req = req.get('GetValueRequestBody', None)
2710        if not req:
2711            raise service_error(service_error.req,
2712                    "Bad request format (no GetValueRequestBody)")
2713       
2714        name = req['name']
2715        wait = req['wait']
2716        rv = { 'name': name }
2717
2718        if self.auth.check_attribute(fid, name):
2719            try:
2720                v = self.synch_store.get_value(name, wait)
2721            except synch_store.RevokedKeyError:
2722                # No more synch on this key
2723                raise service_error(service_error.federant, 
2724                        "Synch key %s revoked" % name)
2725            if v is not None:
2726                rv['value'] = v
2727            self.log.debug("[GetValue] got %s from %s" % (v, name))
2728            return rv
2729        else:
2730            raise service_error(service_error.access, "Access Denied")
2731       
2732
2733    def SetValue(self, req, fid):
2734        """
2735        Set a value in the synchronized store
2736        """
2737        req = req.get('SetValueRequestBody', None)
2738        if not req:
2739            raise service_error(service_error.req,
2740                    "Bad request format (no SetValueRequestBody)")
2741       
2742        name = req['name']
2743        v = req['value']
2744
2745        if self.auth.check_attribute(fid, name):
2746            try:
2747                self.synch_store.set_value(name, v)
2748                self.write_store()
2749                self.log.debug("[SetValue] set %s to %s" % (name, v))
2750            except synch_store.CollisionError:
2751                # Translate into a service_error
2752                raise service_error(service_error.req,
2753                        "Value already set: %s" %name)
2754            except synch_store.RevokedKeyError:
2755                # No more synch on this key
2756                raise service_error(service_error.federant, 
2757                        "Synch key %s revoked" % name)
2758            return { 'name': name, 'value': v }
2759        else:
2760            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.