source: fedd/fedd_experiment_control.py @ abb87eb

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since abb87eb was abb87eb, checked in by Ted Faber <faber@…>, 16 years ago

split acces to service and experiment access

  • Property mode set to 100644
File size: 52.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import xml.parsers.expat
11
12import re
13import random
14import string
15import subprocess
16import tempfile
17import copy
18import pickle
19
20import traceback
21
22from threading import *
23
24from subprocess import *
25
26from fedd_services import *
27from fedd_internal_services import *
28from fedd_util import *
29import parse_detail
30from service_error import *
31
32import logging
33
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.experiment_control")
38fl.addHandler(nullHandler())
39
40class fedd_experiment_control_local:
41    """
42    Control of experiments that this system can directly access.
43
44    Includes experiment creation, termination and information dissemination.
45    Thred safe.
46    """
47    scripts = ["fed_bootstrap", "federate.sh", "smbmount.FreeBSD.pl",
48        "smbmount.Linux.pl", "make_hosts", "fed-tun.pl", "fed-tun.ucb.pl",
49        "fed_evrepeater", "rc.accounts.patch"]
50   
51    class thread_pool:
52        """
53        A class to keep track of a set of threads all invoked for the same
54        task.  Manages the mutual exclusion of the states.
55        """
56        def __init__(self):
57            """
58            Start a pool.
59            """
60            self.changed = Condition()
61            self.started = 0
62            self.terminated = 0
63
64        def acquire(self):
65            """
66            Get the pool's lock.
67            """
68            self.changed.acquire()
69
70        def release(self):
71            """
72            Release the pool's lock.
73            """
74            self.changed.release()
75
76        def wait(self, timeout = None):
77            """
78            Wait for a pool thread to start or stop.
79            """
80            self.changed.wait(timeout)
81
82        def start(self):
83            """
84            Called by a pool thread to report starting.
85            """
86            self.changed.acquire()
87            self.started += 1
88            self.changed.notifyAll()
89            self.changed.release()
90
91        def terminate(self):
92            """
93            Called by a pool thread to report finishing.
94            """
95            self.changed.acquire()
96            self.terminated += 1
97            self.changed.notifyAll()
98            self.changed.release()
99
100        def clear(self):
101            """
102            Clear all pool data.
103            """
104            self.changed.acquire()
105            self.started = 0
106            self.terminated =0
107            self.changed.notifyAll()
108            self.changed.release()
109
110    class pooled_thread(Thread):
111        """
112        One of a set of threads dedicated to a specific task.  Uses the
113        thread_pool class above for coordination.
114        """
115        def __init__(self, group=None, target=None, name=None, args=(), 
116                kwargs={}, pdata=None, trace_file=None):
117            Thread.__init__(self, group, target, name, args, kwargs)
118            self.rv = None          # Return value of the ops in this thread
119            self.exception = None   # Exception that terminated this thread
120            self.target=target      # Target function to run on start()
121            self.args = args        # Args to pass to target
122            self.kwargs = kwargs    # Additional kw args
123            self.pdata = pdata      # thread_pool for this class
124            # Logger for this thread
125            self.log = logging.getLogger("fedd.experiment_control")
126       
127        def run(self):
128            """
129            Emulate Thread.run, except add pool data manipulation and error
130            logging.
131            """
132            if self.pdata:
133                self.pdata.start()
134
135            if self.target:
136                try:
137                    self.rv = self.target(*self.args, **self.kwargs)
138                except service_error, s:
139                    self.exception = s
140                    self.log.error("Thread exception: %s %s" % \
141                            (s.code_string(), s.desc))
142                except:
143                    self.exception = sys.exc_info()[1]
144                    self.log.error(("Unexpected thread exception: %s" +\
145                            "Trace %s") % (self.exception,\
146                                traceback.format_exc()))
147            if self.pdata:
148                self.pdata.terminate()
149
150    def __init__(self, config=None):
151        """
152        Intialize the various attributes, most from the config object
153        """
154        self.scripts = fedd_experiment_control_local.scripts
155        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
156        self.thread_pool = fedd_experiment_control_local.thread_pool
157
158        self.cert_file = None
159        self.cert_pwd = None
160        self.trusted_certs = None
161
162        # Walk through the various relevant certificat specifying config
163        # attributes until the local certificate attributes can be resolved.
164        # The walk is from most specific to most general specification.
165        for p in ("create_experiment_", "proxy_", ""):
166            filen = "%scert_file" % p
167            pwn = "%scert_pwd" % p
168            trustn = "%strusted_certs" % p
169
170            if getattr(config, filen, None):
171                if not self.cert_file:
172                    self.cert_file = getattr(config, filen, None)
173                    self.cert_pwd = getattr(config, pwn, None)
174
175            if getattr(config, trustn, None):
176                if not self.trusted_certs:
177                    self.trusted_certs = getattr(config, trustn, None)
178
179        self.exp_stem = "fed-stem"
180        self.debug = config.create_debug
181        self.log = logging.getLogger("fedd.experiment_control")
182        self.muxmax = 2
183        self.nthreads = 2
184        self.randomize_experiments = False
185        self.scp_exec = "/usr/bin/scp"
186        self.scripts_dir = "/users/faber/testbed/federation"
187        self.splitter = None
188        self.ssh_exec="/usr/bin/ssh"
189        self.ssh_keygen = "/usr/bin/ssh-keygen"
190        self.ssh_identity_file = None
191        # XXX
192        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
193        self.ssh_type = "rsa"
194        self.state = { }
195        self.state_filename = config.experiment_state_file
196        self.state_lock = Lock()
197        self.tclsh = "/usr/local/bin/otclsh"
198        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
199        self.tbmap = { 
200                'deter':'https://users.isi.deterlab.net:23235',
201                'emulab':'https://users.isi.deterlab.net:23236',
202                'ucb':'https://users.isi.deterlab.net:23237',
203                }
204        self.trace_file = sys.stderr
205
206        self.def_expstart = \
207                "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
208        self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
209        self.def_gwstart = \
210                "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
211        self.def_mgwstart = \
212                "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
213        self.def_gwimage = "FBSD61-TUNNEL2";
214        self.def_gwtype = "pc";
215
216
217        if self.ssh_pubkey_file:
218            try:
219                f = open(self.ssh_pubkey_file, 'r')
220                self.ssh_pubkey = f.read()
221                f.close()
222            except IOError:
223                raise service_error(service_error.internal,
224                        "Cannot read sshpubkey")
225
226        # Set the logging level to the value passed in.  The getattr slieght of
227        # hand finds the logging level constant corrersponding to the string.
228        # We're a little paranoid to avoid user mayhem.
229        if config.experiment_log:
230            try:
231                level = int(getattr(logging, config.experiment_log.upper(),-1))
232
233                if  logging.DEBUG <= level <= logging.CRITICAL:
234                    self.log.setLevel(level)
235                else:
236                    self.log.error("Bad experiment_log value: %s" % \
237                            config.experiment_log)
238
239            except ValueError:
240                self.log.error("Bad experiment_log value: %s" % \
241                        config.experiment_log)
242
243        # Grab saved state.  OK to do this w/o locking because it's read only
244        # and only one thread should be in existence that can see self.state at
245        # this point.
246        if self.state_filename:
247            self.read_state()
248
249        # Confirm federation scripts in the right place
250        for s in self.scripts:
251            if not os.path.exists(self.scripts_dir + "/" + s):
252                raise service_error(service_error.server_config,
253                        "%s/%s not in local script dir" % (self.scripts_dir, s))
254
255        # Dispatch tables
256        self.soap_services = {\
257                'Create': make_soap_handler(\
258                        CreateRequestMessage.typecode,
259                        getattr(self, "create_experiment"), 
260                        CreateResponseMessage,
261                        "CreateResponseBody"),
262                'Vtopo': make_soap_handler(\
263                        VtopoRequestMessage.typecode,
264                        getattr(self, "get_vtopo"),
265                        VtopoResponseMessage,
266                        "VtopoResponseBody"),
267                'Vis': make_soap_handler(\
268                        VisRequestMessage.typecode,
269                        getattr(self, "get_vis"),
270                        VisResponseMessage,
271                        "VisResponseBody"),
272                'Info': make_soap_handler(\
273                        InfoRequestMessage.typecode,
274                        getattr(self, "get_info"),
275                        InfoResponseMessage,
276                        "InfoResponseBody"),
277                'Terminate': make_soap_handler(\
278                        TerminateRequestMessage.typecode,
279                        getattr(self, "terminate_experiment"),
280                        TerminateResponseMessage,
281                        "TerminateResponseBody"),
282        }
283
284        self.xmlrpc_services = {\
285                'Create': make_xmlrpc_handler(\
286                        getattr(self, "create_experiment"), 
287                        "CreateResponseBody"),
288                'Vtopo': make_xmlrpc_handler(\
289                        getattr(self, "get_vtopo"),
290                        "VtopoResponseBody"),
291                'Vis': make_xmlrpc_handler(\
292                        getattr(self, "get_vis"),
293                        "VisResponseBody"),
294                'Info': make_xmlrpc_handler(\
295                        getattr(self, "get_info"),
296                        "InfoResponseBody"),
297                'Terminate': make_xmlrpc_handler(\
298                        getattr(self, "terminate_experiment"),
299                        "TerminateResponseBody"),
300        }
301
302    def copy_file(self, src, dest, size=1024):
303        """
304        Exceedingly simple file copy.
305        """
306        s = open(src,'r')
307        d = open(dest, 'w')
308
309        buf = "x"
310        while buf != "":
311            buf = s.read(size)
312            d.write(buf)
313        s.close()
314        d.close()
315
316    # Call while holding self.state_lock
317    def write_state(self):
318        """
319        Write a new copy of experiment state after copying the existing state
320        to a backup.
321
322        State format is a simple pickling of the state dictionary.
323        """
324        if os.access(self.state_filename, os.W_OK):
325            self.copy_file(self.state_filename, \
326                    "%s.bak" % self.state_filename)
327        try:
328            f = open(self.state_filename, 'w')
329            pickle.dump(self.state, f)
330        except IOError, e:
331            self.log.error("Can't write file %s: %s" % \
332                    (self.state_filename, e))
333        except pickle.PicklingError, e:
334            self.log.error("Pickling problem: %s" % e)
335
336    # Call while holding self.state_lock
337    def read_state(self):
338        """
339        Read a new copy of experiment state.  Old state is overwritten.
340
341        State format is a simple pickling of the state dictionary.
342        """
343        try:
344            f = open(self.state_filename, "r")
345            self.state = pickle.load(f)
346            self.log.debug("[read_state]: Read state from %s" % \
347                    self.state_filename)
348        except IOError, e:
349            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
350                    % (self.state_filename, e))
351        except pickle.UnpicklingError, e:
352            self.log.warning(("[read_state]: No saved state: " + \
353                    "Unpickling failed: %s") % e)
354
355    def scp_file(self, file, user, host, dest=""):
356        """
357        scp a file to the remote host.  If debug is set the action is only
358        logged.
359        """
360
361        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
362        rv = 0
363
364        try:
365            dnull = open("/dev/null", "r")
366        except IOError:
367            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
368            dnull = Null
369
370        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
371        if not self.debug:
372            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
373            else: rv = call(scp_cmd)
374
375        return rv == 0
376
377    def ssh_cmd(self, user, host, cmd, wname=None):
378        """
379        Run a remote command on host as user.  If debug is set, the action is
380        only logged.
381        """
382        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
383
384        try:
385            dnull = open("/dev/null", "r")
386        except IOError:
387            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
388            dnull = Null
389
390        self.log.debug("[ssh_cmd]: %s" % sh_str)
391        if not self.debug:
392            if dnull:
393                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
394            else:
395                sub = Popen(sh_str, shell=True)
396            return sub.wait() == 0
397        else:
398            return True
399
400    def ship_scripts(self, host, user, dest_dir):
401        """
402        Copy the federation scripts (fedkit) to the a federant.
403        """
404        if self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
405            for s in self.scripts:
406                if not self.scp_file("%s/%s" % (self.scripts_dir, s),
407                        user, host, dest_dir):
408                    return False
409            return True
410        else:
411            return False
412
413    def ship_configs(self, host, user, src_dir, dest_dir):
414        """
415        Copy federant-specific configuration files to the federant.
416        """
417        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
418            return False
419        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
420            return False
421
422        for f in os.listdir(src_dir):
423            if os.path.isdir(f):
424                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
425                        "%s/%s" % (dest_dir, f)):
426                    return False
427            else:
428                if not self.scp_file("%s/%s" % (src_dir, f), 
429                        user, host, dest_dir):
430                    return False
431        return True
432
433    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
434        """
435        Start a sub-experiment on a federant.
436
437        Get the current state, modify or create as appropriate, ship data and
438        configs and start the experiment.  There are small ordering differences
439        based on the initial state of the sub-experiment.
440        """
441        # ops node in the federant
442        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
443        user = tbparams[tb]['user']     # federant user
444        pid = tbparams[tb]['project']   # federant project
445        # XXX
446        base_confs = ( "hosts",)
447        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
448        # command to test experiment state
449        expinfo_exec = "/usr/testbed/bin/expinfo" 
450        # Configuration directories on the remote machine
451        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
452        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
453        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
454        # Regular expressions to parse the expinfo response
455        state_re = re.compile("State:\s+(\w+)")
456        no_exp_re = re.compile("^No\s+such\s+experiment")
457        state = None    # Experiment state parsed from expinfo
458        # The expinfo ssh command
459        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
460
461        # Get status
462        self.log.debug("[start_segment]: %s"% " ".join(cmd))
463        dev_null = None
464        try:
465            dev_null = open("/dev/null", "a")
466        except IOError, e:
467            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
468
469        status = Popen(cmd, stdout=PIPE, stderr=dev_null)
470        for line in status.stdout:
471            m = state_re.match(line)
472            if m: state = m.group(1)
473            else:
474                m = no_exp_re.match(line)
475                if m: state = "none"
476        rv = status.wait()
477
478        # If the experiment is not present the subcommand returns a non-zero
479        # return value.  If we successfully parsed a "none" outcome, ignore the
480        # return code.
481        if rv != 0 and state != "none":
482            raise service_error(service_error.internal,
483                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
484
485        self.log.debug("[start_segment]: %s: %s" % (tb, state))
486        self.log.info("[start_segment]:transferring experiment to %s" % tb)
487
488        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
489            return False
490        # Clear the federation files
491        if not self.ssh_cmd(user, host, 
492                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
493            return False
494        if not self.ssh_cmd(user, host, 
495                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
496            return False
497        # Clear and create the tarfiles and rpm directories
498        for d in (tarfiles_dir, rpms_dir):
499            if not self.ssh_cmd(user, host, 
500                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
501                return False
502            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
503                    "create tarfiles"):
504                return False
505       
506        if state == 'active':
507            # Remote experiment is active.  Modify it.
508            for f in base_confs:
509                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
510                        "%s/%s" % (proj_dir, f)):
511                    return False
512            if not self.ship_scripts(host, user, proj_dir):
513                return False
514            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
515                    proj_dir):
516                return False
517            if os.path.isdir("%s/tarfiles" % tmpdir):
518                if not self.ship_configs(host, user,
519                        "%s/tarfiles" % tmpdir, tarfiles_dir):
520                    return False
521            if os.path.isdir("%s/rpms" % tmpdir):
522                if not self.ship_configs(host, user,
523                        "%s/rpms" % tmpdir, tarfiles_dir):
524                    return False
525            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
526            if not self.ssh_cmd(user, host,
527                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
528                            (pid, eid, tclfile), "modexp"):
529                return False
530            return True
531        elif state == "swapped":
532            # Remote experiment swapped out.  Modify it and swap it in.
533            for f in base_confs:
534                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
535                        "%s/%s" % (proj_dir, f)):
536                    return False
537            if not self.ship_scripts(host, user, proj_dir):
538                return False
539            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
540                    proj_dir):
541                return False
542            if os.path.isdir("%s/tarfiles" % tmpdir):
543                if not self.ship_configs(host, user,
544                        "%s/tarfiles" % tmpdir, tarfiles_dir):
545                    return False
546            if os.path.isdir("%s/rpms" % tmpdir):
547                if not self.ship_configs(host, user,
548                        "%s/rpms" % tmpdir, tarfiles_dir):
549                    return False
550            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
551            if not self.ssh_cmd(user, host,
552                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
553                    "modexp"):
554                return False
555            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
556            if not self.ssh_cmd(user, host,
557                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
558                    "swapexp"):
559                return False
560            return True
561        elif state == "none":
562            # No remote experiment.  Create one.  We do this in 2 steps so we
563            # can put the configuration files and scripts into the new
564            # experiment directories.
565
566            # Tarfiles must be present for creation to work
567            if os.path.isdir("%s/tarfiles" % tmpdir):
568                if not self.ship_configs(host, user,
569                        "%s/tarfiles" % tmpdir, tarfiles_dir):
570                    return False
571            if os.path.isdir("%s/rpms" % tmpdir):
572                if not self.ship_configs(host, user,
573                        "%s/rpms" % tmpdir, tarfiles_dir):
574                    return False
575            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
576            if not self.ssh_cmd(user, host,
577                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
578                            (pid, eid, tclfile), "startexp"):
579                return False
580            # After startexp the per-experiment directories exist
581            for f in base_confs:
582                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
583                        "%s/%s" % (proj_dir, f)):
584                    return False
585            if not self.ship_scripts(host, user, proj_dir):
586                return False
587            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
588                    proj_dir):
589                return False
590            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
591            if not self.ssh_cmd(user, host,
592                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
593                    "swapexp"):
594                return False
595            return True
596        else:
597            self.log.debug("[start_segment]:unknown state %s" % state)
598            return False
599
600    def stop_segment(self, tb, eid, tbparams):
601        """
602        Stop a sub experiment by calling swapexp on the federant
603        """
604        user = tbparams[tb]['user']
605        host = tbparams[tb]['host']
606        pid = tbparams[tb]['project']
607
608        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
609        return self.ssh_cmd(user, host,
610                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
611
612       
613    def generate_ssh_keys(self, dest, type="rsa" ):
614        """
615        Generate a set of keys for the gateways to use to talk.
616
617        Keys are of type type and are stored in the required dest file.
618        """
619        valid_types = ("rsa", "dsa")
620        t = type.lower();
621        if t not in valid_types: raise ValueError
622        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
623
624        try:
625            trace = open("/dev/null", "w")
626        except IOError:
627            raise service_error(service_error.internal,
628                    "Cannot open /dev/null??");
629
630        # May raise CalledProcessError
631        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
632        rv = call(cmd, stdout=trace, stderr=trace)
633        if rv != 0:
634            raise service_error(service_error.internal, 
635                    "Cannot generate nonce ssh keys.  %s return code %d" \
636                            % (self.ssh_keygen, rv))
637
638    def gentopo(self, str):
639        """
640        Generate the topology dtat structure from the splitter's XML
641        representation of it.
642
643        The topology XML looks like:
644            <experiment>
645                <nodes>
646                    <node><vname></vname><ips>ip1:ip2</ips></node>
647                </nodes>
648                <lans>
649                    <lan>
650                        <vname></vname><vnode></vnode><ip></ip>
651                        <bandwidth></bandwidth><member>node:port</member>
652                    </lan>
653                </lans>
654        """
655        class topo_parse:
656            """
657            Parse the topology XML and create the dats structure.
658            """
659            def __init__(self):
660                # Typing of the subelements for data conversion
661                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
662                self.int_subelements = ( 'bandwidth',)
663                self.float_subelements = ( 'delay',)
664                # The final data structure
665                self.nodes = [ ]
666                self.lans =  [ ]
667                self.topo = { \
668                        'node': self.nodes,\
669                        'lan' : self.lans,\
670                    }
671                self.element = { }  # Current element being created
672                self.chars = ""     # Last text seen
673
674            def end_element(self, name):
675                # After each sub element the contents is added to the current
676                # element or to the appropriate list.
677                if name == 'node':
678                    self.nodes.append(self.element)
679                    self.element = { }
680                elif name == 'lan':
681                    self.lans.append(self.element)
682                    self.element = { }
683                elif name in self.str_subelements:
684                    self.element[name] = self.chars
685                    self.chars = ""
686                elif name in self.int_subelements:
687                    self.element[name] = int(self.chars)
688                    self.chars = ""
689                elif name in self.float_subelements:
690                    self.element[name] = float(self.chars)
691                    self.chars = ""
692
693            def found_chars(self, data):
694                self.chars += data.rstrip()
695
696
697        tp = topo_parse();
698        parser = xml.parsers.expat.ParserCreate()
699        parser.EndElementHandler = tp.end_element
700        parser.CharacterDataHandler = tp.found_chars
701
702        parser.Parse(str)
703
704        return tp.topo
705       
706
707    def genviz(self, topo):
708        """
709        Generate the visualization the virtual topology
710        """
711
712        neato = "/usr/local/bin/neato"
713        # These are used to parse neato output and to create the visualization
714        # file.
715        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
716        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
717                "%s</type></node>"
718
719        try:
720            # Node names
721            nodes = [ n['vname'] for n in topo['node'] ]
722            topo_lans = topo['lan']
723        except KeyError:
724            raise service_error(service_error.internal, "Bad topology")
725
726        lans = { }
727        links = { }
728
729        # Walk through the virtual topology, organizing the connections into
730        # 2-node connections (links) and more-than-2-node connections (lans).
731        # When a lan is created, it's added to the list of nodes (there's a
732        # node in the visualization for the lan).
733        for l in topo_lans:
734            if links.has_key(l['vname']):
735                if len(links[l['vname']]) < 2:
736                    links[l['vname']].append(l['vnode'])
737                else:
738                    nodes.append(l['vname'])
739                    lans[l['vname']] = links[l['vname']]
740                    del links[l['vname']]
741                    lans[l['vname']].append(l['vnode'])
742            elif lans.has_key(l['vname']):
743                lans[l['vname']].append(l['vnode'])
744            else:
745                links[l['vname']] = [ l['vnode'] ]
746
747
748        # Open up a temporary file for dot to turn into a visualization
749        try:
750            df, dotname = tempfile.mkstemp()
751            dotfile = os.fdopen(df, 'w')
752        except IOError:
753            raise service_error(service_error.internal,
754                    "Failed to open file in genviz")
755
756        # Generate a dot/neato input file from the links, nodes and lans
757        try:
758            print >>dotfile, "graph G {"
759            for n in nodes:
760                print >>dotfile, '\t"%s"' % n
761            for l in links.keys():
762                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
763            for l in lans.keys():
764                for n in lans[l]:
765                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
766            print >>dotfile, "}"
767            dotfile.close()
768        except TypeError:
769            raise service_error(service_error.internal,
770                    "Single endpoint link in vtopo")
771        except IOError:
772            raise service_error(service_error.internal, "Cannot write dot file")
773
774        # Use dot to create a visualization
775        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
776                '-Gpack=true', dotname], stdout=PIPE)
777
778        # Translate dot to vis format
779        vis_nodes = [ ]
780        vis = { 'node': vis_nodes }
781        for line in dot.stdout:
782            m = vis_re.match(line)
783            if m:
784                vn = m.group(1)
785                vis_node = {'name': vn, \
786                        'x': float(m.group(2)),\
787                        'y' : float(m.group(3)),\
788                    }
789                if vn in links.keys() or vn in lans.keys():
790                    vis_node['type'] = 'lan'
791                else:
792                    vis_node['type'] = 'node'
793                vis_nodes.append(vis_node)
794        rv = dot.wait()
795
796        os.remove(dotname)
797        if rv == 0 : return vis
798        else: return None
799
800
801    def get_access(self, tb, nodes, user, tbparam):
802        """
803        Get access to testbed through fedd and set the parameters for that tb
804        """
805
806        translate_attr = {
807            'slavenodestartcmd': 'expstart',
808            'slaveconnectorstartcmd': 'gwstart',
809            'masternodestartcmd': 'mexpstart',
810            'masterconnectorstartcmd': 'mgwstart',
811            'connectorimage': 'gwimage',
812            'connectortype': 'gwtype',
813            'tunnelcfg': 'tun',
814            'smbshare': 'smbshare',
815        }
816
817        uri = self.tbmap.get(tb, None)
818        if not uri:
819            raise service_error(serice_error.server_config, 
820                    "Unknown testbed: %s" % tb)
821
822        # The basic request
823        req = {\
824                'destinationTestbed' : { 'uri' : uri },
825                'user':  user,
826                'allocID' : { 'localname': 'test' },
827                # XXX: need to get service access stright
828                'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
829                'serviceAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ]
830            }
831       
832        # node resources if any
833        if nodes != None and len(nodes) > 0:
834            rnodes = [ ]
835            for n in nodes:
836                rn = { }
837                image, hw, count = n.split(":")
838                if image: rn['image'] = [ image ]
839                if hw: rn['hardware'] = [ hw ]
840                if count: rn['count'] = int(count)
841                rnodes.append(rn)
842            req['resources']= { }
843            req['resources']['node'] = rnodes
844
845        # No retry loop here.  Proxy servers must correctly authenticate
846        # themselves without help
847
848        try:
849            ctx = fedd_ssl_context(self.cert_file, 
850                    self.trusted_certs, password=self.cert_pwd)
851        except SSL.SSLError:
852            raise service_error(service_error.server_config, 
853                    "Server certificates misconfigured")
854
855        loc = feddServiceLocator();
856        port = loc.getfeddPortType(uri,
857                transport=M2Crypto.httpslib.HTTPSConnection, 
858                transdict={ 'ssl_context' : ctx })
859
860        # Reconstruct the full request message
861        msg = RequestAccessRequestMessage()
862        msg.set_element_RequestAccessRequestBody(
863                pack_soap(msg, "RequestAccessRequestBody", req))
864
865        try:
866            resp = port.RequestAccess(msg)
867        except ZSI.ParseException, e:
868            raise service_error(service_error.req,
869                    "Bad format message (XMLRPC??): %s" %
870                    str(e))
871        r = unpack_soap(resp)
872
873        if r.has_key('RequestAccessResponseBody'):
874            r = r['RequestAccessResponseBody']
875        else:
876            raise service_error(service_error.proxy,
877                    "Bad proxy response")
878
879
880        e = r['emulab']
881        p = e['project']
882        tbparam[tb] = { 
883                "boss": e['boss'],
884                "host": e['ops'],
885                "domain": e['domain'],
886                "fs": e['fileServer'],
887                "eventserver": e['eventServer'],
888                "project": unpack_id(p['name']),
889                "emulab" : e
890                }
891        # Make the testbed name be the label the user applied
892        p['testbed'] = {'localname': tb }
893
894        for u in p['user']:
895            tbparam[tb]['user'] = unpack_id(u['userID'])
896
897        for a in e['fedAttr']:
898            if a['attribute']:
899                key = translate_attr.get(a['attribute'].lower(), None)
900                if key:
901                    tbparam[tb][key]= a['value']
902       
903    class current_testbed:
904        """
905        Object for collecting the current testbed description.  The testbed
906        description is saved to a file with the local testbed variables
907        subsittuted line by line.
908        """
909        def __init__(self, eid, tmpdir):
910            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
911            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
912            self.current_testbed = None
913            self.testbed_file = None
914
915            self.def_expstart = \
916                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
917            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
918            self.def_gwstart = \
919                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
920            self.def_mgwstart = \
921                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
922            self.def_gwimage = "FBSD61-TUNNEL2";
923            self.def_gwtype = "pc";
924
925            self.eid = eid
926            self.tmpdir = tmpdir
927
928        def __call__(self, line, master, allocated, tbparams):
929            # Capture testbed topology descriptions
930            if self.current_testbed == None:
931                m = self.begin_testbed.match(line)
932                if m != None:
933                    self.current_testbed = m.group(1)
934                    if self.current_testbed == None:
935                        raise service_error(service_error.req,
936                                "Bad request format (unnamed testbed)")
937                    allocated[self.current_testbed] = \
938                            allocated.get(self.current_testbed,0) + 1
939                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
940                    if not os.path.exists(tb_dir):
941                        try:
942                            os.mkdir(tb_dir)
943                        except IOError:
944                            raise service_error(service_error.internal,
945                                    "Cannot create %s" % tb_dir)
946                    try:
947                        self.testbed_file = open("%s/%s.%s.tcl" %
948                                (tb_dir, self.eid, self.current_testbed), 'w')
949                    except IOError:
950                        self.testbed_file = None
951                    return True
952                else: return False
953            else:
954                m = self.end_testbed.match(line)
955                if m != None:
956                    if m.group(1) != self.current_testbed:
957                        raise service_error(service_error.internal, 
958                                "Mismatched testbed markers!?")
959                    if self.testbed_file != None: 
960                        self.testbed_file.close()
961                        self.testbed_file = None
962                    self.current_testbed = None
963                elif self.testbed_file:
964                    # Substitute variables and put the line into the local
965                    # testbed file.
966                    gwtype = tbparams[self.current_testbed].get('gwtype', 
967                            self.def_gwtype)
968                    gwimage = tbparams[self.current_testbed].get('gwimage', 
969                            self.def_gwimage)
970                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
971                            self.def_mgwstart)
972                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
973                            self.def_mexpstart)
974                    gwstart = tbparams[self.current_testbed].get('gwstart', 
975                            self.def_gwstart)
976                    expstart = tbparams[self.current_testbed].get('expstart', 
977                            self.def_expstart)
978                    project = tbparams[self.current_testbed].get('project')
979                    line = re.sub("GWTYPE", gwtype, line)
980                    line = re.sub("GWIMAGE", gwimage, line)
981                    if self.current_testbed == master:
982                        line = re.sub("GWSTART", mgwstart, line)
983                        line = re.sub("EXPSTART", mexpstart, line)
984                    else:
985                        line = re.sub("GWSTART", gwstart, line)
986                        line = re.sub("EXPSTART", expstart, line)
987                    # XXX: does `` embed without doing enything else?
988                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
989                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
990                    line = re.sub("EID", self.eid, line)
991                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
992                            (project, self.eid), line)
993                    print >>self.testbed_file, line
994                return True
995
996    class allbeds:
997        """
998        Process the Allbeds section.  Get access to each federant and save the
999        parameters in tbparams
1000        """
1001        def __init__(self, get_access):
1002            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1003            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1004            self.in_allbeds = False
1005            self.get_access = get_access
1006
1007        def __call__(self, line, user, tbparams):
1008            # Testbed access parameters
1009            if not self.in_allbeds:
1010                if self.begin_allbeds.match(line):
1011                    self.in_allbeds = True
1012                    return True
1013                else:
1014                    return False
1015            else:
1016                if self.end_allbeds.match(line):
1017                    self.in_allbeds = False
1018                else:
1019                    nodes = line.split('|')
1020                    tb = nodes.pop(0)
1021                    self.get_access(tb, nodes, user, tbparams)
1022                return True
1023
1024    class gateways:
1025        def __init__(self, eid, master, tmpdir, gw_pubkey,
1026                gw_secretkey, copy_file):
1027            self.begin_gateways = \
1028                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1029            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1030            self.current_gateways = None
1031            self.control_gateway = None
1032            self.active_end = { }
1033
1034            self.eid = eid
1035            self.master = master
1036            self.tmpdir = tmpdir
1037            self.gw_pubkey_base = gw_pubkey
1038            self.gw_secretkey_base = gw_secretkey
1039
1040            self.copy_file = copy_file
1041
1042
1043        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1044                active_end, tbparams, dtb, myname, desthost, type):
1045            """
1046            Produce a gateway configuration file from a gateways line.
1047            """
1048
1049            sproject = tbparams[gw].get('project', 'project')
1050            dproject = tbparams[dtb].get('project', 'project')
1051            sdomain = ".%s.%s%s" % (eid, sproject,
1052                    tbparams[gw].get('domain', ".example.com"))
1053            ddomain = ".%s.%s%s" % (eid, dproject,
1054                    tbparams[dtb].get('domain', ".example.com"))
1055            boss = tbparams[master].get('boss', "boss")
1056            fs = tbparams[master].get('fs', "fs")
1057            event_server = "%s%s" % \
1058                    (tbparams[gw].get('eventserver', "event_server"),
1059                            tbparams[gw].get('domain', "example.com"))
1060            remote_event_server = "%s%s" % \
1061                    (tbparams[dtb].get('eventserver', "event_server"),
1062                            tbparams[dtb].get('domain', "example.com"))
1063            seer_control = "%s%s" % \
1064                    (tbparams[gw].get('control', "control"), sdomain)
1065            remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1066            local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1067            tunnel_cfg = tbparams[gw].get("tun", "false")
1068
1069            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1070            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1071
1072            # translate to lower case so the `hostname` hack for specifying
1073            # configuration files works.
1074            conf_file = conf_file.lower();
1075            remote_conf_file = remote_conf_file.lower();
1076
1077            if dtb == master:
1078                active = "false"
1079            elif gw == master:
1080                active = "true"
1081            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1082                active = "false"
1083            else:
1084                active_end['%s-%s' % (gw, dtb)] = 1
1085                active = "true"
1086
1087            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1088            print >>gwconfig, "Active: %s" % active
1089            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1090            print >>gwconfig, "BossName: %s" % boss
1091            print >>gwconfig, "FsName: %s" % fs
1092            print >>gwconfig, "EventServerName: %s" % event_server
1093            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1094            print >>gwconfig, "SeerControl: %s" % seer_control
1095            print >>gwconfig, "Type: %s" % type
1096            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1097            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1098                    local_script_dir
1099            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1100            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1101            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1102                    (remote_script_dir, remote_conf_file)
1103            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1104            print >>gwconfig, "Pubkeys: %s/%s" % (local_script_dir, pubkey)
1105            print >>gwconfig, "Privkeys: %s/%s" % (local_script_dir, privkey)
1106            gwconfig.close()
1107
1108            return active == "true"
1109
1110        def __call__(self, line, allocated, tbparams):
1111            # Process gateways
1112            if not self.current_gateways:
1113                m = self.begin_gateways.match(line)
1114                if m:
1115                    self.current_gateways = m.group(1)
1116                    if allocated.has_key(self.current_gateways):
1117                        # This test should always succeed
1118                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1119                        if not os.path.exists(tb_dir):
1120                            try:
1121                                os.mkdir(tb_dir)
1122                            except IOError:
1123                                raise service_error(service_error.internal,
1124                                        "Cannot create %s" % tb_dir)
1125                    else:
1126                        # XXX
1127                        self.log.error("[gateways]: Ignoring gateways for " + \
1128                                "unknown testbed %s" % self.current_gateways)
1129                        self.current_gateways = None
1130                    return True
1131                else:
1132                    return False
1133            else:
1134                m = self.end_gateways.match(line)
1135                if m :
1136                    if m.group(1) != self.current_gateways:
1137                        raise service_error(service_error.internal,
1138                                "Mismatched gateway markers!?")
1139                    if self.control_gateway:
1140                        try:
1141                            cc = open("%s/%s/client.conf" %
1142                                    (self.tmpdir, self.current_gateways), 'w')
1143                            print >>cc, "ControlGateway: %s" % \
1144                                    self.control_gateway
1145                            if tbparams[self.master].has_key('smbshare'):
1146                                print >>cc, "SMBSHare: %s" % \
1147                                        tbparams[self.master]['smbshare']
1148                            print >>cc, "ProjectUser: %s" % \
1149                                    tbparams[self.master]['user']
1150                            print >>cc, "ProjectName: %s" % \
1151                                    tbparams[self.master]['project']
1152                            cc.close()
1153                        except IOError:
1154                            raise service_error(service_error.internal,
1155                                    "Error creating client config")
1156                        try:
1157                            cc = open("%s/%s/seer.conf" %
1158                                    (self.tmpdir, self.current_gateways),
1159                                    'w')
1160                            if self.current_gateways != self.master:
1161                                print >>cc, "ControlNode: %s" % \
1162                                        self.control_gateway
1163                            print >>cc, "ExperimentID: %s/%s" % \
1164                                    ( tbparams[self.master]['project'], \
1165                                    self.eid )
1166                            cc.close()
1167                        except IOError:
1168                            raise service_error(service_error.internal,
1169                                    "Error creating seer config")
1170                    else:
1171                        debug.error("[gateways]: No control gateway for %s" %\
1172                                    self.current_gateways)
1173                    self.current_gateways = None
1174                else:
1175                    dtb, myname, desthost, type = line.split(" ")
1176
1177                    if type == "control" or type == "both":
1178                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1179                                self.eid, 
1180                                tbparams[self.current_gateways]['project'],
1181                                tbparams[self.current_gateways]['domain'])
1182                    try:
1183                        active = self.gateway_conf_file(self.current_gateways,
1184                                self.master, self.eid, self.gw_pubkey_base,
1185                                self.gw_secretkey_base,
1186                                self.active_end, tbparams, dtb, myname,
1187                                desthost, type)
1188                    except IOError, e:
1189                        raise service_error(service_error.internal,
1190                                "Failed to write config file for %s" % \
1191                                        self.current_gateway)
1192           
1193                    gw_pubkey = "%s/keys/%s" % \
1194                            (self.tmpdir, self.gw_pubkey_base)
1195                    gw_secretkey = "%s/keys/%s" % \
1196                            (self.tmpdir, self.gw_secretkey_base)
1197
1198                    pkfile = "%s/%s/%s" % \
1199                            ( self.tmpdir, self.current_gateways, 
1200                                    self.gw_pubkey_base)
1201                    skfile = "%s/%s/%s" % \
1202                            ( self.tmpdir, self.current_gateways, 
1203                                    self.gw_secretkey_base)
1204
1205                    if not os.path.exists(pkfile):
1206                        try:
1207                            self.copy_file(gw_pubkey, pkfile)
1208                        except IOError:
1209                            service_error(service_error.internal,
1210                                    "Failed to copy pubkey file")
1211
1212                    if active and not os.path.exists(skfile):
1213                        try:
1214                            self.copy_file(gw_secretkey, skfile)
1215                        except IOError:
1216                            service_error(service_error.internal,
1217                                    "Failed to copy secretkey file")
1218                return True
1219
1220    class shunt_to_file:
1221        """
1222        Simple class to write data between two regexps to a file.
1223        """
1224        def __init__(self, begin, end, filename):
1225            """
1226            Begin shunting on a match of begin, stop on end, send data to
1227            filename.
1228            """
1229            self.begin = re.compile(begin)
1230            self.end = re.compile(end)
1231            self.in_shunt = False
1232            self.file = None
1233            self.filename = filename
1234
1235        def __call__(self, line):
1236            """
1237            Call this on each line in the input that may be shunted.
1238            """
1239            if not self.in_shunt:
1240                if self.begin.match(line):
1241                    self.in_shunt = True
1242                    try:
1243                        self.file = open(self.filename, "w")
1244                    except:
1245                        self.file = None
1246                        raise
1247                    return True
1248                else:
1249                    return False
1250            else:
1251                if self.end.match(line):
1252                    if self.file: 
1253                        self.file.close()
1254                        self.file = None
1255                    self.in_shunt = False
1256                else:
1257                    if self.file:
1258                        print >>self.file, line
1259                return True
1260
1261    class shunt_to_list:
1262        """
1263        Same interface as shunt_to_file.  Data collected in self.list, one list
1264        element per line.
1265        """
1266        def __init__(self, begin, end):
1267            self.begin = re.compile(begin)
1268            self.end = re.compile(end)
1269            self.in_shunt = False
1270            self.list = [ ]
1271       
1272        def __call__(self, line):
1273            if not self.in_shunt:
1274                if self.begin.match(line):
1275                    self.in_shunt = True
1276                    return True
1277                else:
1278                    return False
1279            else:
1280                if self.end.match(line):
1281                    self.in_shunt = False
1282                else:
1283                    self.list.append(line)
1284                return True
1285
1286    class shunt_to_string:
1287        """
1288        Same interface as shunt_to_file.  Data collected in self.str, all in
1289        one string.
1290        """
1291        def __init__(self, begin, end):
1292            self.begin = re.compile(begin)
1293            self.end = re.compile(end)
1294            self.in_shunt = False
1295            self.str = ""
1296       
1297        def __call__(self, line):
1298            if not self.in_shunt:
1299                if self.begin.match(line):
1300                    self.in_shunt = True
1301                    return True
1302                else:
1303                    return False
1304            else:
1305                if self.end.match(line):
1306                    self.in_shunt = False
1307                else:
1308                    self.str += line
1309                return True
1310
1311    def create_experiment(self, req, fid):
1312        """
1313        The external interface to experiment creation called from the
1314        dispatcher.
1315
1316        Creates a working directory, splits the incoming description using the
1317        splitter script and parses out the avrious subsections using the
1318        lcasses above.  Once each sub-experiment is created, use pooled threads
1319        to instantiate them and start it all up.
1320        """
1321        try:
1322            tmpdir = tempfile.mkdtemp(prefix="split-")
1323        except IOError:
1324            raise service_error(service_error.internal, "Cannot create tmp dir")
1325
1326        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1327        gw_secretkey_base = "fed.%s" % self.ssh_type
1328        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1329        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1330        tclfile = tmpdir + "/experiment.tcl"
1331        tbparams = { }
1332
1333        pid = "dummy"
1334        gid = "dummy"
1335        # XXX
1336        fail_soft = False
1337
1338        try:
1339            os.mkdir(tmpdir+"/keys")
1340        except OSError:
1341            raise service_error(service_error.internal,
1342                    "Can't make temporary dir")
1343
1344        req = req.get('CreateRequestBody', None)
1345        if not req:
1346            raise service_error(service_error.req,
1347                    "Bad request format (no CreateRequestBody)")
1348        # The tcl parser needs to read a file so put the content into that file
1349        descr=req.get('experimentdescription', None)
1350        if descr:
1351            file_content=descr.get('ns2description', None)
1352            if file_content:
1353                try:
1354                    f = open(tclfile, 'w')
1355                    f.write(file_content)
1356                    f.close()
1357                except IOError:
1358                    raise service_error(service_error.internal,
1359                            "Cannot write temp experiment description")
1360            else:
1361                raise service_error(service_error.req, 
1362                        "Only ns2descriptions supported")
1363        else:
1364            raise service_error(service_error.req, "No experiment description")
1365
1366        if req.has_key('experimentID') and \
1367                req['experimentID'].has_key('localname'):
1368            eid = req['experimentID']['localname']
1369            self.state_lock.acquire()
1370            while (self.state.has_key(eid)):
1371                eid += random.choice(string.ascii_letters)
1372            # To avoid another thread picking this localname
1373            self.state[eid] = "placeholder"
1374            self.state_lock.release()
1375        else:
1376            eid = self.exp_stem
1377            for i in range(0,5):
1378                eid += random.choice(string.ascii_letters)
1379            self.state_lock.acquire()
1380            while (self.state.has_key(eid)):
1381                eid = self.exp_stem
1382                for i in range(0,5):
1383                    eid += random.choice(string.ascii_letters)
1384            # To avoid another thread picking this localname
1385            self.state[eid] = "placeholder"
1386            self.state_lock.release()
1387
1388        try:
1389            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1390        except ValueError:
1391            raise service_error(service_error.server_config, 
1392                    "Bad key type (%s)" % self.ssh_type)
1393
1394        user = req.get('user', None)
1395        if user == None:
1396            raise service_error(service_error.req, "No user")
1397
1398        master = req.get('master', None)
1399        if master == None:
1400            raise service_error(service_error.req, "No master testbed label")
1401       
1402       
1403        tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1404            str(self.muxmax), '-m', master, pid, gid, eid, tclfile]
1405        tclparser = Popen(tclcmd, stdout=PIPE)
1406
1407        allocated = { }     # Testbeds we can access
1408        started = { }       # Testbeds where a sub-experiment started
1409                            # successfully
1410
1411        # Objects to parse the splitter output (defined above)
1412        parse_current_testbed = self.current_testbed(eid, tmpdir)
1413        parse_allbeds = self.allbeds(self.get_access)
1414        parse_gateways = self.gateways(eid, master, tmpdir,
1415                gw_pubkey_base, gw_secretkey_base, self.copy_file)
1416        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1417                    "^#\s+End\s+Vtopo")
1418        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1419                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1420        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1421                "^#\s+End\s+tarfiles")
1422        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1423                "^#\s+End\s+rpms")
1424
1425        # Worling on the split data
1426        for line in tclparser.stdout:
1427            line = line.rstrip()
1428            if parse_current_testbed(line, master, allocated, tbparams):
1429                continue
1430            elif parse_allbeds(line, user, tbparams):
1431                continue
1432            elif parse_gateways(line, allocated, tbparams):
1433                continue
1434            elif parse_vtopo(line):
1435                continue
1436            elif parse_hostnames(line):
1437                continue
1438            elif parse_tarfiles(line):
1439                continue
1440            elif parse_rpms(line):
1441                continue
1442            else:
1443                raise service_error(service_error.internal, 
1444                        "Bad tcl parse? %s" % line)
1445
1446        # Virtual topology and visualization
1447        vtopo = self.gentopo(parse_vtopo.str)
1448        if not vtopo:
1449            raise service_error(service_error.internal, 
1450                    "Failed to generate virtual topology")
1451
1452        vis = self.genviz(vtopo)
1453        if not vis:
1454            raise service_error(service_error.internal, 
1455                    "Failed to generate visualization")
1456
1457        # save federant information
1458        for k in allocated.keys():
1459            tbparams[k]['federant'] = {\
1460                    'name': [ { 'localname' : eid} ],\
1461                    'emulab': tbparams[k]['emulab'],\
1462                    'master' : k == master,\
1463                }
1464
1465
1466        # Copy tarfiles and rpms needed at remote sites into a staging area
1467        try:
1468            for t in parse_tarfiles.list:
1469                if not os.path.exists("%s/tarfiles" % tmpdir):
1470                    os.mkdir("%s/tarfiles" % tmpdir)
1471                self.copy_file(t, "%s/tarfiles/%s" % \
1472                        (tmpdir, os.path.basename(t)))
1473            for r in parse_rpms.list:
1474                if not os.path.exists("%s/rpms" % tmpdir):
1475                    os.mkdir("%s/rpms" % tmpdir)
1476                self.copy_file(r, "%s/rpms/%s" % \
1477                        (tmpdir, os.path.basename(r)))
1478        except IOError, e:
1479            raise service_error(service_error.internal, 
1480                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1481
1482        thread_pool_info = self.thread_pool()
1483        threads = [ ]
1484
1485        for tb in [ k for k in allocated.keys() if k != master]:
1486            # Wait until we have a free slot to start the next testbed load
1487            thread_pool_info.acquire()
1488            while thread_pool_info.started - \
1489                    thread_pool_info.terminated >= self.nthreads:
1490                thread_pool_info.wait()
1491            thread_pool_info.release()
1492
1493            # Create and start a thread to start the segment, and save it to
1494            # get the return value later
1495            t  = self.pooled_thread(target=self.start_segment, 
1496                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1497                    pdata=thread_pool_info, trace_file=self.trace_file)
1498            threads.append(t)
1499            t.start()
1500
1501        # Wait until all finish (the first clause of the while is to make sure
1502        # one starts)
1503        thread_pool_info.acquire()
1504        while thread_pool_info.started == 0 or \
1505                thread_pool_info.started > thread_pool_info.terminated:
1506            thread_pool_info.wait()
1507        thread_pool_info.release()
1508
1509        # If none failed, start the master
1510        failed = [ t.getName() for t in threads if not t.rv ]
1511
1512        if len(failed) == 0:
1513            if not self.start_segment(master, eid, tbparams, tmpdir):
1514                failed.append(master)
1515
1516        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1517        # If one failed clean up, unless fail_soft is set
1518        if failed:
1519            if not fail_soft:
1520                for tb in succeeded:
1521                    self.stop_segment(tb, eid, tbparams)
1522                # Remove the placeholder
1523                self.state_lock.acquire()
1524                del self.state[eid]
1525                self.state_lock.release()
1526
1527                raise service_error(service_error.federant,
1528                    "Swap in failed on %s" % ",".join(failed))
1529        else:
1530            self.log.info("[start_segment]: Experiment %s started" % eid)
1531
1532        # Generate an ID for the experiment (slice) and a certificate that the
1533        # allocator can use to prove they own it.  We'll ship it back through
1534        # the encrypted connection.
1535        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1536
1537        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1538
1539        # Walk up tmpdir, deleting as we go
1540        for path, dirs, files in os.walk(tmpdir, topdown=False):
1541            for f in files:
1542                os.remove(os.path.join(path, f))
1543            for d in dirs:
1544                os.rmdir(os.path.join(path, d))
1545        os.rmdir(tmpdir)
1546
1547        resp = { 'federant' : [ tbparams[tb]['federant'] \
1548                for tb in tbparams.keys() \
1549                    if tbparams[tb].has_key('federant') ],\
1550                    'vtopo': vtopo,\
1551                    'vis' : vis,
1552                    'experimentID' : [\
1553                            { 'fedid': copy.copy(expid) }, \
1554                            { 'localname': eid },\
1555                        ],\
1556                    'experimentAccess': { 'X509' : expcert },\
1557                }
1558
1559        # Insert the experiment into our state and update the disk copy
1560        self.state_lock.acquire()
1561        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1562                for tb in tbparams.keys() \
1563                    if tbparams[tb].has_key('federant') ],\
1564                    'vtopo': vtopo,\
1565                    'vis' : vis,
1566                    'experimentID' : [\
1567                            { 'fedid': expid }, { 'localname': eid },\
1568                        ],\
1569                }
1570        self.state[eid] = self.state[expid]
1571        if self.state_filename: self.write_state()
1572        self.state_lock.release()
1573
1574        if not failed:
1575            return resp
1576        else:
1577            raise service_error(service_error.partial, \
1578                    "Partial swap in on %s" % ",".join(succeeded))
1579
1580
1581    def get_vtopo(self, req, fid):
1582        """
1583        Return the stored virtual topology for this experiment
1584        """
1585        rv = None
1586
1587        req = req.get('VtopoRequestBody', None)
1588        if not req:
1589            raise service_error(service_error.req,
1590                    "Bad request format (no VtopoRequestBody)")
1591        exp = req.get('experiment', None)
1592        if exp:
1593            if exp.has_key('fedid'):
1594                key = fedid(bits=exp['fedid'])
1595                keytype = "fedid"
1596            elif exp.has_key('localname'):
1597                key = exp['localname']
1598                keytype = "localname"
1599            else:
1600                raise service_error(service_error.req, "Unknown lookup type")
1601        else:
1602            raise service_error(service_error.req, "No request?")
1603
1604        self.state_lock.acquire()
1605        if self.state.has_key(key):
1606            rv = { 'experiment' : {keytype: key },\
1607                    'vtopo': self.state[key]['vtopo'],\
1608                }
1609        self.state_lock.release()
1610
1611        if rv: return rv
1612        else: raise service_error(service_error.req, "No such experiment")
1613
1614    def get_vis(self, req, fid):
1615        """
1616        Return the stored visualization for this experiment
1617        """
1618        rv = None
1619
1620        req = req.get('VisRequestBody', None)
1621        if not req:
1622            raise service_error(service_error.req,
1623                    "Bad request format (no VisRequestBody)")
1624        exp = req.get('experiment', None)
1625        if exp:
1626            if exp.has_key('fedid'):
1627                key = fedid(bits=exp['fedid'])
1628                keytype = "fedid"
1629            elif exp.has_key('localname'):
1630                key = exp['localname']
1631                keytype = "localname"
1632            else:
1633                raise service_error(service_error.req, "Unknown lookup type")
1634        else:
1635            raise service_error(service_error.req, "No request?")
1636
1637        self.state_lock.acquire()
1638        if self.state.has_key(key):
1639            rv =  { 'experiment' : {keytype: key },\
1640                    'vis': self.state[key]['vis'],\
1641                    }
1642        self.state_lock.release()
1643
1644        if rv: return rv
1645        else: raise service_error(service_error.req, "No such experiment")
1646
1647    def get_info(self, req, fid):
1648        """
1649        Return all the stored info about this experiment
1650        """
1651        rv = None
1652
1653        req = req.get('InfoRequestBody', None)
1654        if not req:
1655            raise service_error(service_error.req,
1656                    "Bad request format (no VisRequestBody)")
1657        exp = req.get('experiment', None)
1658        if exp:
1659            if exp.has_key('fedid'):
1660                key = fedid(bits=exp['fedid'])
1661                keytype = "fedid"
1662            elif exp.has_key('localname'):
1663                key = exp['localname']
1664                keytype = "localname"
1665            else:
1666                raise service_error(service_error.req, "Unknown lookup type")
1667        else:
1668            raise service_error(service_error.req, "No request?")
1669
1670        # The state may be massaged by the service function that called
1671        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1672        # state.
1673        self.state_lock.acquire()
1674        if self.state.has_key(key):
1675            rv = copy.deepcopy(self.state[key])
1676        self.state_lock.release()
1677
1678        if rv: return rv
1679        else: raise service_error(service_error.req, "No such experiment")
1680
1681
1682    def terminate_experiment(self, req, fid):
1683        """
1684        Swap this experiment out on the federants and delete the shared
1685        information
1686        """
1687        tbparams = { }
1688        req = req.get('TerminateRequestBody', None)
1689        if not req:
1690            raise service_error(service_error.req,
1691                    "Bad request format (no TerminateRequestBody)")
1692        exp = req.get('experiment', None)
1693        if exp:
1694            if exp.has_key('fedid'):
1695                key = fedid(bits=exp['fedid'])
1696                keytype = "fedid"
1697            elif exp.has_key('localname'):
1698                key = exp['localname']
1699                keytype = "localname"
1700            else:
1701                raise service_error(service_error.req, "Unknown lookup type")
1702        else:
1703            raise service_error(service_error.req, "No request?")
1704
1705        self.state_lock.acquire()
1706        fed_exp = self.state.get(key, None)
1707
1708        if fed_exp:
1709            # This branch of the conditional holds the lock to generate a
1710            # consistent temporary tbparams variable to deallocate experiments.
1711            # It releases the lock to do the deallocations and reacquires it to
1712            # remove the experiment state when the termination is complete.
1713            ids = []
1714            #  experimentID is a list of dicts that are self-describing
1715            #  identifiers.  This finds all the fedids and localnames - the
1716            #  keys of self.state - and puts them into ids.
1717            for id in fed_exp.get('experimentID', []):
1718                if id.has_key('fedid'): ids.append(id['fedid'])
1719                if id.has_key('localname'): ids.append(id['localname'])
1720
1721            # Construct enough of the tbparams to make the stop_segment calls
1722            # work
1723            for fed in fed_exp['federant']:
1724                try:
1725                    for e in fed['name']:
1726                        eid = e.get('localname', None)
1727                        if eid: break
1728                    else:
1729                        continue
1730
1731                    p = fed['emulab']['project']
1732
1733                    project = p['name']['localname']
1734                    tb = p['testbed']['localname']
1735                    user = p['user'][0]['userID']['localname']
1736
1737                    domain = fed['emulab']['domain']
1738                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1739                except KeyError, e:
1740                    continue
1741                tbparams[tb] = {\
1742                        'user': user,\
1743                        'domain': domain,\
1744                        'project': project,\
1745                        'host': host,\
1746                        'eid': eid,\
1747                    }
1748            self.state_lock.release()
1749
1750            # Stop everyone.
1751            for tb in tbparams.keys():
1752                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1753
1754            # Remove teh terminated experiment
1755            self.state_lock.acquire()
1756            for id in ids:
1757                if self.state.has_key(id): del self.state[id]
1758
1759            if self.state_filename: self.write_state()
1760            self.state_lock.release()
1761
1762            return { 'experiment': exp }
1763        else:
1764            # Don't forget to release the lock
1765            self.state_lock.release()
1766            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.