source: fedd/federation/experiment_control.py @ 32e7d93

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since 32e7d93 was 32e7d93, checked in by Ted Faber <faber@…>, 15 years ago

Incremental logging and correct failures.

  • Property mode set to 100644
File size: 74.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self, timeout=None):
123            """
124            Wait until all active threads finish (and at least one has
125            started).  If a timeout is given, return after waiting that long
126            for termination.  If all threads are done (and one has started in
127            the since the last clear()) return True, otherwise False.
128            """
129            if timeout:
130                deadline = time.time() + timeout
131            self.acquire()
132            while self.started == 0 or self.started > self.terminated:
133                self.wait(timeout)
134                if timeout:
135                    if time.time() > deadline:
136                        break
137                    timeout = deadline - time.time()
138            self.release()
139            return not (self.started == 0 or self.started > self.terminated)
140
141    class pooled_thread(Thread):
142        """
143        One of a set of threads dedicated to a specific task.  Uses the
144        thread_pool class above for coordination.
145        """
146        def __init__(self, group=None, target=None, name=None, args=(), 
147                kwargs={}, pdata=None, trace_file=None):
148            Thread.__init__(self, group, target, name, args, kwargs)
149            self.rv = None          # Return value of the ops in this thread
150            self.exception = None   # Exception that terminated this thread
151            self.target=target      # Target function to run on start()
152            self.args = args        # Args to pass to target
153            self.kwargs = kwargs    # Additional kw args
154            self.pdata = pdata      # thread_pool for this class
155            # Logger for this thread
156            self.log = logging.getLogger("fedd.experiment_control")
157       
158        def run(self):
159            """
160            Emulate Thread.run, except add pool data manipulation and error
161            logging.
162            """
163            if self.pdata:
164                self.pdata.start()
165
166            if self.target:
167                try:
168                    self.rv = self.target(*self.args, **self.kwargs)
169                except service_error, s:
170                    self.exception = s
171                    self.log.error("Thread exception: %s %s" % \
172                            (s.code_string(), s.desc))
173                except:
174                    self.exception = sys.exc_info()[1]
175                    self.log.error(("Unexpected thread exception: %s" +\
176                            "Trace %s") % (self.exception,\
177                                traceback.format_exc()))
178            if self.pdata:
179                self.pdata.terminate()
180
181    call_RequestAccess = service_caller('RequestAccess')
182    call_ReleaseAccess = service_caller('ReleaseAccess')
183    call_StartSegment = service_caller('StartSegment')
184    call_TerminateSegment = service_caller('TerminateSegment')
185    call_Ns2Split = service_caller('Ns2Split')
186
187    def __init__(self, config=None, auth=None):
188        """
189        Intialize the various attributes, most from the config object
190        """
191
192        def parse_tarfile_list(tf):
193            """
194            Parse a tarfile list from the configuration.  This is a set of
195            paths and tarfiles separated by spaces.
196            """
197            rv = [ ]
198            if tf is not None:
199                tl = tf.split()
200                while len(tl) > 1:
201                    p, t = tl[0:2]
202                    del tl[0:2]
203                    rv.append((p, t))
204            return rv
205
206        self.thread_with_rv = experiment_control_local.pooled_thread
207        self.thread_pool = experiment_control_local.thread_pool
208        self.list_log = list_log.list_log
209
210        self.cert_file = config.get("experiment_control", "cert_file")
211        if self.cert_file:
212            self.cert_pwd = config.get("experiment_control", "cert_pwd")
213        else:
214            self.cert_file = config.get("globals", "cert_file")
215            self.cert_pwd = config.get("globals", "cert_pwd")
216
217        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
218                or config.get("globals", "trusted_certs")
219
220        self.repodir = config.get("experiment_control", "repodir")
221
222        self.exp_stem = "fed-stem"
223        self.log = logging.getLogger("fedd.experiment_control")
224        set_log_level(config, "experiment_control", self.log)
225        self.muxmax = 2
226        self.nthreads = 2
227        self.randomize_experiments = False
228
229        self.splitter = None
230        self.ssh_keygen = "/usr/bin/ssh-keygen"
231        self.ssh_identity_file = None
232
233
234        self.debug = config.getboolean("experiment_control", "create_debug")
235        self.state_filename = config.get("experiment_control", 
236                "experiment_state")
237        self.splitter_url = config.get("experiment_control", "splitter_uri")
238        self.fedkit = parse_tarfile_list(\
239                config.get("experiment_control", "fedkit"))
240        self.gatewaykit = parse_tarfile_list(\
241                config.get("experiment_control", "gatewaykit"))
242        accessdb_file = config.get("experiment_control", "accessdb")
243
244        self.ssh_pubkey_file = config.get("experiment_control", 
245                "ssh_pubkey_file")
246        self.ssh_privkey_file = config.get("experiment_control",
247                "ssh_privkey_file")
248        # NB for internal master/slave ops, not experiment setup
249        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
250
251        self.overrides = set([])
252        ovr = config.get('experiment_control', 'overrides')
253        if ovr:
254            for o in ovr.split(","):
255                o = o.strip()
256                if o.startswith('fedid:'): o = o[len('fedid:'):]
257                self.overrides.add(fedid(hexstr=o))
258
259        self.state = { }
260        self.state_lock = Lock()
261        self.tclsh = "/usr/local/bin/otclsh"
262        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
263                config.get("experiment_control", "tcl_splitter",
264                        "/usr/testbed/lib/ns2ir/parse.tcl")
265        mapdb_file = config.get("experiment_control", "mapdb")
266        self.trace_file = sys.stderr
267
268        self.def_expstart = \
269                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
270                "/tmp/federate";
271        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
272                "FEDDIR/hosts";
273        self.def_gwstart = \
274                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
275                "/tmp/bridge.log";
276        self.def_mgwstart = \
277                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
278                "/tmp/bridge.log";
279        self.def_gwimage = "FBSD61-TUNNEL2";
280        self.def_gwtype = "pc";
281        self.local_access = { }
282
283        if auth:
284            self.auth = auth
285        else:
286            self.log.error(\
287                    "[access]: No authorizer initialized, creating local one.")
288            auth = authorizer()
289
290
291        if self.ssh_pubkey_file:
292            try:
293                f = open(self.ssh_pubkey_file, 'r')
294                self.ssh_pubkey = f.read()
295                f.close()
296            except IOError:
297                raise service_error(service_error.internal,
298                        "Cannot read sshpubkey")
299        else:
300            raise service_error(service_error.internal, 
301                    "No SSH public key file?")
302
303        if not self.ssh_privkey_file:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307
308        if mapdb_file:
309            self.read_mapdb(mapdb_file)
310        else:
311            self.log.warn("[experiment_control] No testbed map, using defaults")
312            self.tbmap = { 
313                    'deter':'https://users.isi.deterlab.net:23235',
314                    'emulab':'https://users.isi.deterlab.net:23236',
315                    'ucb':'https://users.isi.deterlab.net:23237',
316                    }
317
318        if accessdb_file:
319                self.read_accessdb(accessdb_file)
320        else:
321            raise service_error(service_error.internal,
322                    "No accessdb specified in config")
323
324        # Grab saved state.  OK to do this w/o locking because it's read only
325        # and only one thread should be in existence that can see self.state at
326        # this point.
327        if self.state_filename:
328            self.read_state()
329
330        # Dispatch tables
331        self.soap_services = {\
332                'Create': soap_handler('Create', self.create_experiment),
333                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
334                'Vis': soap_handler('Vis', self.get_vis),
335                'Info': soap_handler('Info', self.get_info),
336                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
337                'Terminate': soap_handler('Terminate', 
338                    self.terminate_experiment),
339        }
340
341        self.xmlrpc_services = {\
342                'Create': xmlrpc_handler('Create', self.create_experiment),
343                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
344                'Vis': xmlrpc_handler('Vis', self.get_vis),
345                'Info': xmlrpc_handler('Info', self.get_info),
346                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
347                'Terminate': xmlrpc_handler('Terminate',
348                    self.terminate_experiment),
349        }
350
351    # Call while holding self.state_lock
352    def write_state(self):
353        """
354        Write a new copy of experiment state after copying the existing state
355        to a backup.
356
357        State format is a simple pickling of the state dictionary.
358        """
359        if os.access(self.state_filename, os.W_OK):
360            copy_file(self.state_filename, \
361                    "%s.bak" % self.state_filename)
362        try:
363            f = open(self.state_filename, 'w')
364            pickle.dump(self.state, f)
365        except IOError, e:
366            self.log.error("Can't write file %s: %s" % \
367                    (self.state_filename, e))
368        except pickle.PicklingError, e:
369            self.log.error("Pickling problem: %s" % e)
370        except TypeError, e:
371            self.log.error("Pickling problem (TypeError): %s" % e)
372
373    # Call while holding self.state_lock
374    def read_state(self):
375        """
376        Read a new copy of experiment state.  Old state is overwritten.
377
378        State format is a simple pickling of the state dictionary.
379        """
380       
381        def get_experiment_id(state):
382            """
383            Pull the fedid experimentID out of the saved state.  This is kind
384            of a gross walk through the dict.
385            """
386
387            if state.has_key('experimentID'):
388                for e in state['experimentID']:
389                    if e.has_key('fedid'):
390                        return e['fedid']
391                else:
392                    return None
393            else:
394                return None
395
396        def get_alloc_ids(state):
397            """
398            Pull the fedids of the identifiers of each allocation from the
399            state.  Again, a dict dive that's best isolated.
400            """
401
402            return [ f['allocID']['fedid'] 
403                    for f in state.get('federant',[]) \
404                        if f.has_key('allocID') and \
405                            f['allocID'].has_key('fedid')]
406
407
408        try:
409            f = open(self.state_filename, "r")
410            self.state = pickle.load(f)
411            self.log.debug("[read_state]: Read state from %s" % \
412                    self.state_filename)
413        except IOError, e:
414            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
415                    % (self.state_filename, e))
416        except pickle.UnpicklingError, e:
417            self.log.warning(("[read_state]: No saved state: " + \
418                    "Unpickling failed: %s") % e)
419       
420        for s in self.state.values():
421            try:
422
423                eid = get_experiment_id(s)
424                if eid : 
425                    # Give the owner rights to the experiment
426                    self.auth.set_attribute(s['owner'], eid)
427                    # And holders of the eid as well
428                    self.auth.set_attribute(eid, eid)
429                    # allow overrides to control experiments as well
430                    for o in self.overrides:
431                        self.auth.set_attribute(o, eid)
432                    # Set permissions to allow reading of the software repo, if
433                    # any, as well.
434                    for a in get_alloc_ids(s):
435                        self.auth.set_attribute(a, 'repo/%s' % eid)
436                else:
437                    raise KeyError("No experiment id")
438            except KeyError, e:
439                self.log.warning("[read_state]: State ownership or identity " +\
440                        "misformatted in %s: %s" % (self.state_filename, e))
441
442
443    def read_accessdb(self, accessdb_file):
444        """
445        Read the mapping from fedids that can create experiments to their name
446        in the 3-level access namespace.  All will be asserted from this
447        testbed and can include the local username and porject that will be
448        asserted on their behalf by this fedd.  Each fedid is also added to the
449        authorization system with the "create" attribute.
450        """
451        self.accessdb = {}
452        # These are the regexps for parsing the db
453        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
454        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
455                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
456        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
457                "\s*->\s*(" + name_expr + ")\s*$")
458        lineno = 0
459
460        # Parse the mappings and store in self.authdb, a dict of
461        # fedid -> (proj, user)
462        try:
463            f = open(accessdb_file, "r")
464            for line in f:
465                lineno += 1
466                line = line.strip()
467                if len(line) == 0 or line.startswith('#'):
468                    continue
469                m = project_line.match(line)
470                if m:
471                    fid = fedid(hexstr=m.group(1))
472                    project, user = m.group(2,3)
473                    if not self.accessdb.has_key(fid):
474                        self.accessdb[fid] = []
475                    self.accessdb[fid].append((project, user))
476                    continue
477
478                m = user_line.match(line)
479                if m:
480                    fid = fedid(hexstr=m.group(1))
481                    project = None
482                    user = m.group(2)
483                    if not self.accessdb.has_key(fid):
484                        self.accessdb[fid] = []
485                    self.accessdb[fid].append((project, user))
486                    continue
487                self.log.warn("[experiment_control] Error parsing access " +\
488                        "db %s at line %d" %  (accessdb_file, lineno))
489        except IOError:
490            raise service_error(service_error.internal,
491                    "Error opening/reading %s as experiment " +\
492                            "control accessdb" %  accessdb_file)
493        f.close()
494
495        # Initialize the authorization attributes
496        for fid in self.accessdb.keys():
497            self.auth.set_attribute(fid, 'create')
498
499    def read_mapdb(self, file):
500        """
501        Read a simple colon separated list of mappings for the
502        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
503        """
504
505        self.tbmap = { }
506        lineno =0
507        try:
508            f = open(file, "r")
509            for line in f:
510                lineno += 1
511                line = line.strip()
512                if line.startswith('#') or len(line) == 0:
513                    continue
514                try:
515                    label, url = line.split(':', 1)
516                    self.tbmap[label] = url
517                except ValueError, e:
518                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
519                            "map db: %s %s" % (lineno, line, e))
520        except IOError, e:
521            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
522                    "open %s: %s" % (file, e))
523        f.close()
524       
525    def generate_ssh_keys(self, dest, type="rsa" ):
526        """
527        Generate a set of keys for the gateways to use to talk.
528
529        Keys are of type type and are stored in the required dest file.
530        """
531        valid_types = ("rsa", "dsa")
532        t = type.lower();
533        if t not in valid_types: raise ValueError
534        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
535
536        try:
537            trace = open("/dev/null", "w")
538        except IOError:
539            raise service_error(service_error.internal,
540                    "Cannot open /dev/null??");
541
542        # May raise CalledProcessError
543        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
544        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
545        if rv != 0:
546            raise service_error(service_error.internal, 
547                    "Cannot generate nonce ssh keys.  %s return code %d" \
548                            % (self.ssh_keygen, rv))
549
550    def gentopo(self, str):
551        """
552        Generate the topology dtat structure from the splitter's XML
553        representation of it.
554
555        The topology XML looks like:
556            <experiment>
557                <nodes>
558                    <node><vname></vname><ips>ip1:ip2</ips></node>
559                </nodes>
560                <lans>
561                    <lan>
562                        <vname></vname><vnode></vnode><ip></ip>
563                        <bandwidth></bandwidth><member>node:port</member>
564                    </lan>
565                </lans>
566        """
567        class topo_parse:
568            """
569            Parse the topology XML and create the dats structure.
570            """
571            def __init__(self):
572                # Typing of the subelements for data conversion
573                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
574                self.int_subelements = ( 'bandwidth',)
575                self.float_subelements = ( 'delay',)
576                # The final data structure
577                self.nodes = [ ]
578                self.lans =  [ ]
579                self.topo = { \
580                        'node': self.nodes,\
581                        'lan' : self.lans,\
582                    }
583                self.element = { }  # Current element being created
584                self.chars = ""     # Last text seen
585
586            def end_element(self, name):
587                # After each sub element the contents is added to the current
588                # element or to the appropriate list.
589                if name == 'node':
590                    self.nodes.append(self.element)
591                    self.element = { }
592                elif name == 'lan':
593                    self.lans.append(self.element)
594                    self.element = { }
595                elif name in self.str_subelements:
596                    self.element[name] = self.chars
597                    self.chars = ""
598                elif name in self.int_subelements:
599                    self.element[name] = int(self.chars)
600                    self.chars = ""
601                elif name in self.float_subelements:
602                    self.element[name] = float(self.chars)
603                    self.chars = ""
604
605            def found_chars(self, data):
606                self.chars += data.rstrip()
607
608
609        tp = topo_parse();
610        parser = xml.parsers.expat.ParserCreate()
611        parser.EndElementHandler = tp.end_element
612        parser.CharacterDataHandler = tp.found_chars
613
614        parser.Parse(str)
615
616        return tp.topo
617       
618
619    def genviz(self, topo):
620        """
621        Generate the visualization the virtual topology
622        """
623
624        neato = "/usr/local/bin/neato"
625        # These are used to parse neato output and to create the visualization
626        # file.
627        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
628        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
629                "%s</type></node>"
630
631        try:
632            # Node names
633            nodes = [ n['vname'] for n in topo['node'] ]
634            topo_lans = topo['lan']
635        except KeyError, e:
636            raise service_error(service_error.internal, "Bad topology: %s" %e)
637
638        lans = { }
639        links = { }
640
641        # Walk through the virtual topology, organizing the connections into
642        # 2-node connections (links) and more-than-2-node connections (lans).
643        # When a lan is created, it's added to the list of nodes (there's a
644        # node in the visualization for the lan).
645        for l in topo_lans:
646            if links.has_key(l['vname']):
647                if len(links[l['vname']]) < 2:
648                    links[l['vname']].append(l['vnode'])
649                else:
650                    nodes.append(l['vname'])
651                    lans[l['vname']] = links[l['vname']]
652                    del links[l['vname']]
653                    lans[l['vname']].append(l['vnode'])
654            elif lans.has_key(l['vname']):
655                lans[l['vname']].append(l['vnode'])
656            else:
657                links[l['vname']] = [ l['vnode'] ]
658
659
660        # Open up a temporary file for dot to turn into a visualization
661        try:
662            df, dotname = tempfile.mkstemp()
663            dotfile = os.fdopen(df, 'w')
664        except IOError:
665            raise service_error(service_error.internal,
666                    "Failed to open file in genviz")
667
668        try:
669            dnull = open('/dev/null', 'w')
670        except IOError:
671            service_error(service_error.internal,
672                    "Failed to open /dev/null in genviz")
673
674        # Generate a dot/neato input file from the links, nodes and lans
675        try:
676            print >>dotfile, "graph G {"
677            for n in nodes:
678                print >>dotfile, '\t"%s"' % n
679            for l in links.keys():
680                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
681            for l in lans.keys():
682                for n in lans[l]:
683                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
684            print >>dotfile, "}"
685            dotfile.close()
686        except TypeError:
687            raise service_error(service_error.internal,
688                    "Single endpoint link in vtopo")
689        except IOError:
690            raise service_error(service_error.internal, "Cannot write dot file")
691
692        # Use dot to create a visualization
693        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
694                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
695                close_fds=True)
696        dnull.close()
697
698        # Translate dot to vis format
699        vis_nodes = [ ]
700        vis = { 'node': vis_nodes }
701        for line in dot.stdout:
702            m = vis_re.match(line)
703            if m:
704                vn = m.group(1)
705                vis_node = {'name': vn, \
706                        'x': float(m.group(2)),\
707                        'y' : float(m.group(3)),\
708                    }
709                if vn in links.keys() or vn in lans.keys():
710                    vis_node['type'] = 'lan'
711                else:
712                    vis_node['type'] = 'node'
713                vis_nodes.append(vis_node)
714        rv = dot.wait()
715
716        os.remove(dotname)
717        if rv == 0 : return vis
718        else: return None
719
720    def get_access(self, tb, nodes, user, tbparam, master, export_project,
721            access_user):
722        """
723        Get access to testbed through fedd and set the parameters for that tb
724        """
725        uri = self.tbmap.get(tb, None)
726        if not uri:
727            raise service_error(serice_error.server_config, 
728                    "Unknown testbed: %s" % tb)
729
730        # currently this lumps all users into one service access group
731        service_keys = [ a for u in user \
732                for a in u.get('access', []) \
733                    if a.has_key('sshPubkey')]
734
735        if len(service_keys) == 0:
736            raise service_error(service_error.req, 
737                    "Must have at least one SSH pubkey for services")
738
739
740        for p, u in access_user:
741            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
742                    "to %s") %  ((p or "None"), u, uri))
743
744            if p:
745                # Request with user and project specified
746                req = {\
747                        'destinationTestbed' : { 'uri' : uri },
748                        'project': { 
749                            'name': {'localname': p},
750                            'user': [ {'userID': { 'localname': u } } ],
751                            },
752                        'user':  user,
753                        'allocID' : { 'localname': 'test' },
754                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
755                        'serviceAccess' : service_keys
756                    }
757            else:
758                # Request with only user specified
759                req = {\
760                        'destinationTestbed' : { 'uri' : uri },
761                        'user':  [ {'userID': { 'localname': u } } ],
762                        'allocID' : { 'localname': 'test' },
763                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
764                        'serviceAccess' : service_keys
765                    }
766
767            if tb == master:
768                # NB, the export_project parameter is a dict that includes
769                # the type
770                req['exportProject'] = export_project
771
772            # node resources if any
773            if nodes != None and len(nodes) > 0:
774                rnodes = [ ]
775                for n in nodes:
776                    rn = { }
777                    image, hw, count = n.split(":")
778                    if image: rn['image'] = [ image ]
779                    if hw: rn['hardware'] = [ hw ]
780                    if count and int(count) >0 : rn['count'] = int(count)
781                    rnodes.append(rn)
782                req['resources']= { }
783                req['resources']['node'] = rnodes
784
785            try:
786                if self.local_access.has_key(uri):
787                    # Local access call
788                    req = { 'RequestAccessRequestBody' : req }
789                    r = self.local_access[uri].RequestAccess(req, 
790                            fedid(file=self.cert_file))
791                    r = { 'RequestAccessResponseBody' : r }
792                else:
793                    r = self.call_RequestAccess(uri, req, 
794                            self.cert_file, self.cert_pwd, self.trusted_certs)
795            except service_error, e:
796                if e.code == service_error.access:
797                    self.log.debug("[get_access] Access denied")
798                    r = None
799                    continue
800                else:
801                    raise e
802
803            if r.has_key('RequestAccessResponseBody'):
804                # Through to here we have a valid response, not a fault.
805                # Access denied is a fault, so something better or worse than
806                # access denied has happened.
807                r = r['RequestAccessResponseBody']
808                self.log.debug("[get_access] Access granted")
809                break
810            else:
811                raise service_error(service_error.protocol,
812                        "Bad proxy response")
813       
814        if not r:
815            raise service_error(service_error.access, 
816                    "Access denied by %s (%s)" % (tb, uri))
817
818        e = r['emulab']
819        p = e['project']
820        tbparam[tb] = { 
821                "boss": e['boss'],
822                "host": e['ops'],
823                "domain": e['domain'],
824                "fs": e['fileServer'],
825                "eventserver": e['eventServer'],
826                "project": unpack_id(p['name']),
827                "emulab" : e,
828                "allocID" : r['allocID'],
829                }
830        # Make the testbed name be the label the user applied
831        p['testbed'] = {'localname': tb }
832
833        for u in p['user']:
834            role = u.get('role', None)
835            if role == 'experimentCreation':
836                tbparam[tb]['user'] = unpack_id(u['userID'])
837                break
838        else:
839            raise service_error(service_error.internal, 
840                    "No createExperimentUser from %s" %tb)
841
842        # Add attributes to barameter space.  We don't allow attributes to
843        # overlay any parameters already installed.
844        for a in e['fedAttr']:
845            try:
846                if a['attribute'] and isinstance(a['attribute'], basestring)\
847                        and not tbparam[tb].has_key(a['attribute'].lower()):
848                    tbparam[tb][a['attribute'].lower()] = a['value']
849            except KeyError:
850                self.log.error("Bad attribute in response: %s" % a)
851       
852    def release_access(self, tb, aid):
853        """
854        Release access to testbed through fedd
855        """
856
857        uri = self.tbmap.get(tb, None)
858        if not uri:
859            raise service_error(serice_error.server_config, 
860                    "Unknown testbed: %s" % tb)
861
862        if self.local_access.has_key(uri):
863            resp = self.local_access[uri].ReleaseAccess(\
864                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
865                    fedid(file=self.cert_file))
866            resp = { 'ReleaseAccessResponseBody': resp } 
867        else:
868            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
869                    self.cert_file, self.cert_pwd, self.trusted_certs)
870
871        # better error coding
872
873    def remote_splitter(self, uri, desc, master):
874
875        req = {
876                'description' : { 'ns2description': desc },
877                'master': master,
878                'include_fedkit': bool(self.fedkit),
879                'include_gatewaykit': bool(self.gatewaykit)
880            }
881
882        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
883                self.trusted_certs)
884
885        if r.has_key('Ns2SplitResponseBody'):
886            r = r['Ns2SplitResponseBody']
887            if r.has_key('output'):
888                return r['output'].splitlines()
889            else:
890                raise service_error(service_error.protocol, 
891                        "Bad splitter response (no output)")
892        else:
893            raise service_error(service_error.protocol, "Bad splitter response")
894
895    class start_segment:
896        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
897                cert_pwd=None, trusted_certs=None, caller=None,
898                log_collector=None):
899            self.log = log
900            self.debug = debug
901            self.cert_file = cert_file
902            self.cert_pwd = cert_pwd
903            self.trusted_certs = None
904            self.caller = caller
905            self.testbed = testbed
906            self.log_collector = log_collector
907
908        def __call__(self, uri, aid, topo, master, attrs=None):
909            req = {
910                    'allocID': { 'fedid' : aid }, 
911                    'segmentdescription': { 
912                        'topdldescription': topo.to_dict(),
913                    },
914                    'master': master,
915                }
916            if attrs:
917                req['fedAttr'] = attrs
918
919            try:
920                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
921                        self.trusted_certs)
922                if r.has_key('StartSegmentResponseBody'):
923                    lval = r['StartSegmentResponseBody'].get('allocationLog',
924                            None)
925                    if lval and self.log_collector:
926                        for line in  lval.splitlines(True):
927                            self.log_collector.write(line)
928                else:
929                    raise service_error(service_error.internal, 
930                            "Bad response!?: %s" %r)
931                return True
932            except service_error, e:
933                self.log.error("Start segment failed on %s: %s" % \
934                        (self.testbed, e))
935                return False
936
937
938
939    class terminate_segment:
940        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
941                cert_pwd=None, trusted_certs=None, caller=None):
942            self.log = log
943            self.debug = debug
944            self.cert_file = cert_file
945            self.cert_pwd = cert_pwd
946            self.trusted_certs = None
947            self.caller = caller
948            self.testbed = testbed
949
950        def __call__(self, uri, aid ):
951            req = {
952                    'allocID': aid , 
953                }
954            try:
955                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
956                        self.trusted_certs)
957                return True
958            except service_error, e:
959                self.log.error("Terminate segment failed on %s: %s" % \
960                        (self.testbed, e))
961                return False
962   
963
964    def allocate_resources(self, allocated, master, eid, expid, expcert, 
965            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
966            attrs=None):
967        started = { }           # Testbeds where a sub-experiment started
968                                # successfully
969
970        # XXX
971        fail_soft = False
972
973        log = alloc_log or self.log
974
975        thread_pool = self.thread_pool(self.nthreads)
976        threads = [ ]
977
978        for tb in [ k for k in allocated.keys() if k != master]:
979            # Create and start a thread to start the segment, and save it to
980            # get the return value later
981            thread_pool.wait_for_slot()
982            uri = self.tbmap.get(tb, None)
983            if not uri:
984                raise service_error(service_error.internal, 
985                        "Unknown testbed %s !?" % tb)
986
987            if tbparams[tb].has_key('allocID') and \
988                    tbparams[tb]['allocID'].has_key('fedid'):
989                aid = tbparams[tb]['allocID']['fedid']
990            else:
991                raise service_error(service_error.internal, 
992                        "No alloc id for testbed %s !?" % tb)
993
994            t  = self.pooled_thread(\
995                    target=self.start_segment(log=log, debug=self.debug,
996                        testbed=tb, cert_file=self.cert_file,
997                        cert_pwd=self.cert_pwd,
998                        trusted_certs=self.trusted_certs,
999                        caller=self.call_StartSegment,
1000                        log_collector=log_collector), 
1001                    args=(uri, aid, topo[tb], False, attrs), name=tb,
1002                    pdata=thread_pool, trace_file=self.trace_file)
1003            threads.append(t)
1004            t.start()
1005
1006        # Wait until all finish (keep pinging the log, though)
1007        mins = 0
1008        while not thread_pool.wait_for_all_done(60.0):
1009            mins += 1
1010            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1011                    % mins)
1012
1013        thread_pool.clear()
1014
1015        # If none failed, start the master
1016        failed = [ t.getName() for t in threads if not t.rv ]
1017
1018        if len(failed) == 0:
1019            uri = self.tbmap.get(master, None)
1020            if not uri:
1021                raise service_error(service_error.internal, 
1022                        "Unknown testbed %s !?" % master)
1023
1024            if tbparams[master].has_key('allocID') and \
1025                    tbparams[master]['allocID'].has_key('fedid'):
1026                aid = tbparams[master]['allocID']['fedid']
1027            else:
1028                raise service_error(service_error.internal, 
1029                    "No alloc id for testbed %s !?" % master)
1030            t = self.pooled_thread(
1031                    target=self.start_segment(log=log, debug=self.debug, 
1032                        testbed=master, cert_file=self.cert_file, 
1033                        cert_pwd=self.cert_pwd, 
1034                        trusted_certs=self.trusted_certs,
1035                        caller=self.call_StartSegment,
1036                        log_collector=log_collector),
1037                    args =(uri, aid, topo[master], True, attrs),
1038                    name=master, pdata=thread_pool, trace_file=self.trace_file)
1039            threads.append(t)
1040            t.start()
1041            # Wait until the master finishes (keep pinging the log, though)
1042            mins = 0
1043            while not thread_pool.wait_for_all_done(60.0):
1044                mins += 1
1045                alloc_log.info("Waiting for master (it has been %d mins)" \
1046                        % mins)
1047            # update failed to include the master, if it failed
1048            failed = [ t.getName() for t in threads if not t.rv ]
1049
1050        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1051        # If one failed clean up, unless fail_soft is set
1052        if failed:
1053            if not fail_soft:
1054                thread_pool.clear()
1055                for tb in succeeded:
1056                    # Create and start a thread to stop the segment
1057                    thread_pool.wait_for_slot()
1058                    uri = self.tbmap.get(tb, None)
1059                    t  = self.pooled_thread(\
1060                            target=self.terminate_segment(log=log,
1061                                testbed=tb,
1062                                cert_file=self.cert_file, 
1063                                cert_pwd=self.cert_pwd,
1064                                trusted_certs=self.trusted_certs,
1065                                caller=self.call_TerminateSegment),
1066                            args=(uri, tbparams[tb]['federant']['allocID']),
1067                            name=tb,
1068                            pdata=thread_pool, trace_file=self.trace_file)
1069                    t.start()
1070                # Wait until all finish
1071                thread_pool.wait_for_all_done()
1072
1073                # release the allocations
1074                for tb in tbparams.keys():
1075                    self.release_access(tb, tbparams[tb]['allocID'])
1076                # Remove the placeholder
1077                self.state_lock.acquire()
1078                self.state[eid]['experimentStatus'] = 'failed'
1079                if self.state_filename: self.write_state()
1080                self.state_lock.release()
1081
1082                log.error("Swap in failed on %s" % ",".join(failed))
1083                return
1084        else:
1085            log.info("[start_segment]: Experiment %s active" % eid)
1086
1087        log.debug("[start_experiment]: removing %s" % tmpdir)
1088
1089        # Walk up tmpdir, deleting as we go
1090        for path, dirs, files in os.walk(tmpdir, topdown=False):
1091            for f in files:
1092                os.remove(os.path.join(path, f))
1093            for d in dirs:
1094                os.rmdir(os.path.join(path, d))
1095        os.rmdir(tmpdir)
1096
1097        # Insert the experiment into our state and update the disk copy
1098        self.state_lock.acquire()
1099        self.state[expid]['experimentStatus'] = 'active'
1100        self.state[eid] = self.state[expid]
1101        if self.state_filename: self.write_state()
1102        self.state_lock.release()
1103        return
1104
1105
1106    def add_kit(self, e, kit):
1107        """
1108        Add a Software object created from the list of (install, location)
1109        tuples passed as kit  to the software attribute of an object e.  We
1110        do this enough to break out the code, but it's kind of a hack to
1111        avoid changing the old tuple rep.
1112        """
1113
1114        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1115
1116        if isinstance(e.software, list): e.software.extend(s)
1117        else: e.software = s
1118
1119
1120    def create_experiment_state(self, fid, req, expid, expcert):
1121        """
1122        Create the initial entry in the experiment's state.  The expid and
1123        expcert are the experiment's fedid and certifacte that represents that
1124        ID, which are installed in the experiment state.  If the request
1125        includes a suggested local name that is used if possible.  If the local
1126        name is already taken by an experiment owned by this user that has
1127        failed, it is overwriutten.  Otherwise new letters are added until a
1128        valid localname is found.  The generated local name is returned.
1129        """
1130
1131        if req.has_key('experimentID') and \
1132                req['experimentID'].has_key('localname'):
1133            overwrite = False
1134            eid = req['experimentID']['localname']
1135            # If there's an old failed experiment here with the same local name
1136            # and accessible by this user, we'll overwrite it, otherwise we'll
1137            # fall through and do the collision avoidance.
1138            old_expid = self.get_experiment_fedid(eid)
1139            if old_expid and self.check_experiment_access(fid, old_expid):
1140                self.state_lock.acquire()
1141                status = self.state[eid].get('experimentStatus', None)
1142                if status and status == 'failed':
1143                    # remove the old access attribute
1144                    self.auth.unset_attribute(fid, old_expid)
1145                    overwrite = True
1146                    del self.state[eid]
1147                    del self.state[old_expid]
1148                self.state_lock.release()
1149            self.state_lock.acquire()
1150            while (self.state.has_key(eid) and not overwrite):
1151                eid += random.choice(string.ascii_letters)
1152            # Initial state
1153            self.state[eid] = {
1154                    'experimentID' : \
1155                            [ { 'localname' : eid }, {'fedid': expid } ],
1156                    'experimentStatus': 'starting',
1157                    'experimentAccess': { 'X509' : expcert },
1158                    'owner': fid,
1159                    'log' : [],
1160                }
1161            self.state[expid] = self.state[eid]
1162            if self.state_filename: self.write_state()
1163            self.state_lock.release()
1164        else:
1165            eid = self.exp_stem
1166            for i in range(0,5):
1167                eid += random.choice(string.ascii_letters)
1168            self.state_lock.acquire()
1169            while (self.state.has_key(eid)):
1170                eid = self.exp_stem
1171                for i in range(0,5):
1172                    eid += random.choice(string.ascii_letters)
1173            # Initial state
1174            self.state[eid] = {
1175                    'experimentID' : \
1176                            [ { 'localname' : eid }, {'fedid': expid } ],
1177                    'experimentStatus': 'starting',
1178                    'experimentAccess': { 'X509' : expcert },
1179                    'owner': fid,
1180                    'log' : [],
1181                }
1182            self.state[expid] = self.state[eid]
1183            if self.state_filename: self.write_state()
1184            self.state_lock.release()
1185
1186        return eid
1187
1188
1189    def allocate_ips_to_topo(self, top):
1190        """
1191        Add an ip4_address attribute to all the hosts in teh topology, based on
1192        the shared substrates on which they sit.  An /etc/hosts file is also
1193        created and returned as a list of hostfiles entries.
1194        """
1195        subs = sorted(top.substrates, 
1196                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1197                reverse=True)
1198        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1199        ifs = { }
1200        hosts = [ ]
1201
1202        for idx, s in enumerate(subs):
1203            a = ips.allocate(len(s.interfaces)+2)
1204            if a :
1205                base, num = a
1206                if num < len(s.interfaces) +2 : 
1207                    raise service_error(service_error.internal,
1208                            "Allocator returned wrong number of IPs??")
1209            else:
1210                raise service_error(service_error.req, 
1211                        "Cannot allocate IP addresses")
1212
1213            base += 1
1214            for i in s.interfaces:
1215                i.attribute.append(
1216                        topdl.Attribute('ip4_address', 
1217                            "%s" % ip_addr(base)))
1218                hname = i.element.name[0]
1219                if ifs.has_key(hname):
1220                    hosts.append("%s\t%s-%s %s-%d" % \
1221                            (ip_addr(base), hname, s.name, hname,
1222                                ifs[hname]))
1223                else:
1224                    ifs[hname] = 0
1225                    hosts.append("%s\t%s-%s %s-%d %s" % \
1226                            (ip_addr(base), hname, s.name, hname,
1227                                ifs[hname], hname))
1228
1229                ifs[hname] += 1
1230                base += 1
1231        return hosts
1232
1233    def get_access_to_testbeds(self, testbeds, user, access_user, 
1234            export_project, master, allocated, tbparams):
1235        """
1236        Request access to the various testbeds required for this instantiation
1237        (passed in as testbeds).  User, access_user, expoert_project and master
1238        are used to construct the correct requests.  Per-testbed parameters are
1239        returned in tbparams.
1240        """
1241        for tb in testbeds:
1242            self.get_access(tb, None, user, tbparams, master,
1243                    export_project, access_user)
1244            allocated[tb] = 1
1245
1246    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1247        """
1248        Create the sub-topologies that are needed for experimetn instantiation.
1249        Along the way attach startup commands to the computers in the
1250        subtopologies.
1251        """
1252        for tb in testbeds:
1253            topo[tb] = top.clone()
1254            to_delete = [ ]
1255            for e in topo[tb].elements:
1256                etb = e.get_attribute('testbed')
1257                if etb and etb != tb:
1258                    for i in e.interface:
1259                        for s in i.subs:
1260                            try:
1261                                s.interfaces.remove(i)
1262                            except ValueError:
1263                                raise service_error(service_error.internal,
1264                                        "Can't remove interface??")
1265                    to_delete.append(e)
1266            for e in to_delete:
1267                topo[tb].elements.remove(e)
1268            topo[tb].make_indices()
1269
1270            for e in [ e for e in topo[tb].elements \
1271                    if isinstance(e,topdl.Computer)]:
1272                if tb == master:
1273                    cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1274                else:
1275                    cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1276                scmd = e.get_attribute('startup')
1277                if scmd:
1278                    cmd = "%s \\$USER '%s'" % (cmd, scmd)
1279
1280                e.set_attribute('startup', cmd)
1281                if self.fedkit: self.add_kit(e, self.fedkit)
1282
1283    def add_portals(self, top, topo, eid, master, tbparams):
1284        """
1285        For each substrate in the main topology, find those that
1286        have nodes on more than one testbed.  Insert portal nodes
1287        into the copies of those substrates on the sub topologies.
1288        """
1289        for s in top.substrates:
1290            # tbs will contain an ip address on this subsrate that is in
1291            # each testbed.
1292            tbs = { }
1293            for i in s.interfaces:
1294                e = i.element
1295                tb = e.get_attribute('testbed')
1296                if tb and not tbs.has_key(tb):
1297                    for i in e.interface:
1298                        if s in i.subs:
1299                            tbs[tb]= i.get_attribute('ip4_address')
1300            if len(tbs) < 2:
1301                continue
1302
1303            # More than one testbed is on this substrate.  Insert
1304            # some portals into the subtopologies.  st == source testbed,
1305            # dt == destination testbed.
1306            segment_substrate = { }
1307            for st in tbs.keys():
1308                segment_substrate[st] = { }
1309                for dt in [ t for t in tbs.keys() if t != st]:
1310                    myname =  "%stunnel" % dt
1311                    desthost  =  "%stunnel" % st
1312                    sproject = tbparams[st].get('project', 'project')
1313                    dproject = tbparams[dt].get('project', 'project')
1314                    mproject = tbparams[master].get('project', 'project')
1315                    sdomain = tbparams[st].get('domain', ".example.com")
1316                    ddomain = tbparams[dt].get('domain', ".example.com")
1317                    mdomain = tbparams[master].get('domain', '.example.com')
1318                    muser = tbparams[master].get('user', 'root')
1319                    smbshare = tbparams[master].get('smbshare', 'USERS')
1320                    # XXX: active and type need to be unkludged
1321                    active = ("%s" % (st == master))
1322                    if not segment_substrate[st].has_key(dt):
1323                        # Put a substrate and a segment for the connected
1324                        # testbed in there.
1325                        tsubstrate = \
1326                                topdl.Substrate(name='%s-%s' % (st, dt),
1327                                        attribute= [
1328                                            topdl.Attribute(
1329                                                attribute='portal',
1330                                                value='true')
1331                                            ]
1332                                        )
1333                        segment_element = topdl.Segment(
1334                                id= tbparams[dt]['allocID'],
1335                                type='emulab',
1336                                uri = self.tbmap.get(dt, None),
1337                                interface=[ 
1338                                    topdl.Interface(
1339                                        substrate=tsubstrate.name),
1340                                    ],
1341                                attribute = [
1342                                    topdl.Attribute(attribute=n, value=v)
1343                                        for n, v in (\
1344                                            ('domain', ddomain),
1345                                            ('experiment', "%s/%s" % \
1346                                                    (dproject, eid)),)
1347                                    ],
1348                                )
1349                        segment_substrate[st][dt] = tsubstrate
1350                        topo[st].substrates.append(tsubstrate)
1351                        topo[st].elements.append(segment_element)
1352                    portal = topdl.Computer(
1353                            name="%stunnel" % dt,
1354                            attribute=[ 
1355                                topdl.Attribute(attribute=n,value=v)
1356                                    for n, v in (\
1357                                        ('portal', 'true'),
1358                                        ('domain', sdomain),
1359                                        ('masterdomain', mdomain),
1360                                        ('masterexperiment', "%s/%s" % \
1361                                                (mproject, eid)),
1362                                        ('masteruser', muser),
1363                                        ('smbshare', smbshare),
1364                                        ('experiment', "%s/%s" % \
1365                                                (sproject, eid)),
1366                                        ('peer', "%s" % desthost),
1367                                        ('peer_segment', "%s" % \
1368                                                tbparams[dt]['allocID']['fedid']),
1369                                        ('scriptdir', 
1370                                            "/usr/local/federation/bin"),
1371                                        ('active', "%s" % active),
1372                                        ('portal_type', 'both'), 
1373                                        ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl -f /proj/%s/exp/%s/tmp/%s.%s.%s%s.gw.conf >& /tmp/bridge.log' % (sproject, eid, myname.lower(), eid.lower(), sproject.lower(), sdomain.lower())))
1374                                ],
1375                            interface=[
1376                                topdl.Interface(
1377                                    substrate=s.name,
1378                                    attribute=[ 
1379                                        topdl.Attribute(
1380                                            attribute='ip4_address', 
1381                                            value=tbs[dt]
1382                                        )
1383                                    ]),
1384                                topdl.Interface(
1385                                    substrate=\
1386                                        segment_substrate[st][dt].name,
1387                                    attribute=[
1388                                        topdl.Attribute(attribute='portal',
1389                                            value='true')
1390                                        ]
1391                                    ),
1392                                ],
1393                            )
1394                    if self.fedkit: self.add_kit(portal, self.fedkit)
1395                    if self.gatewaykit: self.add_kit(portal, self.gatewaykit)
1396
1397                    topo[st].elements.append(portal)
1398
1399        # Connect the gateway nodes into the topologies and clear out
1400        # substrates that are not in the topologies
1401        for tb in tbparams.keys():
1402            topo[tb].incorporate_elements()
1403            topo[tb].substrates = \
1404                    [s for s in topo[tb].substrates \
1405                        if len(s.interfaces) >0]
1406
1407    def wrangle_software(self, expid, top, topo, tbparams):
1408        """
1409        Copy software out to the repository directory, allocate permissions and
1410        rewrite the segment topologies to look for the software in local
1411        places.
1412        """
1413
1414        # Copy the rpms and tarfiles to a distribution directory from
1415        # which the federants can retrieve them
1416        linkpath = "%s/software" %  expid
1417        softdir ="%s/%s" % ( self.repodir, linkpath)
1418        softmap = { }
1419        # These are in a list of tuples format (each kit).  This comprehension
1420        # unwraps them into a single list of tuples that initilaizes the set of
1421        # tuples.
1422        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1423                for p, t in l ])
1424        pkgs.update([x.location for e in top.elements \
1425                for x in e.software])
1426        try:
1427            os.makedirs(softdir)
1428        except IOError, e:
1429            raise service_error(
1430                    "Cannot create software directory: %s" % e)
1431        # The actual copying.  Everything's converted into a url for copying.
1432        for pkg in pkgs:
1433            loc = pkg
1434
1435            scheme, host, path = urlparse(loc)[0:3]
1436            dest = os.path.basename(path)
1437            if not scheme:
1438                if not loc.startswith('/'):
1439                    loc = "/%s" % loc
1440                loc = "file://%s" %loc
1441            try:
1442                u = urlopen(loc)
1443            except Exception, e:
1444                raise service_error(service_error.req, 
1445                        "Cannot open %s: %s" % (loc, e))
1446            try:
1447                f = open("%s/%s" % (softdir, dest) , "w")
1448                self.log.debug("Writing %s/%s" % (softdir,dest) )
1449                data = u.read(4096)
1450                while data:
1451                    f.write(data)
1452                    data = u.read(4096)
1453                f.close()
1454                u.close()
1455            except Exception, e:
1456                raise service_error(service_error.internal,
1457                        "Could not copy %s: %s" % (loc, e))
1458            path = re.sub("/tmp", "", linkpath)
1459            # XXX
1460            softmap[pkg] = \
1461                    "https://users.isi.deterlab.net:23232/%s/%s" %\
1462                    ( path, dest)
1463
1464            # Allow the individual segments to access the software.
1465            for tb in tbparams.keys():
1466                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1467                        "/%s/%s" % ( path, dest))
1468
1469        # Convert the software locations in the segments into the local
1470        # copies on this host
1471        for soft in [ s for tb in topo.values() \
1472                for e in tb.elements \
1473                    if getattr(e, 'software', False) \
1474                        for s in e.software ]:
1475            if softmap.has_key(soft.location):
1476                soft.location = softmap[soft.location]
1477
1478
1479    def create_experiment(self, req, fid):
1480        """
1481        The external interface to experiment creation called from the
1482        dispatcher.
1483
1484        Creates a working directory, splits the incoming description using the
1485        splitter script and parses out the avrious subsections using the
1486        lcasses above.  Once each sub-experiment is created, use pooled threads
1487        to instantiate them and start it all up.
1488        """
1489        if not self.auth.check_attribute(fid, 'create'):
1490            raise service_error(service_error.access, "Create access denied")
1491
1492        try:
1493            tmpdir = tempfile.mkdtemp(prefix="split-")
1494            os.mkdir(tmpdir+"/keys")
1495        except IOError:
1496            raise service_error(service_error.internal, "Cannot create tmp dir")
1497
1498        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1499        gw_secretkey_base = "fed.%s" % self.ssh_type
1500        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1501        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1502        tclfile = tmpdir + "/experiment.tcl"
1503        tbparams = { }
1504        try:
1505            access_user = self.accessdb[fid]
1506        except KeyError:
1507            raise service_error(service_error.internal,
1508                    "Access map and authorizer out of sync in " + \
1509                            "create_experiment for fedid %s"  % fid)
1510
1511        pid = "dummy"
1512        gid = "dummy"
1513
1514        req = req.get('CreateRequestBody', None)
1515        if not req:
1516            raise service_error(service_error.req,
1517                    "Bad request format (no CreateRequestBody)")
1518        # The tcl parser needs to read a file so put the content into that file
1519        descr=req.get('experimentdescription', None)
1520        if descr:
1521            file_content=descr.get('ns2description', None)
1522            if file_content:
1523                try:
1524                    f = open(tclfile, 'w')
1525                    f.write(file_content)
1526                    f.close()
1527                except IOError:
1528                    raise service_error(service_error.internal,
1529                            "Cannot write temp experiment description")
1530            else:
1531                raise service_error(service_error.req, 
1532                        "Only ns2descriptions supported")
1533        else:
1534            raise service_error(service_error.req, "No experiment description")
1535
1536        # Generate an ID for the experiment (slice) and a certificate that the
1537        # allocator can use to prove they own it.  We'll ship it back through
1538        # the encrypted connection.
1539        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1540
1541        eid = self.create_experiment_state(fid, req, expid, expcert)
1542        try: 
1543            # This catches exceptions to clear the placeholder if necessary
1544            try:
1545                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1546            except ValueError:
1547                raise service_error(service_error.server_config, 
1548                        "Bad key type (%s)" % self.ssh_type)
1549
1550            user = req.get('user', None)
1551            if user == None:
1552                raise service_error(service_error.req, "No user")
1553
1554            master = req.get('master', None)
1555            if not master:
1556                raise service_error(service_error.req,
1557                        "No master testbed label")
1558            export_project = req.get('exportProject', None)
1559            if not export_project:
1560                raise service_error(service_error.req, "No export project")
1561           
1562            # Translate to topdl
1563            if self.splitter_url:
1564                # XXX: need remote topdl translator
1565                self.log.debug("Calling remote splitter at %s" % \
1566                        self.splitter_url)
1567                split_data = self.remote_splitter(self.splitter_url,
1568                        file_content, master)
1569            else:
1570                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1571                    str(self.muxmax), '-m', master]
1572
1573                if self.fedkit:
1574                    tclcmd.append('-k')
1575
1576                if self.gatewaykit:
1577                    tclcmd.append('-K')
1578
1579                tclcmd.extend([pid, gid, eid, tclfile])
1580
1581                self.log.debug("running local splitter %s", " ".join(tclcmd))
1582                # This is just fantastic.  As a side effect the parser copies
1583                # tb_compat.tcl into the current directory, so that directory
1584                # must be writable by the fedd user.  Doing this in the
1585                # temporary subdir ensures this is the case.
1586                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1587                        cwd=tmpdir)
1588                split_data = tclparser.stdout
1589
1590            top = topdl.topology_from_xml(file=split_data, top="experiment")
1591
1592            hosts = self.allocate_ips_to_topo(top)
1593             # Find the testbeds to look up
1594            testbeds = set([ a.value for e in top.elements \
1595                    for a in e.attribute \
1596                    if a.attribute == 'testbed'] )
1597
1598            allocated = { }         # Testbeds we can access
1599            topo ={ }               # Sub topologies
1600            self.get_access_to_testbeds(testbeds, user, access_user, 
1601                    export_project, master, allocated, tbparams)
1602            self.split_topology(top, topo, testbeds, eid, master, tbparams)
1603
1604            # Copy configuration files into the remote file store
1605            # The config urlpath
1606            configpath = "/%s/config" % expid
1607            # The config file system location
1608            configdir ="%s%s" % ( self.repodir, configpath)
1609            try:
1610                os.makedirs(configdir)
1611            except IOError, e:
1612                raise service_error(
1613                        "Cannot create config directory: %s" % e)
1614            try:
1615                f = open("%s/hosts" % configdir, "w")
1616                f.write('\n'.join(hosts))
1617                f.close()
1618            except IOError, e:
1619                raise service_error(service_error.internal, 
1620                        "Cannot write hosts file: %s" % e)
1621            try:
1622                copy_file("%s" % gw_pubkey, "%s/%s" % \
1623                        (configdir, gw_pubkey_base))
1624                copy_file("%s" % gw_secretkey, "%s/%s" % \
1625                        (configdir, gw_secretkey_base))
1626            except IOError, e:
1627                raise service_error(service_error.internal, 
1628                        "Cannot copy keyfiles: %s" % e)
1629
1630            # Allow the individual testbeds to access the configuration files.
1631            for tb in tbparams.keys():
1632                asignee = tbparams[tb]['allocID']['fedid']
1633                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1634                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1635
1636            self.add_portals(top, topo, eid, master, tbparams)
1637            self.wrangle_software(expid, top, topo, tbparams)
1638
1639            vtopo = topdl.topology_to_vtopo(top)
1640            vis = self.genviz(vtopo)
1641
1642            # save federant information
1643            for k in allocated.keys():
1644                tbparams[k]['federant'] = {\
1645                        'name': [ { 'localname' : eid} ],\
1646                        'emulab': tbparams[k]['emulab'],\
1647                        'allocID' : tbparams[k]['allocID'],\
1648                        'master' : k == master,\
1649                    }
1650
1651            self.state_lock.acquire()
1652            self.state[eid]['vtopo'] = vtopo
1653            self.state[eid]['vis'] = vis
1654            self.state[expid]['federant'] = \
1655                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1656                        if tbparams[tb].has_key('federant') ]
1657            if self.state_filename: 
1658                self.write_state()
1659            self.state_lock.release()
1660        except service_error, e:
1661            # If something goes wrong in the parse (usually an access error)
1662            # clear the placeholder state.  From here on out the code delays
1663            # exceptions.  Failing at this point returns a fault to the remote
1664            # caller.
1665
1666            self.state_lock.acquire()
1667            del self.state[eid]
1668            del self.state[expid]
1669            if self.state_filename: self.write_state()
1670            self.state_lock.release()
1671            raise e
1672
1673
1674        # Start the background swapper and return the starting state.  From
1675        # here on out, the state will stick around a while.
1676
1677        # Let users touch the state
1678        self.auth.set_attribute(fid, expid)
1679        self.auth.set_attribute(expid, expid)
1680        # Override fedids can manipulate state as well
1681        for o in self.overrides:
1682            self.auth.set_attribute(o, expid)
1683
1684        # Create a logger that logs to the experiment's state object as well as
1685        # to the main log file.
1686        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1687        alloc_collector = self.list_log(self.state[eid]['log'])
1688        h = logging.StreamHandler(alloc_collector)
1689        # XXX: there should be a global one of these rather than repeating the
1690        # code.
1691        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1692                    '%d %b %y %H:%M:%S'))
1693        alloc_log.addHandler(h)
1694       
1695        # XXX
1696        url_base = 'https://users.isi.deterlab.net:23232'
1697        attrs = [ 
1698                {
1699                    'attribute': 'ssh_pubkey', 
1700                    'value': '%s/%s/config/%s' % \
1701                            (url_base, expid, gw_pubkey_base)
1702                },
1703                {
1704                    'attribute': 'ssh_secretkey', 
1705                    'value': '%s/%s/config/%s' % \
1706                            (url_base, expid, gw_secretkey_base)
1707                },
1708                {
1709                    'attribute': 'hosts', 
1710                    'value': '%s/%s/config/hosts' % \
1711                            (url_base, expid)
1712                },
1713                {
1714                    'attribute': 'experiment_name',
1715                    'value': eid,
1716                },
1717            ]
1718
1719        # Start a thread to do the resource allocation
1720        t  = Thread(target=self.allocate_resources,
1721                args=(allocated, master, eid, expid, expcert, tbparams, 
1722                    topo, tmpdir, alloc_log, alloc_collector, attrs),
1723                name=eid)
1724        t.start()
1725
1726        rv = {
1727                'experimentID': [
1728                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1729                ],
1730                'experimentStatus': 'starting',
1731                'experimentAccess': { 'X509' : expcert }
1732            }
1733
1734        return rv
1735   
1736    def get_experiment_fedid(self, key):
1737        """
1738        find the fedid associated with the localname key in the state database.
1739        """
1740
1741        rv = None
1742        self.state_lock.acquire()
1743        if self.state.has_key(key):
1744            if isinstance(self.state[key], dict):
1745                try:
1746                    kl = [ f['fedid'] for f in \
1747                            self.state[key]['experimentID']\
1748                                if f.has_key('fedid') ]
1749                except KeyError:
1750                    self.state_lock.release()
1751                    raise service_error(service_error.internal, 
1752                            "No fedid for experiment %s when getting "+\
1753                                    "fedid(!?)" % key)
1754                if len(kl) == 1:
1755                    rv = kl[0]
1756                else:
1757                    self.state_lock.release()
1758                    raise service_error(service_error.internal, 
1759                            "multiple fedids for experiment %s when " +\
1760                                    "getting fedid(!?)" % key)
1761            else:
1762                self.state_lock.release()
1763                raise service_error(service_error.internal, 
1764                        "Unexpected state for %s" % key)
1765        self.state_lock.release()
1766        return rv
1767
1768    def check_experiment_access(self, fid, key):
1769        """
1770        Confirm that the fid has access to the experiment.  Though a request
1771        may be made in terms of a local name, the access attribute is always
1772        the experiment's fedid.
1773        """
1774        if not isinstance(key, fedid):
1775            key = self.get_experiment_fedid(key)
1776
1777        if self.auth.check_attribute(fid, key):
1778            return True
1779        else:
1780            raise service_error(service_error.access, "Access Denied")
1781
1782
1783    def get_handler(self, path, fid):
1784        if self.auth.check_attribute(fid, path):
1785            return ("%s/%s" % (self.repodir, path), "application/binary")
1786        else:
1787            return (None, None)
1788
1789    def get_vtopo(self, req, fid):
1790        """
1791        Return the stored virtual topology for this experiment
1792        """
1793        rv = None
1794        state = None
1795
1796        req = req.get('VtopoRequestBody', None)
1797        if not req:
1798            raise service_error(service_error.req,
1799                    "Bad request format (no VtopoRequestBody)")
1800        exp = req.get('experiment', None)
1801        if exp:
1802            if exp.has_key('fedid'):
1803                key = exp['fedid']
1804                keytype = "fedid"
1805            elif exp.has_key('localname'):
1806                key = exp['localname']
1807                keytype = "localname"
1808            else:
1809                raise service_error(service_error.req, "Unknown lookup type")
1810        else:
1811            raise service_error(service_error.req, "No request?")
1812
1813        self.check_experiment_access(fid, key)
1814
1815        self.state_lock.acquire()
1816        if self.state.has_key(key):
1817            if self.state[key].has_key('vtopo'):
1818                rv = { 'experiment' : {keytype: key },\
1819                        'vtopo': self.state[key]['vtopo'],\
1820                    }
1821            else:
1822                state = self.state[key]['experimentStatus']
1823        self.state_lock.release()
1824
1825        if rv: return rv
1826        else: 
1827            if state:
1828                raise service_error(service_error.partial, 
1829                        "Not ready: %s" % state)
1830            else:
1831                raise service_error(service_error.req, "No such experiment")
1832
1833    def get_vis(self, req, fid):
1834        """
1835        Return the stored visualization for this experiment
1836        """
1837        rv = None
1838        state = None
1839
1840        req = req.get('VisRequestBody', None)
1841        if not req:
1842            raise service_error(service_error.req,
1843                    "Bad request format (no VisRequestBody)")
1844        exp = req.get('experiment', None)
1845        if exp:
1846            if exp.has_key('fedid'):
1847                key = exp['fedid']
1848                keytype = "fedid"
1849            elif exp.has_key('localname'):
1850                key = exp['localname']
1851                keytype = "localname"
1852            else:
1853                raise service_error(service_error.req, "Unknown lookup type")
1854        else:
1855            raise service_error(service_error.req, "No request?")
1856
1857        self.check_experiment_access(fid, key)
1858
1859        self.state_lock.acquire()
1860        if self.state.has_key(key):
1861            if self.state[key].has_key('vis'):
1862                rv =  { 'experiment' : {keytype: key },\
1863                        'vis': self.state[key]['vis'],\
1864                        }
1865            else:
1866                state = self.state[key]['experimentStatus']
1867        self.state_lock.release()
1868
1869        if rv: return rv
1870        else:
1871            if state:
1872                raise service_error(service_error.partial, 
1873                        "Not ready: %s" % state)
1874            else:
1875                raise service_error(service_error.req, "No such experiment")
1876
1877    def clean_info_response(self, rv):
1878        """
1879        Remove the information in the experiment's state object that is not in
1880        the info response.
1881        """
1882        # Remove the owner info (should always be there, but...)
1883        if rv.has_key('owner'): del rv['owner']
1884
1885        # Convert the log into the allocationLog parameter and remove the
1886        # log entry (with defensive programming)
1887        if rv.has_key('log'):
1888            rv['allocationLog'] = "".join(rv['log'])
1889            del rv['log']
1890        else:
1891            rv['allocationLog'] = ""
1892
1893        if rv['experimentStatus'] != 'active':
1894            if rv.has_key('federant'): del rv['federant']
1895        else:
1896            # remove the allocationID info from each federant
1897            for f in rv.get('federant', []):
1898                if f.has_key('allocID'): del f['allocID']
1899        return rv
1900
1901    def get_info(self, req, fid):
1902        """
1903        Return all the stored info about this experiment
1904        """
1905        rv = None
1906
1907        req = req.get('InfoRequestBody', None)
1908        if not req:
1909            raise service_error(service_error.req,
1910                    "Bad request format (no InfoRequestBody)")
1911        exp = req.get('experiment', None)
1912        if exp:
1913            if exp.has_key('fedid'):
1914                key = exp['fedid']
1915                keytype = "fedid"
1916            elif exp.has_key('localname'):
1917                key = exp['localname']
1918                keytype = "localname"
1919            else:
1920                raise service_error(service_error.req, "Unknown lookup type")
1921        else:
1922            raise service_error(service_error.req, "No request?")
1923
1924        self.check_experiment_access(fid, key)
1925
1926        # The state may be massaged by the service function that called
1927        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1928        # state.
1929        self.state_lock.acquire()
1930        if self.state.has_key(key):
1931            rv = copy.deepcopy(self.state[key])
1932        self.state_lock.release()
1933
1934        if rv:
1935            return self.clean_info_response(rv)
1936        else:
1937            raise service_error(service_error.req, "No such experiment")
1938
1939    def get_multi_info(self, req, fid):
1940        """
1941        Return all the stored info that this fedid can access
1942        """
1943        rv = { 'info': [ ] }
1944
1945        self.state_lock.acquire()
1946        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
1947            self.check_experiment_access(fid, key)
1948
1949            if self.state.has_key(key):
1950                e = copy.deepcopy(self.state[key])
1951                e = self.clean_info_response(e)
1952                rv['info'].append(e)
1953        self.state_lock.release()
1954        return rv
1955
1956    def terminate_experiment(self, req, fid):
1957        """
1958        Swap this experiment out on the federants and delete the shared
1959        information
1960        """
1961        tbparams = { }
1962        req = req.get('TerminateRequestBody', None)
1963        if not req:
1964            raise service_error(service_error.req,
1965                    "Bad request format (no TerminateRequestBody)")
1966        force = req.get('force', False)
1967        exp = req.get('experiment', None)
1968        if exp:
1969            if exp.has_key('fedid'):
1970                key = exp['fedid']
1971                keytype = "fedid"
1972            elif exp.has_key('localname'):
1973                key = exp['localname']
1974                keytype = "localname"
1975            else:
1976                raise service_error(service_error.req, "Unknown lookup type")
1977        else:
1978            raise service_error(service_error.req, "No request?")
1979
1980        self.check_experiment_access(fid, key)
1981
1982        dealloc_list = [ ]
1983
1984
1985        # Create a logger that logs to the dealloc_list as well as to the main
1986        # log file.
1987        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
1988        h = logging.StreamHandler(self.list_log(dealloc_list))
1989        # XXX: there should be a global one of these rather than repeating the
1990        # code.
1991        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1992                    '%d %b %y %H:%M:%S'))
1993        dealloc_log.addHandler(h)
1994
1995        self.state_lock.acquire()
1996        fed_exp = self.state.get(key, None)
1997
1998        if fed_exp:
1999            # This branch of the conditional holds the lock to generate a
2000            # consistent temporary tbparams variable to deallocate experiments.
2001            # It releases the lock to do the deallocations and reacquires it to
2002            # remove the experiment state when the termination is complete.
2003
2004            # First make sure that the experiment creation is complete.
2005            status = fed_exp.get('experimentStatus', None)
2006
2007            if status:
2008                if status in ('starting', 'terminating'):
2009                    if not force:
2010                        self.state_lock.release()
2011                        raise service_error(service_error.partial, 
2012                                'Experiment still being created or destroyed')
2013                    else:
2014                        self.log.warning('Experiment in %s state ' % status + \
2015                                'being terminated by force.')
2016            else:
2017                # No status??? trouble
2018                self.state_lock.release()
2019                raise service_error(service_error.internal,
2020                        "Experiment has no status!?")
2021
2022            ids = []
2023            #  experimentID is a list of dicts that are self-describing
2024            #  identifiers.  This finds all the fedids and localnames - the
2025            #  keys of self.state - and puts them into ids.
2026            for id in fed_exp.get('experimentID', []):
2027                if id.has_key('fedid'): ids.append(id['fedid'])
2028                if id.has_key('localname'): ids.append(id['localname'])
2029
2030            # Collect the allocation/segment ids
2031            for fed in fed_exp.get('federant', []):
2032                try:
2033                    tb = fed['emulab']['project']['testbed']['localname']
2034                    aid = fed['allocID']
2035                except KeyError, e:
2036                    continue
2037                tbparams[tb] = aid
2038            fed_exp['experimentStatus'] = 'terminating'
2039            if self.state_filename: self.write_state()
2040            self.state_lock.release()
2041
2042            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2043            # then completes, so we can't wait if nothing starts.  So, no
2044            # tbparams, no start.
2045            if len(tbparams) > 0:
2046                thread_pool = self.thread_pool(self.nthreads)
2047                for tb in tbparams.keys():
2048                    # Create and start a thread to stop the segment
2049                    thread_pool.wait_for_slot()
2050                    uri = self.tbmap.get(tb, None)
2051                    t  = self.pooled_thread(\
2052                            target=self.terminate_segment(log=dealloc_log,
2053                                testbed=tb,
2054                                cert_file=self.cert_file, 
2055                                cert_pwd=self.cert_pwd,
2056                                trusted_certs=self.trusted_certs,
2057                                caller=self.call_TerminateSegment),
2058                            args=(uri, tbparams[tb]), name=tb,
2059                            pdata=thread_pool, trace_file=self.trace_file)
2060                    t.start()
2061                # Wait for completions
2062                thread_pool.wait_for_all_done()
2063
2064            # release the allocations (failed experiments have done this
2065            # already, and starting experiments may be in odd states, so we
2066            # ignore errors releasing those allocations
2067            try: 
2068                for tb in tbparams.keys():
2069                    self.release_access(tb, tbparams[tb])
2070            except service_error, e:
2071                if status != 'failed' and not force:
2072                    raise e
2073
2074            # Remove the terminated experiment
2075            self.state_lock.acquire()
2076            for id in ids:
2077                if self.state.has_key(id): del self.state[id]
2078
2079            if self.state_filename: self.write_state()
2080            self.state_lock.release()
2081
2082            return { 
2083                    'experiment': exp , 
2084                    'deallocationLog': "".join(dealloc_list),
2085                    }
2086        else:
2087            # Don't forget to release the lock
2088            self.state_lock.release()
2089            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.