source: fedd/fedd_experiment_control.py @ 51cc9df

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 51cc9df was 51cc9df, checked in by Ted Faber <faber@…>, 16 years ago

split fedid out

  • Property mode set to 100644
File size: 52.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import xml.parsers.expat
11
12import re
13import random
14import string
15import subprocess
16import tempfile
17import copy
18import pickle
19
20import traceback
21
22from threading import *
23
24from subprocess import *
25
26from fedd_services import *
27from fedd_internal_services import *
28from fedd_util import *
29from fedid import fedid, generate_fedid
30from remote_service import xmlrpc_handler, soap_handler, service_caller
31import parse_detail
32from service_error import *
33
34import logging
35
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.experiment_control")
40fl.addHandler(nullHandler())
41
42class fedd_experiment_control_local:
43    """
44    Control of experiments that this system can directly access.
45
46    Includes experiment creation, termination and information dissemination.
47    Thred safe.
48    """
49   
50    class thread_pool:
51        """
52        A class to keep track of a set of threads all invoked for the same
53        task.  Manages the mutual exclusion of the states.
54        """
55        def __init__(self):
56            """
57            Start a pool.
58            """
59            self.changed = Condition()
60            self.started = 0
61            self.terminated = 0
62
63        def acquire(self):
64            """
65            Get the pool's lock.
66            """
67            self.changed.acquire()
68
69        def release(self):
70            """
71            Release the pool's lock.
72            """
73            self.changed.release()
74
75        def wait(self, timeout = None):
76            """
77            Wait for a pool thread to start or stop.
78            """
79            self.changed.wait(timeout)
80
81        def start(self):
82            """
83            Called by a pool thread to report starting.
84            """
85            self.changed.acquire()
86            self.started += 1
87            self.changed.notifyAll()
88            self.changed.release()
89
90        def terminate(self):
91            """
92            Called by a pool thread to report finishing.
93            """
94            self.changed.acquire()
95            self.terminated += 1
96            self.changed.notifyAll()
97            self.changed.release()
98
99        def clear(self):
100            """
101            Clear all pool data.
102            """
103            self.changed.acquire()
104            self.started = 0
105            self.terminated =0
106            self.changed.notifyAll()
107            self.changed.release()
108
109    class pooled_thread(Thread):
110        """
111        One of a set of threads dedicated to a specific task.  Uses the
112        thread_pool class above for coordination.
113        """
114        def __init__(self, group=None, target=None, name=None, args=(), 
115                kwargs={}, pdata=None, trace_file=None):
116            Thread.__init__(self, group, target, name, args, kwargs)
117            self.rv = None          # Return value of the ops in this thread
118            self.exception = None   # Exception that terminated this thread
119            self.target=target      # Target function to run on start()
120            self.args = args        # Args to pass to target
121            self.kwargs = kwargs    # Additional kw args
122            self.pdata = pdata      # thread_pool for this class
123            # Logger for this thread
124            self.log = logging.getLogger("fedd.experiment_control")
125       
126        def run(self):
127            """
128            Emulate Thread.run, except add pool data manipulation and error
129            logging.
130            """
131            if self.pdata:
132                self.pdata.start()
133
134            if self.target:
135                try:
136                    self.rv = self.target(*self.args, **self.kwargs)
137                except service_error, s:
138                    self.exception = s
139                    self.log.error("Thread exception: %s %s" % \
140                            (s.code_string(), s.desc))
141                except:
142                    self.exception = sys.exc_info()[1]
143                    self.log.error(("Unexpected thread exception: %s" +\
144                            "Trace %s") % (self.exception,\
145                                traceback.format_exc()))
146            if self.pdata:
147                self.pdata.terminate()
148
149    call_RequestAccess = service_caller('RequestAccess',
150            'getfeddPortType', feddServiceLocator, 
151            RequestAccessRequestMessage, 'RequestAccessRequestBody')
152
153    call_ReleaseAccess = service_caller('ReleaseAccess',
154            'getfeddPortType', feddServiceLocator, 
155            ReleaseAccessRequestMessage, 'ReleaseAccessRequestBody')
156
157    call_Ns2Split = service_caller('Ns2Split',
158            'getfeddInternalPortType', feddInternalServiceLocator, 
159            Ns2SplitRequestMessage, 'Ns2SplitRequestBody')
160
161    def __init__(self, config=None):
162        """
163        Intialize the various attributes, most from the config object
164        """
165        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
166        self.thread_pool = fedd_experiment_control_local.thread_pool
167
168        self.cert_file = None
169        self.cert_pwd = None
170        self.trusted_certs = None
171
172        # Walk through the various relevant certificat specifying config
173        # attributes until the local certificate attributes can be resolved.
174        # The walk is from most specific to most general specification.
175        for s in ("experiment_control", "globals"):
176            if config.has_section(s): 
177                if config.has_option(s, "cert_file"):
178                    if not self.cert_file:
179                        self.cert_file = config.get(s, "cert_file")
180                        self.cert_pwd = config.get(s, "cert_pwd")
181
182                if config.has_option(s, "trusted_certs"):
183                    if not self.trusted_certs:
184                        self.trusted_certs = config.get(s, "trusted_certs")
185
186        self.exp_stem = "fed-stem"
187        self.log = logging.getLogger("fedd.experiment_control")
188        self.muxmax = 2
189        self.nthreads = 2
190        self.randomize_experiments = False
191
192        self.scp_exec = "/usr/bin/scp"
193        self.splitter = None
194        self.ssh_exec="/usr/bin/ssh"
195        self.ssh_keygen = "/usr/bin/ssh-keygen"
196        self.ssh_identity_file = None
197
198        if config.has_section("experiment_control"):
199            self.debug = config.get("experiment_control", "create_debug")
200            self.state_filename = config.get("experiment_control", 
201                    "experiment_state_file")
202            self.splitter_url = config.get("experiment_control", "splitter_url")
203            self.fedkit = config.get("experiment_control", "fedkit")
204        else:
205            self.debug = False
206            self.state_filename = None
207            self.splitter_url = None
208            self.fedkit = None
209
210        # XXX
211        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
212        self.ssh_type = "rsa"
213        self.state = { }
214        self.state_lock = Lock()
215        self.tclsh = "/usr/local/bin/otclsh"
216        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
217        self.tbmap = { 
218                'deter':'https://users.isi.deterlab.net:23235',
219                'emulab':'https://users.isi.deterlab.net:23236',
220                'ucb':'https://users.isi.deterlab.net:23237',
221                }
222        self.trace_file = sys.stderr
223
224        self.def_expstart = \
225                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
226                "/tmp/federate";
227        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
228                "FEDDIR/hosts";
229        self.def_gwstart = \
230                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
231                "/tmp/bridge.log";
232        self.def_mgwstart = \
233                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
234                "/tmp/bridge.log";
235        self.def_gwimage = "FBSD61-TUNNEL2";
236        self.def_gwtype = "pc";
237
238
239        if self.ssh_pubkey_file:
240            try:
241                f = open(self.ssh_pubkey_file, 'r')
242                self.ssh_pubkey = f.read()
243                f.close()
244            except IOError:
245                raise service_error(service_error.internal,
246                        "Cannot read sshpubkey")
247
248        set_log_level(config, "experiment_control", self.log)
249
250        # Grab saved state.  OK to do this w/o locking because it's read only
251        # and only one thread should be in existence that can see self.state at
252        # this point.
253        if self.state_filename:
254            self.read_state()
255
256        # Dispatch tables
257        self.soap_services = {\
258                'Create': soap_handler(\
259                        CreateRequestMessage.typecode,
260                        getattr(self, "create_experiment"), 
261                        CreateResponseMessage,
262                        "CreateResponseBody"),
263                'Vtopo': soap_handler(\
264                        VtopoRequestMessage.typecode,
265                        getattr(self, "get_vtopo"),
266                        VtopoResponseMessage,
267                        "VtopoResponseBody"),
268                'Vis': soap_handler(\
269                        VisRequestMessage.typecode,
270                        getattr(self, "get_vis"),
271                        VisResponseMessage,
272                        "VisResponseBody"),
273                'Info': soap_handler(\
274                        InfoRequestMessage.typecode,
275                        getattr(self, "get_info"),
276                        InfoResponseMessage,
277                        "InfoResponseBody"),
278                'Terminate': soap_handler(\
279                        TerminateRequestMessage.typecode,
280                        getattr(self, "terminate_experiment"),
281                        TerminateResponseMessage,
282                        "TerminateResponseBody"),
283        }
284
285        self.xmlrpc_services = {\
286                'Create': xmlrpc_handler(\
287                        getattr(self, "create_experiment"), 
288                        "CreateResponseBody"),
289                'Vtopo': xmlrpc_handler(\
290                        getattr(self, "get_vtopo"),
291                        "VtopoResponseBody"),
292                'Vis': xmlrpc_handler(\
293                        getattr(self, "get_vis"),
294                        "VisResponseBody"),
295                'Info': xmlrpc_handler(\
296                        getattr(self, "get_info"),
297                        "InfoResponseBody"),
298                'Terminate': xmlrpc_handler(\
299                        getattr(self, "terminate_experiment"),
300                        "TerminateResponseBody"),
301        }
302
303    def copy_file(self, src, dest, size=1024):
304        """
305        Exceedingly simple file copy.
306        """
307        s = open(src,'r')
308        d = open(dest, 'w')
309
310        buf = "x"
311        while buf != "":
312            buf = s.read(size)
313            d.write(buf)
314        s.close()
315        d.close()
316
317    # Call while holding self.state_lock
318    def write_state(self):
319        """
320        Write a new copy of experiment state after copying the existing state
321        to a backup.
322
323        State format is a simple pickling of the state dictionary.
324        """
325        if os.access(self.state_filename, os.W_OK):
326            self.copy_file(self.state_filename, \
327                    "%s.bak" % self.state_filename)
328        try:
329            f = open(self.state_filename, 'w')
330            pickle.dump(self.state, f)
331        except IOError, e:
332            self.log.error("Can't write file %s: %s" % \
333                    (self.state_filename, e))
334        except pickle.PicklingError, e:
335            self.log.error("Pickling problem: %s" % e)
336        except TypeError, e:
337            self.log.error("Pickling problem (TypeError): %s" % e)
338
339    # Call while holding self.state_lock
340    def read_state(self):
341        """
342        Read a new copy of experiment state.  Old state is overwritten.
343
344        State format is a simple pickling of the state dictionary.
345        """
346        try:
347            f = open(self.state_filename, "r")
348            self.state = pickle.load(f)
349            self.log.debug("[read_state]: Read state from %s" % \
350                    self.state_filename)
351        except IOError, e:
352            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
353                    % (self.state_filename, e))
354        except pickle.UnpicklingError, e:
355            self.log.warning(("[read_state]: No saved state: " + \
356                    "Unpickling failed: %s") % e)
357
358    def scp_file(self, file, user, host, dest=""):
359        """
360        scp a file to the remote host.  If debug is set the action is only
361        logged.
362        """
363
364        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
365        rv = 0
366
367        try:
368            dnull = open("/dev/null", "r")
369        except IOError:
370            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
371            dnull = Null
372
373        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
374        if not self.debug:
375            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
376            else: rv = call(scp_cmd)
377
378        return rv == 0
379
380    def ssh_cmd(self, user, host, cmd, wname=None):
381        """
382        Run a remote command on host as user.  If debug is set, the action is
383        only logged.
384        """
385        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
386
387        try:
388            dnull = open("/dev/null", "r")
389        except IOError:
390            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
391            dnull = Null
392
393        self.log.debug("[ssh_cmd]: %s" % sh_str)
394        if not self.debug:
395            if dnull:
396                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
397            else:
398                sub = Popen(sh_str, shell=True)
399            return sub.wait() == 0
400        else:
401            return True
402
403    def ship_configs(self, host, user, src_dir, dest_dir):
404        """
405        Copy federant-specific configuration files to the federant.
406        """
407        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
408            return False
409        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
410            return False
411
412        for f in os.listdir(src_dir):
413            if os.path.isdir(f):
414                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
415                        "%s/%s" % (dest_dir, f)):
416                    return False
417            else:
418                if not self.scp_file("%s/%s" % (src_dir, f), 
419                        user, host, dest_dir):
420                    return False
421        return True
422
423    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
424        """
425        Start a sub-experiment on a federant.
426
427        Get the current state, modify or create as appropriate, ship data and
428        configs and start the experiment.  There are small ordering differences
429        based on the initial state of the sub-experiment.
430        """
431        # ops node in the federant
432        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
433        user = tbparams[tb]['user']     # federant user
434        pid = tbparams[tb]['project']   # federant project
435        # XXX
436        base_confs = ( "hosts",)
437        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
438        # command to test experiment state
439        expinfo_exec = "/usr/testbed/bin/expinfo" 
440        # Configuration directories on the remote machine
441        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
442        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
443        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
444        # Regular expressions to parse the expinfo response
445        state_re = re.compile("State:\s+(\w+)")
446        no_exp_re = re.compile("^No\s+such\s+experiment")
447        state = None    # Experiment state parsed from expinfo
448        # The expinfo ssh command
449        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
450
451        # Get status
452        self.log.debug("[start_segment]: %s"% " ".join(cmd))
453        dev_null = None
454        try:
455            dev_null = open("/dev/null", "a")
456        except IOError, e:
457            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
458
459        if self.debug:
460            state = 'swapped'
461            rv = 0
462        else:
463            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
464            for line in status.stdout:
465                m = state_re.match(line)
466                if m: state = m.group(1)
467                else:
468                    m = no_exp_re.match(line)
469                    if m: state = "none"
470            rv = status.wait()
471
472        # If the experiment is not present the subcommand returns a non-zero
473        # return value.  If we successfully parsed a "none" outcome, ignore the
474        # return code.
475        if rv != 0 and state != "none":
476            raise service_error(service_error.internal,
477                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
478
479        self.log.debug("[start_segment]: %s: %s" % (tb, state))
480        self.log.info("[start_segment]:transferring experiment to %s" % tb)
481
482        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
483            return False
484        # Clear the federation files
485        if not self.ssh_cmd(user, host, 
486                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
487            return False
488        if not self.ssh_cmd(user, host, 
489                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
490            return False
491        # Clear and create the tarfiles and rpm directories
492        for d in (tarfiles_dir, rpms_dir):
493            if not self.ssh_cmd(user, host, 
494                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
495                return False
496            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
497                    "create tarfiles"):
498                return False
499       
500        if state == 'active':
501            # Remote experiment is active.  Modify it.
502            for f in base_confs:
503                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
504                        "%s/%s" % (proj_dir, f)):
505                    return False
506            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
507                    proj_dir):
508                return False
509            if os.path.isdir("%s/tarfiles" % tmpdir):
510                if not self.ship_configs(host, user,
511                        "%s/tarfiles" % tmpdir, tarfiles_dir):
512                    return False
513            if os.path.isdir("%s/rpms" % tmpdir):
514                if not self.ship_configs(host, user,
515                        "%s/rpms" % tmpdir, tarfiles_dir):
516                    return False
517            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
518            if not self.ssh_cmd(user, host,
519                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
520                            (pid, eid, tclfile), "modexp"):
521                return False
522            return True
523        elif state == "swapped":
524            # Remote experiment swapped out.  Modify it and swap it in.
525            for f in base_confs:
526                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
527                        "%s/%s" % (proj_dir, f)):
528                    return False
529            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
530                    proj_dir):
531                return False
532            if os.path.isdir("%s/tarfiles" % tmpdir):
533                if not self.ship_configs(host, user,
534                        "%s/tarfiles" % tmpdir, tarfiles_dir):
535                    return False
536            if os.path.isdir("%s/rpms" % tmpdir):
537                if not self.ship_configs(host, user,
538                        "%s/rpms" % tmpdir, tarfiles_dir):
539                    return False
540            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
541            if not self.ssh_cmd(user, host,
542                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
543                    "modexp"):
544                return False
545            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
546            if not self.ssh_cmd(user, host,
547                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
548                    "swapexp"):
549                return False
550            return True
551        elif state == "none":
552            # No remote experiment.  Create one.  We do this in 2 steps so we
553            # can put the configuration files and scripts into the new
554            # experiment directories.
555
556            # Tarfiles must be present for creation to work
557            if os.path.isdir("%s/tarfiles" % tmpdir):
558                if not self.ship_configs(host, user,
559                        "%s/tarfiles" % tmpdir, tarfiles_dir):
560                    return False
561            if os.path.isdir("%s/rpms" % tmpdir):
562                if not self.ship_configs(host, user,
563                        "%s/rpms" % tmpdir, tarfiles_dir):
564                    return False
565            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
566            if not self.ssh_cmd(user, host,
567                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
568                            (pid, eid, tclfile), "startexp"):
569                return False
570            # After startexp the per-experiment directories exist
571            for f in base_confs:
572                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
573                        "%s/%s" % (proj_dir, f)):
574                    return False
575            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
576                    proj_dir):
577                return False
578            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
579            if not self.ssh_cmd(user, host,
580                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
581                    "swapexp"):
582                return False
583            return True
584        else:
585            self.log.debug("[start_segment]:unknown state %s" % state)
586            return False
587
588    def stop_segment(self, tb, eid, tbparams):
589        """
590        Stop a sub experiment by calling swapexp on the federant
591        """
592        user = tbparams[tb]['user']
593        host = tbparams[tb]['host']
594        pid = tbparams[tb]['project']
595
596        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
597        return self.ssh_cmd(user, host,
598                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
599
600       
601    def generate_ssh_keys(self, dest, type="rsa" ):
602        """
603        Generate a set of keys for the gateways to use to talk.
604
605        Keys are of type type and are stored in the required dest file.
606        """
607        valid_types = ("rsa", "dsa")
608        t = type.lower();
609        if t not in valid_types: raise ValueError
610        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
611
612        try:
613            trace = open("/dev/null", "w")
614        except IOError:
615            raise service_error(service_error.internal,
616                    "Cannot open /dev/null??");
617
618        # May raise CalledProcessError
619        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
620        rv = call(cmd, stdout=trace, stderr=trace)
621        if rv != 0:
622            raise service_error(service_error.internal, 
623                    "Cannot generate nonce ssh keys.  %s return code %d" \
624                            % (self.ssh_keygen, rv))
625
626    def gentopo(self, str):
627        """
628        Generate the topology dtat structure from the splitter's XML
629        representation of it.
630
631        The topology XML looks like:
632            <experiment>
633                <nodes>
634                    <node><vname></vname><ips>ip1:ip2</ips></node>
635                </nodes>
636                <lans>
637                    <lan>
638                        <vname></vname><vnode></vnode><ip></ip>
639                        <bandwidth></bandwidth><member>node:port</member>
640                    </lan>
641                </lans>
642        """
643        class topo_parse:
644            """
645            Parse the topology XML and create the dats structure.
646            """
647            def __init__(self):
648                # Typing of the subelements for data conversion
649                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
650                self.int_subelements = ( 'bandwidth',)
651                self.float_subelements = ( 'delay',)
652                # The final data structure
653                self.nodes = [ ]
654                self.lans =  [ ]
655                self.topo = { \
656                        'node': self.nodes,\
657                        'lan' : self.lans,\
658                    }
659                self.element = { }  # Current element being created
660                self.chars = ""     # Last text seen
661
662            def end_element(self, name):
663                # After each sub element the contents is added to the current
664                # element or to the appropriate list.
665                if name == 'node':
666                    self.nodes.append(self.element)
667                    self.element = { }
668                elif name == 'lan':
669                    self.lans.append(self.element)
670                    self.element = { }
671                elif name in self.str_subelements:
672                    self.element[name] = self.chars
673                    self.chars = ""
674                elif name in self.int_subelements:
675                    self.element[name] = int(self.chars)
676                    self.chars = ""
677                elif name in self.float_subelements:
678                    self.element[name] = float(self.chars)
679                    self.chars = ""
680
681            def found_chars(self, data):
682                self.chars += data.rstrip()
683
684
685        tp = topo_parse();
686        parser = xml.parsers.expat.ParserCreate()
687        parser.EndElementHandler = tp.end_element
688        parser.CharacterDataHandler = tp.found_chars
689
690        parser.Parse(str)
691
692        return tp.topo
693       
694
695    def genviz(self, topo):
696        """
697        Generate the visualization the virtual topology
698        """
699
700        neato = "/usr/local/bin/neato"
701        # These are used to parse neato output and to create the visualization
702        # file.
703        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
704        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
705                "%s</type></node>"
706
707        try:
708            # Node names
709            nodes = [ n['vname'] for n in topo['node'] ]
710            topo_lans = topo['lan']
711        except KeyError:
712            raise service_error(service_error.internal, "Bad topology")
713
714        lans = { }
715        links = { }
716
717        # Walk through the virtual topology, organizing the connections into
718        # 2-node connections (links) and more-than-2-node connections (lans).
719        # When a lan is created, it's added to the list of nodes (there's a
720        # node in the visualization for the lan).
721        for l in topo_lans:
722            if links.has_key(l['vname']):
723                if len(links[l['vname']]) < 2:
724                    links[l['vname']].append(l['vnode'])
725                else:
726                    nodes.append(l['vname'])
727                    lans[l['vname']] = links[l['vname']]
728                    del links[l['vname']]
729                    lans[l['vname']].append(l['vnode'])
730            elif lans.has_key(l['vname']):
731                lans[l['vname']].append(l['vnode'])
732            else:
733                links[l['vname']] = [ l['vnode'] ]
734
735
736        # Open up a temporary file for dot to turn into a visualization
737        try:
738            df, dotname = tempfile.mkstemp()
739            dotfile = os.fdopen(df, 'w')
740        except IOError:
741            raise service_error(service_error.internal,
742                    "Failed to open file in genviz")
743
744        # Generate a dot/neato input file from the links, nodes and lans
745        try:
746            print >>dotfile, "graph G {"
747            for n in nodes:
748                print >>dotfile, '\t"%s"' % n
749            for l in links.keys():
750                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
751            for l in lans.keys():
752                for n in lans[l]:
753                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
754            print >>dotfile, "}"
755            dotfile.close()
756        except TypeError:
757            raise service_error(service_error.internal,
758                    "Single endpoint link in vtopo")
759        except IOError:
760            raise service_error(service_error.internal, "Cannot write dot file")
761
762        # Use dot to create a visualization
763        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
764                '-Gpack=true', dotname], stdout=PIPE)
765
766        # Translate dot to vis format
767        vis_nodes = [ ]
768        vis = { 'node': vis_nodes }
769        for line in dot.stdout:
770            m = vis_re.match(line)
771            if m:
772                vn = m.group(1)
773                vis_node = {'name': vn, \
774                        'x': float(m.group(2)),\
775                        'y' : float(m.group(3)),\
776                    }
777                if vn in links.keys() or vn in lans.keys():
778                    vis_node['type'] = 'lan'
779                else:
780                    vis_node['type'] = 'node'
781                vis_nodes.append(vis_node)
782        rv = dot.wait()
783
784        os.remove(dotname)
785        if rv == 0 : return vis
786        else: return None
787
788    def get_access(self, tb, nodes, user, tbparam):
789        """
790        Get access to testbed through fedd and set the parameters for that tb
791        """
792
793        translate_attr = {
794            'slavenodestartcmd': 'expstart',
795            'slaveconnectorstartcmd': 'gwstart',
796            'masternodestartcmd': 'mexpstart',
797            'masterconnectorstartcmd': 'mgwstart',
798            'connectorimage': 'gwimage',
799            'connectortype': 'gwtype',
800            'tunnelcfg': 'tun',
801            'smbshare': 'smbshare',
802        }
803
804        uri = self.tbmap.get(tb, None)
805        if not uri:
806            raise service_error(serice_error.server_config, 
807                    "Unknown testbed: %s" % tb)
808
809        # The basic request
810        req = {\
811                'destinationTestbed' : { 'uri' : uri },
812                'user':  user,
813                'allocID' : { 'localname': 'test' },
814                # XXX: need to get service access stright
815                'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
816                'serviceAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ]
817            }
818       
819        # node resources if any
820        if nodes != None and len(nodes) > 0:
821            rnodes = [ ]
822            for n in nodes:
823                rn = { }
824                image, hw, count = n.split(":")
825                if image: rn['image'] = [ image ]
826                if hw: rn['hardware'] = [ hw ]
827                if count: rn['count'] = int(count)
828                rnodes.append(rn)
829            req['resources']= { }
830            req['resources']['node'] = rnodes
831
832        r = self.call_RequestAccess(uri, req, 
833                self.cert_file, self.cert_pwd, self.trusted_certs)
834
835        if r.has_key('RequestAccessResponseBody'):
836            r = r['RequestAccessResponseBody']
837        else:
838            raise service_error(service_error.protocol,
839                    "Bad proxy response")
840
841        e = r['emulab']
842        p = e['project']
843        tbparam[tb] = { 
844                "boss": e['boss'],
845                "host": e['ops'],
846                "domain": e['domain'],
847                "fs": e['fileServer'],
848                "eventserver": e['eventServer'],
849                "project": unpack_id(p['name']),
850                "emulab" : e,
851                "allocID" : r['allocID'],
852                }
853        # Make the testbed name be the label the user applied
854        p['testbed'] = {'localname': tb }
855
856        for u in p['user']:
857            tbparam[tb]['user'] = unpack_id(u['userID'])
858
859        for a in e['fedAttr']:
860            if a['attribute']:
861                key = translate_attr.get(a['attribute'].lower(), None)
862                if key:
863                    tbparam[tb][key]= a['value']
864       
865    def release_access(self, tb, aid):
866        """
867        Release access to testbed through fedd
868        """
869
870        uri = self.tbmap.get(tb, None)
871        if not uri:
872            raise service_error(serice_error.server_config, 
873                    "Unknown testbed: %s" % tb)
874
875        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
876                self.cert_file, self.cert_pwd, self.trusted_certs)
877
878        # better error coding
879
880    def remote_splitter(self, uri, desc, master):
881
882        req = {
883                'description' : { 'ns2description': desc },
884                'master': master,
885                'include_fedkit': bool(self.fedkit)
886            }
887
888        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
889                self.trusted_certs)
890
891        if r.has_key('Ns2SplitResponseBody'):
892            r = r['Ns2SplitResponseBody']
893            if r.has_key('output'):
894                return r['output'].splitlines()
895            else:
896                raise service_error(service_error.protocol, 
897                        "Bad splitter response (no output)")
898        else:
899            raise service_error(service_error.protocol, "Bad splitter response")
900       
901    class current_testbed:
902        """
903        Object for collecting the current testbed description.  The testbed
904        description is saved to a file with the local testbed variables
905        subsittuted line by line.
906        """
907        def __init__(self, eid, tmpdir, fedkit):
908            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
909            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
910            self.current_testbed = None
911            self.testbed_file = None
912
913            self.def_expstart = \
914                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
915            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
916            self.def_gwstart = \
917                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
918            self.def_mgwstart = \
919                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
920            self.def_gwimage = "FBSD61-TUNNEL2";
921            self.def_gwtype = "pc";
922
923            self.eid = eid
924            self.tmpdir = tmpdir
925            self.fedkit = fedkit
926
927        def __call__(self, line, master, allocated, tbparams):
928            # Capture testbed topology descriptions
929            if self.current_testbed == None:
930                m = self.begin_testbed.match(line)
931                if m != None:
932                    self.current_testbed = m.group(1)
933                    if self.current_testbed == None:
934                        raise service_error(service_error.req,
935                                "Bad request format (unnamed testbed)")
936                    allocated[self.current_testbed] = \
937                            allocated.get(self.current_testbed,0) + 1
938                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
939                    if not os.path.exists(tb_dir):
940                        try:
941                            os.mkdir(tb_dir)
942                        except IOError:
943                            raise service_error(service_error.internal,
944                                    "Cannot create %s" % tb_dir)
945                    try:
946                        self.testbed_file = open("%s/%s.%s.tcl" %
947                                (tb_dir, self.eid, self.current_testbed), 'w')
948                    except IOError:
949                        self.testbed_file = None
950                    return True
951                else: return False
952            else:
953                m = self.end_testbed.match(line)
954                if m != None:
955                    if m.group(1) != self.current_testbed:
956                        raise service_error(service_error.internal, 
957                                "Mismatched testbed markers!?")
958                    if self.testbed_file != None: 
959                        self.testbed_file.close()
960                        self.testbed_file = None
961                    self.current_testbed = None
962                elif self.testbed_file:
963                    # Substitute variables and put the line into the local
964                    # testbed file.
965                    gwtype = tbparams[self.current_testbed].get('gwtype', 
966                            self.def_gwtype)
967                    gwimage = tbparams[self.current_testbed].get('gwimage', 
968                            self.def_gwimage)
969                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
970                            self.def_mgwstart)
971                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
972                            self.def_mexpstart)
973                    gwstart = tbparams[self.current_testbed].get('gwstart', 
974                            self.def_gwstart)
975                    expstart = tbparams[self.current_testbed].get('expstart', 
976                            self.def_expstart)
977                    project = tbparams[self.current_testbed].get('project')
978                    line = re.sub("GWTYPE", gwtype, line)
979                    line = re.sub("GWIMAGE", gwimage, line)
980                    if self.current_testbed == master:
981                        line = re.sub("GWSTART", mgwstart, line)
982                        line = re.sub("EXPSTART", mexpstart, line)
983                    else:
984                        line = re.sub("GWSTART", gwstart, line)
985                        line = re.sub("EXPSTART", expstart, line)
986                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
987                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
988                    line = re.sub("EID", self.eid, line)
989                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
990                            (project, self.eid), line)
991                    if self.fedkit:
992                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
993                                line)
994                    print >>self.testbed_file, line
995                return True
996
997    class allbeds:
998        """
999        Process the Allbeds section.  Get access to each federant and save the
1000        parameters in tbparams
1001        """
1002        def __init__(self, get_access):
1003            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1004            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1005            self.in_allbeds = False
1006            self.get_access = get_access
1007
1008        def __call__(self, line, user, tbparams):
1009            # Testbed access parameters
1010            if not self.in_allbeds:
1011                if self.begin_allbeds.match(line):
1012                    self.in_allbeds = True
1013                    return True
1014                else:
1015                    return False
1016            else:
1017                if self.end_allbeds.match(line):
1018                    self.in_allbeds = False
1019                else:
1020                    nodes = line.split('|')
1021                    tb = nodes.pop(0)
1022                    self.get_access(tb, nodes, user, tbparams)
1023                return True
1024
1025    class gateways:
1026        def __init__(self, eid, master, tmpdir, gw_pubkey,
1027                gw_secretkey, copy_file, fedkit):
1028            self.begin_gateways = \
1029                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1030            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1031            self.current_gateways = None
1032            self.control_gateway = None
1033            self.active_end = { }
1034
1035            self.eid = eid
1036            self.master = master
1037            self.tmpdir = tmpdir
1038            self.gw_pubkey_base = gw_pubkey
1039            self.gw_secretkey_base = gw_secretkey
1040
1041            self.copy_file = copy_file
1042            self.fedkit = fedkit
1043
1044
1045        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1046                active_end, tbparams, dtb, myname, desthost, type):
1047            """
1048            Produce a gateway configuration file from a gateways line.
1049            """
1050
1051            sproject = tbparams[gw].get('project', 'project')
1052            dproject = tbparams[dtb].get('project', 'project')
1053            sdomain = ".%s.%s%s" % (eid, sproject,
1054                    tbparams[gw].get('domain', ".example.com"))
1055            ddomain = ".%s.%s%s" % (eid, dproject,
1056                    tbparams[dtb].get('domain', ".example.com"))
1057            boss = tbparams[master].get('boss', "boss")
1058            fs = tbparams[master].get('fs', "fs")
1059            event_server = "%s%s" % \
1060                    (tbparams[gw].get('eventserver', "event_server"),
1061                            tbparams[gw].get('domain', "example.com"))
1062            remote_event_server = "%s%s" % \
1063                    (tbparams[dtb].get('eventserver', "event_server"),
1064                            tbparams[dtb].get('domain', "example.com"))
1065            seer_control = "%s%s" % \
1066                    (tbparams[gw].get('control', "control"), sdomain)
1067
1068            if self.fedkit:
1069                remote_script_dir = "/usr/local/federation/bin"
1070                local_script_dir = "/usr/local/federation/bin"
1071            else:
1072                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1073                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1074
1075            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1076            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1077            tunnel_cfg = tbparams[gw].get("tun", "false")
1078
1079            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1080            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1081
1082            # translate to lower case so the `hostname` hack for specifying
1083            # configuration files works.
1084            conf_file = conf_file.lower();
1085            remote_conf_file = remote_conf_file.lower();
1086
1087            if dtb == master:
1088                active = "false"
1089            elif gw == master:
1090                active = "true"
1091            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1092                active = "false"
1093            else:
1094                active_end['%s-%s' % (gw, dtb)] = 1
1095                active = "true"
1096
1097            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1098            print >>gwconfig, "Active: %s" % active
1099            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1100            print >>gwconfig, "BossName: %s" % boss
1101            print >>gwconfig, "FsName: %s" % fs
1102            print >>gwconfig, "EventServerName: %s" % event_server
1103            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1104            print >>gwconfig, "SeerControl: %s" % seer_control
1105            print >>gwconfig, "Type: %s" % type
1106            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1107            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1108                    local_script_dir
1109            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1110            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1111            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1112                    (remote_conf_dir, remote_conf_file)
1113            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1114            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1115            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1116            gwconfig.close()
1117
1118            return active == "true"
1119
1120        def __call__(self, line, allocated, tbparams):
1121            # Process gateways
1122            if not self.current_gateways:
1123                m = self.begin_gateways.match(line)
1124                if m:
1125                    self.current_gateways = m.group(1)
1126                    if allocated.has_key(self.current_gateways):
1127                        # This test should always succeed
1128                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1129                        if not os.path.exists(tb_dir):
1130                            try:
1131                                os.mkdir(tb_dir)
1132                            except IOError:
1133                                raise service_error(service_error.internal,
1134                                        "Cannot create %s" % tb_dir)
1135                    else:
1136                        # XXX
1137                        self.log.error("[gateways]: Ignoring gateways for " + \
1138                                "unknown testbed %s" % self.current_gateways)
1139                        self.current_gateways = None
1140                    return True
1141                else:
1142                    return False
1143            else:
1144                m = self.end_gateways.match(line)
1145                if m :
1146                    if m.group(1) != self.current_gateways:
1147                        raise service_error(service_error.internal,
1148                                "Mismatched gateway markers!?")
1149                    if self.control_gateway:
1150                        try:
1151                            cc = open("%s/%s/client.conf" %
1152                                    (self.tmpdir, self.current_gateways), 'w')
1153                            print >>cc, "ControlGateway: %s" % \
1154                                    self.control_gateway
1155                            if tbparams[self.master].has_key('smbshare'):
1156                                print >>cc, "SMBSHare: %s" % \
1157                                        tbparams[self.master]['smbshare']
1158                            print >>cc, "ProjectUser: %s" % \
1159                                    tbparams[self.master]['user']
1160                            print >>cc, "ProjectName: %s" % \
1161                                    tbparams[self.master]['project']
1162                            cc.close()
1163                        except IOError:
1164                            raise service_error(service_error.internal,
1165                                    "Error creating client config")
1166                        try:
1167                            cc = open("%s/%s/seer.conf" %
1168                                    (self.tmpdir, self.current_gateways),
1169                                    'w')
1170                            if self.current_gateways != self.master:
1171                                print >>cc, "ControlNode: %s" % \
1172                                        self.control_gateway
1173                            print >>cc, "ExperimentID: %s/%s" % \
1174                                    ( tbparams[self.master]['project'], \
1175                                    self.eid )
1176                            cc.close()
1177                        except IOError:
1178                            raise service_error(service_error.internal,
1179                                    "Error creating seer config")
1180                    else:
1181                        debug.error("[gateways]: No control gateway for %s" %\
1182                                    self.current_gateways)
1183                    self.current_gateways = None
1184                else:
1185                    dtb, myname, desthost, type = line.split(" ")
1186
1187                    if type == "control" or type == "both":
1188                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1189                                self.eid, 
1190                                tbparams[self.current_gateways]['project'],
1191                                tbparams[self.current_gateways]['domain'])
1192                    try:
1193                        active = self.gateway_conf_file(self.current_gateways,
1194                                self.master, self.eid, self.gw_pubkey_base,
1195                                self.gw_secretkey_base,
1196                                self.active_end, tbparams, dtb, myname,
1197                                desthost, type)
1198                    except IOError, e:
1199                        raise service_error(service_error.internal,
1200                                "Failed to write config file for %s" % \
1201                                        self.current_gateway)
1202           
1203                    gw_pubkey = "%s/keys/%s" % \
1204                            (self.tmpdir, self.gw_pubkey_base)
1205                    gw_secretkey = "%s/keys/%s" % \
1206                            (self.tmpdir, self.gw_secretkey_base)
1207
1208                    pkfile = "%s/%s/%s" % \
1209                            ( self.tmpdir, self.current_gateways, 
1210                                    self.gw_pubkey_base)
1211                    skfile = "%s/%s/%s" % \
1212                            ( self.tmpdir, self.current_gateways, 
1213                                    self.gw_secretkey_base)
1214
1215                    if not os.path.exists(pkfile):
1216                        try:
1217                            self.copy_file(gw_pubkey, pkfile)
1218                        except IOError:
1219                            service_error(service_error.internal,
1220                                    "Failed to copy pubkey file")
1221
1222                    if active and not os.path.exists(skfile):
1223                        try:
1224                            self.copy_file(gw_secretkey, skfile)
1225                        except IOError:
1226                            service_error(service_error.internal,
1227                                    "Failed to copy secretkey file")
1228                return True
1229
1230    class shunt_to_file:
1231        """
1232        Simple class to write data between two regexps to a file.
1233        """
1234        def __init__(self, begin, end, filename):
1235            """
1236            Begin shunting on a match of begin, stop on end, send data to
1237            filename.
1238            """
1239            self.begin = re.compile(begin)
1240            self.end = re.compile(end)
1241            self.in_shunt = False
1242            self.file = None
1243            self.filename = filename
1244
1245        def __call__(self, line):
1246            """
1247            Call this on each line in the input that may be shunted.
1248            """
1249            if not self.in_shunt:
1250                if self.begin.match(line):
1251                    self.in_shunt = True
1252                    try:
1253                        self.file = open(self.filename, "w")
1254                    except:
1255                        self.file = None
1256                        raise
1257                    return True
1258                else:
1259                    return False
1260            else:
1261                if self.end.match(line):
1262                    if self.file: 
1263                        self.file.close()
1264                        self.file = None
1265                    self.in_shunt = False
1266                else:
1267                    if self.file:
1268                        print >>self.file, line
1269                return True
1270
1271    class shunt_to_list:
1272        """
1273        Same interface as shunt_to_file.  Data collected in self.list, one list
1274        element per line.
1275        """
1276        def __init__(self, begin, end):
1277            self.begin = re.compile(begin)
1278            self.end = re.compile(end)
1279            self.in_shunt = False
1280            self.list = [ ]
1281       
1282        def __call__(self, line):
1283            if not self.in_shunt:
1284                if self.begin.match(line):
1285                    self.in_shunt = True
1286                    return True
1287                else:
1288                    return False
1289            else:
1290                if self.end.match(line):
1291                    self.in_shunt = False
1292                else:
1293                    self.list.append(line)
1294                return True
1295
1296    class shunt_to_string:
1297        """
1298        Same interface as shunt_to_file.  Data collected in self.str, all in
1299        one string.
1300        """
1301        def __init__(self, begin, end):
1302            self.begin = re.compile(begin)
1303            self.end = re.compile(end)
1304            self.in_shunt = False
1305            self.str = ""
1306       
1307        def __call__(self, line):
1308            if not self.in_shunt:
1309                if self.begin.match(line):
1310                    self.in_shunt = True
1311                    return True
1312                else:
1313                    return False
1314            else:
1315                if self.end.match(line):
1316                    self.in_shunt = False
1317                else:
1318                    self.str += line
1319                return True
1320
1321    def create_experiment(self, req, fid):
1322        """
1323        The external interface to experiment creation called from the
1324        dispatcher.
1325
1326        Creates a working directory, splits the incoming description using the
1327        splitter script and parses out the avrious subsections using the
1328        lcasses above.  Once each sub-experiment is created, use pooled threads
1329        to instantiate them and start it all up.
1330        """
1331        try:
1332            tmpdir = tempfile.mkdtemp(prefix="split-")
1333        except IOError:
1334            raise service_error(service_error.internal, "Cannot create tmp dir")
1335
1336        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1337        gw_secretkey_base = "fed.%s" % self.ssh_type
1338        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1339        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1340        tclfile = tmpdir + "/experiment.tcl"
1341        tbparams = { }
1342
1343        pid = "dummy"
1344        gid = "dummy"
1345        # XXX
1346        fail_soft = False
1347
1348        try:
1349            os.mkdir(tmpdir+"/keys")
1350        except OSError:
1351            raise service_error(service_error.internal,
1352                    "Can't make temporary dir")
1353
1354        req = req.get('CreateRequestBody', None)
1355        if not req:
1356            raise service_error(service_error.req,
1357                    "Bad request format (no CreateRequestBody)")
1358        # The tcl parser needs to read a file so put the content into that file
1359        descr=req.get('experimentdescription', None)
1360        if descr:
1361            file_content=descr.get('ns2description', None)
1362            if file_content:
1363                try:
1364                    f = open(tclfile, 'w')
1365                    f.write(file_content)
1366                    f.close()
1367                except IOError:
1368                    raise service_error(service_error.internal,
1369                            "Cannot write temp experiment description")
1370            else:
1371                raise service_error(service_error.req, 
1372                        "Only ns2descriptions supported")
1373        else:
1374            raise service_error(service_error.req, "No experiment description")
1375
1376        if req.has_key('experimentID') and \
1377                req['experimentID'].has_key('localname'):
1378            eid = req['experimentID']['localname']
1379            self.state_lock.acquire()
1380            while (self.state.has_key(eid)):
1381                eid += random.choice(string.ascii_letters)
1382            # To avoid another thread picking this localname
1383            self.state[eid] = "placeholder"
1384            self.state_lock.release()
1385        else:
1386            eid = self.exp_stem
1387            for i in range(0,5):
1388                eid += random.choice(string.ascii_letters)
1389            self.state_lock.acquire()
1390            while (self.state.has_key(eid)):
1391                eid = self.exp_stem
1392                for i in range(0,5):
1393                    eid += random.choice(string.ascii_letters)
1394            # To avoid another thread picking this localname
1395            self.state[eid] = "placeholder"
1396            self.state_lock.release()
1397
1398        try:
1399            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1400        except ValueError:
1401            raise service_error(service_error.server_config, 
1402                    "Bad key type (%s)" % self.ssh_type)
1403
1404        user = req.get('user', None)
1405        if user == None:
1406            raise service_error(service_error.req, "No user")
1407
1408        master = req.get('master', None)
1409        if master == None:
1410            raise service_error(service_error.req, "No master testbed label")
1411       
1412        if self.splitter_url:
1413            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1414            split_data = self.remote_splitter(self.splitter_url, file_content,
1415                    master)
1416        else:
1417            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1418                str(self.muxmax), '-m', master]
1419
1420            if self.fedkit:
1421                tclcmd.append('-k')
1422
1423            tclcmd.extend([pid, gid, eid, tclfile])
1424
1425            self.log.debug("running local splitter %s", " ".join(tclcmd))
1426            tclparser = Popen(tclcmd, stdout=PIPE)
1427            split_data = tclparser.stdout
1428
1429        allocated = { }     # Testbeds we can access
1430        started = { }       # Testbeds where a sub-experiment started
1431                            # successfully
1432
1433        # Objects to parse the splitter output (defined above)
1434        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1435        parse_allbeds = self.allbeds(self.get_access)
1436        parse_gateways = self.gateways(eid, master, tmpdir,
1437                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1438        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1439                    "^#\s+End\s+Vtopo")
1440        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1441                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1442        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1443                "^#\s+End\s+tarfiles")
1444        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1445                "^#\s+End\s+rpms")
1446
1447        # Worling on the split data
1448        for line in split_data:
1449            line = line.rstrip()
1450            if parse_current_testbed(line, master, allocated, tbparams):
1451                continue
1452            elif parse_allbeds(line, user, tbparams):
1453                continue
1454            elif parse_gateways(line, allocated, tbparams):
1455                continue
1456            elif parse_vtopo(line):
1457                continue
1458            elif parse_hostnames(line):
1459                continue
1460            elif parse_tarfiles(line):
1461                continue
1462            elif parse_rpms(line):
1463                continue
1464            else:
1465                raise service_error(service_error.internal, 
1466                        "Bad tcl parse? %s" % line)
1467
1468        # Virtual topology and visualization
1469        vtopo = self.gentopo(parse_vtopo.str)
1470        if not vtopo:
1471            raise service_error(service_error.internal, 
1472                    "Failed to generate virtual topology")
1473
1474        vis = self.genviz(vtopo)
1475        if not vis:
1476            raise service_error(service_error.internal, 
1477                    "Failed to generate visualization")
1478
1479        # save federant information
1480        for k in allocated.keys():
1481            tbparams[k]['federant'] = {\
1482                    'name': [ { 'localname' : eid} ],\
1483                    'emulab': tbparams[k]['emulab'],\
1484                    'allocID' : tbparams[k]['allocID'],\
1485                    'master' : k == master,\
1486                }
1487
1488
1489        # Copy tarfiles and rpms needed at remote sites into a staging area
1490        try:
1491            if self.fedkit:
1492                parse_tarfiles.list.append(self.fedkit)
1493            for t in parse_tarfiles.list:
1494                if not os.path.exists("%s/tarfiles" % tmpdir):
1495                    os.mkdir("%s/tarfiles" % tmpdir)
1496                self.copy_file(t, "%s/tarfiles/%s" % \
1497                        (tmpdir, os.path.basename(t)))
1498            for r in parse_rpms.list:
1499                if not os.path.exists("%s/rpms" % tmpdir):
1500                    os.mkdir("%s/rpms" % tmpdir)
1501                self.copy_file(r, "%s/rpms/%s" % \
1502                        (tmpdir, os.path.basename(r)))
1503        except IOError, e:
1504            raise service_error(service_error.internal, 
1505                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1506
1507        thread_pool_info = self.thread_pool()
1508        threads = [ ]
1509
1510        for tb in [ k for k in allocated.keys() if k != master]:
1511            # Wait until we have a free slot to start the next testbed load
1512            thread_pool_info.acquire()
1513            while thread_pool_info.started - \
1514                    thread_pool_info.terminated >= self.nthreads:
1515                thread_pool_info.wait()
1516            thread_pool_info.release()
1517
1518            # Create and start a thread to start the segment, and save it to
1519            # get the return value later
1520            t  = self.pooled_thread(target=self.start_segment, 
1521                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1522                    pdata=thread_pool_info, trace_file=self.trace_file)
1523            threads.append(t)
1524            t.start()
1525
1526        # Wait until all finish (the first clause of the while is to make sure
1527        # one starts)
1528        thread_pool_info.acquire()
1529        while thread_pool_info.started == 0 or \
1530                thread_pool_info.started > thread_pool_info.terminated:
1531            thread_pool_info.wait()
1532        thread_pool_info.release()
1533
1534        # If none failed, start the master
1535        failed = [ t.getName() for t in threads if not t.rv ]
1536
1537        if len(failed) == 0:
1538            if not self.start_segment(master, eid, tbparams, tmpdir):
1539                failed.append(master)
1540
1541        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1542        # If one failed clean up, unless fail_soft is set
1543        if failed:
1544            if not fail_soft:
1545                for tb in succeeded:
1546                    self.stop_segment(tb, eid, tbparams)
1547                # Remove the placeholder
1548                self.state_lock.acquire()
1549                del self.state[eid]
1550                self.state_lock.release()
1551
1552                raise service_error(service_error.federant,
1553                    "Swap in failed on %s" % ",".join(failed))
1554        else:
1555            self.log.info("[start_segment]: Experiment %s started" % eid)
1556
1557        # Generate an ID for the experiment (slice) and a certificate that the
1558        # allocator can use to prove they own it.  We'll ship it back through
1559        # the encrypted connection.
1560        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1561
1562        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1563
1564        # Walk up tmpdir, deleting as we go
1565        for path, dirs, files in os.walk(tmpdir, topdown=False):
1566            for f in files:
1567                os.remove(os.path.join(path, f))
1568            for d in dirs:
1569                os.rmdir(os.path.join(path, d))
1570        os.rmdir(tmpdir)
1571
1572        # The deepcopy prevents the allocation ID and other binaries from being
1573        # translated into other formats
1574        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1575                for tb in tbparams.keys() \
1576                    if tbparams[tb].has_key('federant') ],\
1577                    'vtopo': vtopo,\
1578                    'vis' : vis,
1579                    'experimentID' : [\
1580                            { 'fedid': copy.copy(expid) }, \
1581                            { 'localname': eid },\
1582                        ],\
1583                    'experimentAccess': { 'X509' : expcert },\
1584                }
1585
1586        # Insert the experiment into our state and update the disk copy
1587        self.state_lock.acquire()
1588        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1589                for tb in tbparams.keys() \
1590                    if tbparams[tb].has_key('federant') ],\
1591                    'vtopo': vtopo,\
1592                    'vis' : vis,
1593                    'experimentID' : [\
1594                            { 'fedid': expid }, { 'localname': eid },\
1595                        ],\
1596                }
1597        self.state[eid] = self.state[expid]
1598        if self.state_filename: self.write_state()
1599        self.state_lock.release()
1600
1601        if not failed:
1602            return resp
1603        else:
1604            raise service_error(service_error.partial, \
1605                    "Partial swap in on %s" % ",".join(succeeded))
1606
1607
1608    def get_vtopo(self, req, fid):
1609        """
1610        Return the stored virtual topology for this experiment
1611        """
1612        rv = None
1613
1614        req = req.get('VtopoRequestBody', None)
1615        if not req:
1616            raise service_error(service_error.req,
1617                    "Bad request format (no VtopoRequestBody)")
1618        exp = req.get('experiment', None)
1619        if exp:
1620            if exp.has_key('fedid'):
1621                key = exp['fedid']
1622                keytype = "fedid"
1623            elif exp.has_key('localname'):
1624                key = exp['localname']
1625                keytype = "localname"
1626            else:
1627                raise service_error(service_error.req, "Unknown lookup type")
1628        else:
1629            raise service_error(service_error.req, "No request?")
1630
1631        self.state_lock.acquire()
1632        if self.state.has_key(key):
1633            rv = { 'experiment' : {keytype: key },\
1634                    'vtopo': self.state[key]['vtopo'],\
1635                }
1636        self.state_lock.release()
1637
1638        if rv: return rv
1639        else: raise service_error(service_error.req, "No such experiment")
1640
1641    def get_vis(self, req, fid):
1642        """
1643        Return the stored visualization for this experiment
1644        """
1645        rv = None
1646
1647        req = req.get('VisRequestBody', None)
1648        if not req:
1649            raise service_error(service_error.req,
1650                    "Bad request format (no VisRequestBody)")
1651        exp = req.get('experiment', None)
1652        if exp:
1653            if exp.has_key('fedid'):
1654                key = exp['fedid']
1655                keytype = "fedid"
1656            elif exp.has_key('localname'):
1657                key = exp['localname']
1658                keytype = "localname"
1659            else:
1660                raise service_error(service_error.req, "Unknown lookup type")
1661        else:
1662            raise service_error(service_error.req, "No request?")
1663
1664        self.state_lock.acquire()
1665        if self.state.has_key(key):
1666            rv =  { 'experiment' : {keytype: key },\
1667                    'vis': self.state[key]['vis'],\
1668                    }
1669        self.state_lock.release()
1670
1671        if rv: return rv
1672        else: raise service_error(service_error.req, "No such experiment")
1673
1674    def get_info(self, req, fid):
1675        """
1676        Return all the stored info about this experiment
1677        """
1678        rv = None
1679
1680        req = req.get('InfoRequestBody', None)
1681        if not req:
1682            raise service_error(service_error.req,
1683                    "Bad request format (no VisRequestBody)")
1684        exp = req.get('experiment', None)
1685        if exp:
1686            if exp.has_key('fedid'):
1687                key = exp['fedid']
1688                keytype = "fedid"
1689            elif exp.has_key('localname'):
1690                key = exp['localname']
1691                keytype = "localname"
1692            else:
1693                raise service_error(service_error.req, "Unknown lookup type")
1694        else:
1695            raise service_error(service_error.req, "No request?")
1696
1697        # The state may be massaged by the service function that called
1698        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1699        # state.
1700        self.state_lock.acquire()
1701        if self.state.has_key(key):
1702            rv = copy.deepcopy(self.state[key])
1703        self.state_lock.release()
1704
1705        if rv: return rv
1706        else: raise service_error(service_error.req, "No such experiment")
1707
1708
1709    def terminate_experiment(self, req, fid):
1710        """
1711        Swap this experiment out on the federants and delete the shared
1712        information
1713        """
1714        tbparams = { }
1715        req = req.get('TerminateRequestBody', None)
1716        if not req:
1717            raise service_error(service_error.req,
1718                    "Bad request format (no TerminateRequestBody)")
1719        exp = req.get('experiment', None)
1720        if exp:
1721            if exp.has_key('fedid'):
1722                key = exp['fedid']
1723                keytype = "fedid"
1724            elif exp.has_key('localname'):
1725                key = exp['localname']
1726                keytype = "localname"
1727            else:
1728                raise service_error(service_error.req, "Unknown lookup type")
1729        else:
1730            raise service_error(service_error.req, "No request?")
1731
1732        self.state_lock.acquire()
1733        fed_exp = self.state.get(key, None)
1734
1735        if fed_exp:
1736            # This branch of the conditional holds the lock to generate a
1737            # consistent temporary tbparams variable to deallocate experiments.
1738            # It releases the lock to do the deallocations and reacquires it to
1739            # remove the experiment state when the termination is complete.
1740            ids = []
1741            #  experimentID is a list of dicts that are self-describing
1742            #  identifiers.  This finds all the fedids and localnames - the
1743            #  keys of self.state - and puts them into ids.
1744            for id in fed_exp.get('experimentID', []):
1745                if id.has_key('fedid'): ids.append(id['fedid'])
1746                if id.has_key('localname'): ids.append(id['localname'])
1747
1748            # Construct enough of the tbparams to make the stop_segment calls
1749            # work
1750            for fed in fed_exp['federant']:
1751                try:
1752                    for e in fed['name']:
1753                        eid = e.get('localname', None)
1754                        if eid: break
1755                    else:
1756                        continue
1757
1758                    p = fed['emulab']['project']
1759
1760                    project = p['name']['localname']
1761                    tb = p['testbed']['localname']
1762                    user = p['user'][0]['userID']['localname']
1763
1764                    domain = fed['emulab']['domain']
1765                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1766                    aid = fed['allocID']
1767                except KeyError, e:
1768                    continue
1769                tbparams[tb] = {\
1770                        'user': user,\
1771                        'domain': domain,\
1772                        'project': project,\
1773                        'host': host,\
1774                        'eid': eid,\
1775                        'aid': aid,\
1776                    }
1777            self.state_lock.release()
1778
1779            # Stop everyone.
1780            for tb in tbparams.keys():
1781                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1782
1783            # release the allocations
1784            for tb in tbparams.keys():
1785                self.release_access(tb, tbparams[tb]['aid'])
1786
1787            # Remove the terminated experiment
1788            self.state_lock.acquire()
1789            for id in ids:
1790                if self.state.has_key(id): del self.state[id]
1791
1792            if self.state_filename: self.write_state()
1793            self.state_lock.release()
1794
1795            return { 'experiment': exp }
1796        else:
1797            # Don't forget to release the lock
1798            self.state_lock.release()
1799            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.