source: fedd/fedd_experiment_control.py @ 9460b1e

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 9460b1e was 9460b1e, checked in by Ted Faber <faber@…>, 15 years ago

move remote_service out of fedd_util

  • Property mode set to 100644
File size: 52.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import xml.parsers.expat
11
12import re
13import random
14import string
15import subprocess
16import tempfile
17import copy
18import pickle
19
20import traceback
21
22from threading import *
23
24from subprocess import *
25
26from fedd_services import *
27from fedd_internal_services import *
28from fedd_util import *
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30import parse_detail
31from service_error import *
32
33import logging
34
35class nullHandler(logging.Handler):
36    def emit(self, record): pass
37
38fl = logging.getLogger("fedd.experiment_control")
39fl.addHandler(nullHandler())
40
41class fedd_experiment_control_local:
42    """
43    Control of experiments that this system can directly access.
44
45    Includes experiment creation, termination and information dissemination.
46    Thred safe.
47    """
48   
49    class thread_pool:
50        """
51        A class to keep track of a set of threads all invoked for the same
52        task.  Manages the mutual exclusion of the states.
53        """
54        def __init__(self):
55            """
56            Start a pool.
57            """
58            self.changed = Condition()
59            self.started = 0
60            self.terminated = 0
61
62        def acquire(self):
63            """
64            Get the pool's lock.
65            """
66            self.changed.acquire()
67
68        def release(self):
69            """
70            Release the pool's lock.
71            """
72            self.changed.release()
73
74        def wait(self, timeout = None):
75            """
76            Wait for a pool thread to start or stop.
77            """
78            self.changed.wait(timeout)
79
80        def start(self):
81            """
82            Called by a pool thread to report starting.
83            """
84            self.changed.acquire()
85            self.started += 1
86            self.changed.notifyAll()
87            self.changed.release()
88
89        def terminate(self):
90            """
91            Called by a pool thread to report finishing.
92            """
93            self.changed.acquire()
94            self.terminated += 1
95            self.changed.notifyAll()
96            self.changed.release()
97
98        def clear(self):
99            """
100            Clear all pool data.
101            """
102            self.changed.acquire()
103            self.started = 0
104            self.terminated =0
105            self.changed.notifyAll()
106            self.changed.release()
107
108    class pooled_thread(Thread):
109        """
110        One of a set of threads dedicated to a specific task.  Uses the
111        thread_pool class above for coordination.
112        """
113        def __init__(self, group=None, target=None, name=None, args=(), 
114                kwargs={}, pdata=None, trace_file=None):
115            Thread.__init__(self, group, target, name, args, kwargs)
116            self.rv = None          # Return value of the ops in this thread
117            self.exception = None   # Exception that terminated this thread
118            self.target=target      # Target function to run on start()
119            self.args = args        # Args to pass to target
120            self.kwargs = kwargs    # Additional kw args
121            self.pdata = pdata      # thread_pool for this class
122            # Logger for this thread
123            self.log = logging.getLogger("fedd.experiment_control")
124       
125        def run(self):
126            """
127            Emulate Thread.run, except add pool data manipulation and error
128            logging.
129            """
130            if self.pdata:
131                self.pdata.start()
132
133            if self.target:
134                try:
135                    self.rv = self.target(*self.args, **self.kwargs)
136                except service_error, s:
137                    self.exception = s
138                    self.log.error("Thread exception: %s %s" % \
139                            (s.code_string(), s.desc))
140                except:
141                    self.exception = sys.exc_info()[1]
142                    self.log.error(("Unexpected thread exception: %s" +\
143                            "Trace %s") % (self.exception,\
144                                traceback.format_exc()))
145            if self.pdata:
146                self.pdata.terminate()
147
148    call_RequestAccess = service_caller('RequestAccess',
149            'getfeddPortType', feddServiceLocator, 
150            RequestAccessRequestMessage, 'RequestAccessRequestBody')
151
152    call_ReleaseAccess = service_caller('ReleaseAccess',
153            'getfeddPortType', feddServiceLocator, 
154            ReleaseAccessRequestMessage, 'ReleaseAccessRequestBody')
155
156    call_Ns2Split = service_caller('Ns2Split',
157            'getfeddInternalPortType', feddInternalServiceLocator, 
158            Ns2SplitRequestMessage, 'Ns2SplitRequestBody')
159
160    def __init__(self, config=None):
161        """
162        Intialize the various attributes, most from the config object
163        """
164        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
165        self.thread_pool = fedd_experiment_control_local.thread_pool
166
167        self.cert_file = None
168        self.cert_pwd = None
169        self.trusted_certs = None
170
171        # Walk through the various relevant certificat specifying config
172        # attributes until the local certificate attributes can be resolved.
173        # The walk is from most specific to most general specification.
174        for s in ("experiment_control", "globals"):
175            if config.has_section(s): 
176                if config.has_option(s, "cert_file"):
177                    if not self.cert_file:
178                        self.cert_file = config.get(s, "cert_file")
179                        self.cert_pwd = config.get(s, "cert_pwd")
180
181                if config.has_option(s, "trusted_certs"):
182                    if not self.trusted_certs:
183                        self.trusted_certs = config.get(s, "trusted_certs")
184
185        self.exp_stem = "fed-stem"
186        self.log = logging.getLogger("fedd.experiment_control")
187        self.muxmax = 2
188        self.nthreads = 2
189        self.randomize_experiments = False
190
191        self.scp_exec = "/usr/bin/scp"
192        self.splitter = None
193        self.ssh_exec="/usr/bin/ssh"
194        self.ssh_keygen = "/usr/bin/ssh-keygen"
195        self.ssh_identity_file = None
196
197        if config.has_section("experiment_control"):
198            self.debug = config.get("experiment_control", "create_debug")
199            self.state_filename = config.get("experiment_control", 
200                    "experiment_state_file")
201            self.splitter_url = config.get("experiment_control", "splitter_url")
202            self.fedkit = config.get("experiment_control", "fedkit")
203        else:
204            self.debug = False
205            self.state_filename = None
206            self.splitter_url = None
207            self.fedkit = None
208
209        # XXX
210        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
211        self.ssh_type = "rsa"
212        self.state = { }
213        self.state_lock = Lock()
214        self.tclsh = "/usr/local/bin/otclsh"
215        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
216        self.tbmap = { 
217                'deter':'https://users.isi.deterlab.net:23235',
218                'emulab':'https://users.isi.deterlab.net:23236',
219                'ucb':'https://users.isi.deterlab.net:23237',
220                }
221        self.trace_file = sys.stderr
222
223        self.def_expstart = \
224                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
225                "/tmp/federate";
226        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
227                "FEDDIR/hosts";
228        self.def_gwstart = \
229                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
230                "/tmp/bridge.log";
231        self.def_mgwstart = \
232                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
233                "/tmp/bridge.log";
234        self.def_gwimage = "FBSD61-TUNNEL2";
235        self.def_gwtype = "pc";
236
237
238        if self.ssh_pubkey_file:
239            try:
240                f = open(self.ssh_pubkey_file, 'r')
241                self.ssh_pubkey = f.read()
242                f.close()
243            except IOError:
244                raise service_error(service_error.internal,
245                        "Cannot read sshpubkey")
246
247        set_log_level(config, "experiment_control", self.log)
248
249        # Grab saved state.  OK to do this w/o locking because it's read only
250        # and only one thread should be in existence that can see self.state at
251        # this point.
252        if self.state_filename:
253            self.read_state()
254
255        # Dispatch tables
256        self.soap_services = {\
257                'Create': soap_handler(\
258                        CreateRequestMessage.typecode,
259                        getattr(self, "create_experiment"), 
260                        CreateResponseMessage,
261                        "CreateResponseBody"),
262                'Vtopo': soap_handler(\
263                        VtopoRequestMessage.typecode,
264                        getattr(self, "get_vtopo"),
265                        VtopoResponseMessage,
266                        "VtopoResponseBody"),
267                'Vis': soap_handler(\
268                        VisRequestMessage.typecode,
269                        getattr(self, "get_vis"),
270                        VisResponseMessage,
271                        "VisResponseBody"),
272                'Info': soap_handler(\
273                        InfoRequestMessage.typecode,
274                        getattr(self, "get_info"),
275                        InfoResponseMessage,
276                        "InfoResponseBody"),
277                'Terminate': soap_handler(\
278                        TerminateRequestMessage.typecode,
279                        getattr(self, "terminate_experiment"),
280                        TerminateResponseMessage,
281                        "TerminateResponseBody"),
282        }
283
284        self.xmlrpc_services = {\
285                'Create': xmlrpc_handler(\
286                        getattr(self, "create_experiment"), 
287                        "CreateResponseBody"),
288                'Vtopo': xmlrpc_handler(\
289                        getattr(self, "get_vtopo"),
290                        "VtopoResponseBody"),
291                'Vis': xmlrpc_handler(\
292                        getattr(self, "get_vis"),
293                        "VisResponseBody"),
294                'Info': xmlrpc_handler(\
295                        getattr(self, "get_info"),
296                        "InfoResponseBody"),
297                'Terminate': xmlrpc_handler(\
298                        getattr(self, "terminate_experiment"),
299                        "TerminateResponseBody"),
300        }
301
302    def copy_file(self, src, dest, size=1024):
303        """
304        Exceedingly simple file copy.
305        """
306        s = open(src,'r')
307        d = open(dest, 'w')
308
309        buf = "x"
310        while buf != "":
311            buf = s.read(size)
312            d.write(buf)
313        s.close()
314        d.close()
315
316    # Call while holding self.state_lock
317    def write_state(self):
318        """
319        Write a new copy of experiment state after copying the existing state
320        to a backup.
321
322        State format is a simple pickling of the state dictionary.
323        """
324        if os.access(self.state_filename, os.W_OK):
325            self.copy_file(self.state_filename, \
326                    "%s.bak" % self.state_filename)
327        try:
328            f = open(self.state_filename, 'w')
329            pickle.dump(self.state, f)
330        except IOError, e:
331            self.log.error("Can't write file %s: %s" % \
332                    (self.state_filename, e))
333        except pickle.PicklingError, e:
334            self.log.error("Pickling problem: %s" % e)
335        except TypeError, e:
336            self.log.error("Pickling problem (TypeError): %s" % e)
337
338    # Call while holding self.state_lock
339    def read_state(self):
340        """
341        Read a new copy of experiment state.  Old state is overwritten.
342
343        State format is a simple pickling of the state dictionary.
344        """
345        try:
346            f = open(self.state_filename, "r")
347            self.state = pickle.load(f)
348            self.log.debug("[read_state]: Read state from %s" % \
349                    self.state_filename)
350        except IOError, e:
351            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
352                    % (self.state_filename, e))
353        except pickle.UnpicklingError, e:
354            self.log.warning(("[read_state]: No saved state: " + \
355                    "Unpickling failed: %s") % e)
356
357    def scp_file(self, file, user, host, dest=""):
358        """
359        scp a file to the remote host.  If debug is set the action is only
360        logged.
361        """
362
363        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
364        rv = 0
365
366        try:
367            dnull = open("/dev/null", "r")
368        except IOError:
369            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
370            dnull = Null
371
372        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
373        if not self.debug:
374            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
375            else: rv = call(scp_cmd)
376
377        return rv == 0
378
379    def ssh_cmd(self, user, host, cmd, wname=None):
380        """
381        Run a remote command on host as user.  If debug is set, the action is
382        only logged.
383        """
384        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
385
386        try:
387            dnull = open("/dev/null", "r")
388        except IOError:
389            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
390            dnull = Null
391
392        self.log.debug("[ssh_cmd]: %s" % sh_str)
393        if not self.debug:
394            if dnull:
395                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
396            else:
397                sub = Popen(sh_str, shell=True)
398            return sub.wait() == 0
399        else:
400            return True
401
402    def ship_configs(self, host, user, src_dir, dest_dir):
403        """
404        Copy federant-specific configuration files to the federant.
405        """
406        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
407            return False
408        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
409            return False
410
411        for f in os.listdir(src_dir):
412            if os.path.isdir(f):
413                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
414                        "%s/%s" % (dest_dir, f)):
415                    return False
416            else:
417                if not self.scp_file("%s/%s" % (src_dir, f), 
418                        user, host, dest_dir):
419                    return False
420        return True
421
422    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
423        """
424        Start a sub-experiment on a federant.
425
426        Get the current state, modify or create as appropriate, ship data and
427        configs and start the experiment.  There are small ordering differences
428        based on the initial state of the sub-experiment.
429        """
430        # ops node in the federant
431        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
432        user = tbparams[tb]['user']     # federant user
433        pid = tbparams[tb]['project']   # federant project
434        # XXX
435        base_confs = ( "hosts",)
436        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
437        # command to test experiment state
438        expinfo_exec = "/usr/testbed/bin/expinfo" 
439        # Configuration directories on the remote machine
440        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
441        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
442        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
443        # Regular expressions to parse the expinfo response
444        state_re = re.compile("State:\s+(\w+)")
445        no_exp_re = re.compile("^No\s+such\s+experiment")
446        state = None    # Experiment state parsed from expinfo
447        # The expinfo ssh command
448        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
449
450        # Get status
451        self.log.debug("[start_segment]: %s"% " ".join(cmd))
452        dev_null = None
453        try:
454            dev_null = open("/dev/null", "a")
455        except IOError, e:
456            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
457
458        if self.debug:
459            state = 'swapped'
460            rv = 0
461        else:
462            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
463            for line in status.stdout:
464                m = state_re.match(line)
465                if m: state = m.group(1)
466                else:
467                    m = no_exp_re.match(line)
468                    if m: state = "none"
469            rv = status.wait()
470
471        # If the experiment is not present the subcommand returns a non-zero
472        # return value.  If we successfully parsed a "none" outcome, ignore the
473        # return code.
474        if rv != 0 and state != "none":
475            raise service_error(service_error.internal,
476                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
477
478        self.log.debug("[start_segment]: %s: %s" % (tb, state))
479        self.log.info("[start_segment]:transferring experiment to %s" % tb)
480
481        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
482            return False
483        # Clear the federation files
484        if not self.ssh_cmd(user, host, 
485                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
486            return False
487        if not self.ssh_cmd(user, host, 
488                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
489            return False
490        # Clear and create the tarfiles and rpm directories
491        for d in (tarfiles_dir, rpms_dir):
492            if not self.ssh_cmd(user, host, 
493                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
494                return False
495            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
496                    "create tarfiles"):
497                return False
498       
499        if state == 'active':
500            # Remote experiment is active.  Modify it.
501            for f in base_confs:
502                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
503                        "%s/%s" % (proj_dir, f)):
504                    return False
505            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
506                    proj_dir):
507                return False
508            if os.path.isdir("%s/tarfiles" % tmpdir):
509                if not self.ship_configs(host, user,
510                        "%s/tarfiles" % tmpdir, tarfiles_dir):
511                    return False
512            if os.path.isdir("%s/rpms" % tmpdir):
513                if not self.ship_configs(host, user,
514                        "%s/rpms" % tmpdir, tarfiles_dir):
515                    return False
516            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
517            if not self.ssh_cmd(user, host,
518                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
519                            (pid, eid, tclfile), "modexp"):
520                return False
521            return True
522        elif state == "swapped":
523            # Remote experiment swapped out.  Modify it and swap it in.
524            for f in base_confs:
525                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
526                        "%s/%s" % (proj_dir, f)):
527                    return False
528            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
529                    proj_dir):
530                return False
531            if os.path.isdir("%s/tarfiles" % tmpdir):
532                if not self.ship_configs(host, user,
533                        "%s/tarfiles" % tmpdir, tarfiles_dir):
534                    return False
535            if os.path.isdir("%s/rpms" % tmpdir):
536                if not self.ship_configs(host, user,
537                        "%s/rpms" % tmpdir, tarfiles_dir):
538                    return False
539            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
540            if not self.ssh_cmd(user, host,
541                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
542                    "modexp"):
543                return False
544            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
545            if not self.ssh_cmd(user, host,
546                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
547                    "swapexp"):
548                return False
549            return True
550        elif state == "none":
551            # No remote experiment.  Create one.  We do this in 2 steps so we
552            # can put the configuration files and scripts into the new
553            # experiment directories.
554
555            # Tarfiles must be present for creation to work
556            if os.path.isdir("%s/tarfiles" % tmpdir):
557                if not self.ship_configs(host, user,
558                        "%s/tarfiles" % tmpdir, tarfiles_dir):
559                    return False
560            if os.path.isdir("%s/rpms" % tmpdir):
561                if not self.ship_configs(host, user,
562                        "%s/rpms" % tmpdir, tarfiles_dir):
563                    return False
564            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
565            if not self.ssh_cmd(user, host,
566                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
567                            (pid, eid, tclfile), "startexp"):
568                return False
569            # After startexp the per-experiment directories exist
570            for f in base_confs:
571                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
572                        "%s/%s" % (proj_dir, f)):
573                    return False
574            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
575                    proj_dir):
576                return False
577            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
578            if not self.ssh_cmd(user, host,
579                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
580                    "swapexp"):
581                return False
582            return True
583        else:
584            self.log.debug("[start_segment]:unknown state %s" % state)
585            return False
586
587    def stop_segment(self, tb, eid, tbparams):
588        """
589        Stop a sub experiment by calling swapexp on the federant
590        """
591        user = tbparams[tb]['user']
592        host = tbparams[tb]['host']
593        pid = tbparams[tb]['project']
594
595        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
596        return self.ssh_cmd(user, host,
597                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
598
599       
600    def generate_ssh_keys(self, dest, type="rsa" ):
601        """
602        Generate a set of keys for the gateways to use to talk.
603
604        Keys are of type type and are stored in the required dest file.
605        """
606        valid_types = ("rsa", "dsa")
607        t = type.lower();
608        if t not in valid_types: raise ValueError
609        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
610
611        try:
612            trace = open("/dev/null", "w")
613        except IOError:
614            raise service_error(service_error.internal,
615                    "Cannot open /dev/null??");
616
617        # May raise CalledProcessError
618        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
619        rv = call(cmd, stdout=trace, stderr=trace)
620        if rv != 0:
621            raise service_error(service_error.internal, 
622                    "Cannot generate nonce ssh keys.  %s return code %d" \
623                            % (self.ssh_keygen, rv))
624
625    def gentopo(self, str):
626        """
627        Generate the topology dtat structure from the splitter's XML
628        representation of it.
629
630        The topology XML looks like:
631            <experiment>
632                <nodes>
633                    <node><vname></vname><ips>ip1:ip2</ips></node>
634                </nodes>
635                <lans>
636                    <lan>
637                        <vname></vname><vnode></vnode><ip></ip>
638                        <bandwidth></bandwidth><member>node:port</member>
639                    </lan>
640                </lans>
641        """
642        class topo_parse:
643            """
644            Parse the topology XML and create the dats structure.
645            """
646            def __init__(self):
647                # Typing of the subelements for data conversion
648                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
649                self.int_subelements = ( 'bandwidth',)
650                self.float_subelements = ( 'delay',)
651                # The final data structure
652                self.nodes = [ ]
653                self.lans =  [ ]
654                self.topo = { \
655                        'node': self.nodes,\
656                        'lan' : self.lans,\
657                    }
658                self.element = { }  # Current element being created
659                self.chars = ""     # Last text seen
660
661            def end_element(self, name):
662                # After each sub element the contents is added to the current
663                # element or to the appropriate list.
664                if name == 'node':
665                    self.nodes.append(self.element)
666                    self.element = { }
667                elif name == 'lan':
668                    self.lans.append(self.element)
669                    self.element = { }
670                elif name in self.str_subelements:
671                    self.element[name] = self.chars
672                    self.chars = ""
673                elif name in self.int_subelements:
674                    self.element[name] = int(self.chars)
675                    self.chars = ""
676                elif name in self.float_subelements:
677                    self.element[name] = float(self.chars)
678                    self.chars = ""
679
680            def found_chars(self, data):
681                self.chars += data.rstrip()
682
683
684        tp = topo_parse();
685        parser = xml.parsers.expat.ParserCreate()
686        parser.EndElementHandler = tp.end_element
687        parser.CharacterDataHandler = tp.found_chars
688
689        parser.Parse(str)
690
691        return tp.topo
692       
693
694    def genviz(self, topo):
695        """
696        Generate the visualization the virtual topology
697        """
698
699        neato = "/usr/local/bin/neato"
700        # These are used to parse neato output and to create the visualization
701        # file.
702        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
703        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
704                "%s</type></node>"
705
706        try:
707            # Node names
708            nodes = [ n['vname'] for n in topo['node'] ]
709            topo_lans = topo['lan']
710        except KeyError:
711            raise service_error(service_error.internal, "Bad topology")
712
713        lans = { }
714        links = { }
715
716        # Walk through the virtual topology, organizing the connections into
717        # 2-node connections (links) and more-than-2-node connections (lans).
718        # When a lan is created, it's added to the list of nodes (there's a
719        # node in the visualization for the lan).
720        for l in topo_lans:
721            if links.has_key(l['vname']):
722                if len(links[l['vname']]) < 2:
723                    links[l['vname']].append(l['vnode'])
724                else:
725                    nodes.append(l['vname'])
726                    lans[l['vname']] = links[l['vname']]
727                    del links[l['vname']]
728                    lans[l['vname']].append(l['vnode'])
729            elif lans.has_key(l['vname']):
730                lans[l['vname']].append(l['vnode'])
731            else:
732                links[l['vname']] = [ l['vnode'] ]
733
734
735        # Open up a temporary file for dot to turn into a visualization
736        try:
737            df, dotname = tempfile.mkstemp()
738            dotfile = os.fdopen(df, 'w')
739        except IOError:
740            raise service_error(service_error.internal,
741                    "Failed to open file in genviz")
742
743        # Generate a dot/neato input file from the links, nodes and lans
744        try:
745            print >>dotfile, "graph G {"
746            for n in nodes:
747                print >>dotfile, '\t"%s"' % n
748            for l in links.keys():
749                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
750            for l in lans.keys():
751                for n in lans[l]:
752                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
753            print >>dotfile, "}"
754            dotfile.close()
755        except TypeError:
756            raise service_error(service_error.internal,
757                    "Single endpoint link in vtopo")
758        except IOError:
759            raise service_error(service_error.internal, "Cannot write dot file")
760
761        # Use dot to create a visualization
762        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
763                '-Gpack=true', dotname], stdout=PIPE)
764
765        # Translate dot to vis format
766        vis_nodes = [ ]
767        vis = { 'node': vis_nodes }
768        for line in dot.stdout:
769            m = vis_re.match(line)
770            if m:
771                vn = m.group(1)
772                vis_node = {'name': vn, \
773                        'x': float(m.group(2)),\
774                        'y' : float(m.group(3)),\
775                    }
776                if vn in links.keys() or vn in lans.keys():
777                    vis_node['type'] = 'lan'
778                else:
779                    vis_node['type'] = 'node'
780                vis_nodes.append(vis_node)
781        rv = dot.wait()
782
783        os.remove(dotname)
784        if rv == 0 : return vis
785        else: return None
786
787    def get_access(self, tb, nodes, user, tbparam):
788        """
789        Get access to testbed through fedd and set the parameters for that tb
790        """
791
792        translate_attr = {
793            'slavenodestartcmd': 'expstart',
794            'slaveconnectorstartcmd': 'gwstart',
795            'masternodestartcmd': 'mexpstart',
796            'masterconnectorstartcmd': 'mgwstart',
797            'connectorimage': 'gwimage',
798            'connectortype': 'gwtype',
799            'tunnelcfg': 'tun',
800            'smbshare': 'smbshare',
801        }
802
803        uri = self.tbmap.get(tb, None)
804        if not uri:
805            raise service_error(serice_error.server_config, 
806                    "Unknown testbed: %s" % tb)
807
808        # The basic request
809        req = {\
810                'destinationTestbed' : { 'uri' : uri },
811                'user':  user,
812                'allocID' : { 'localname': 'test' },
813                # XXX: need to get service access stright
814                'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
815                'serviceAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ]
816            }
817       
818        # node resources if any
819        if nodes != None and len(nodes) > 0:
820            rnodes = [ ]
821            for n in nodes:
822                rn = { }
823                image, hw, count = n.split(":")
824                if image: rn['image'] = [ image ]
825                if hw: rn['hardware'] = [ hw ]
826                if count: rn['count'] = int(count)
827                rnodes.append(rn)
828            req['resources']= { }
829            req['resources']['node'] = rnodes
830
831        r = self.call_RequestAccess(uri, req, 
832                self.cert_file, self.cert_pwd, self.trusted_certs)
833
834        if r.has_key('RequestAccessResponseBody'):
835            r = r['RequestAccessResponseBody']
836        else:
837            raise service_error(service_error.protocol,
838                    "Bad proxy response")
839
840        e = r['emulab']
841        p = e['project']
842        tbparam[tb] = { 
843                "boss": e['boss'],
844                "host": e['ops'],
845                "domain": e['domain'],
846                "fs": e['fileServer'],
847                "eventserver": e['eventServer'],
848                "project": unpack_id(p['name']),
849                "emulab" : e,
850                "allocID" : r['allocID'],
851                }
852        # Make the testbed name be the label the user applied
853        p['testbed'] = {'localname': tb }
854
855        for u in p['user']:
856            tbparam[tb]['user'] = unpack_id(u['userID'])
857
858        for a in e['fedAttr']:
859            if a['attribute']:
860                key = translate_attr.get(a['attribute'].lower(), None)
861                if key:
862                    tbparam[tb][key]= a['value']
863       
864    def release_access(self, tb, aid):
865        """
866        Release access to testbed through fedd
867        """
868
869        uri = self.tbmap.get(tb, None)
870        if not uri:
871            raise service_error(serice_error.server_config, 
872                    "Unknown testbed: %s" % tb)
873
874        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
875                self.cert_file, self.cert_pwd, self.trusted_certs)
876
877        # better error coding
878
879    def remote_splitter(self, uri, desc, master):
880
881        req = {
882                'description' : { 'ns2description': desc },
883                'master': master,
884                'include_fedkit': bool(self.fedkit)
885            }
886
887        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
888                self.trusted_certs)
889
890        if r.has_key('Ns2SplitResponseBody'):
891            r = r['Ns2SplitResponseBody']
892            if r.has_key('output'):
893                return r['output'].splitlines()
894            else:
895                raise service_error(service_error.protocol, 
896                        "Bad splitter response (no output)")
897        else:
898            raise service_error(service_error.protocol, "Bad splitter response")
899       
900    class current_testbed:
901        """
902        Object for collecting the current testbed description.  The testbed
903        description is saved to a file with the local testbed variables
904        subsittuted line by line.
905        """
906        def __init__(self, eid, tmpdir, fedkit):
907            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
908            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
909            self.current_testbed = None
910            self.testbed_file = None
911
912            self.def_expstart = \
913                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
914            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
915            self.def_gwstart = \
916                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
917            self.def_mgwstart = \
918                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
919            self.def_gwimage = "FBSD61-TUNNEL2";
920            self.def_gwtype = "pc";
921
922            self.eid = eid
923            self.tmpdir = tmpdir
924            self.fedkit = fedkit
925
926        def __call__(self, line, master, allocated, tbparams):
927            # Capture testbed topology descriptions
928            if self.current_testbed == None:
929                m = self.begin_testbed.match(line)
930                if m != None:
931                    self.current_testbed = m.group(1)
932                    if self.current_testbed == None:
933                        raise service_error(service_error.req,
934                                "Bad request format (unnamed testbed)")
935                    allocated[self.current_testbed] = \
936                            allocated.get(self.current_testbed,0) + 1
937                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
938                    if not os.path.exists(tb_dir):
939                        try:
940                            os.mkdir(tb_dir)
941                        except IOError:
942                            raise service_error(service_error.internal,
943                                    "Cannot create %s" % tb_dir)
944                    try:
945                        self.testbed_file = open("%s/%s.%s.tcl" %
946                                (tb_dir, self.eid, self.current_testbed), 'w')
947                    except IOError:
948                        self.testbed_file = None
949                    return True
950                else: return False
951            else:
952                m = self.end_testbed.match(line)
953                if m != None:
954                    if m.group(1) != self.current_testbed:
955                        raise service_error(service_error.internal, 
956                                "Mismatched testbed markers!?")
957                    if self.testbed_file != None: 
958                        self.testbed_file.close()
959                        self.testbed_file = None
960                    self.current_testbed = None
961                elif self.testbed_file:
962                    # Substitute variables and put the line into the local
963                    # testbed file.
964                    gwtype = tbparams[self.current_testbed].get('gwtype', 
965                            self.def_gwtype)
966                    gwimage = tbparams[self.current_testbed].get('gwimage', 
967                            self.def_gwimage)
968                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
969                            self.def_mgwstart)
970                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
971                            self.def_mexpstart)
972                    gwstart = tbparams[self.current_testbed].get('gwstart', 
973                            self.def_gwstart)
974                    expstart = tbparams[self.current_testbed].get('expstart', 
975                            self.def_expstart)
976                    project = tbparams[self.current_testbed].get('project')
977                    line = re.sub("GWTYPE", gwtype, line)
978                    line = re.sub("GWIMAGE", gwimage, line)
979                    if self.current_testbed == master:
980                        line = re.sub("GWSTART", mgwstart, line)
981                        line = re.sub("EXPSTART", mexpstart, line)
982                    else:
983                        line = re.sub("GWSTART", gwstart, line)
984                        line = re.sub("EXPSTART", expstart, line)
985                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
986                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
987                    line = re.sub("EID", self.eid, line)
988                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
989                            (project, self.eid), line)
990                    if self.fedkit:
991                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
992                                line)
993                    print >>self.testbed_file, line
994                return True
995
996    class allbeds:
997        """
998        Process the Allbeds section.  Get access to each federant and save the
999        parameters in tbparams
1000        """
1001        def __init__(self, get_access):
1002            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1003            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1004            self.in_allbeds = False
1005            self.get_access = get_access
1006
1007        def __call__(self, line, user, tbparams):
1008            # Testbed access parameters
1009            if not self.in_allbeds:
1010                if self.begin_allbeds.match(line):
1011                    self.in_allbeds = True
1012                    return True
1013                else:
1014                    return False
1015            else:
1016                if self.end_allbeds.match(line):
1017                    self.in_allbeds = False
1018                else:
1019                    nodes = line.split('|')
1020                    tb = nodes.pop(0)
1021                    self.get_access(tb, nodes, user, tbparams)
1022                return True
1023
1024    class gateways:
1025        def __init__(self, eid, master, tmpdir, gw_pubkey,
1026                gw_secretkey, copy_file, fedkit):
1027            self.begin_gateways = \
1028                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1029            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1030            self.current_gateways = None
1031            self.control_gateway = None
1032            self.active_end = { }
1033
1034            self.eid = eid
1035            self.master = master
1036            self.tmpdir = tmpdir
1037            self.gw_pubkey_base = gw_pubkey
1038            self.gw_secretkey_base = gw_secretkey
1039
1040            self.copy_file = copy_file
1041            self.fedkit = fedkit
1042
1043
1044        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1045                active_end, tbparams, dtb, myname, desthost, type):
1046            """
1047            Produce a gateway configuration file from a gateways line.
1048            """
1049
1050            sproject = tbparams[gw].get('project', 'project')
1051            dproject = tbparams[dtb].get('project', 'project')
1052            sdomain = ".%s.%s%s" % (eid, sproject,
1053                    tbparams[gw].get('domain', ".example.com"))
1054            ddomain = ".%s.%s%s" % (eid, dproject,
1055                    tbparams[dtb].get('domain', ".example.com"))
1056            boss = tbparams[master].get('boss', "boss")
1057            fs = tbparams[master].get('fs', "fs")
1058            event_server = "%s%s" % \
1059                    (tbparams[gw].get('eventserver', "event_server"),
1060                            tbparams[gw].get('domain', "example.com"))
1061            remote_event_server = "%s%s" % \
1062                    (tbparams[dtb].get('eventserver', "event_server"),
1063                            tbparams[dtb].get('domain', "example.com"))
1064            seer_control = "%s%s" % \
1065                    (tbparams[gw].get('control', "control"), sdomain)
1066
1067            if self.fedkit:
1068                remote_script_dir = "/usr/local/federation/bin"
1069                local_script_dir = "/usr/local/federation/bin"
1070            else:
1071                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1072                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1073
1074            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1075            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1076            tunnel_cfg = tbparams[gw].get("tun", "false")
1077
1078            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1079            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1080
1081            # translate to lower case so the `hostname` hack for specifying
1082            # configuration files works.
1083            conf_file = conf_file.lower();
1084            remote_conf_file = remote_conf_file.lower();
1085
1086            if dtb == master:
1087                active = "false"
1088            elif gw == master:
1089                active = "true"
1090            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1091                active = "false"
1092            else:
1093                active_end['%s-%s' % (gw, dtb)] = 1
1094                active = "true"
1095
1096            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1097            print >>gwconfig, "Active: %s" % active
1098            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1099            print >>gwconfig, "BossName: %s" % boss
1100            print >>gwconfig, "FsName: %s" % fs
1101            print >>gwconfig, "EventServerName: %s" % event_server
1102            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1103            print >>gwconfig, "SeerControl: %s" % seer_control
1104            print >>gwconfig, "Type: %s" % type
1105            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1106            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1107                    local_script_dir
1108            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1109            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1110            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1111                    (remote_conf_dir, remote_conf_file)
1112            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1113            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1114            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1115            gwconfig.close()
1116
1117            return active == "true"
1118
1119        def __call__(self, line, allocated, tbparams):
1120            # Process gateways
1121            if not self.current_gateways:
1122                m = self.begin_gateways.match(line)
1123                if m:
1124                    self.current_gateways = m.group(1)
1125                    if allocated.has_key(self.current_gateways):
1126                        # This test should always succeed
1127                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1128                        if not os.path.exists(tb_dir):
1129                            try:
1130                                os.mkdir(tb_dir)
1131                            except IOError:
1132                                raise service_error(service_error.internal,
1133                                        "Cannot create %s" % tb_dir)
1134                    else:
1135                        # XXX
1136                        self.log.error("[gateways]: Ignoring gateways for " + \
1137                                "unknown testbed %s" % self.current_gateways)
1138                        self.current_gateways = None
1139                    return True
1140                else:
1141                    return False
1142            else:
1143                m = self.end_gateways.match(line)
1144                if m :
1145                    if m.group(1) != self.current_gateways:
1146                        raise service_error(service_error.internal,
1147                                "Mismatched gateway markers!?")
1148                    if self.control_gateway:
1149                        try:
1150                            cc = open("%s/%s/client.conf" %
1151                                    (self.tmpdir, self.current_gateways), 'w')
1152                            print >>cc, "ControlGateway: %s" % \
1153                                    self.control_gateway
1154                            if tbparams[self.master].has_key('smbshare'):
1155                                print >>cc, "SMBSHare: %s" % \
1156                                        tbparams[self.master]['smbshare']
1157                            print >>cc, "ProjectUser: %s" % \
1158                                    tbparams[self.master]['user']
1159                            print >>cc, "ProjectName: %s" % \
1160                                    tbparams[self.master]['project']
1161                            cc.close()
1162                        except IOError:
1163                            raise service_error(service_error.internal,
1164                                    "Error creating client config")
1165                        try:
1166                            cc = open("%s/%s/seer.conf" %
1167                                    (self.tmpdir, self.current_gateways),
1168                                    'w')
1169                            if self.current_gateways != self.master:
1170                                print >>cc, "ControlNode: %s" % \
1171                                        self.control_gateway
1172                            print >>cc, "ExperimentID: %s/%s" % \
1173                                    ( tbparams[self.master]['project'], \
1174                                    self.eid )
1175                            cc.close()
1176                        except IOError:
1177                            raise service_error(service_error.internal,
1178                                    "Error creating seer config")
1179                    else:
1180                        debug.error("[gateways]: No control gateway for %s" %\
1181                                    self.current_gateways)
1182                    self.current_gateways = None
1183                else:
1184                    dtb, myname, desthost, type = line.split(" ")
1185
1186                    if type == "control" or type == "both":
1187                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1188                                self.eid, 
1189                                tbparams[self.current_gateways]['project'],
1190                                tbparams[self.current_gateways]['domain'])
1191                    try:
1192                        active = self.gateway_conf_file(self.current_gateways,
1193                                self.master, self.eid, self.gw_pubkey_base,
1194                                self.gw_secretkey_base,
1195                                self.active_end, tbparams, dtb, myname,
1196                                desthost, type)
1197                    except IOError, e:
1198                        raise service_error(service_error.internal,
1199                                "Failed to write config file for %s" % \
1200                                        self.current_gateway)
1201           
1202                    gw_pubkey = "%s/keys/%s" % \
1203                            (self.tmpdir, self.gw_pubkey_base)
1204                    gw_secretkey = "%s/keys/%s" % \
1205                            (self.tmpdir, self.gw_secretkey_base)
1206
1207                    pkfile = "%s/%s/%s" % \
1208                            ( self.tmpdir, self.current_gateways, 
1209                                    self.gw_pubkey_base)
1210                    skfile = "%s/%s/%s" % \
1211                            ( self.tmpdir, self.current_gateways, 
1212                                    self.gw_secretkey_base)
1213
1214                    if not os.path.exists(pkfile):
1215                        try:
1216                            self.copy_file(gw_pubkey, pkfile)
1217                        except IOError:
1218                            service_error(service_error.internal,
1219                                    "Failed to copy pubkey file")
1220
1221                    if active and not os.path.exists(skfile):
1222                        try:
1223                            self.copy_file(gw_secretkey, skfile)
1224                        except IOError:
1225                            service_error(service_error.internal,
1226                                    "Failed to copy secretkey file")
1227                return True
1228
1229    class shunt_to_file:
1230        """
1231        Simple class to write data between two regexps to a file.
1232        """
1233        def __init__(self, begin, end, filename):
1234            """
1235            Begin shunting on a match of begin, stop on end, send data to
1236            filename.
1237            """
1238            self.begin = re.compile(begin)
1239            self.end = re.compile(end)
1240            self.in_shunt = False
1241            self.file = None
1242            self.filename = filename
1243
1244        def __call__(self, line):
1245            """
1246            Call this on each line in the input that may be shunted.
1247            """
1248            if not self.in_shunt:
1249                if self.begin.match(line):
1250                    self.in_shunt = True
1251                    try:
1252                        self.file = open(self.filename, "w")
1253                    except:
1254                        self.file = None
1255                        raise
1256                    return True
1257                else:
1258                    return False
1259            else:
1260                if self.end.match(line):
1261                    if self.file: 
1262                        self.file.close()
1263                        self.file = None
1264                    self.in_shunt = False
1265                else:
1266                    if self.file:
1267                        print >>self.file, line
1268                return True
1269
1270    class shunt_to_list:
1271        """
1272        Same interface as shunt_to_file.  Data collected in self.list, one list
1273        element per line.
1274        """
1275        def __init__(self, begin, end):
1276            self.begin = re.compile(begin)
1277            self.end = re.compile(end)
1278            self.in_shunt = False
1279            self.list = [ ]
1280       
1281        def __call__(self, line):
1282            if not self.in_shunt:
1283                if self.begin.match(line):
1284                    self.in_shunt = True
1285                    return True
1286                else:
1287                    return False
1288            else:
1289                if self.end.match(line):
1290                    self.in_shunt = False
1291                else:
1292                    self.list.append(line)
1293                return True
1294
1295    class shunt_to_string:
1296        """
1297        Same interface as shunt_to_file.  Data collected in self.str, all in
1298        one string.
1299        """
1300        def __init__(self, begin, end):
1301            self.begin = re.compile(begin)
1302            self.end = re.compile(end)
1303            self.in_shunt = False
1304            self.str = ""
1305       
1306        def __call__(self, line):
1307            if not self.in_shunt:
1308                if self.begin.match(line):
1309                    self.in_shunt = True
1310                    return True
1311                else:
1312                    return False
1313            else:
1314                if self.end.match(line):
1315                    self.in_shunt = False
1316                else:
1317                    self.str += line
1318                return True
1319
1320    def create_experiment(self, req, fid):
1321        """
1322        The external interface to experiment creation called from the
1323        dispatcher.
1324
1325        Creates a working directory, splits the incoming description using the
1326        splitter script and parses out the avrious subsections using the
1327        lcasses above.  Once each sub-experiment is created, use pooled threads
1328        to instantiate them and start it all up.
1329        """
1330        try:
1331            tmpdir = tempfile.mkdtemp(prefix="split-")
1332        except IOError:
1333            raise service_error(service_error.internal, "Cannot create tmp dir")
1334
1335        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1336        gw_secretkey_base = "fed.%s" % self.ssh_type
1337        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1338        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1339        tclfile = tmpdir + "/experiment.tcl"
1340        tbparams = { }
1341
1342        pid = "dummy"
1343        gid = "dummy"
1344        # XXX
1345        fail_soft = False
1346
1347        try:
1348            os.mkdir(tmpdir+"/keys")
1349        except OSError:
1350            raise service_error(service_error.internal,
1351                    "Can't make temporary dir")
1352
1353        req = req.get('CreateRequestBody', None)
1354        if not req:
1355            raise service_error(service_error.req,
1356                    "Bad request format (no CreateRequestBody)")
1357        # The tcl parser needs to read a file so put the content into that file
1358        descr=req.get('experimentdescription', None)
1359        if descr:
1360            file_content=descr.get('ns2description', None)
1361            if file_content:
1362                try:
1363                    f = open(tclfile, 'w')
1364                    f.write(file_content)
1365                    f.close()
1366                except IOError:
1367                    raise service_error(service_error.internal,
1368                            "Cannot write temp experiment description")
1369            else:
1370                raise service_error(service_error.req, 
1371                        "Only ns2descriptions supported")
1372        else:
1373            raise service_error(service_error.req, "No experiment description")
1374
1375        if req.has_key('experimentID') and \
1376                req['experimentID'].has_key('localname'):
1377            eid = req['experimentID']['localname']
1378            self.state_lock.acquire()
1379            while (self.state.has_key(eid)):
1380                eid += random.choice(string.ascii_letters)
1381            # To avoid another thread picking this localname
1382            self.state[eid] = "placeholder"
1383            self.state_lock.release()
1384        else:
1385            eid = self.exp_stem
1386            for i in range(0,5):
1387                eid += random.choice(string.ascii_letters)
1388            self.state_lock.acquire()
1389            while (self.state.has_key(eid)):
1390                eid = self.exp_stem
1391                for i in range(0,5):
1392                    eid += random.choice(string.ascii_letters)
1393            # To avoid another thread picking this localname
1394            self.state[eid] = "placeholder"
1395            self.state_lock.release()
1396
1397        try:
1398            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1399        except ValueError:
1400            raise service_error(service_error.server_config, 
1401                    "Bad key type (%s)" % self.ssh_type)
1402
1403        user = req.get('user', None)
1404        if user == None:
1405            raise service_error(service_error.req, "No user")
1406
1407        master = req.get('master', None)
1408        if master == None:
1409            raise service_error(service_error.req, "No master testbed label")
1410       
1411        if self.splitter_url:
1412            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1413            split_data = self.remote_splitter(self.splitter_url, file_content,
1414                    master)
1415        else:
1416            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1417                str(self.muxmax), '-m', master]
1418
1419            if self.fedkit:
1420                tclcmd.append('-k')
1421
1422            tclcmd.extend([pid, gid, eid, tclfile])
1423
1424            self.log.debug("running local splitter %s", " ".join(tclcmd))
1425            tclparser = Popen(tclcmd, stdout=PIPE)
1426            split_data = tclparser.stdout
1427
1428        allocated = { }     # Testbeds we can access
1429        started = { }       # Testbeds where a sub-experiment started
1430                            # successfully
1431
1432        # Objects to parse the splitter output (defined above)
1433        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1434        parse_allbeds = self.allbeds(self.get_access)
1435        parse_gateways = self.gateways(eid, master, tmpdir,
1436                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1437        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1438                    "^#\s+End\s+Vtopo")
1439        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1440                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1441        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1442                "^#\s+End\s+tarfiles")
1443        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1444                "^#\s+End\s+rpms")
1445
1446        # Worling on the split data
1447        for line in split_data:
1448            line = line.rstrip()
1449            if parse_current_testbed(line, master, allocated, tbparams):
1450                continue
1451            elif parse_allbeds(line, user, tbparams):
1452                continue
1453            elif parse_gateways(line, allocated, tbparams):
1454                continue
1455            elif parse_vtopo(line):
1456                continue
1457            elif parse_hostnames(line):
1458                continue
1459            elif parse_tarfiles(line):
1460                continue
1461            elif parse_rpms(line):
1462                continue
1463            else:
1464                raise service_error(service_error.internal, 
1465                        "Bad tcl parse? %s" % line)
1466
1467        # Virtual topology and visualization
1468        vtopo = self.gentopo(parse_vtopo.str)
1469        if not vtopo:
1470            raise service_error(service_error.internal, 
1471                    "Failed to generate virtual topology")
1472
1473        vis = self.genviz(vtopo)
1474        if not vis:
1475            raise service_error(service_error.internal, 
1476                    "Failed to generate visualization")
1477
1478        # save federant information
1479        for k in allocated.keys():
1480            tbparams[k]['federant'] = {\
1481                    'name': [ { 'localname' : eid} ],\
1482                    'emulab': tbparams[k]['emulab'],\
1483                    'allocID' : tbparams[k]['allocID'],\
1484                    'master' : k == master,\
1485                }
1486
1487
1488        # Copy tarfiles and rpms needed at remote sites into a staging area
1489        try:
1490            if self.fedkit:
1491                parse_tarfiles.list.append(self.fedkit)
1492            for t in parse_tarfiles.list:
1493                if not os.path.exists("%s/tarfiles" % tmpdir):
1494                    os.mkdir("%s/tarfiles" % tmpdir)
1495                self.copy_file(t, "%s/tarfiles/%s" % \
1496                        (tmpdir, os.path.basename(t)))
1497            for r in parse_rpms.list:
1498                if not os.path.exists("%s/rpms" % tmpdir):
1499                    os.mkdir("%s/rpms" % tmpdir)
1500                self.copy_file(r, "%s/rpms/%s" % \
1501                        (tmpdir, os.path.basename(r)))
1502        except IOError, e:
1503            raise service_error(service_error.internal, 
1504                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1505
1506        thread_pool_info = self.thread_pool()
1507        threads = [ ]
1508
1509        for tb in [ k for k in allocated.keys() if k != master]:
1510            # Wait until we have a free slot to start the next testbed load
1511            thread_pool_info.acquire()
1512            while thread_pool_info.started - \
1513                    thread_pool_info.terminated >= self.nthreads:
1514                thread_pool_info.wait()
1515            thread_pool_info.release()
1516
1517            # Create and start a thread to start the segment, and save it to
1518            # get the return value later
1519            t  = self.pooled_thread(target=self.start_segment, 
1520                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1521                    pdata=thread_pool_info, trace_file=self.trace_file)
1522            threads.append(t)
1523            t.start()
1524
1525        # Wait until all finish (the first clause of the while is to make sure
1526        # one starts)
1527        thread_pool_info.acquire()
1528        while thread_pool_info.started == 0 or \
1529                thread_pool_info.started > thread_pool_info.terminated:
1530            thread_pool_info.wait()
1531        thread_pool_info.release()
1532
1533        # If none failed, start the master
1534        failed = [ t.getName() for t in threads if not t.rv ]
1535
1536        if len(failed) == 0:
1537            if not self.start_segment(master, eid, tbparams, tmpdir):
1538                failed.append(master)
1539
1540        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1541        # If one failed clean up, unless fail_soft is set
1542        if failed:
1543            if not fail_soft:
1544                for tb in succeeded:
1545                    self.stop_segment(tb, eid, tbparams)
1546                # Remove the placeholder
1547                self.state_lock.acquire()
1548                del self.state[eid]
1549                self.state_lock.release()
1550
1551                raise service_error(service_error.federant,
1552                    "Swap in failed on %s" % ",".join(failed))
1553        else:
1554            self.log.info("[start_segment]: Experiment %s started" % eid)
1555
1556        # Generate an ID for the experiment (slice) and a certificate that the
1557        # allocator can use to prove they own it.  We'll ship it back through
1558        # the encrypted connection.
1559        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1560
1561        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1562
1563        # Walk up tmpdir, deleting as we go
1564        for path, dirs, files in os.walk(tmpdir, topdown=False):
1565            for f in files:
1566                os.remove(os.path.join(path, f))
1567            for d in dirs:
1568                os.rmdir(os.path.join(path, d))
1569        os.rmdir(tmpdir)
1570
1571        # The deepcopy prevents the allocation ID and other binaries from being
1572        # translated into other formats
1573        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1574                for tb in tbparams.keys() \
1575                    if tbparams[tb].has_key('federant') ],\
1576                    'vtopo': vtopo,\
1577                    'vis' : vis,
1578                    'experimentID' : [\
1579                            { 'fedid': copy.copy(expid) }, \
1580                            { 'localname': eid },\
1581                        ],\
1582                    'experimentAccess': { 'X509' : expcert },\
1583                }
1584
1585        # Insert the experiment into our state and update the disk copy
1586        self.state_lock.acquire()
1587        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1588                for tb in tbparams.keys() \
1589                    if tbparams[tb].has_key('federant') ],\
1590                    'vtopo': vtopo,\
1591                    'vis' : vis,
1592                    'experimentID' : [\
1593                            { 'fedid': expid }, { 'localname': eid },\
1594                        ],\
1595                }
1596        self.state[eid] = self.state[expid]
1597        if self.state_filename: self.write_state()
1598        self.state_lock.release()
1599
1600        if not failed:
1601            return resp
1602        else:
1603            raise service_error(service_error.partial, \
1604                    "Partial swap in on %s" % ",".join(succeeded))
1605
1606
1607    def get_vtopo(self, req, fid):
1608        """
1609        Return the stored virtual topology for this experiment
1610        """
1611        rv = None
1612
1613        req = req.get('VtopoRequestBody', None)
1614        if not req:
1615            raise service_error(service_error.req,
1616                    "Bad request format (no VtopoRequestBody)")
1617        exp = req.get('experiment', None)
1618        if exp:
1619            if exp.has_key('fedid'):
1620                key = exp['fedid']
1621                keytype = "fedid"
1622            elif exp.has_key('localname'):
1623                key = exp['localname']
1624                keytype = "localname"
1625            else:
1626                raise service_error(service_error.req, "Unknown lookup type")
1627        else:
1628            raise service_error(service_error.req, "No request?")
1629
1630        self.state_lock.acquire()
1631        if self.state.has_key(key):
1632            rv = { 'experiment' : {keytype: key },\
1633                    'vtopo': self.state[key]['vtopo'],\
1634                }
1635        self.state_lock.release()
1636
1637        if rv: return rv
1638        else: raise service_error(service_error.req, "No such experiment")
1639
1640    def get_vis(self, req, fid):
1641        """
1642        Return the stored visualization for this experiment
1643        """
1644        rv = None
1645
1646        req = req.get('VisRequestBody', None)
1647        if not req:
1648            raise service_error(service_error.req,
1649                    "Bad request format (no VisRequestBody)")
1650        exp = req.get('experiment', None)
1651        if exp:
1652            if exp.has_key('fedid'):
1653                key = exp['fedid']
1654                keytype = "fedid"
1655            elif exp.has_key('localname'):
1656                key = exp['localname']
1657                keytype = "localname"
1658            else:
1659                raise service_error(service_error.req, "Unknown lookup type")
1660        else:
1661            raise service_error(service_error.req, "No request?")
1662
1663        self.state_lock.acquire()
1664        if self.state.has_key(key):
1665            rv =  { 'experiment' : {keytype: key },\
1666                    'vis': self.state[key]['vis'],\
1667                    }
1668        self.state_lock.release()
1669
1670        if rv: return rv
1671        else: raise service_error(service_error.req, "No such experiment")
1672
1673    def get_info(self, req, fid):
1674        """
1675        Return all the stored info about this experiment
1676        """
1677        rv = None
1678
1679        req = req.get('InfoRequestBody', None)
1680        if not req:
1681            raise service_error(service_error.req,
1682                    "Bad request format (no VisRequestBody)")
1683        exp = req.get('experiment', None)
1684        if exp:
1685            if exp.has_key('fedid'):
1686                key = exp['fedid']
1687                keytype = "fedid"
1688            elif exp.has_key('localname'):
1689                key = exp['localname']
1690                keytype = "localname"
1691            else:
1692                raise service_error(service_error.req, "Unknown lookup type")
1693        else:
1694            raise service_error(service_error.req, "No request?")
1695
1696        # The state may be massaged by the service function that called
1697        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1698        # state.
1699        self.state_lock.acquire()
1700        if self.state.has_key(key):
1701            rv = copy.deepcopy(self.state[key])
1702        self.state_lock.release()
1703
1704        if rv: return rv
1705        else: raise service_error(service_error.req, "No such experiment")
1706
1707
1708    def terminate_experiment(self, req, fid):
1709        """
1710        Swap this experiment out on the federants and delete the shared
1711        information
1712        """
1713        tbparams = { }
1714        req = req.get('TerminateRequestBody', None)
1715        if not req:
1716            raise service_error(service_error.req,
1717                    "Bad request format (no TerminateRequestBody)")
1718        exp = req.get('experiment', None)
1719        if exp:
1720            if exp.has_key('fedid'):
1721                key = exp['fedid']
1722                keytype = "fedid"
1723            elif exp.has_key('localname'):
1724                key = exp['localname']
1725                keytype = "localname"
1726            else:
1727                raise service_error(service_error.req, "Unknown lookup type")
1728        else:
1729            raise service_error(service_error.req, "No request?")
1730
1731        self.state_lock.acquire()
1732        fed_exp = self.state.get(key, None)
1733
1734        if fed_exp:
1735            # This branch of the conditional holds the lock to generate a
1736            # consistent temporary tbparams variable to deallocate experiments.
1737            # It releases the lock to do the deallocations and reacquires it to
1738            # remove the experiment state when the termination is complete.
1739            ids = []
1740            #  experimentID is a list of dicts that are self-describing
1741            #  identifiers.  This finds all the fedids and localnames - the
1742            #  keys of self.state - and puts them into ids.
1743            for id in fed_exp.get('experimentID', []):
1744                if id.has_key('fedid'): ids.append(id['fedid'])
1745                if id.has_key('localname'): ids.append(id['localname'])
1746
1747            # Construct enough of the tbparams to make the stop_segment calls
1748            # work
1749            for fed in fed_exp['federant']:
1750                try:
1751                    for e in fed['name']:
1752                        eid = e.get('localname', None)
1753                        if eid: break
1754                    else:
1755                        continue
1756
1757                    p = fed['emulab']['project']
1758
1759                    project = p['name']['localname']
1760                    tb = p['testbed']['localname']
1761                    user = p['user'][0]['userID']['localname']
1762
1763                    domain = fed['emulab']['domain']
1764                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1765                    aid = fed['allocID']
1766                except KeyError, e:
1767                    continue
1768                tbparams[tb] = {\
1769                        'user': user,\
1770                        'domain': domain,\
1771                        'project': project,\
1772                        'host': host,\
1773                        'eid': eid,\
1774                        'aid': aid,\
1775                    }
1776            self.state_lock.release()
1777
1778            # Stop everyone.
1779            for tb in tbparams.keys():
1780                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1781
1782            # release the allocations
1783            for tb in tbparams.keys():
1784                self.release_access(tb, tbparams[tb]['aid'])
1785
1786            # Remove the terminated experiment
1787            self.state_lock.acquire()
1788            for id in ids:
1789                if self.state.has_key(id): del self.state[id]
1790
1791            if self.state_filename: self.write_state()
1792            self.state_lock.release()
1793
1794            return { 'experiment': exp }
1795        else:
1796            # Don't forget to release the lock
1797            self.state_lock.release()
1798            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.