source: fedd/fedd_experiment_control.py @ d90f0fa

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since d90f0fa was d90f0fa, checked in by Ted Faber <faber@…>, 15 years ago

Bug checking multiple access requests. The code didn't notice when a request
succeeded

  • Property mode set to 100644
File size: 60.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from fedd_services import *
22from fedd_internal_services import *
23from fedd_util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26import parse_detail
27from service_error import service_error
28
29
30class nullHandler(logging.Handler):
31    def emit(self, record): pass
32
33fl = logging.getLogger("fedd.experiment_control")
34fl.addHandler(nullHandler())
35
36class fedd_experiment_control_local:
37    """
38    Control of experiments that this system can directly access.
39
40    Includes experiment creation, termination and information dissemination.
41    Thred safe.
42    """
43   
44    class thread_pool:
45        """
46        A class to keep track of a set of threads all invoked for the same
47        task.  Manages the mutual exclusion of the states.
48        """
49        def __init__(self):
50            """
51            Start a pool.
52            """
53            self.changed = Condition()
54            self.started = 0
55            self.terminated = 0
56
57        def acquire(self):
58            """
59            Get the pool's lock.
60            """
61            self.changed.acquire()
62
63        def release(self):
64            """
65            Release the pool's lock.
66            """
67            self.changed.release()
68
69        def wait(self, timeout = None):
70            """
71            Wait for a pool thread to start or stop.
72            """
73            self.changed.wait(timeout)
74
75        def start(self):
76            """
77            Called by a pool thread to report starting.
78            """
79            self.changed.acquire()
80            self.started += 1
81            self.changed.notifyAll()
82            self.changed.release()
83
84        def terminate(self):
85            """
86            Called by a pool thread to report finishing.
87            """
88            self.changed.acquire()
89            self.terminated += 1
90            self.changed.notifyAll()
91            self.changed.release()
92
93        def clear(self):
94            """
95            Clear all pool data.
96            """
97            self.changed.acquire()
98            self.started = 0
99            self.terminated =0
100            self.changed.notifyAll()
101            self.changed.release()
102
103    class pooled_thread(Thread):
104        """
105        One of a set of threads dedicated to a specific task.  Uses the
106        thread_pool class above for coordination.
107        """
108        def __init__(self, group=None, target=None, name=None, args=(), 
109                kwargs={}, pdata=None, trace_file=None):
110            Thread.__init__(self, group, target, name, args, kwargs)
111            self.rv = None          # Return value of the ops in this thread
112            self.exception = None   # Exception that terminated this thread
113            self.target=target      # Target function to run on start()
114            self.args = args        # Args to pass to target
115            self.kwargs = kwargs    # Additional kw args
116            self.pdata = pdata      # thread_pool for this class
117            # Logger for this thread
118            self.log = logging.getLogger("fedd.experiment_control")
119       
120        def run(self):
121            """
122            Emulate Thread.run, except add pool data manipulation and error
123            logging.
124            """
125            if self.pdata:
126                self.pdata.start()
127
128            if self.target:
129                try:
130                    self.rv = self.target(*self.args, **self.kwargs)
131                except service_error, s:
132                    self.exception = s
133                    self.log.error("Thread exception: %s %s" % \
134                            (s.code_string(), s.desc))
135                except:
136                    self.exception = sys.exc_info()[1]
137                    self.log.error(("Unexpected thread exception: %s" +\
138                            "Trace %s") % (self.exception,\
139                                traceback.format_exc()))
140            if self.pdata:
141                self.pdata.terminate()
142
143    call_RequestAccess = service_caller('RequestAccess',
144            'getfeddPortType', feddServiceLocator, 
145            RequestAccessRequestMessage, 'RequestAccessRequestBody')
146
147    call_ReleaseAccess = service_caller('ReleaseAccess',
148            'getfeddPortType', feddServiceLocator, 
149            ReleaseAccessRequestMessage, 'ReleaseAccessRequestBody')
150
151    call_Ns2Split = service_caller('Ns2Split',
152            'getfeddInternalPortType', feddInternalServiceLocator, 
153            Ns2SplitRequestMessage, 'Ns2SplitRequestBody')
154
155    def __init__(self, config=None, auth=None):
156        """
157        Intialize the various attributes, most from the config object
158        """
159        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
160        self.thread_pool = fedd_experiment_control_local.thread_pool
161
162        self.cert_file = None
163        self.cert_pwd = None
164        self.trusted_certs = None
165
166        # Walk through the various relevant certificat specifying config
167        # attributes until the local certificate attributes can be resolved.
168        # The walk is from most specific to most general specification.
169        for s in ("experiment_control", "globals"):
170            if config.has_section(s): 
171                if config.has_option(s, "cert_file"):
172                    if not self.cert_file:
173                        self.cert_file = config.get(s, "cert_file")
174                        self.cert_pwd = config.get(s, "cert_pwd")
175
176                if config.has_option(s, "trusted_certs"):
177                    if not self.trusted_certs:
178                        self.trusted_certs = config.get(s, "trusted_certs")
179
180
181        self.exp_stem = "fed-stem"
182        self.log = logging.getLogger("fedd.experiment_control")
183        set_log_level(config, "experiment_control", self.log)
184        self.muxmax = 2
185        self.nthreads = 2
186        self.randomize_experiments = False
187
188        self.scp_exec = "/usr/bin/scp"
189        self.splitter = None
190        self.ssh_exec="/usr/bin/ssh"
191        self.ssh_keygen = "/usr/bin/ssh-keygen"
192        self.ssh_identity_file = None
193
194
195        self.debug = config.getboolean("experiment_control", "create_debug")
196        self.state_filename = config.get("experiment_control", 
197                "experiment_state_file")
198        self.splitter_url = config.get("experiment_control", "splitter_url")
199        self.fedkit = config.get("experiment_control", "fedkit")
200        accessdb_file = config.get("experiment_control", "accessdb")
201
202        self.ssh_pubkey_file = config.get("experiment_control", 
203                "ssh_pubkey_file")
204        self.ssh_privkey_file = config.get("experiment_control",
205                "ssh_privkey_file")
206        # NB for internal master/slave ops, not experiment setup
207        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
208        self.state = { }
209        self.state_lock = Lock()
210        self.tclsh = "/usr/local/bin/otclsh"
211        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
212        mapdb_file = config.get("experiment_control", "mapdb")
213        self.trace_file = sys.stderr
214
215        self.def_expstart = \
216                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
217                "/tmp/federate";
218        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
219                "FEDDIR/hosts";
220        self.def_gwstart = \
221                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
222                "/tmp/bridge.log";
223        self.def_mgwstart = \
224                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
225                "/tmp/bridge.log";
226        self.def_gwimage = "FBSD61-TUNNEL2";
227        self.def_gwtype = "pc";
228
229        if auth:
230            self.auth = auth
231        else:
232            self.log.error(\
233                    "[access]: No authorizer initialized, creating local one.")
234            auth = authorizer()
235
236
237        if self.ssh_pubkey_file:
238            try:
239                f = open(self.ssh_pubkey_file, 'r')
240                self.ssh_pubkey = f.read()
241                f.close()
242            except IOError:
243                raise service_error(service_error.internal,
244                        "Cannot read sshpubkey")
245        else:
246            raise service_error(service_error.internal, 
247                    "No SSH public key file?")
248
249        if not self.ssh_privkey_file:
250            raise service_error(service_error.internal, 
251                    "No SSH public key file?")
252
253
254        if mapdb_file:
255            self.read_mapdb(mapdb_file)
256            print self.tbmap
257        else:
258            self.log.warn("[experiment_control] No testbed map, using defaults")
259            self.tbmap = { 
260                    'deter':'https://users.isi.deterlab.net:23235',
261                    'emulab':'https://users.isi.deterlab.net:23236',
262                    'ucb':'https://users.isi.deterlab.net:23237',
263                    }
264
265        if accessdb_file:
266                self.read_accessdb(accessdb_file)
267        else:
268            raise service_error(service_error.internal,
269                    "No accessdb specified in config")
270
271        # Grab saved state.  OK to do this w/o locking because it's read only
272        # and only one thread should be in existence that can see self.state at
273        # this point.
274        if self.state_filename:
275            self.read_state()
276
277        # Dispatch tables
278        self.soap_services = {\
279                'Create': soap_handler(\
280                        CreateRequestMessage.typecode,
281                        getattr(self, "create_experiment"), 
282                        CreateResponseMessage,
283                        "CreateResponseBody"),
284                'Vtopo': soap_handler(\
285                        VtopoRequestMessage.typecode,
286                        getattr(self, "get_vtopo"),
287                        VtopoResponseMessage,
288                        "VtopoResponseBody"),
289                'Vis': soap_handler(\
290                        VisRequestMessage.typecode,
291                        getattr(self, "get_vis"),
292                        VisResponseMessage,
293                        "VisResponseBody"),
294                'Info': soap_handler(\
295                        InfoRequestMessage.typecode,
296                        getattr(self, "get_info"),
297                        InfoResponseMessage,
298                        "InfoResponseBody"),
299                'Terminate': soap_handler(\
300                        TerminateRequestMessage.typecode,
301                        getattr(self, "terminate_experiment"),
302                        TerminateResponseMessage,
303                        "TerminateResponseBody"),
304        }
305
306        self.xmlrpc_services = {\
307                'Create': xmlrpc_handler(\
308                        getattr(self, "create_experiment"), 
309                        "CreateResponseBody"),
310                'Vtopo': xmlrpc_handler(\
311                        getattr(self, "get_vtopo"),
312                        "VtopoResponseBody"),
313                'Vis': xmlrpc_handler(\
314                        getattr(self, "get_vis"),
315                        "VisResponseBody"),
316                'Info': xmlrpc_handler(\
317                        getattr(self, "get_info"),
318                        "InfoResponseBody"),
319                'Terminate': xmlrpc_handler(\
320                        getattr(self, "terminate_experiment"),
321                        "TerminateResponseBody"),
322        }
323
324    def copy_file(self, src, dest, size=1024):
325        """
326        Exceedingly simple file copy.
327        """
328        s = open(src,'r')
329        d = open(dest, 'w')
330
331        buf = "x"
332        while buf != "":
333            buf = s.read(size)
334            d.write(buf)
335        s.close()
336        d.close()
337
338    # Call while holding self.state_lock
339    def write_state(self):
340        """
341        Write a new copy of experiment state after copying the existing state
342        to a backup.
343
344        State format is a simple pickling of the state dictionary.
345        """
346        if os.access(self.state_filename, os.W_OK):
347            self.copy_file(self.state_filename, \
348                    "%s.bak" % self.state_filename)
349        try:
350            f = open(self.state_filename, 'w')
351            pickle.dump(self.state, f)
352        except IOError, e:
353            self.log.error("Can't write file %s: %s" % \
354                    (self.state_filename, e))
355        except pickle.PicklingError, e:
356            self.log.error("Pickling problem: %s" % e)
357        except TypeError, e:
358            self.log.error("Pickling problem (TypeError): %s" % e)
359
360    # Call while holding self.state_lock
361    def read_state(self):
362        """
363        Read a new copy of experiment state.  Old state is overwritten.
364
365        State format is a simple pickling of the state dictionary.
366        """
367        try:
368            f = open(self.state_filename, "r")
369            self.state = pickle.load(f)
370            self.log.debug("[read_state]: Read state from %s" % \
371                    self.state_filename)
372        except IOError, e:
373            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
374                    % (self.state_filename, e))
375        except pickle.UnpicklingError, e:
376            self.log.warning(("[read_state]: No saved state: " + \
377                    "Unpickling failed: %s") % e)
378       
379        for k in self.state.keys():
380            try:
381                # This list should only have one element in it, but phrasing it
382                # as a for loop doesn't cost much, really.  We have to find the
383                # fedid elements anyway.
384                for eid in [ f['fedid'] \
385                        for f in self.state[k]['experimentID']\
386                            if f.has_key('fedid') ]:
387                    self.auth.set_attribute(self.state[k]['owner'], eid)
388            except KeyError, e:
389                self.log.warning("[read_state]: State ownership or identity " +\
390                        "misformatted in %s: %s" % (self.state_filename, e))
391
392
393    def read_accessdb(self, accessdb_file):
394        """
395        Read the mapping from fedids that can create experiments to their name
396        in the 3-level access namespace.  All will be asserted from this
397        testbed and can include the local username and porject that will be
398        asserted on their behalf by this fedd.  Each fedid is also added to the
399        authorization system with the "create" attribute.
400        """
401        self.accessdb = {}
402        # These are the regexps for parsing the db
403        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
404        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
405                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
406        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
407                "\s*->\s*(" + name_expr + ")\s*$")
408        lineno = 0
409
410        # Parse the mappings and store in self.authdb, a dict of
411        # fedid -> (proj, user)
412        try:
413            f = open(accessdb_file, "r")
414            for line in f:
415                lineno += 1
416                line = line.strip()
417                if len(line) == 0 or line.startswith('#'):
418                    continue
419                m = project_line.match(line)
420                if m:
421                    fid = fedid(hexstr=m.group(1))
422                    project, user = m.group(2,3)
423                    if not self.accessdb.has_key(fid):
424                        self.accessdb[fid] = []
425                    self.accessdb[fid].append((project, user))
426                    continue
427
428                m = user_line.match(line)
429                if m:
430                    fid = fedid(hexstr=m.group(1))
431                    project = None
432                    user = m.group(2)
433                    if not self.accessdb.has_key(fid):
434                        self.accessdb[fid] = []
435                    self.accessdb[fid].append((project, user))
436                    continue
437                self.log.warn("[experiment_control] Error parsing access " +\
438                        "db %s at line %d" %  (accessdb_file, lineno))
439        except IOError:
440            raise service_error(service_error.internal,
441                    "Error opening/reading %s as experiment " +\
442                            "control accessdb" %  accessdb_file)
443        f.close()
444
445        # Initialize the authorization attributes
446        for fid in self.accessdb.keys():
447            self.auth.set_attribute(fid, 'create')
448
449    def read_mapdb(self, file):
450        """
451        Read a simple colon separated list of mappings for the
452        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
453        """
454
455        self.tbmap = { }
456        lineno =0
457        try:
458            f = open(file, "r")
459            for line in f:
460                lineno += 1
461                line = line.strip()
462                if line.startswith('#') or len(line) == 0:
463                    continue
464                try:
465                    label, url = line.split(':', 1)
466                    self.tbmap[label] = url
467                except ValueError, e:
468                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
469                            "map db: %s %s" % (lineno, line, e))
470        except IOError, e:
471            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
472                    "open %s: %s" % (file, e))
473        f.close()
474
475    def scp_file(self, file, user, host, dest=""):
476        """
477        scp a file to the remote host.  If debug is set the action is only
478        logged.
479        """
480
481        scp_cmd = [self.scp_exec, '-i', self.ssh_privkey_file, file, 
482                "%s@%s:%s" % (user, host, dest)]
483        rv = 0
484
485        try:
486            dnull = open("/dev/null", "r")
487        except IOError:
488            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
489            dnull = Null
490
491        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
492        if not self.debug:
493            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
494            else: rv = call(scp_cmd)
495
496        return rv == 0
497
498    def ssh_cmd(self, user, host, cmd, wname=None):
499        """
500        Run a remote command on host as user.  If debug is set, the action is
501        only logged.
502        """
503        sh_str = "%s -i %s %s@%s %s" % (self.ssh_exec, self.ssh_privkey_file, 
504                user, host, cmd)
505
506        try:
507            dnull = open("/dev/null", "r")
508        except IOError:
509            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
510            dnull = Null
511
512        self.log.debug("[ssh_cmd]: %s" % sh_str)
513        if not self.debug:
514            if dnull:
515                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
516            else:
517                sub = Popen(sh_str, shell=True)
518            return sub.wait() == 0
519        else:
520            return True
521
522    def ship_configs(self, host, user, src_dir, dest_dir):
523        """
524        Copy federant-specific configuration files to the federant.
525        """
526        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
527            return False
528        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
529            return False
530
531        for f in os.listdir(src_dir):
532            if os.path.isdir(f):
533                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
534                        "%s/%s" % (dest_dir, f)):
535                    return False
536            else:
537                if not self.scp_file("%s/%s" % (src_dir, f), 
538                        user, host, dest_dir):
539                    return False
540        return True
541
542    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
543        """
544        Start a sub-experiment on a federant.
545
546        Get the current state, modify or create as appropriate, ship data and
547        configs and start the experiment.  There are small ordering differences
548        based on the initial state of the sub-experiment.
549        """
550        # ops node in the federant
551        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
552        user = tbparams[tb]['user']     # federant user
553        pid = tbparams[tb]['project']   # federant project
554        # XXX
555        base_confs = ( "hosts",)
556        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
557        # command to test experiment state
558        expinfo_exec = "/usr/testbed/bin/expinfo" 
559        # Configuration directories on the remote machine
560        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
561        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
562        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
563        # Regular expressions to parse the expinfo response
564        state_re = re.compile("State:\s+(\w+)")
565        no_exp_re = re.compile("^No\s+such\s+experiment")
566        state = None    # Experiment state parsed from expinfo
567        # The expinfo ssh command
568        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
569
570        # Get status
571        self.log.debug("[start_segment]: %s"% " ".join(cmd))
572        dev_null = None
573        try:
574            dev_null = open("/dev/null", "a")
575        except IOError, e:
576            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
577
578        if self.debug:
579            state = 'swapped'
580            rv = 0
581        else:
582            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
583            for line in status.stdout:
584                m = state_re.match(line)
585                if m: state = m.group(1)
586                else:
587                    m = no_exp_re.match(line)
588                    if m: state = "none"
589            rv = status.wait()
590
591        # If the experiment is not present the subcommand returns a non-zero
592        # return value.  If we successfully parsed a "none" outcome, ignore the
593        # return code.
594        if rv != 0 and state != "none":
595            raise service_error(service_error.internal,
596                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
597
598        self.log.debug("[start_segment]: %s: %s" % (tb, state))
599        self.log.info("[start_segment]:transferring experiment to %s" % tb)
600
601        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
602            return False
603        # Clear the federation files
604        if not self.ssh_cmd(user, host, 
605                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
606            return False
607        if not self.ssh_cmd(user, host, 
608                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
609            return False
610        # Clear and create the tarfiles and rpm directories
611        for d in (tarfiles_dir, rpms_dir):
612            if not self.ssh_cmd(user, host, 
613                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
614                return False
615            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
616                    "create tarfiles"):
617                return False
618       
619        if state == 'active':
620            # Remote experiment is active.  Modify it.
621            for f in base_confs:
622                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
623                        "%s/%s" % (proj_dir, f)):
624                    return False
625            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
626                    proj_dir):
627                return False
628            if os.path.isdir("%s/tarfiles" % tmpdir):
629                if not self.ship_configs(host, user,
630                        "%s/tarfiles" % tmpdir, tarfiles_dir):
631                    return False
632            if os.path.isdir("%s/rpms" % tmpdir):
633                if not self.ship_configs(host, user,
634                        "%s/rpms" % tmpdir, tarfiles_dir):
635                    return False
636            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
637            if not self.ssh_cmd(user, host,
638                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
639                            (pid, eid, tclfile), "modexp"):
640                return False
641            return True
642        elif state == "swapped":
643            # Remote experiment swapped out.  Modify it and swap it in.
644            for f in base_confs:
645                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
646                        "%s/%s" % (proj_dir, f)):
647                    return False
648            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
649                    proj_dir):
650                return False
651            if os.path.isdir("%s/tarfiles" % tmpdir):
652                if not self.ship_configs(host, user,
653                        "%s/tarfiles" % tmpdir, tarfiles_dir):
654                    return False
655            if os.path.isdir("%s/rpms" % tmpdir):
656                if not self.ship_configs(host, user,
657                        "%s/rpms" % tmpdir, tarfiles_dir):
658                    return False
659            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
660            if not self.ssh_cmd(user, host,
661                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
662                    "modexp"):
663                return False
664            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
665            if not self.ssh_cmd(user, host,
666                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
667                    "swapexp"):
668                return False
669            return True
670        elif state == "none":
671            # No remote experiment.  Create one.  We do this in 2 steps so we
672            # can put the configuration files and scripts into the new
673            # experiment directories.
674
675            # Tarfiles must be present for creation to work
676            if os.path.isdir("%s/tarfiles" % tmpdir):
677                if not self.ship_configs(host, user,
678                        "%s/tarfiles" % tmpdir, tarfiles_dir):
679                    return False
680            if os.path.isdir("%s/rpms" % tmpdir):
681                if not self.ship_configs(host, user,
682                        "%s/rpms" % tmpdir, tarfiles_dir):
683                    return False
684            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
685            if not self.ssh_cmd(user, host,
686                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
687                            (pid, eid, tclfile), "startexp"):
688                return False
689            # After startexp the per-experiment directories exist
690            for f in base_confs:
691                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
692                        "%s/%s" % (proj_dir, f)):
693                    return False
694            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
695                    proj_dir):
696                return False
697            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
698            if not self.ssh_cmd(user, host,
699                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
700                    "swapexp"):
701                return False
702            return True
703        else:
704            self.log.debug("[start_segment]:unknown state %s" % state)
705            return False
706
707    def stop_segment(self, tb, eid, tbparams):
708        """
709        Stop a sub experiment by calling swapexp on the federant
710        """
711        user = tbparams[tb]['user']
712        host = tbparams[tb]['host']
713        pid = tbparams[tb]['project']
714
715        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
716        return self.ssh_cmd(user, host,
717                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
718
719       
720    def generate_ssh_keys(self, dest, type="rsa" ):
721        """
722        Generate a set of keys for the gateways to use to talk.
723
724        Keys are of type type and are stored in the required dest file.
725        """
726        valid_types = ("rsa", "dsa")
727        t = type.lower();
728        if t not in valid_types: raise ValueError
729        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
730
731        try:
732            trace = open("/dev/null", "w")
733        except IOError:
734            raise service_error(service_error.internal,
735                    "Cannot open /dev/null??");
736
737        # May raise CalledProcessError
738        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
739        rv = call(cmd, stdout=trace, stderr=trace)
740        if rv != 0:
741            raise service_error(service_error.internal, 
742                    "Cannot generate nonce ssh keys.  %s return code %d" \
743                            % (self.ssh_keygen, rv))
744
745    def gentopo(self, str):
746        """
747        Generate the topology dtat structure from the splitter's XML
748        representation of it.
749
750        The topology XML looks like:
751            <experiment>
752                <nodes>
753                    <node><vname></vname><ips>ip1:ip2</ips></node>
754                </nodes>
755                <lans>
756                    <lan>
757                        <vname></vname><vnode></vnode><ip></ip>
758                        <bandwidth></bandwidth><member>node:port</member>
759                    </lan>
760                </lans>
761        """
762        class topo_parse:
763            """
764            Parse the topology XML and create the dats structure.
765            """
766            def __init__(self):
767                # Typing of the subelements for data conversion
768                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
769                self.int_subelements = ( 'bandwidth',)
770                self.float_subelements = ( 'delay',)
771                # The final data structure
772                self.nodes = [ ]
773                self.lans =  [ ]
774                self.topo = { \
775                        'node': self.nodes,\
776                        'lan' : self.lans,\
777                    }
778                self.element = { }  # Current element being created
779                self.chars = ""     # Last text seen
780
781            def end_element(self, name):
782                # After each sub element the contents is added to the current
783                # element or to the appropriate list.
784                if name == 'node':
785                    self.nodes.append(self.element)
786                    self.element = { }
787                elif name == 'lan':
788                    self.lans.append(self.element)
789                    self.element = { }
790                elif name in self.str_subelements:
791                    self.element[name] = self.chars
792                    self.chars = ""
793                elif name in self.int_subelements:
794                    self.element[name] = int(self.chars)
795                    self.chars = ""
796                elif name in self.float_subelements:
797                    self.element[name] = float(self.chars)
798                    self.chars = ""
799
800            def found_chars(self, data):
801                self.chars += data.rstrip()
802
803
804        tp = topo_parse();
805        parser = xml.parsers.expat.ParserCreate()
806        parser.EndElementHandler = tp.end_element
807        parser.CharacterDataHandler = tp.found_chars
808
809        parser.Parse(str)
810
811        return tp.topo
812       
813
814    def genviz(self, topo):
815        """
816        Generate the visualization the virtual topology
817        """
818
819        neato = "/usr/local/bin/neato"
820        # These are used to parse neato output and to create the visualization
821        # file.
822        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
823        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
824                "%s</type></node>"
825
826        try:
827            # Node names
828            nodes = [ n['vname'] for n in topo['node'] ]
829            topo_lans = topo['lan']
830        except KeyError:
831            raise service_error(service_error.internal, "Bad topology")
832
833        lans = { }
834        links = { }
835
836        # Walk through the virtual topology, organizing the connections into
837        # 2-node connections (links) and more-than-2-node connections (lans).
838        # When a lan is created, it's added to the list of nodes (there's a
839        # node in the visualization for the lan).
840        for l in topo_lans:
841            if links.has_key(l['vname']):
842                if len(links[l['vname']]) < 2:
843                    links[l['vname']].append(l['vnode'])
844                else:
845                    nodes.append(l['vname'])
846                    lans[l['vname']] = links[l['vname']]
847                    del links[l['vname']]
848                    lans[l['vname']].append(l['vnode'])
849            elif lans.has_key(l['vname']):
850                lans[l['vname']].append(l['vnode'])
851            else:
852                links[l['vname']] = [ l['vnode'] ]
853
854
855        # Open up a temporary file for dot to turn into a visualization
856        try:
857            df, dotname = tempfile.mkstemp()
858            dotfile = os.fdopen(df, 'w')
859        except IOError:
860            raise service_error(service_error.internal,
861                    "Failed to open file in genviz")
862
863        # Generate a dot/neato input file from the links, nodes and lans
864        try:
865            print >>dotfile, "graph G {"
866            for n in nodes:
867                print >>dotfile, '\t"%s"' % n
868            for l in links.keys():
869                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
870            for l in lans.keys():
871                for n in lans[l]:
872                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
873            print >>dotfile, "}"
874            dotfile.close()
875        except TypeError:
876            raise service_error(service_error.internal,
877                    "Single endpoint link in vtopo")
878        except IOError:
879            raise service_error(service_error.internal, "Cannot write dot file")
880
881        # Use dot to create a visualization
882        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
883                '-Gpack=true', dotname], stdout=PIPE)
884
885        # Translate dot to vis format
886        vis_nodes = [ ]
887        vis = { 'node': vis_nodes }
888        for line in dot.stdout:
889            m = vis_re.match(line)
890            if m:
891                vn = m.group(1)
892                vis_node = {'name': vn, \
893                        'x': float(m.group(2)),\
894                        'y' : float(m.group(3)),\
895                    }
896                if vn in links.keys() or vn in lans.keys():
897                    vis_node['type'] = 'lan'
898                else:
899                    vis_node['type'] = 'node'
900                vis_nodes.append(vis_node)
901        rv = dot.wait()
902
903        os.remove(dotname)
904        if rv == 0 : return vis
905        else: return None
906
907    def get_access(self, tb, nodes, user, tbparam, master, export_project,
908            access_user):
909        """
910        Get access to testbed through fedd and set the parameters for that tb
911        """
912
913        translate_attr = {
914            'slavenodestartcmd': 'expstart',
915            'slaveconnectorstartcmd': 'gwstart',
916            'masternodestartcmd': 'mexpstart',
917            'masterconnectorstartcmd': 'mgwstart',
918            'connectorimage': 'gwimage',
919            'connectortype': 'gwtype',
920            'tunnelcfg': 'tun',
921            'smbshare': 'smbshare',
922        }
923
924        uri = self.tbmap.get(tb, None)
925        if not uri:
926            raise service_error(serice_error.server_config, 
927                    "Unknown testbed: %s" % tb)
928
929        # currently this lumps all users into one service access group
930        service_keys = [ a for u in user \
931                for a in u.get('access', []) \
932                    if a.has_key('sshPubkey')]
933
934        if len(service_keys) == 0:
935            raise service_error(service_error.req, 
936                    "Must have at least one SSH pubkey for services")
937
938
939        for p, u in access_user:
940            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
941                    "to %s") %  ((p or "None"), u, uri))
942
943            if p:
944                # Request with user and project specified
945                req = {\
946                        'destinationTestbed' : { 'uri' : uri },
947                        'project': { 
948                            'name': {'localname': p},
949                            'user': [ {'userID': { 'localname': u } } ],
950                            },
951                        'user':  user,
952                        'allocID' : { 'localname': 'test' },
953                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
954                        'serviceAccess' : service_keys
955                    }
956            else:
957                # Request with only user specified
958                req = {\
959                        'destinationTestbed' : { 'uri' : uri },
960                        'user':  [ {'userID': { 'localname': u } } ],
961                        'allocID' : { 'localname': 'test' },
962                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
963                        'serviceAccess' : service_keys
964                    }
965
966            if tb == master:
967                # NB, the export_project parameter is a dict that includes
968                # the type
969                req['exportProject'] = export_project
970
971            # node resources if any
972            if nodes != None and len(nodes) > 0:
973                rnodes = [ ]
974                for n in nodes:
975                    rn = { }
976                    image, hw, count = n.split(":")
977                    if image: rn['image'] = [ image ]
978                    if hw: rn['hardware'] = [ hw ]
979                    if count: rn['count'] = int(count)
980                    rnodes.append(rn)
981                req['resources']= { }
982                req['resources']['node'] = rnodes
983
984            try:
985                r = self.call_RequestAccess(uri, req, 
986                        self.cert_file, self.cert_pwd, self.trusted_certs)
987            except service_error, e:
988                if e.code == service_error.access:
989                    self.log.debug("[get_access] Access denied")
990                    r = None
991                    continue
992                else:
993                    raise e
994
995            if r.has_key('RequestAccessResponseBody'):
996                # Through to here we have a valid response, not a fault.
997                # Access denied is a fault, so something better or worse than
998                # access denied has happened.
999                r = r['RequestAccessResponseBody']
1000                self.log.debug("[get_access] Access granted")
1001                break
1002            else:
1003                raise service_error(service_error.protocol,
1004                        "Bad proxy response")
1005       
1006        if not r:
1007            raise service_error(service_error.access, 
1008                    "Access denied by %s (%s)" % (tb, uri))
1009
1010        e = r['emulab']
1011        p = e['project']
1012        tbparam[tb] = { 
1013                "boss": e['boss'],
1014                "host": e['ops'],
1015                "domain": e['domain'],
1016                "fs": e['fileServer'],
1017                "eventserver": e['eventServer'],
1018                "project": unpack_id(p['name']),
1019                "emulab" : e,
1020                "allocID" : r['allocID'],
1021                }
1022        # Make the testbed name be the label the user applied
1023        p['testbed'] = {'localname': tb }
1024
1025        for u in p['user']:
1026            tbparam[tb]['user'] = unpack_id(u['userID'])
1027
1028        for a in e['fedAttr']:
1029            if a['attribute']:
1030                key = translate_attr.get(a['attribute'].lower(), None)
1031                if key:
1032                    tbparam[tb][key]= a['value']
1033       
1034    def release_access(self, tb, aid):
1035        """
1036        Release access to testbed through fedd
1037        """
1038
1039        uri = self.tbmap.get(tb, None)
1040        if not uri:
1041            raise service_error(serice_error.server_config, 
1042                    "Unknown testbed: %s" % tb)
1043
1044        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1045                self.cert_file, self.cert_pwd, self.trusted_certs)
1046
1047        # better error coding
1048
1049    def remote_splitter(self, uri, desc, master):
1050
1051        req = {
1052                'description' : { 'ns2description': desc },
1053                'master': master,
1054                'include_fedkit': bool(self.fedkit)
1055            }
1056
1057        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1058                self.trusted_certs)
1059
1060        if r.has_key('Ns2SplitResponseBody'):
1061            r = r['Ns2SplitResponseBody']
1062            if r.has_key('output'):
1063                return r['output'].splitlines()
1064            else:
1065                raise service_error(service_error.protocol, 
1066                        "Bad splitter response (no output)")
1067        else:
1068            raise service_error(service_error.protocol, "Bad splitter response")
1069       
1070    class current_testbed:
1071        """
1072        Object for collecting the current testbed description.  The testbed
1073        description is saved to a file with the local testbed variables
1074        subsittuted line by line.
1075        """
1076        def __init__(self, eid, tmpdir, fedkit):
1077            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1078            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1079            self.current_testbed = None
1080            self.testbed_file = None
1081
1082            self.def_expstart = \
1083                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1084            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1085            self.def_gwstart = \
1086                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1087            self.def_mgwstart = \
1088                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1089            self.def_gwimage = "FBSD61-TUNNEL2";
1090            self.def_gwtype = "pc";
1091
1092            self.eid = eid
1093            self.tmpdir = tmpdir
1094            self.fedkit = fedkit
1095
1096        def __call__(self, line, master, allocated, tbparams):
1097            # Capture testbed topology descriptions
1098            if self.current_testbed == None:
1099                m = self.begin_testbed.match(line)
1100                if m != None:
1101                    self.current_testbed = m.group(1)
1102                    if self.current_testbed == None:
1103                        raise service_error(service_error.req,
1104                                "Bad request format (unnamed testbed)")
1105                    allocated[self.current_testbed] = \
1106                            allocated.get(self.current_testbed,0) + 1
1107                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1108                    if not os.path.exists(tb_dir):
1109                        try:
1110                            os.mkdir(tb_dir)
1111                        except IOError:
1112                            raise service_error(service_error.internal,
1113                                    "Cannot create %s" % tb_dir)
1114                    try:
1115                        self.testbed_file = open("%s/%s.%s.tcl" %
1116                                (tb_dir, self.eid, self.current_testbed), 'w')
1117                    except IOError:
1118                        self.testbed_file = None
1119                    return True
1120                else: return False
1121            else:
1122                m = self.end_testbed.match(line)
1123                if m != None:
1124                    if m.group(1) != self.current_testbed:
1125                        raise service_error(service_error.internal, 
1126                                "Mismatched testbed markers!?")
1127                    if self.testbed_file != None: 
1128                        self.testbed_file.close()
1129                        self.testbed_file = None
1130                    self.current_testbed = None
1131                elif self.testbed_file:
1132                    # Substitute variables and put the line into the local
1133                    # testbed file.
1134                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1135                            self.def_gwtype)
1136                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1137                            self.def_gwimage)
1138                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1139                            self.def_mgwstart)
1140                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1141                            self.def_mexpstart)
1142                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1143                            self.def_gwstart)
1144                    expstart = tbparams[self.current_testbed].get('expstart', 
1145                            self.def_expstart)
1146                    project = tbparams[self.current_testbed].get('project')
1147                    line = re.sub("GWTYPE", gwtype, line)
1148                    line = re.sub("GWIMAGE", gwimage, line)
1149                    if self.current_testbed == master:
1150                        line = re.sub("GWSTART", mgwstart, line)
1151                        line = re.sub("EXPSTART", mexpstart, line)
1152                    else:
1153                        line = re.sub("GWSTART", gwstart, line)
1154                        line = re.sub("EXPSTART", expstart, line)
1155                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1156                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1157                    line = re.sub("EID", self.eid, line)
1158                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1159                            (project, self.eid), line)
1160                    if self.fedkit:
1161                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1162                                line)
1163                    print >>self.testbed_file, line
1164                return True
1165
1166    class allbeds:
1167        """
1168        Process the Allbeds section.  Get access to each federant and save the
1169        parameters in tbparams
1170        """
1171        def __init__(self, get_access):
1172            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1173            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1174            self.in_allbeds = False
1175            self.get_access = get_access
1176
1177        def __call__(self, line, user, tbparams, master, export_project,
1178                access_user):
1179            # Testbed access parameters
1180            if not self.in_allbeds:
1181                if self.begin_allbeds.match(line):
1182                    self.in_allbeds = True
1183                    return True
1184                else:
1185                    return False
1186            else:
1187                if self.end_allbeds.match(line):
1188                    self.in_allbeds = False
1189                else:
1190                    nodes = line.split('|')
1191                    tb = nodes.pop(0)
1192                    self.get_access(tb, nodes, user, tbparams, master,
1193                            export_project, access_user)
1194                return True
1195
1196    class gateways:
1197        def __init__(self, eid, master, tmpdir, gw_pubkey,
1198                gw_secretkey, copy_file, fedkit):
1199            self.begin_gateways = \
1200                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1201            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1202            self.current_gateways = None
1203            self.control_gateway = None
1204            self.active_end = { }
1205
1206            self.eid = eid
1207            self.master = master
1208            self.tmpdir = tmpdir
1209            self.gw_pubkey_base = gw_pubkey
1210            self.gw_secretkey_base = gw_secretkey
1211
1212            self.copy_file = copy_file
1213            self.fedkit = fedkit
1214
1215
1216        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1217                active_end, tbparams, dtb, myname, desthost, type):
1218            """
1219            Produce a gateway configuration file from a gateways line.
1220            """
1221
1222            sproject = tbparams[gw].get('project', 'project')
1223            dproject = tbparams[dtb].get('project', 'project')
1224            sdomain = ".%s.%s%s" % (eid, sproject,
1225                    tbparams[gw].get('domain', ".example.com"))
1226            ddomain = ".%s.%s%s" % (eid, dproject,
1227                    tbparams[dtb].get('domain', ".example.com"))
1228            boss = tbparams[master].get('boss', "boss")
1229            fs = tbparams[master].get('fs', "fs")
1230            event_server = "%s%s" % \
1231                    (tbparams[gw].get('eventserver', "event_server"),
1232                            tbparams[gw].get('domain', "example.com"))
1233            remote_event_server = "%s%s" % \
1234                    (tbparams[dtb].get('eventserver', "event_server"),
1235                            tbparams[dtb].get('domain', "example.com"))
1236            seer_control = "%s%s" % \
1237                    (tbparams[gw].get('control', "control"), sdomain)
1238
1239            if self.fedkit:
1240                remote_script_dir = "/usr/local/federation/bin"
1241                local_script_dir = "/usr/local/federation/bin"
1242            else:
1243                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1244                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1245
1246            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1247            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1248            tunnel_cfg = tbparams[gw].get("tun", "false")
1249
1250            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1251            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1252
1253            # translate to lower case so the `hostname` hack for specifying
1254            # configuration files works.
1255            conf_file = conf_file.lower();
1256            remote_conf_file = remote_conf_file.lower();
1257
1258            if dtb == master:
1259                active = "false"
1260            elif gw == master:
1261                active = "true"
1262            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1263                active = "false"
1264            else:
1265                active_end['%s-%s' % (gw, dtb)] = 1
1266                active = "true"
1267
1268            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1269            print >>gwconfig, "Active: %s" % active
1270            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1271            print >>gwconfig, "BossName: %s" % boss
1272            print >>gwconfig, "FsName: %s" % fs
1273            print >>gwconfig, "EventServerName: %s" % event_server
1274            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1275            print >>gwconfig, "SeerControl: %s" % seer_control
1276            print >>gwconfig, "Type: %s" % type
1277            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1278            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1279                    local_script_dir
1280            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1281            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1282            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1283                    (remote_conf_dir, remote_conf_file)
1284            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1285            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1286            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1287            gwconfig.close()
1288
1289            return active == "true"
1290
1291        def __call__(self, line, allocated, tbparams):
1292            # Process gateways
1293            if not self.current_gateways:
1294                m = self.begin_gateways.match(line)
1295                if m:
1296                    self.current_gateways = m.group(1)
1297                    if allocated.has_key(self.current_gateways):
1298                        # This test should always succeed
1299                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1300                        if not os.path.exists(tb_dir):
1301                            try:
1302                                os.mkdir(tb_dir)
1303                            except IOError:
1304                                raise service_error(service_error.internal,
1305                                        "Cannot create %s" % tb_dir)
1306                    else:
1307                        # XXX
1308                        self.log.error("[gateways]: Ignoring gateways for " + \
1309                                "unknown testbed %s" % self.current_gateways)
1310                        self.current_gateways = None
1311                    return True
1312                else:
1313                    return False
1314            else:
1315                m = self.end_gateways.match(line)
1316                if m :
1317                    if m.group(1) != self.current_gateways:
1318                        raise service_error(service_error.internal,
1319                                "Mismatched gateway markers!?")
1320                    if self.control_gateway:
1321                        try:
1322                            cc = open("%s/%s/client.conf" %
1323                                    (self.tmpdir, self.current_gateways), 'w')
1324                            print >>cc, "ControlGateway: %s" % \
1325                                    self.control_gateway
1326                            if tbparams[self.master].has_key('smbshare'):
1327                                print >>cc, "SMBSHare: %s" % \
1328                                        tbparams[self.master]['smbshare']
1329                            print >>cc, "ProjectUser: %s" % \
1330                                    tbparams[self.master]['user']
1331                            print >>cc, "ProjectName: %s" % \
1332                                    tbparams[self.master]['project']
1333                            cc.close()
1334                        except IOError:
1335                            raise service_error(service_error.internal,
1336                                    "Error creating client config")
1337                        try:
1338                            cc = open("%s/%s/seer.conf" %
1339                                    (self.tmpdir, self.current_gateways),
1340                                    'w')
1341                            if self.current_gateways != self.master:
1342                                print >>cc, "ControlNode: %s" % \
1343                                        self.control_gateway
1344                            print >>cc, "ExperimentID: %s/%s" % \
1345                                    ( tbparams[self.master]['project'], \
1346                                    self.eid )
1347                            cc.close()
1348                        except IOError:
1349                            raise service_error(service_error.internal,
1350                                    "Error creating seer config")
1351                    else:
1352                        debug.error("[gateways]: No control gateway for %s" %\
1353                                    self.current_gateways)
1354                    self.current_gateways = None
1355                else:
1356                    dtb, myname, desthost, type = line.split(" ")
1357
1358                    if type == "control" or type == "both":
1359                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1360                                self.eid, 
1361                                tbparams[self.current_gateways]['project'],
1362                                tbparams[self.current_gateways]['domain'])
1363                    try:
1364                        active = self.gateway_conf_file(self.current_gateways,
1365                                self.master, self.eid, self.gw_pubkey_base,
1366                                self.gw_secretkey_base,
1367                                self.active_end, tbparams, dtb, myname,
1368                                desthost, type)
1369                    except IOError, e:
1370                        raise service_error(service_error.internal,
1371                                "Failed to write config file for %s" % \
1372                                        self.current_gateway)
1373           
1374                    gw_pubkey = "%s/keys/%s" % \
1375                            (self.tmpdir, self.gw_pubkey_base)
1376                    gw_secretkey = "%s/keys/%s" % \
1377                            (self.tmpdir, self.gw_secretkey_base)
1378
1379                    pkfile = "%s/%s/%s" % \
1380                            ( self.tmpdir, self.current_gateways, 
1381                                    self.gw_pubkey_base)
1382                    skfile = "%s/%s/%s" % \
1383                            ( self.tmpdir, self.current_gateways, 
1384                                    self.gw_secretkey_base)
1385
1386                    if not os.path.exists(pkfile):
1387                        try:
1388                            self.copy_file(gw_pubkey, pkfile)
1389                        except IOError:
1390                            service_error(service_error.internal,
1391                                    "Failed to copy pubkey file")
1392
1393                    if active and not os.path.exists(skfile):
1394                        try:
1395                            self.copy_file(gw_secretkey, skfile)
1396                        except IOError:
1397                            service_error(service_error.internal,
1398                                    "Failed to copy secretkey file")
1399                return True
1400
1401    class shunt_to_file:
1402        """
1403        Simple class to write data between two regexps to a file.
1404        """
1405        def __init__(self, begin, end, filename):
1406            """
1407            Begin shunting on a match of begin, stop on end, send data to
1408            filename.
1409            """
1410            self.begin = re.compile(begin)
1411            self.end = re.compile(end)
1412            self.in_shunt = False
1413            self.file = None
1414            self.filename = filename
1415
1416        def __call__(self, line):
1417            """
1418            Call this on each line in the input that may be shunted.
1419            """
1420            if not self.in_shunt:
1421                if self.begin.match(line):
1422                    self.in_shunt = True
1423                    try:
1424                        self.file = open(self.filename, "w")
1425                    except:
1426                        self.file = None
1427                        raise
1428                    return True
1429                else:
1430                    return False
1431            else:
1432                if self.end.match(line):
1433                    if self.file: 
1434                        self.file.close()
1435                        self.file = None
1436                    self.in_shunt = False
1437                else:
1438                    if self.file:
1439                        print >>self.file, line
1440                return True
1441
1442    class shunt_to_list:
1443        """
1444        Same interface as shunt_to_file.  Data collected in self.list, one list
1445        element per line.
1446        """
1447        def __init__(self, begin, end):
1448            self.begin = re.compile(begin)
1449            self.end = re.compile(end)
1450            self.in_shunt = False
1451            self.list = [ ]
1452       
1453        def __call__(self, line):
1454            if not self.in_shunt:
1455                if self.begin.match(line):
1456                    self.in_shunt = True
1457                    return True
1458                else:
1459                    return False
1460            else:
1461                if self.end.match(line):
1462                    self.in_shunt = False
1463                else:
1464                    self.list.append(line)
1465                return True
1466
1467    class shunt_to_string:
1468        """
1469        Same interface as shunt_to_file.  Data collected in self.str, all in
1470        one string.
1471        """
1472        def __init__(self, begin, end):
1473            self.begin = re.compile(begin)
1474            self.end = re.compile(end)
1475            self.in_shunt = False
1476            self.str = ""
1477       
1478        def __call__(self, line):
1479            if not self.in_shunt:
1480                if self.begin.match(line):
1481                    self.in_shunt = True
1482                    return True
1483                else:
1484                    return False
1485            else:
1486                if self.end.match(line):
1487                    self.in_shunt = False
1488                else:
1489                    self.str += line
1490                return True
1491
1492    def create_experiment(self, req, fid):
1493        """
1494        The external interface to experiment creation called from the
1495        dispatcher.
1496
1497        Creates a working directory, splits the incoming description using the
1498        splitter script and parses out the avrious subsections using the
1499        lcasses above.  Once each sub-experiment is created, use pooled threads
1500        to instantiate them and start it all up.
1501        """
1502
1503        if not self.auth.check_attribute(fid, 'create'):
1504            raise service_error(service_error.access, "Create access denied")
1505
1506        try:
1507            tmpdir = tempfile.mkdtemp(prefix="split-")
1508        except IOError:
1509            raise service_error(service_error.internal, "Cannot create tmp dir")
1510
1511        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1512        gw_secretkey_base = "fed.%s" % self.ssh_type
1513        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1514        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1515        tclfile = tmpdir + "/experiment.tcl"
1516        tbparams = { }
1517        try:
1518            access_user = self.accessdb[fid]
1519        except KeyError:
1520            raise service_error(service_error.internal,
1521                    "Access map and authorizer out of sync in " + \
1522                            "create_experiment for fedid %s"  % fid)
1523
1524        pid = "dummy"
1525        gid = "dummy"
1526        # XXX
1527        fail_soft = False
1528
1529        try:
1530            os.mkdir(tmpdir+"/keys")
1531        except OSError:
1532            raise service_error(service_error.internal,
1533                    "Can't make temporary dir")
1534
1535        req = req.get('CreateRequestBody', None)
1536        if not req:
1537            raise service_error(service_error.req,
1538                    "Bad request format (no CreateRequestBody)")
1539        # The tcl parser needs to read a file so put the content into that file
1540        descr=req.get('experimentdescription', None)
1541        if descr:
1542            file_content=descr.get('ns2description', None)
1543            if file_content:
1544                try:
1545                    f = open(tclfile, 'w')
1546                    f.write(file_content)
1547                    f.close()
1548                except IOError:
1549                    raise service_error(service_error.internal,
1550                            "Cannot write temp experiment description")
1551            else:
1552                raise service_error(service_error.req, 
1553                        "Only ns2descriptions supported")
1554        else:
1555            raise service_error(service_error.req, "No experiment description")
1556
1557        if req.has_key('experimentID') and \
1558                req['experimentID'].has_key('localname'):
1559            eid = req['experimentID']['localname']
1560            self.state_lock.acquire()
1561            while (self.state.has_key(eid)):
1562                eid += random.choice(string.ascii_letters)
1563            # To avoid another thread picking this localname
1564            self.state[eid] = "placeholder"
1565            self.state_lock.release()
1566        else:
1567            eid = self.exp_stem
1568            for i in range(0,5):
1569                eid += random.choice(string.ascii_letters)
1570            self.state_lock.acquire()
1571            while (self.state.has_key(eid)):
1572                eid = self.exp_stem
1573                for i in range(0,5):
1574                    eid += random.choice(string.ascii_letters)
1575            # To avoid another thread picking this localname
1576            self.state[eid] = "placeholder"
1577            self.state_lock.release()
1578
1579        try:
1580            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1581        except ValueError:
1582            raise service_error(service_error.server_config, 
1583                    "Bad key type (%s)" % self.ssh_type)
1584
1585        user = req.get('user', None)
1586        if user == None:
1587            raise service_error(service_error.req, "No user")
1588
1589        master = req.get('master', None)
1590        if not master:
1591            raise service_error(service_error.req, "No master testbed label")
1592        export_project = req.get('exportProject', None)
1593        if not export_project:
1594            raise service_error(service_error.req, "No export project")
1595       
1596        if self.splitter_url:
1597            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1598            split_data = self.remote_splitter(self.splitter_url, file_content,
1599                    master)
1600        else:
1601            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1602                str(self.muxmax), '-m', master]
1603
1604            if self.fedkit:
1605                tclcmd.append('-k')
1606
1607            tclcmd.extend([pid, gid, eid, tclfile])
1608
1609            self.log.debug("running local splitter %s", " ".join(tclcmd))
1610            tclparser = Popen(tclcmd, stdout=PIPE)
1611            split_data = tclparser.stdout
1612
1613        allocated = { }     # Testbeds we can access
1614        started = { }       # Testbeds where a sub-experiment started
1615                            # successfully
1616
1617        # Objects to parse the splitter output (defined above)
1618        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1619        parse_allbeds = self.allbeds(self.get_access)
1620        parse_gateways = self.gateways(eid, master, tmpdir,
1621                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1622        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1623                    "^#\s+End\s+Vtopo")
1624        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1625                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1626        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1627                "^#\s+End\s+tarfiles")
1628        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1629                "^#\s+End\s+rpms")
1630
1631        # Worling on the split data
1632        for line in split_data:
1633            line = line.rstrip()
1634            if parse_current_testbed(line, master, allocated, tbparams):
1635                continue
1636            elif parse_allbeds(line, user, tbparams, master, export_project,
1637                    access_user):
1638                continue
1639            elif parse_gateways(line, allocated, tbparams):
1640                continue
1641            elif parse_vtopo(line):
1642                continue
1643            elif parse_hostnames(line):
1644                continue
1645            elif parse_tarfiles(line):
1646                continue
1647            elif parse_rpms(line):
1648                continue
1649            else:
1650                raise service_error(service_error.internal, 
1651                        "Bad tcl parse? %s" % line)
1652
1653        # Virtual topology and visualization
1654        vtopo = self.gentopo(parse_vtopo.str)
1655        if not vtopo:
1656            raise service_error(service_error.internal, 
1657                    "Failed to generate virtual topology")
1658
1659        vis = self.genviz(vtopo)
1660        if not vis:
1661            raise service_error(service_error.internal, 
1662                    "Failed to generate visualization")
1663
1664        # save federant information
1665        for k in allocated.keys():
1666            tbparams[k]['federant'] = {\
1667                    'name': [ { 'localname' : eid} ],\
1668                    'emulab': tbparams[k]['emulab'],\
1669                    'allocID' : tbparams[k]['allocID'],\
1670                    'master' : k == master,\
1671                }
1672
1673
1674        # Copy tarfiles and rpms needed at remote sites into a staging area
1675        try:
1676            if self.fedkit:
1677                parse_tarfiles.list.append(self.fedkit)
1678            for t in parse_tarfiles.list:
1679                if not os.path.exists("%s/tarfiles" % tmpdir):
1680                    os.mkdir("%s/tarfiles" % tmpdir)
1681                self.copy_file(t, "%s/tarfiles/%s" % \
1682                        (tmpdir, os.path.basename(t)))
1683            for r in parse_rpms.list:
1684                if not os.path.exists("%s/rpms" % tmpdir):
1685                    os.mkdir("%s/rpms" % tmpdir)
1686                self.copy_file(r, "%s/rpms/%s" % \
1687                        (tmpdir, os.path.basename(r)))
1688        except IOError, e:
1689            raise service_error(service_error.internal, 
1690                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1691
1692        thread_pool_info = self.thread_pool()
1693        threads = [ ]
1694
1695        for tb in [ k for k in allocated.keys() if k != master]:
1696            # Wait until we have a free slot to start the next testbed load
1697            thread_pool_info.acquire()
1698            while thread_pool_info.started - \
1699                    thread_pool_info.terminated >= self.nthreads:
1700                thread_pool_info.wait()
1701            thread_pool_info.release()
1702
1703            # Create and start a thread to start the segment, and save it to
1704            # get the return value later
1705            t  = self.pooled_thread(target=self.start_segment, 
1706                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1707                    pdata=thread_pool_info, trace_file=self.trace_file)
1708            threads.append(t)
1709            t.start()
1710
1711        # Wait until all finish (the first clause of the while is to make sure
1712        # one starts)
1713        thread_pool_info.acquire()
1714        while thread_pool_info.started == 0 or \
1715                thread_pool_info.started > thread_pool_info.terminated:
1716            thread_pool_info.wait()
1717        thread_pool_info.release()
1718
1719        # If none failed, start the master
1720        failed = [ t.getName() for t in threads if not t.rv ]
1721
1722        if len(failed) == 0:
1723            if not self.start_segment(master, eid, tbparams, tmpdir):
1724                failed.append(master)
1725
1726        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1727        # If one failed clean up, unless fail_soft is set
1728        if failed:
1729            if not fail_soft:
1730                for tb in succeeded:
1731                    self.stop_segment(tb, eid, tbparams)
1732                # Remove the placeholder
1733                self.state_lock.acquire()
1734                del self.state[eid]
1735                self.state_lock.release()
1736
1737                raise service_error(service_error.federant,
1738                    "Swap in failed on %s" % ",".join(failed))
1739        else:
1740            self.log.info("[start_segment]: Experiment %s started" % eid)
1741
1742        # Generate an ID for the experiment (slice) and a certificate that the
1743        # allocator can use to prove they own it.  We'll ship it back through
1744        # the encrypted connection.
1745        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1746
1747        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1748
1749        # Walk up tmpdir, deleting as we go
1750        for path, dirs, files in os.walk(tmpdir, topdown=False):
1751            for f in files:
1752                os.remove(os.path.join(path, f))
1753            for d in dirs:
1754                os.rmdir(os.path.join(path, d))
1755        os.rmdir(tmpdir)
1756
1757        # The deepcopy prevents the allocation ID and other binaries from being
1758        # translated into other formats
1759        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1760                for tb in tbparams.keys() \
1761                    if tbparams[tb].has_key('federant') ],\
1762                    'vtopo': vtopo,\
1763                    'vis' : vis,
1764                    'experimentID' : [\
1765                            { 'fedid': copy.copy(expid) }, \
1766                            { 'localname': eid },\
1767                        ],\
1768                    'experimentAccess': { 'X509' : expcert },\
1769                }
1770
1771        # Insert the experiment into our state and update the disk copy
1772        self.state_lock.acquire()
1773        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1774                for tb in tbparams.keys() \
1775                    if tbparams[tb].has_key('federant') ],\
1776                    'vtopo': vtopo,\
1777                    'vis' : vis,
1778                    'owner': fid,
1779                    'experimentID' : [\
1780                            { 'fedid': expid }, { 'localname': eid },\
1781                        ],\
1782                }
1783        self.state[eid] = self.state[expid]
1784        if self.state_filename: self.write_state()
1785        self.state_lock.release()
1786
1787        self.auth.set_attribute(fid, expid)
1788        self.auth.set_attribute(expid, expid)
1789
1790        if not failed:
1791            return resp
1792        else:
1793            raise service_error(service_error.partial, \
1794                    "Partial swap in on %s" % ",".join(succeeded))
1795
1796    def check_experiment_access(self, fid, key):
1797        """
1798        Confirm that the fid has access to the experiment.  Though a request
1799        may be made in terms of a local name, the access attribute is always
1800        the experiment's fedid.
1801        """
1802        if not isinstance(key, fedid):
1803            self.state_lock.acquire()
1804            if self.state.has_key(key):
1805                try:
1806                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1807                            if f.has_key('fedid') ]
1808                except KeyError:
1809                    self.state_lock.release()
1810                    raise service_error(service_error.internal, 
1811                            "No fedid for experiment %s when checking " +\
1812                                    "access(!?)" % key)
1813                if len(kl) == 1:
1814                    key = kl[0]
1815                else:
1816                    self.state_lock.release()
1817                    raise service_error(service_error.internal, 
1818                            "multiple fedids for experiment %s when " +\
1819                                    "checking access(!?)" % key)
1820            else:
1821                self.state_lock.release()
1822                raise service_error(service_error.access, "Access Denied")
1823            self.state_lock.release()
1824
1825        if self.auth.check_attribute(fid, key):
1826            return True
1827        else:
1828            raise service_error(service_error.access, "Access Denied")
1829
1830
1831
1832    def get_vtopo(self, req, fid):
1833        """
1834        Return the stored virtual topology for this experiment
1835        """
1836        rv = None
1837
1838        req = req.get('VtopoRequestBody', None)
1839        if not req:
1840            raise service_error(service_error.req,
1841                    "Bad request format (no VtopoRequestBody)")
1842        exp = req.get('experiment', None)
1843        if exp:
1844            if exp.has_key('fedid'):
1845                key = exp['fedid']
1846                keytype = "fedid"
1847            elif exp.has_key('localname'):
1848                key = exp['localname']
1849                keytype = "localname"
1850            else:
1851                raise service_error(service_error.req, "Unknown lookup type")
1852        else:
1853            raise service_error(service_error.req, "No request?")
1854
1855        self.check_experiment_access(fid, key)
1856
1857        self.state_lock.acquire()
1858        if self.state.has_key(key):
1859            rv = { 'experiment' : {keytype: key },\
1860                    'vtopo': self.state[key]['vtopo'],\
1861                }
1862        self.state_lock.release()
1863
1864        if rv: return rv
1865        else: raise service_error(service_error.req, "No such experiment")
1866
1867    def get_vis(self, req, fid):
1868        """
1869        Return the stored visualization for this experiment
1870        """
1871        rv = None
1872
1873        req = req.get('VisRequestBody', None)
1874        if not req:
1875            raise service_error(service_error.req,
1876                    "Bad request format (no VisRequestBody)")
1877        exp = req.get('experiment', None)
1878        if exp:
1879            if exp.has_key('fedid'):
1880                key = exp['fedid']
1881                keytype = "fedid"
1882            elif exp.has_key('localname'):
1883                key = exp['localname']
1884                keytype = "localname"
1885            else:
1886                raise service_error(service_error.req, "Unknown lookup type")
1887        else:
1888            raise service_error(service_error.req, "No request?")
1889
1890        self.check_experiment_access(fid, key)
1891
1892        self.state_lock.acquire()
1893        if self.state.has_key(key):
1894            rv =  { 'experiment' : {keytype: key },\
1895                    'vis': self.state[key]['vis'],\
1896                    }
1897        self.state_lock.release()
1898
1899        if rv: return rv
1900        else: raise service_error(service_error.req, "No such experiment")
1901
1902    def get_info(self, req, fid):
1903        """
1904        Return all the stored info about this experiment
1905        """
1906        rv = None
1907
1908        req = req.get('InfoRequestBody', None)
1909        if not req:
1910            raise service_error(service_error.req,
1911                    "Bad request format (no VisRequestBody)")
1912        exp = req.get('experiment', None)
1913        if exp:
1914            if exp.has_key('fedid'):
1915                key = exp['fedid']
1916                keytype = "fedid"
1917            elif exp.has_key('localname'):
1918                key = exp['localname']
1919                keytype = "localname"
1920            else:
1921                raise service_error(service_error.req, "Unknown lookup type")
1922        else:
1923            raise service_error(service_error.req, "No request?")
1924
1925        self.check_experiment_access(fid, key)
1926
1927        # The state may be massaged by the service function that called
1928        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1929        # state.
1930        self.state_lock.acquire()
1931        if self.state.has_key(key):
1932            rv = copy.deepcopy(self.state[key])
1933        self.state_lock.release()
1934
1935        if rv: return rv
1936        else: raise service_error(service_error.req, "No such experiment")
1937
1938
1939    def terminate_experiment(self, req, fid):
1940        """
1941        Swap this experiment out on the federants and delete the shared
1942        information
1943        """
1944        tbparams = { }
1945        req = req.get('TerminateRequestBody', None)
1946        if not req:
1947            raise service_error(service_error.req,
1948                    "Bad request format (no TerminateRequestBody)")
1949        exp = req.get('experiment', None)
1950        if exp:
1951            if exp.has_key('fedid'):
1952                key = exp['fedid']
1953                keytype = "fedid"
1954            elif exp.has_key('localname'):
1955                key = exp['localname']
1956                keytype = "localname"
1957            else:
1958                raise service_error(service_error.req, "Unknown lookup type")
1959        else:
1960            raise service_error(service_error.req, "No request?")
1961
1962        self.check_experiment_access(fid, key)
1963
1964        self.state_lock.acquire()
1965        fed_exp = self.state.get(key, None)
1966
1967        if fed_exp:
1968            # This branch of the conditional holds the lock to generate a
1969            # consistent temporary tbparams variable to deallocate experiments.
1970            # It releases the lock to do the deallocations and reacquires it to
1971            # remove the experiment state when the termination is complete.
1972            ids = []
1973            #  experimentID is a list of dicts that are self-describing
1974            #  identifiers.  This finds all the fedids and localnames - the
1975            #  keys of self.state - and puts them into ids.
1976            for id in fed_exp.get('experimentID', []):
1977                if id.has_key('fedid'): ids.append(id['fedid'])
1978                if id.has_key('localname'): ids.append(id['localname'])
1979
1980            # Construct enough of the tbparams to make the stop_segment calls
1981            # work
1982            for fed in fed_exp['federant']:
1983                try:
1984                    for e in fed['name']:
1985                        eid = e.get('localname', None)
1986                        if eid: break
1987                    else:
1988                        continue
1989
1990                    p = fed['emulab']['project']
1991
1992                    project = p['name']['localname']
1993                    tb = p['testbed']['localname']
1994                    user = p['user'][0]['userID']['localname']
1995
1996                    domain = fed['emulab']['domain']
1997                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1998                    aid = fed['allocID']
1999                except KeyError, e:
2000                    continue
2001                tbparams[tb] = {\
2002                        'user': user,\
2003                        'domain': domain,\
2004                        'project': project,\
2005                        'host': host,\
2006                        'eid': eid,\
2007                        'aid': aid,\
2008                    }
2009            self.state_lock.release()
2010
2011            # Stop everyone.
2012            for tb in tbparams.keys():
2013                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
2014
2015            # release the allocations
2016            for tb in tbparams.keys():
2017                self.release_access(tb, tbparams[tb]['aid'])
2018
2019            # Remove the terminated experiment
2020            self.state_lock.acquire()
2021            for id in ids:
2022                if self.state.has_key(id): del self.state[id]
2023
2024            if self.state_filename: self.write_state()
2025            self.state_lock.release()
2026
2027            return { 'experiment': exp }
2028        else:
2029            # Don't forget to release the lock
2030            self.state_lock.release()
2031            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.