source: fedd/fedd_create_experiment.py @ 3441fe3

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 3441fe3 was 3441fe3, checked in by Ted Faber <faber@…>, 16 years ago

closer to fedd integration

  • Property mode set to 100644
File size: 37.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import xml.parsers.expat
11
12import re
13import random
14import string
15import subprocess
16import tempfile
17import copy
18
19import traceback
20
21from threading import *
22
23from subprocess import *
24
25from fedd_services import *
26from fedd_internal_services import *
27from fedd_util import *
28import parse_detail
29from service_error import *
30
31class fedd_create_experiment_local:
32    scripts = ["fed_bootstrap", "federate.sh", "smbmount.FreeBSD.pl",
33        "smbmount.Linux.pl", "make_hosts", "fed-tun.pl", "fed-tun.ucb.pl",
34        "fed_evrepeater", "rc.accounts.patch"]
35   
36    def __init__(self, 
37            cert_file=None,
38            cert_pwd=None,
39            exp_stem="faber-splitter",
40            debug=False,
41            muxmax=2,
42            nthreads=2,
43            randomize_experiments=False,
44            scp_exec="/usr/bin/scp",
45            scripts_dir="./", 
46            splitter=None,
47            ssh_exec="/usr/bin/ssh",
48            ssh_identity_file=None,
49            ssh_keygen="/usr/bin/ssh-keygen",
50            ssh_pubkey_file=None,
51            ssh_type="rsa",
52            tbmap=None,
53            tclsh="/usr/local/bin/otclsh",
54            tcl_splitter="/usr/testbed/lib/ns2ir/parse.tcl",
55            trace_file=None,
56            trusted_certs=None,
57            ):
58        self.scripts = fedd_create_experiment_local.scripts
59        self.thread_with_rv = fedd_create_experiment_local.pooled_thread
60        self.thread_pool = fedd_create_experiment_local.thread_pool
61
62        self.cert_file = cert_file
63        self.cert_pwd = cert_pwd
64        self.exp_stem = exp_stem
65        self.debug = debug
66        self.muxmax = muxmax
67        self.nthreads = nthreads
68        self.randomize_experiments = randomize_experiments
69        self.scp_exec = scp_exec
70        self.scripts_dir = scripts_dir
71        self.splitter = splitter
72        self.ssh_exec=ssh_exec
73        self.ssh_keygen = ssh_keygen
74        self.ssh_identity_file = ssh_identity_file
75        self.ssh_type = ssh_type
76        self.tclsh = tclsh
77        self.tcl_splitter = tcl_splitter
78        self.tbmap = tbmap
79        self.trace_file = trace_file
80        self.trusted_certs=trusted_certs
81
82        self.def_expstart = \
83                "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
84        self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
85        self.def_gwstart = \
86                "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
87        self.def_mgwstart = \
88                "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
89        self.def_gwimage = "FBSD61-TUNNEL2";
90        self.def_gwtype = "pc";
91
92
93        if ssh_pubkey_file:
94            try:
95                f = open(ssh_pubkey_file, 'r')
96                self.ssh_pubkey = f.read()
97                f.close()
98            except IOError:
99                raise service_error(service_error.internal,
100                        "Cannot read sshpubkey")
101
102        # Confirm federation scripts in the right place
103        for s in self.scripts:
104            if not os.path.exists(self.scripts_dir + "/" + s):
105                raise service_error(service_error.server_config,
106                        "%s/%s not in local script dir" % (self.scripts_dir, s))
107    class thread_pool:
108        def __init__(self):
109            self.changed = Condition()
110            self.started = 0
111            self.terminated = 0
112
113        def acquire(self):
114            self.changed.acquire()
115
116        def release(self):
117            self.changed.release()
118
119        def wait(self, timeout = None):
120            self.changed.wait(timeout)
121
122        def start(self):
123            self.changed.acquire()
124            self.started += 1
125            self.changed.notifyAll()
126            self.changed.release()
127
128        def terminate(self):
129            self.changed.acquire()
130            self.terminated += 1
131            self.changed.notifyAll()
132            self.changed.release()
133
134        def clear(self):
135            self.changed.acquire()
136            self.started = 0
137            self.terminated =0
138            self.changed.notifyAll()
139            self.changed.release()
140
141
142
143    class pooled_thread(Thread):
144        def __init__(self, group=None, target=None, name=None, args=(), 
145                kwargs={}, pdata=None, trace_file=None):
146            Thread.__init__(self, group, target, name, args, kwargs)
147            self.rv = None
148            self.exception = None
149            self.target=target
150            self.args = args
151            self.kwargs = kwargs
152            self.pdata = pdata
153            self.trace_file = trace_file
154       
155        def run(self):
156            if self.pdata:
157                self.pdata.start()
158
159            if self.target:
160                try:
161                    self.rv = self.target(*self.args, **self.kwargs)
162                except service_error, s:
163                    self.exception = s
164                    if self.trace_file:
165                        print >>self.trace_file, "Thread exception: %s %s" % \
166                                (s.code_string(), s.desc)
167                   
168                except:
169                    self.exception = sys.exc_info()[1]
170                    if self.trace_file:
171                        print >>self.trace_file, \
172                                "Unexpected thread exception: %s" % \
173                                self.exception
174                        print >>self.trace_file, "Trace: %s" % \
175                                traceback.format_exc()
176            if self.pdata:
177                self.pdata.terminate()
178
179    def copy_file(self, src, dest, size=1024):
180        """
181        Exceedingly simple file copy.
182        """
183        s = open(src,'r')
184        d = open(dest, 'w')
185
186        buf = "x"
187        while buf != "":
188            buf = s.read(size)
189            d.write(buf)
190        s.close()
191        d.close()
192
193    def scp_file(self, file, user, host, dest=""):
194        """
195        scp a file to the remote host.
196        """
197
198        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
199
200        trace = self.trace_file
201        if not trace:
202            try:
203                trace = open("/dev/null", "w")
204            except IOError:
205                raise service_error(service_error.internal,
206                        "Cannot open /dev/null??");
207
208        if not self.debug:
209            rv = call(scp_cmd, stdout=trace, stderr=trace)
210        else:
211            if self.trace_file: 
212                print >>self.trace_file, "debug [scp_file]: %s" % \
213                        " ".join(scp_cmd)
214            rv = 0
215
216        return rv == 0
217
218    def ssh_cmd(self, user, host, cmd, wname=None):
219        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
220
221        trace = self.trace_file
222        if not trace:
223            try:
224                trace = open("/dev/null", "w")
225            except IOError:
226                raise service_error(service_error.internal,
227                        "Cannot open /dev/null??");
228
229        if not self.debug:
230            sub = Popen(sh_str, shell=True, stdout=trace, stderr=trace)
231            return sub.wait() == 0
232        else:
233            if self.trace_file:
234                print >>self.trace_file,"debug [ssh_cmd]: %s" % sh_str
235            return True
236
237    def ship_scripts(self, host, user, dest_dir):
238        if self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
239            for s in self.scripts:
240                if not self.scp_file("%s/%s" % (self.scripts_dir, s),
241                        user, host, dest_dir):
242                    return False
243            return True
244        else:
245            return False
246
247    def ship_configs(self, host, user, src_dir, dest_dir):
248        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
249            return False
250        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
251            return False
252
253        for f in os.listdir(src_dir):
254            if os.path.isdir(f):
255                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
256                        "%s/%s" % (dest_dir, f)):
257                    return False
258            else:
259                if not self.scp_file("%s/%s" % (src_dir, f), 
260                        user, host, dest_dir):
261                    return False
262        return True
263
264    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
265        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
266        user = tbparams[tb]['user']
267        pid = tbparams[tb]['project']
268        # XXX
269        base_confs = ( "hosts", "vtopo.xml", "viz.xml")
270        tclfile = "%s.%s.tcl" % (eid, tb)
271        expinfo_exec = "/usr/testbed/bin/expinfo"
272        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
273        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
274        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
275        state_re = re.compile("State:\s+(\w+)")
276        no_exp_re = re.compile("^No\s+such\s+experiment")
277        state = None
278        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
279
280
281        if self.trace_file:
282            print >>self.trace_file, "status request: %s" % " ".join(cmd)
283       
284        if not self.trace_file:
285            try:
286                st_file = open("/dev/null", "w")
287            except IOError:
288                raise service_error(service_error.internal, 
289                        "Cannot open /dev/null!?")
290        else:
291            st_file = self.trace_file
292
293        status = Popen(cmd, stdout=PIPE, stderr=st_file)
294        for line in status.stdout:
295            m = state_re.match(line)
296            if m: state = m.group(1)
297            else:
298                m = no_exp_re.match(line)
299                if m: state = "none"
300        rv = status.wait()
301        # No experiment returns a non-zero rv.  If we successfully parsed a
302        # "none" outcome, ignore teh return code.
303        if rv != 0 and state != "none":
304            raise service_error(service_error.internal,
305                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
306        if self.trace_file:
307            print >>self.trace_file, "%s: %s" % (tb, state)
308            print >>self.trace_file, "transferring experiment to %s" % tb
309
310        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
311            return False
312        # Clear and create the tarfiles and rpm directories
313        for d in (tarfiles_dir, rpms_dir):
314            if not self.ssh_cmd(user, host, 
315                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
316                return False
317            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
318                    "create tarfiles"):
319                return False
320       
321        if state == 'active':
322            # Remote experiment is active.  Modify it.
323            for f in base_confs:
324                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
325                        "%s/%s" % (proj_dir, f)):
326                    return False
327            if not self.ship_scripts(host, user, proj_dir):
328                return False
329            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
330                    proj_dir):
331                return False
332            if os.path.isdir("%s/tarfiles" % tmpdir):
333                if not self.ship_configs(host, user,
334                        "%s/tarfiles" % tmpdir, tarfiles_dir):
335                    return False
336            if os.path.isdir("%s/rpms" % tmpdir):
337                if not self.ship_configs(host, user,
338                        "%s/rpms" % tmpdir, tarfiles_dir):
339                    return False
340            if self.trace_file:
341                print >>self.trace_file, "Modifying %s on %s" % (eid, tb)
342            if not self.ssh_cmd(user, host,
343                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
344                            (pid, eid, tclfile), "modexp"):
345                return False
346            return True
347        elif state == "swapped":
348            # Remote experiment swapped out.  Modify it and swap it in.
349            for f in base_confs:
350                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
351                        "%s/%s" % (proj_dir, f)):
352                    return False
353            if not self.ship_scripts(host, user, proj_dir):
354                return False
355            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
356                    proj_dir):
357                return False
358            if os.path.isdir("%s/tarfiles" % tmpdir):
359                if not self.ship_configs(host, user,
360                        "%s/tarfiles" % tmpdir, tarfiles_dir):
361                    return False
362            if os.path.isdir("%s/rpms" % tmpdir):
363                if not self.ship_configs(host, user,
364                        "%s/rpms" % tmpdir, tarfiles_dir):
365                    return False
366            if self.trace_file:
367                print >>self.trace_file, "Modifying %s on %s" % (eid, tb)
368            if not self.ssh_cmd(user, host,
369                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
370                    "modexp"):
371                return False
372            if self.trace_file:
373                print >>self.trace_file, "Swapping %s in on %s" % (eid, tb)
374            if not self.ssh_cmd(user, host,
375                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
376                    "swapexp"):
377                return False
378            return True
379        elif state == "none":
380            # No remote experiment.  Create one.  We do this in 2 steps so we
381            # can put the configuration files and scripts into the new
382            # experiment directories.
383
384            # Tarfiles must be present for creation to work
385            if os.path.isdir("%s/tarfiles" % tmpdir):
386                if not self.ship_configs(host, user,
387                        "%s/tarfiles" % tmpdir, tarfiles_dir):
388                    return False
389            if os.path.isdir("%s/rpms" % tmpdir):
390                if not self.ship_configs(host, user,
391                        "%s/rpms" % tmpdir, tarfiles_dir):
392                    return False
393            if self.trace_file:
394                print >>self.trace_file, "Creating %s on %s" % (eid, tb)
395            if not self.ssh_cmd(user, host,
396                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
397                            (pid, eid, tclfile), "startexp"):
398                return False
399            # After startexp the per-experiment directories exist
400            for f in base_confs:
401                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
402                        "%s/%s" % (proj_dir, f)):
403                    return False
404            if not self.ship_scripts(host, user, proj_dir):
405                return False
406            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
407                    proj_dir):
408                return False
409            if self.trace_file:
410                print >>self.trace_file, "Swapping %s in on %s" % (eid, tb)
411            if not self.ssh_cmd(user, host,
412                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
413                    "swapexp"):
414                return False
415            return True
416        else:
417            if self.trace_file:
418                print >>self.trace_file, "unknown state %s" % state
419            return False
420
421    def stop_segment(self, tb, eid, tbparams):
422        user = tbparams[tb]['user']
423        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
424        pid = tbparams[tb]['project']
425
426        if self.trace_file:
427            print >>self.trace_file, "Stopping %s on %s" % (eid, tb)
428        return self.ssh_cmd(user, host,
429                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
430
431       
432    def generate_ssh_keys(self, dest, type="rsa" ):
433        """
434        Generate a set of keys for the gateways to use to talk.
435
436        Keys are of type type and are stored in the required dest file.
437        """
438        valid_types = ("rsa", "dsa")
439        t = type.lower();
440        if t not in valid_types: raise ValueError
441
442        trace = self.trace_file
443        if not trace:
444            try:
445                trace = open("/dev/null", "w")
446            except IOError:
447                raise service_error(service_error.internal,
448                        "Cannot open /dev/null??");
449
450        # May raise CalledProcessError
451        rv = call([self.ssh_keygen, '-t', t, '-N', '', '-f', dest],
452                stdout=trace, stderr=trace)
453        if rv != 0:
454            raise service_error(service_error.internal, 
455                    "Cannot generate nonce ssh keys.  %s return code %d" \
456                            % (self.ssh_keygen, rv))
457
458    def genviz(self, topo_file, viz_file):
459        """
460        Generate the visualization file from the topology file
461        """
462
463        class topo_parse:
464            """
465            Parse the vtopo file into a set of lans, links and nodes.
466            """
467            def __init__(self):
468                self.links = {}         # Each link is a list of at most 2 nodes
469                self.lans = {}          # Lans have more than 2 nodes
470                self.nodes = {}         # The nodes in the experiment
471                self.lan = {}           # The current link/len being collected
472                self.in_lan = False     # Is the conatining element lan?
473                self.in_node = False    # Is the conatining element node?
474                self.chars = None       # Last set of marked-up chars
475
476            def start_element(self, name, attrs):
477                """
478                New element started.  Set flags and clear lan
479                """
480                if name == "node":
481                    self.in_node = True
482                elif name == "lan":
483                    self.in_lan = True
484                    self.lan.clear()
485           
486            def end_element(self, name):
487                """
488                End of element.  Collect data if appropriate
489
490                If a node or lan is ending, create an entry in nodes or
491                lans/links.  If a vname/vnode in a node or lan is ending,
492                capture its name.  If a lan is ending, add the node to the
493                evolving link or lan.
494                """
495                if name == "node":
496                    self.in_node = False
497                if self.in_node and name == "vname":
498                    self.nodes[self.chars] = "node"
499                if self.in_lan:
500                    if name != "lan":
501                        if name == 'vname' or name == 'vnode':
502                            self.lan[name] = self.chars
503                    else:
504                        self.in_lan = False
505                        vname = self.lan['vname']
506                        links = self.links.get(vname, [])
507                        if len(links) == 2:
508                            # This link needs to be a lan instead
509                            self.nodes[vname] = "lan"
510                            self.lans[vname] = \
511                                    [ l for l in links, self.lan['vnode'] ]
512                            del self.links[vname]
513                            self.lan = {}
514                            return
515                        lans = self.lans.get(vname, [])
516                        if len(lans) > 0:
517                            lans.append(self.lan['vnode'])
518                            self.lan = {}
519                            return
520                        if not vname in self.links:
521                            self.links[vname] = []
522                        self.links[vname].append(self.lan['vnode'])
523                        self.lan = {}
524                        return
525
526            def found_chars(self, data):
527                """
528                Capture marked up chars for later
529                """
530                self.chars = data
531
532        neato = "/usr/local/bin/neato"
533        # These are used to parse neato output and to create the visualization
534        # file.
535        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
536        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
537                "%s</type></node>" 
538        try:
539            df, dotname = tempfile.mkstemp()
540            dotfile = os.fdopen(df, 'w')
541            infile = open(topo_file, 'r')
542            out = open(viz_file, "w")
543        except IOError:
544            raise service_error(service_error.internal,
545                    "Failed to open file in genviz")
546
547        # Parse the topology file using XML tools
548        tp = topo_parse();
549        parser = xml.parsers.expat.ParserCreate()
550        parser.StartElementHandler = tp.start_element
551        parser.EndElementHandler = tp.end_element
552        parser.CharacterDataHandler = tp.found_chars
553
554        parser.ParseFile(infile)
555
556        # Generate a dot/neato input file from the links, nodes and lans
557        try:
558            print >>dotfile, "graph G {"
559            for n in tp.nodes.keys():
560                print >>dotfile, '\t"%s"' % n
561            for l in tp.links.keys():
562                print >>dotfile, " -- ".join(tp.links[l])
563            for l in tp.lans.keys():
564                for n in tp.lans[l]:
565                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
566            print >>dotfile, "}"
567            dotfile.close()
568        except IOError:
569            raise service_error(service_error.internal, "Cannot write dot file")
570
571        # Use dot to create a visualization
572        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
573                '-Gpack=true', dotname], stdout=PIPE)
574
575        # Translate dot to emulab format
576        try:
577            print >>out, "<vis>"
578            for line in dot.stdout:
579                m = vis_re.match(line)
580                if m:
581                    n, x, y = (m.group(1), m.group(2), m.group(3))
582                    if tp.nodes.has_key(n):
583                        print >>out, vis_fmt % (n, x, y, tp.nodes[n])
584            print >>out, "</vis>"
585            out.close()
586        except IOError:
587            raise service_error(service_error.internal,
588                    "Failed to write visualization file")
589        rv = dot.wait()
590        os.remove(dotname)
591        return rv == 0
592
593
594    def get_access(self, tb, nodes, user, tbparam):
595        """
596        Get access to testbed through fedd and set the parameters for that tb
597        """
598
599        translate_attr = {
600            'slavenodestartcmd': 'expstart',
601            'slaveconnectorstartcmd': 'gwstart',
602            'masternodestartcmd': 'mexpstart',
603            'masterconnectorstartcmd': 'mgwstart',
604            'connectorimage': 'gwimage',
605            'connectortype': 'gwtype',
606            'tunnelcfg': 'tun',
607            'smbshare': 'smbshare',
608        }
609
610        # XXX multi-level access
611        uri = self.tbmap.get(tb, None)
612        if not uri:
613            raise service_error(serice_error.server_config, 
614                    "Unknown testbed: %s" % tb)
615
616        # The basic request
617        req = {\
618                'destinationTestbed' : { 'uri' : uri },
619                'user':  user,
620                'allocID' : { 'username': 'test' },
621                'access' : [ { 'sshPubkey' : self.ssh_pubkey } ]
622            }
623       
624        # node resources if any
625        if nodes != None and len(nodes) > 0:
626            rnodes = [ ]
627            for n in nodes:
628                rn = { }
629                image, hw, count = n.split(":")
630                if image: rn['image'] = [ image ]
631                if hw: rn['hardware'] = [ hw ]
632                if count: rn['count'] = int(count)
633                rnodes.append(rn)
634            req['resources']= { }
635            req['resources']['node'] = rnodes
636
637        # No retry loop here.  Proxy servers must correctly authenticate
638        # themselves without help
639        try:
640            ctx = fedd_ssl_context(self.cert_file, 
641                    self.trusted_certs, password=self.cert_pwd)
642        except SSL.SSLError:
643            raise service_error(service_error.server_config, 
644                    "Server certificates misconfigured")
645
646        loc = feddServiceLocator();
647        port = loc.getfeddPortType(uri,
648                transport=M2Crypto.httpslib.HTTPSConnection, 
649                transdict={ 'ssl_context' : ctx })
650
651        # Reconstruct the full request message
652        msg = RequestAccessRequestMessage()
653        msg.set_element_RequestAccessRequestBody(
654                pack_soap(msg, "RequestAccessRequestBody", req))
655        try:
656            resp = port.RequestAccess(msg)
657        except ZSI.ParseException, e:
658            raise service_error(service_error.req,
659                    "Bad format message (XMLRPC??): %s" %
660                    str(e))
661        r = unpack_soap(resp)
662
663        if r.has_key('RequestAccessResponseBody'):
664            r = r['RequestAccessResponseBody']
665        else:
666            raise service_error(service_error.proxy,
667                    "Bad proxy response")
668
669
670        e = r['emulab']
671        p = e['project']
672        tbparam[tb] = { 
673                "boss": e['boss'],
674                "host": e['ops'],
675                "domain": e['domain'],
676                "fs": e['fileServer'],
677                "eventserver": e['eventServer'],
678                "project": unpack_id(p['name']),
679                "emulab" : e
680                }
681
682        for u in p['user']:
683            tbparam[tb]['user'] = unpack_id(u['userID'])
684
685        for a in e['fedAttr']:
686            if a['attribute']:
687                key = translate_attr.get(a['attribute'].lower(), None)
688                if key:
689                    tbparam[tb][key]= a['value']
690       
691    class current_testbed:
692        def __init__(self, eid, tmpdir):
693            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
694            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
695            self.current_testbed = None
696            self.testbed_file = None
697
698            self.def_expstart = \
699                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
700            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
701            self.def_gwstart = \
702                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
703            self.def_mgwstart = \
704                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
705            self.def_gwimage = "FBSD61-TUNNEL2";
706            self.def_gwtype = "pc";
707
708            self.eid = eid
709            self.tmpdir = tmpdir
710
711        def __call__(self, line, master, allocated, tbparams):
712            # Capture testbed topology descriptions
713            if self.current_testbed == None:
714                m = self.begin_testbed.match(line)
715                if m != None:
716                    self.current_testbed = m.group(1)
717                    if self.current_testbed == None:
718                        raise service_error(service_error.req,
719                                "Bad request format (unnamed testbed)")
720                    allocated[self.current_testbed] = \
721                            allocated.get(self.current_testbed,0) + 1
722                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
723                    if not os.path.exists(tb_dir):
724                        try:
725                            os.mkdir(tb_dir)
726                        except IOError:
727                            raise service_error(service_error.internal,
728                                    "Cannot create %s" % tb_dir)
729                    try:
730                        self.testbed_file = open("%s/%s.%s.tcl" %
731                                (tb_dir, self.eid, self.current_testbed), 'w')
732                    except IOError:
733                        self.testbed_file = None
734                    return True
735                else: return False
736            else:
737                m = self.end_testbed.match(line)
738                if m != None:
739                    if m.group(1) != self.current_testbed:
740                        raise service_error(service_error.internal, 
741                                "Mismatched testbed markers!?")
742                    if self.testbed_file != None: 
743                        self.testbed_file.close()
744                        self.testbed_file = None
745                    self.current_testbed = None
746                elif self.testbed_file:
747                    # Substitute variables and put the line into the local
748                    # testbed file.
749                    gwtype = tbparams[self.current_testbed].get('gwtype', 
750                            self.def_gwtype)
751                    gwimage = tbparams[self.current_testbed].get('gwimage', 
752                            self.def_gwimage)
753                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
754                            self.def_mgwstart)
755                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
756                            self.def_mexpstart)
757                    gwstart = tbparams[self.current_testbed].get('gwstart', 
758                            self.def_gwstart)
759                    expstart = tbparams[self.current_testbed].get('expstart', 
760                            self.def_expstart)
761                    project = tbparams[self.current_testbed].get('project')
762                    line = re.sub("GWTYPE", gwtype, line)
763                    line = re.sub("GWIMAGE", gwimage, line)
764                    if self.current_testbed == master:
765                        line = re.sub("GWSTART", mgwstart, line)
766                        line = re.sub("EXPSTART", mexpstart, line)
767                    else:
768                        line = re.sub("GWSTART", gwstart, line)
769                        line = re.sub("EXPSTART", expstart, line)
770                    # XXX: does `` embed without doing enything else?
771                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
772                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
773                    line = re.sub("EID", self.eid, line)
774                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
775                            (project, self.eid), line)
776                    print >>self.testbed_file, line
777                return True
778
779    class allbeds:
780        def __init__(self, get_access):
781            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
782            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
783            self.in_allbeds = False
784            self.get_access = get_access
785
786        def __call__(self, line, user, tbparams):
787            # Testbed access parameters
788            if not self.in_allbeds:
789                if self.begin_allbeds.match(line):
790                    self.in_allbeds = True
791                    return True
792                else:
793                    return False
794            else:
795                if self.end_allbeds.match(line):
796                    self.in_allbeds = False
797                else:
798                    nodes = line.split('|')
799                    tb = nodes.pop(0)
800                    self.get_access(tb, nodes, user, tbparams)
801                return True
802
803    class gateways:
804        def __init__(self, eid, master, tmpdir, gw_pubkey,
805                gw_secretkey, copy_file):
806            self.begin_gateways = \
807                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
808            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
809            self.current_gateways = None
810            self.control_gateway = None
811            self.active_end = { }
812
813            self.eid = eid
814            self.master = master
815            self.tmpdir = tmpdir
816            self.gw_pubkey_base = gw_pubkey
817            self.gw_secretkey_base = gw_secretkey
818
819            self.copy_file = copy_file
820
821
822        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
823                active_end, tbparams, dtb, myname, desthost, type):
824            """
825            Produce a gateway configuration file from a gateways line.
826            """
827
828            sproject = tbparams[gw].get('project', 'project')
829            dproject = tbparams[dtb].get('project', 'project')
830            sdomain = ".%s.%s%s" % (eid, sproject,
831                    tbparams[gw].get('domain', ".example.com"))
832            ddomain = ".%s.%s%s" % (eid, dproject,
833                    tbparams[dtb].get('domain', ".example.com"))
834            boss = tbparams[master].get('boss', "boss")
835            fs = tbparams[master].get('fs', "fs")
836            event_server = "%s%s" % \
837                    (tbparams[master].get('eventserver', "event_server"),
838                            tbparams[master].get('domain', "example.com"))
839            remote_event_server = "%s%s" % \
840                    (tbparams[dtb].get('eventserver', "event_server"),
841                            tbparams[dtb].get('domain', "example.com"))
842
843            remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
844            local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
845            tunnel_cfg = tbparams[gw].get("tun", "false")
846
847            conf_file = "%s%s.gw.conf" % (myname, sdomain)
848            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
849
850            # translate to lower case so the `hostname` hack for specifying
851            # configuration files works.
852            conf_file = conf_file.lower();
853            remote_conf_file = remote_conf_file.lower();
854
855            if dtb == master:
856                active = "false"
857            elif gw == master:
858                active = "true"
859            elif active_end.has_key['%s-%s' % (dtb, gw)]:
860                active = "false"
861            else:
862                active_end['%s-%s' % (gw, dtb)] = 1
863                active = "true"
864
865            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
866            print >>gwconfig, "Active: %s" % active
867            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
868            print >>gwconfig, "BossName: %s" % boss
869            print >>gwconfig, "FsName: %s" % fs
870            print >>gwconfig, "EventServerName: %s" % event_server
871            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
872            print >>gwconfig, "Type: %s" % type
873            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
874            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
875                    local_script_dir
876            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
877            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
878            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
879                    (remote_script_dir, remote_conf_file)
880            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
881            print >>gwconfig, "Pubkeys: %s/%s" % (local_script_dir, pubkey)
882            print >>gwconfig, "Privkeys: %s/%s" % (local_script_dir, privkey)
883            gwconfig.close()
884
885            return active == "true"
886
887        def __call__(self, line, allocated, tbparams):
888            # Process gateways
889            if not self.current_gateways:
890                m = self.begin_gateways.match(line)
891                if m:
892                    self.current_gateways = m.group(1)
893                    if allocated.has_key(self.current_gateways):
894                        # This test should always succeed
895                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
896                        if not os.path.exists(tb_dir):
897                            try:
898                                os.mkdir(tb_dir)
899                            except IOError:
900                                raise service_error(service_error.internal,
901                                        "Cannot create %s" % tb_dir)
902                    else:
903                        # XXX
904                        print >>sys.stderr, \
905                            "Ignoring gateways for unknown testbed %s" \
906                                    % self.current_gateways
907                        self.current_gateways = None
908                    return True
909                else:
910                    return False
911            else:
912                m = self.end_gateways.match(line)
913                if m :
914                    if m.group(1) != self.current_gateways:
915                        raise service_error(service_error.internal,
916                                "Mismatched gateway markers!?")
917                    if self.control_gateway:
918                        try:
919                            cc = open("%s/%s/client.conf" %
920                                    (self.tmpdir, self.current_gateways), 'w')
921                            print >>cc, "ControlGateway: %s" % \
922                                    self.control_gateway
923                            if tbparams[self.master].has_key('smbshare'):
924                                print >>cc, "SMBSHare: %s" % \
925                                        tbparams[self.master]['smbshare']
926                            print >>cc, "ProjectUser: %s" % \
927                                    tbparams[self.master]['user']
928                            print >>cc, "ProjectName: %s" % \
929                                    tbparams[self.master]['project']
930                            cc.close()
931                        except IOError:
932                            raise service_error(service_error.internal,
933                                    "Error creating client config")
934                    else:
935                        if self.trace_file:
936                            print >>sys.stderr, "No control gateway for %s" %\
937                                    self.current_gateways
938                    self.current_gateways = None
939                else:
940                    dtb, myname, desthost, type = line.split(" ")
941
942                    if type == "control" or type == "both":
943                        self.control_gateway = "%s.%s.%s%s" % (myname, 
944                                self.eid, 
945                                tbparams[self.current_gateways]['project'],
946                                tbparams[self.current_gateways]['domain'])
947                    try:
948                        active = self.gateway_conf_file(self.current_gateways,
949                                self.master, self.eid, self.gw_pubkey_base,
950                                self.gw_secretkey_base,
951                                self.active_end, tbparams, dtb, myname,
952                                desthost, type)
953                    except IOError, e:
954                        raise service_error(service_error.internal,
955                                "Failed to write config file for %s" % \
956                                        self.current_gateway)
957           
958                    gw_pubkey = "%s/keys/%s" % \
959                            (self.tmpdir, self.gw_pubkey_base)
960                    gw_secretkey = "%s/keys/%s" % \
961                            (self.tmpdir, self.gw_secretkey_base)
962
963                    pkfile = "%s/%s/%s" % \
964                            ( self.tmpdir, self.current_gateways, 
965                                    self.gw_pubkey_base)
966                    skfile = "%s/%s/%s" % \
967                            ( self.tmpdir, self.current_gateways, 
968                                    self.gw_secretkey_base)
969
970                    if not os.path.exists(pkfile):
971                        try:
972                            self.copy_file(gw_pubkey, pkfile)
973                        except IOError:
974                            service_error(service_error.internal,
975                                    "Failed to copy pubkey file")
976
977                    if active and not os.path.exists(skfile):
978                        try:
979                            self.copy_file(gw_secretkey, skfile)
980                        except IOError:
981                            service_error(service_error.internal,
982                                    "Failed to copy secretkey file")
983                return True
984
985    class shunt_to_file:
986        def __init__(self, begin, end, filename):
987            self.begin = re.compile(begin)
988            self.end = re.compile(end)
989            self.in_shunt = False
990            self.file = None
991            self.filename = filename
992
993        def __call__(self, line):
994            if not self.in_shunt:
995                if self.begin.match(line):
996                    self.in_shunt = True
997                    try:
998                        self.file = open(self.filename, "w")
999                    except:
1000                        self.file = None
1001                        raise
1002                    return True
1003                else:
1004                    return False
1005            else:
1006                if self.end.match(line):
1007                    if self.file: 
1008                        self.file.close()
1009                        self.file = None
1010                    self.in_shunt = False
1011                else:
1012                    if self.file:
1013                        print >>self.file, line
1014                return True
1015
1016    class shunt_to_list:
1017        def __init__(self, begin, end):
1018            self.begin = re.compile(begin)
1019            self.end = re.compile(end)
1020            self.in_shunt = False
1021            self.list = [ ]
1022       
1023        def __call__(self, line):
1024            if not self.in_shunt:
1025                if self.begin.match(line):
1026                    self.in_shunt = True
1027                    return True
1028                else:
1029                    return False
1030            else:
1031                if self.end.match(line):
1032                    self.in_shunt = False
1033                else:
1034                    self.list.append(line)
1035                return True
1036
1037    def create_experiment(self, req, fid):
1038        try:
1039            tmpdir = tempfile.mkdtemp(prefix="split-")
1040        except IOError:
1041            raise service_error(service_error.internal, "Cannot create tmp dir")
1042
1043        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1044        gw_secretkey_base = "fed.%s" % self.ssh_type
1045        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1046        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1047        tclfile = tmpdir + "/experiment.tcl"
1048        tbparams = { }
1049
1050        pid = "dummy"
1051        gid = "dummy"
1052        eid = self.exp_stem
1053        if self.randomize_experiments:
1054            for i in range(0,5):
1055                eid += random.choice(string.ascii_letters)
1056        # XXX
1057        fail_soft = False
1058
1059        try:
1060            os.mkdir(tmpdir+"/keys")
1061        except OSError:
1062            raise service_error(service_error.internal,
1063                    "Can't make temporary dir")
1064
1065        # The tcl parser needs to read a file so put the content into that file
1066        file_content=req.get('experimentdescription', None)
1067        if file_content != None:
1068            try:
1069                f = open(tclfile, 'w')
1070                f.write(file_content)
1071                f.close()
1072            except IOError:
1073                raise service_error(service_error.internal,
1074                        "Cannot write temp experiment description")
1075        else:
1076            raise service_error(service_error.req, "No experiment description")
1077
1078        try:
1079            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1080        except ValueError:
1081            raise service_error(service_error.server_config, 
1082                    "Bad key type (%s)" % self.ssh_type)
1083
1084        user = req.get('user', None)
1085        if user == None:
1086            raise service_error(service_error.req, "No user")
1087
1088        master = req.get('master', None)
1089        if master == None:
1090            raise service_error(service_error.req, "No master testbed label")
1091       
1092       
1093        tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1094            str(self.muxmax), '-m', master, pid, gid, eid, tclfile]
1095        tclparser = Popen(tclcmd, stdout=PIPE)
1096
1097        allocated = { }
1098        started = { }
1099
1100        parse_current_testbed = self.current_testbed(eid, tmpdir)
1101        parse_allbeds = self.allbeds(self.get_access)
1102        parse_gateways = self.gateways(eid, master, tmpdir,
1103                gw_pubkey_base, gw_secretkey_base, self.copy_file)
1104        parse_vtopo = self.shunt_to_file("^#\s+Begin\s+Vtopo",
1105                    "^#\s+End\s+Vtopo", tmpdir + "/vtopo.xml")
1106        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1107                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1108        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1109                "^#\s+End\s+tarfiles")
1110        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1111                "^#\s+End\s+rpms")
1112
1113        for line in tclparser.stdout:
1114            line = line.rstrip()
1115            if parse_current_testbed(line, master, allocated, tbparams):
1116                continue
1117            elif parse_allbeds(line, user, tbparams):
1118                continue
1119            elif parse_gateways(line, allocated, tbparams):
1120                continue
1121            elif parse_vtopo(line):
1122                continue
1123            elif parse_hostnames(line):
1124                continue
1125            elif parse_tarfiles(line):
1126                continue
1127            elif parse_rpms(line):
1128                continue
1129            else:
1130                raise service_error(service_error.internal, 
1131                        "Bad tcl parse? %s" % line)
1132
1133        self.genviz(tmpdir + "/vtopo.xml", tmpdir + "/viz.xml")
1134
1135        # Copy tarfiles and rpms needed at remote sites into a staging area
1136        try:
1137            for t in parse_tarfiles.list:
1138                if not os.path.exists("%s/tarfiles" % tmpdir):
1139                    os.mkdir("%s/tarfiles" % tmpdir)
1140                self.copy_file(t, "%s/tarfiles/%s" % \
1141                        (tmpdir, os.path.basename(t)))
1142            for r in parse_rpms.list:
1143                if not os.path.exists("%s/rpms" % tmpdir):
1144                    os.mkdir("%s/rpms" % tmpdir)
1145                self.copy_file(r, "%s/rpms/%s" % \
1146                        (tmpdir, os.path.basename(r)))
1147        except IOError, e:
1148            raise service_error(service_error.internal, 
1149                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1150
1151        thread_pool_info = self.thread_pool()
1152        threads = [ ]
1153
1154        for tb in [ k for k in allocated.keys() if k != master]:
1155            # Wait until we have a free slot to start the next testbed load
1156            thread_pool_info.acquire()
1157            while thread_pool_info.started - \
1158                    thread_pool_info.terminated >= self.nthreads:
1159                thread_pool_info.wait()
1160            thread_pool_info.release()
1161
1162            # Create and start a thread to start the segment, and save it to
1163            # get the return value later
1164            t  = self.pooled_thread(target=self.start_segment, 
1165                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1166                    pdata=thread_pool_info, trace_file=self.trace_file)
1167            threads.append(t)
1168            t.start()
1169
1170        # Wait until all finish (the first clause of the while is to make sure
1171        # one starts)
1172        thread_pool_info.acquire()
1173        while thread_pool_info.started == 0 or \
1174                thread_pool_info.started > thread_pool_info.terminated:
1175            thread_pool_info.wait()
1176        thread_pool_info.release()
1177
1178        # If none failed, start the master
1179        failed = [ t.getName() for t in threads if not t.rv ]
1180
1181        if len(failed) == 0:
1182            if not self.start_segment(master, eid, tbparams, tmpdir):
1183                failed.append(master)
1184
1185        # If one failed clean up
1186        if len(failed) > 0:
1187            succeeded = [tb for tb in allocated.keys() if tb not in failed]
1188            if fail_soft:
1189                raise service_error(service_error.partial, \
1190                        "Partial swap in on %s" % ",".join(succeeded))
1191            else:
1192                for tb in succeeded:
1193                    self.stop_segment(tb, eid, tbparams)
1194                raise service_error(service_error.federant,
1195                    "Swap in failed on %s" % ",".join(failed))
1196        else:
1197            if self.trace_file:
1198                print >>self.trace_file, "Experiment started"
1199
1200        return { 'emulab' : [ tbparams[tb]['emulab'] \
1201                for tb in tbparams.keys() \
1202                    if tbparams[tb].has_key('emulab') ] }
1203
1204if __name__ == '__main__':
1205    from optparse import OptionParser
1206   
1207    parser = OptionParser()
1208    parser.add_option('-d', '--debug', dest='debug', default=False,
1209            action='store_true', help='print actions rather than take them')
1210    parser.add_option('-f', '--file', dest='tcl', help='tcl file to parse')
1211    parser.add_option('-m', '--master', dest='master', 
1212            help='testbed label for matster testbd')
1213    parser.add_option('-t', '--trace', dest='trace', default=None, 
1214            help='file to print intermediate messages to')
1215    parser.add_option('-T', '--trace-stderr', dest='trace', 
1216            action='store_const',const=sys.stderr,
1217            help='file to print intermediate messages to')
1218    opts, args  = parser.parse_args()
1219
1220    trace_file = None
1221    if opts.trace:
1222        try:
1223            trace_file = open(opts.trace, 'w')
1224        except IOError:
1225            print >>sys.stderr, "Can't open trace file"
1226
1227    if opts.debug:
1228        if not trace_file:
1229            trace_file = sys.stderr
1230
1231    if opts.tcl != None:
1232        try:
1233            f = open(opts.tcl, 'r')
1234            content = ''.join(f)
1235            f.close()
1236        except IOError, e:
1237            sys.exit("Can't read %s: %s" % (opts.tcl, e))
1238    else:
1239        sys.exit("Must specify a file name")
1240
1241    if not opts.master:
1242        sys.exit("Must supply master tb label (--master)");
1243
1244    obj = fedd_create_experiment_local(
1245            debug=opts.debug,
1246            scripts_dir="/users/faber/testbed/federation",
1247            cert_file="./fedd_client.pem", cert_pwd="faber", 
1248            ssh_pubkey_file='/users/faber/.ssh/id_rsa.pub',
1249            trusted_certs="./cacert.pem",
1250            tbmap = { 
1251                'deter':'https://users.isi.deterlab.net:23235',
1252                'emulab':'https://users.isi.deterlab.net:23236',
1253                'ucb':'https://users.isi.deterlab.net:23237',
1254                },
1255            trace_file=trace_file
1256        ) 
1257    rv = obj.create_experiment( {\
1258            'experimentdescription' : content, 
1259            'master' : opts.master, 
1260            'user': [ {'userID' : { 'username' : 'faber' } } ],
1261            },
1262            None)
1263
1264    print rv
Note: See TracBrowser for help on using the repository browser.