source: fedd/fedd_create_experiment.py @ 6679c122

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 6679c122 was 6679c122, checked in by Ted Faber <faber@…>, 16 years ago

beginnings of federated creation interfaces

  • Property mode set to 100644
File size: 27.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import re
11import random
12import string
13import subprocess
14import tempfile
15import copy
16
17from subprocess import *
18
19from fedd_services import *
20from fedd_internal_services import *
21from fedd_util import *
22import parse_detail
23from service_error import *
24
25class fedd_create_experiment_local:
26    scripts = ["fed_bootstrap", "federate.sh", "smbmount.FreeBSD.pl",
27        "smbmount.Linux.pl", "make_hosts", "fed-tun.pl", "fed_evrepeater", 
28        "rc.accounts.patch"]
29   
30    def __init__(self, 
31            cert_file=None,
32            cert_pwd=None,
33            muxmax=2,
34            project_user = "faber",
35            # scp_exec="/bin/echo",
36            scp_exec="/usr/bin/scp",
37            scripts_dir="./", 
38            splitter=None,
39            # ssh_exec="/bin/echo",
40            ssh_exec="/usr/bin/ssh",
41            ssh_identity_file=None,
42            ssh_keygen="/usr/bin/ssh-keygen",
43            ssh_pubkey_file=None,
44            ssh_type="rsa",
45            tbmap=None,
46            tclsh="/usr/local/bin/otclsh",
47            tcl_splitter="/usr/testbed/lib/ns2ir/parse.tcl",
48            tmpdir="/tmp",
49            trusted_certs=None,
50            ):
51        self.scripts = fedd_create_experiment_local.scripts
52
53        self.cert_file = cert_file
54        self.cert_pwd = cert_pwd
55        self.muxmax = muxmax
56        self.project_user = project_user
57        self.scp_exec = scp_exec
58        self.scripts_dir = scripts_dir
59        self.splitter = splitter
60        self.ssh_exec=ssh_exec
61        self.ssh_keygen = ssh_keygen
62        self.ssh_identity_file = ssh_identity_file
63        self.ssh_type = ssh_type
64        self.tclsh = tclsh
65        self.tcl_splitter = tcl_splitter
66        self.tbmap = tbmap
67        self.tmpdir = tmpdir
68        self.tmpdir += "/split%d/" % os.getpid()
69        self.trusted_certs=trusted_certs
70
71        self.def_expstart = \
72                "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
73        self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
74        self.def_gwstart = \
75                "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
76        self.def_mgwstart = \
77                "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
78        self.def_gwimage = "FBSD61-TUNNEL2";
79        self.def_gwtype = "pc";
80
81
82        if ssh_pubkey_file:
83            try:
84                f = open(ssh_pubkey_file, 'r')
85                self.ssh_pubkey = f.read()
86                f.close()
87            except IOError:
88                raise service_error(service_error.internal,
89                        "Cannot read sshpubkey")
90
91        # Confirm federation scripts in the right place
92        for s in self.scripts:
93            if not os.path.exists(self.scripts_dir + "/" + s):
94                raise service_error(service_error.server_config,
95                        "%s/%s not in local script dir" % (self.scripts_dir, s))
96
97    def copy_file(self, src, dest, size=1024):
98        """
99        Exceedingly simple file copy.
100        """
101        s = open(src,'r')
102        d = open(dest, 'w')
103
104        buf = "x"
105        while buf != "":
106            buf = s.read(size)
107            d.write(buf)
108        s.close()
109        d.close()
110
111    def scp_file(self, file, user, host, dest=""):
112        """
113        scp a file to the remote host.
114        """
115       
116        rv = call([self.scp_exec, file, "%s@%s:%s" % (user, host, dest)])
117        if rv == 0:
118            return True
119        else:
120            # XXX
121            print >>sys.stdout, "Failed to scp %s to %s@%s" % (file, user, host)
122            return False
123
124    def ssh_cmd(self, user, host, cmd, wname=None, timeout=0):
125        # This should be done more carefully
126        sub = Popen("%s %s@%s %s" % (self.ssh_exec, user, host, cmd), 
127                shell=True)
128        return sub.wait() == 0
129
130    def ship_scripts(self, host, user, dest_dir):
131        if self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
132            for s in self.scripts:
133                if not self.scp_file("%s/%s" % (self.scripts_dir, s),
134                        user, host, dest_dir):
135                    return False
136            return True
137        else:
138            return False
139
140    def ship_configs(self, host, user, src_dir, dest_dir):
141        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
142            return False
143        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
144            return False
145
146        for f in os.listdir(src_dir):
147            if os.path.isdir(f):
148                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
149                        "%s/%s" % (dest_dir, f)):
150                    return False
151            else:
152                if not self.scp_file("%s/%s" % (src_dir, f), 
153                        user, host, dest_dir):
154                    return False
155        return True
156
157    def start_segment(self, tb, eid, tbparams, timeout=0):
158        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
159        user = tbparams[tb]['user']
160        pid = tbparams[tb]['project']
161        # XXX
162        # base_confs = ( "hosts", "vtopo.xml", "viz.xml")
163        base_confs = ( "hosts", "vtopo.xml")
164        tclfile = "%s.%s.tcl" % (eid, tb)
165        expinfo_exec = "/usr/testbed/bin/expinfo"
166        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
167        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
168        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
169        state_re = re.compile("State:\s+(\w+)")
170        state = "none"
171
172        status = Popen([self.ssh_exec, "%s@%s" % (user, host), 
173                expinfo_exec, pid, eid], stdout=PIPE)
174        for line in status.stdout:
175            m = state_re.match(line)
176            if m: state = m.group(1)
177        rv = status.wait()
178        if rv != 0:
179            raise service_error(service_error.internal,
180                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
181        # XXX
182        print "%s: %s" % (tb, state)
183        print "transferring experiment to %s" % tb
184
185        if not self.scp_file("%s/%s/%s" % (self.tmpdir, tb, tclfile),
186                user, host):
187            return False
188        # Clear and create the tarfiles and rpm directories
189        for d in (tarfiles_dir, rpms_dir):
190            if not self.ssh_cmd(user, host, 
191                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
192                return False
193            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
194                    "create tarfiles"):
195                return False
196       
197        if state == 'active':
198            # Remote experiment is active.  Modify it.
199            for f in base_confs:
200                if not self.scp_file("%s/%s" % (self.tmpdir, f), user, host,
201                        "%s/%s" % (proj_dir, f)):
202                    return False
203            if not self.ship_scripts(host, user, proj_dir):
204                return False
205            if not self.ship_configs(host, user, "%s/%s" % (self.tmpdir, tb),
206                    proj_dir):
207                return False
208            if os.path.isdir("%s/tarfiles" % self.tmpdir):
209                if not self.ship_configs(host, user,
210                        "%s/tarfiles" % self.tmpdir, tarfiles_dir):
211                    return False
212            if os.path.isdir("%s/rpms" % self.tmpdir):
213                if not self.ship_configs(host, user,
214                        "%s/rpms" % self.tmpdir, tarfiles_dir):
215                    return False
216            print "Modifying %s on %s" % (eid, tb)
217            if not self.ssh_cmd(user, host,
218                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
219                            (pid, eid, tclfile), "modexp"):
220                return False
221            return True
222        elif state == "swapped":
223            # Remote experiment swapped out.  Modify it and swap it in.
224            for f in base_confs:
225                if not self.scp_file("%s/%s" % (self.tmpdir, f), user, host,
226                        "%s/%s" % (proj_dir, f)):
227                    return False
228            if not self.ship_scripts(host, user, proj_dir):
229                return False
230            if not self.ship_configs(host, user, "%s/%s" % (self.tmpdir, tb),
231                    proj_dir):
232                return False
233            if os.path.isdir("%s/tarfiles" % self.tmpdir):
234                if not self.ship_configs(host, user,
235                        "%s/tarfiles" % self.tmpdir, tarfiles_dir):
236                    return False
237            if os.path.isdir("%s/rpms" % self.tmpdir):
238                if not self.ship_configs(host, user,
239                        "%s/rpms" % self.tmpdir, tarfiles_dir):
240                    return False
241            print "Modifying %s on %s" % (eid, tb)
242            if not self.ssh_cmd(user, host,
243                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
244                    "modexp"):
245                return False
246            print "Swapping %s in on %s" % (eid, tb)
247            if not self.ssh_cmd(user, host,
248                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
249                    "swapexp", timeout):
250                return False
251            return True
252        elif state == "none":
253            # No remote experiment.  Create one.  We do this in 2 steps so we
254            # can put the configuration files and scripts into the new
255            # experiment directories.
256
257            # Tarfiles must be present for creation to work
258            if os.path.isdir("%s/tarfiles" % self.tmpdir):
259                if not self.ship_configs(host, user,
260                        "%s/tarfiles" % self.tmpdir, tarfiles_dir):
261                    return False
262            if os.path.isdir("%s/rpms" % self.tmpdir):
263                if not self.ship_configs(host, user,
264                        "%s/rpms" % self.tmpdir, tarfiles_dir):
265                    return False
266            print "Creating %s on %s" % (eid, tb)
267            if not self.ssh_cmd(user, host,
268                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
269                            (pid, eid, tclfile), "startexp", timeout):
270                return False
271            # After startexp the per-experiment directories exist
272            for f in base_confs:
273                if not self.scp_file("%s/%s" % (self.tmpdir, f), user, host,
274                        "%s/%s" % (proj_dir, f)):
275                    return False
276            if not self.ship_scripts(host, user, proj_dir):
277                return False
278            if not self.ship_configs(host, user, "%s/%s" % (self.tmpdir, tb),
279                    proj_dir):
280                return False
281            print "Swapping %s in on %s" % (eid, tb)
282            if not self.ssh_cmd(user, host,
283                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
284                    "swapexp", timeout):
285                return False
286            return True
287        else:
288            # XXX
289            print "unknown state %s" % state
290            return False
291
292    def stop_segment(self, tb, eid, tbparams):
293        user = tbparams[tb]['user']
294        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
295        pid = tbparams[tb]['project']
296
297        # XXX:
298        print "Stopping %s on %s" % (eid, tb)
299        return self.ssh_cmd(user, host,
300                "/usr/testbed/bin/swapexp -w %d %d out" % (pid, eid))
301
302       
303    def generate_ssh_keys(self, dest, type="rsa" ):
304        """
305        Generate a set of keys for the gateways to use to talk.
306
307        Keys are of type type and are stored in the required dest file.
308        """
309        valid_types = ("rsa", "dsa")
310        t = type.lower();
311        if t not in valid_types: raise ValueError
312        # May raise CalledProcessError
313        rv = call([self.ssh_keygen, '-t', t, '-N', '', '-f', dest])
314        if rv != 0:
315            raise service_error(service_error.internal, 
316                    "Cannot generate nonce ssh keys.  %s return code %d" \
317                            % (self.ssh_keygen, rv))
318
319    def genviz(self, topo_file, viz_file): pass
320
321    def get_access(self, tb, nodes, user, tbparam):
322        """
323        Get access to testbed through fedd and set the parameters for that tb
324        """
325
326        translate_attr = {
327            'slavenodestartcmd': 'expstart',
328            'slaveconnectorstartcmd': 'gwstart',
329            'masternodestartcmd': 'mexpstart',
330            'masterconnectorstartcmd': 'mgwstart',
331            'connectorimage': 'gwimage',
332            'connectortype': 'gwtype',
333            'tunnelcfg': 'tun',
334        }
335
336        # XXX multi-level access
337        uri = self.tbmap.get(tb, None)
338        if not uri:
339            raise service_error(serice_error.server_config, 
340                    "Unknown testbed: %s" % tb)
341
342        # The basic request
343        req = {\
344                'destinationTestbed' : { 'uri' : uri },
345                'user':  user,
346                'allocID' : { 'username': 'test' },
347                'access' : [ { 'sshPubkey' : self.ssh_pubkey } ]
348            }
349       
350        # node resources if any
351        if nodes != None and len(nodes) > 0:
352            rnodes = [ ]
353            for n in nodes:
354                rn = { }
355                image, hw, count = n.split(":")
356                if image: rn['image'] = [ image ]
357                if hw: rn['hardware'] = [ hw ]
358                if count: rn['count'] = int(count)
359                rnodes.append(rn)
360            req['resources']= { }
361            req['resources']['node'] = rnodes
362
363        # No retry loop here.  Proxy servers must correctly authenticate
364        # themselves without help
365        try:
366            ctx = fedd_ssl_context(self.cert_file, 
367                    self.trusted_certs, password=self.cert_pwd)
368        except SSL.SSLError:
369            raise service_error(service_error.server_config, 
370                    "Server certificates misconfigured")
371
372        loc = feddServiceLocator();
373        port = loc.getfeddPortType(uri,
374                transport=M2Crypto.httpslib.HTTPSConnection, 
375                transdict={ 'ssl_context' : ctx })
376
377        # Reconstruct the full request message
378        msg = RequestAccessRequestMessage()
379        msg.set_element_RequestAccessRequestBody(
380                pack_soap(msg, "RequestAccessRequestBody", req))
381        try:
382            resp = port.RequestAccess(msg)
383        except ZSI.ParseException, e:
384            raise service_error(service_error.req,
385                    "Bad format message (XMLRPC??): %s" %
386                    str(e))
387        r = unpack_soap(resp)
388
389        if r.has_key('RequestAccessResponseBody'):
390            r = r['RequestAccessResponseBody']
391        else:
392            raise service_error(service_error.proxy,
393                    "Bad proxy response")
394
395
396        e = r['emulab']
397        p = e['project']
398        tbparam[tb] = { 
399                "boss": e['boss'],
400                "host": e['ops'],
401                "domain": e['domain'],
402                "fs": e['fileServer'],
403                "eventserver": e['eventServer'],
404                "project": unpack_id(p['name'])
405                }
406
407        for u in p['user']:
408            tbparam[tb]['user'] = unpack_id(u['userID'])
409
410        for a in e['fedAttr']:
411            if a['attribute']:
412                key = translate_attr.get(a['attribute'].lower(), None)
413                if key:
414                    tbparam[tb][key]= a['value']
415       
416    class current_testbed:
417        def __init__(self, eid, tmpdir):
418            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
419            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
420            self.current_testbed = None
421            self.testbed_file = None
422
423            self.def_expstart = \
424                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
425            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
426            self.def_gwstart = \
427                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
428            self.def_mgwstart = \
429                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
430            self.def_gwimage = "FBSD61-TUNNEL2";
431            self.def_gwtype = "pc";
432
433            self.eid = eid
434            self.tmpdir = tmpdir
435
436        def __call__(self, line, master, allocated, tbparams):
437            # Capture testbed topology descriptions
438            if self.current_testbed == None:
439                m = self.begin_testbed.match(line)
440                if m != None:
441                    self.current_testbed = m.group(1)
442                    if self.current_testbed == None:
443                        raise service_error(service_error.req,
444                                "Bad request format (unnamed testbed)")
445                    allocated[self.current_testbed] = \
446                            allocated.get(self.current_testbed,0) + 1
447                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
448                    if not os.path.exists(tb_dir):
449                        try:
450                            os.mkdir(tb_dir)
451                        except IOError:
452                            raise service_error(service_error.internal,
453                                    "Cannot create %s" % tb_dir)
454                    try:
455                        self.testbed_file = open("%s/%s.%s.tcl" %
456                                (tb_dir, self.eid, self.current_testbed), 'w')
457                    except IOError:
458                        self.testbed_file = None
459                    return True
460                else: return False
461            else:
462                m = self.end_testbed.match(line)
463                if m != None:
464                    if m.group(1) != self.current_testbed:
465                        raise service_error(service_error.internal, 
466                                "Mismatched testbed markers!?")
467                    if self.testbed_file != None: 
468                        self.testbed_file.close()
469                        self.testbed_file = None
470                    self.current_testbed = None
471                elif self.testbed_file:
472                    # Substitute variables and put the line into the local
473                    # testbed file.
474                    gwtype = tbparams[self.current_testbed].get('gwtype', 
475                            self.def_gwtype)
476                    gwimage = tbparams[self.current_testbed].get('gwimage', 
477                            self.def_gwimage)
478                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
479                            self.def_mgwstart)
480                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
481                            self.def_mexpstart)
482                    gwstart = tbparams[self.current_testbed].get('gwstart', 
483                            self.def_gwstart)
484                    expstart = tbparams[self.current_testbed].get('expstart', 
485                            self.def_expstart)
486                    project = tbparams[self.current_testbed].get('project')
487                    line = re.sub("GWTYPE", gwtype, line)
488                    line = re.sub("GWIMAGE", gwimage, line)
489                    if self.current_testbed == master:
490                        line = re.sub("GWSTART", mgwstart, line)
491                        line = re.sub("EXPSTART", mexpstart, line)
492                    else:
493                        line = re.sub("GWSTART", gwstart, line)
494                        line = re.sub("EXPSTART", expstart, line)
495                    # XXX: does `` embed without doing enything else?
496                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
497                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
498                    line = re.sub("EID", self.eid, line)
499                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
500                            (project, self.eid), line)
501                    print >>self.testbed_file, line
502                return True
503
504    class allbeds:
505        def __init__(self, get_access):
506            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
507            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
508            self.in_allbeds = False
509            self.get_access = get_access
510
511        def __call__(self, line, user, tbparams):
512            # Testbed access parameters
513            if not self.in_allbeds:
514                if self.begin_allbeds.match(line):
515                    self.in_allbeds = True
516                    return True
517                else:
518                    return False
519            else:
520                if self.end_allbeds.match(line):
521                    self.in_allbeds = False
522                else:
523                    nodes = line.split('|')
524                    tb = nodes.pop(0)
525                    self.get_access(tb, nodes, user, tbparams)
526                return True
527
528    class gateways:
529        def __init__(self, eid, smbshare, master, tmpdir, gw_pubkey,
530                gw_secretkey, copy_file):
531            self.begin_gateways = \
532                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
533            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
534            self.current_gateways = None
535            self.control_gateway = None
536            self.active_end = { }
537
538            self.eid = eid
539            self.smbshare = smbshare
540            self.master = master
541            self.tmpdir = tmpdir
542            self.gw_pubkey_base = gw_pubkey
543            self.gw_secretkey_base = gw_secretkey
544
545            self.copy_file = copy_file
546
547
548        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
549                active_end, tbparams, dtb, myname, desthost, type):
550            """
551            Produce a gateway configuration file from a line of the gateways section
552            """
553
554            sproject = tbparams[gw].get('project', 'project')
555            dproject = tbparams[dtb].get('project', 'project')
556            sdomain = ".%s.%s%s" % (eid, sproject,
557                    tbparams[gw].get('domain', ".example.com"))
558            ddomain = ".%s.%s%s" % (eid, dproject,
559                    tbparams[dtb].get('domain', ".example.com"))
560            boss = tbparams[master].get('boss', "boss")
561            fs = tbparams[master].get('fs', "fs")
562            event_server = "%s%s" % \
563                    (tbparams[master].get('eventserver', "event_server"),
564                            tbparams[master].get('domain', "example.com"))
565            remote_event_server = "%s%s" % \
566                    (tbparams[dtb].get('eventserver', "event_server"),
567                            tbparams[dtb].get('domain', "example.com"))
568
569            remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
570            local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
571            tunnel_cfg = tbparams[gw].get("tun", "false")
572
573            conf_file = "%s%s.gw.conf" % (myname, sdomain)
574            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
575
576            # translate to lower case so the `hostname` hack for specifying
577            # configuration files works.
578            conf_file = conf_file.lower();
579            remote_conf_file = remote_conf_file.lower();
580
581            if dtb == master:
582                active = "false"
583            elif gw == master:
584                active = "true"
585            elif active_end.has_key['%s-%s' % (dtb, gw)]:
586                active = "false"
587            else:
588                active_end['%s-%s' % (gw, dtb)] = 1
589                active = "true"
590
591            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
592            print >>gwconfig, "Active: %s" % active
593            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
594            print >>gwconfig, "BossName: %s" % boss
595            print >>gwconfig, "FsName: %s" % fs
596            print >>gwconfig, "EventServerName: %s" % event_server
597            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
598            print >>gwconfig, "Type: %s" % type
599            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
600            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
601                    local_script_dir
602            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
603            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
604            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
605                    (remote_script_dir, remote_conf_file)
606            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
607            print >>gwconfig, "Pubkeys: %s/%s" % (local_script_dir, pubkey)
608            print >>gwconfig, "Privkeys: %s/%s" % (local_script_dir, privkey)
609            gwconfig.close()
610
611            return active == "true"
612
613        def __call__(self, line, allocated, tbparams):
614            # Process gateways
615            if not self.current_gateways:
616                m = self.begin_gateways.match(line)
617                if m:
618                    self.current_gateways = m.group(1)
619                    if allocated.has_key(self.current_gateways):
620                        # This test should always succeed
621                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
622                        if not os.path.exists(tb_dir):
623                            try:
624                                os.mkdir(tb_dir)
625                            except IOError:
626                                raise service_error(service_error.internal,
627                                        "Cannot create %s" % tb_dir)
628                    else:
629                        # XXX
630                        print >>sys.stderr, \
631                            "Ignoring gateways for unknown testbed %s" \
632                                    % self.current_gateways
633                        self.current_gateways = None
634                    return True
635                else:
636                    return False
637            else:
638                m = self.end_gateways.match(line)
639                if m :
640                    if m.group(1) != self.current_gateways:
641                        raise service_error(service_error.internal,
642                                "Mismatched gateway markers!?")
643                    if self.control_gateway:
644                        try:
645                            cc = open("%s/%s/client.conf" %
646                                    (self.tmpdir, self.current_gateways), 'w')
647                            print >>cc, "ControlGateway: %s" % \
648                                    self.control_gateway
649                            print >>cc, "SMBSHare: %s" % self.smbshare
650                            print >>cc, "ProjectUser: %s" % \
651                                    tbparams[self.current_gateways]['user']
652                            print >>cc, "ProjectName: %s" % \
653                                    tbparams[self.master]['project']
654                            cc.close()
655                        except IOError:
656                            raise service_error(service_error.internal,
657                                    "Error creating client config")
658                    else:
659                        # XXX
660                        print >>sys.stderr, "No control gateway for %s" %\
661                                self.current_gateways
662                    self.current_gateways = None
663                else:
664                    dtb, myname, desthost, type = line.split(" ")
665
666                    if type == "control" or type == "both":
667                        self.control_gateway = "%s.%s.%s%s" % (myname, 
668                                self.eid, 
669                                tbparams[self.current_gateways]['project'],
670                                tbparams[self.current_gateways]['domain'])
671                    try:
672                        active = self.gateway_conf_file(self.current_gateways,
673                                self.master, self.eid, self.gw_pubkey_base,
674                                self.gw_secretkey_base,
675                                self.active_end, tbparams, dtb, myname,
676                                desthost, type)
677                    except IOError, e:
678                        raise service_error(service_error.internal,
679                                "Failed to write config file for %s" % \
680                                        self.current_gateway)
681           
682                    gw_pubkey = "%s/keys/%s" % \
683                            (self.tmpdir, self.gw_pubkey_base)
684                    gw_secretkey = "%s/keys/%s" % \
685                            (self.tmpdir, self.gw_secretkey_base)
686
687                    pkfile = "%s/%s/%s" % \
688                            ( self.tmpdir, self.current_gateways, 
689                                    self.gw_pubkey_base)
690                    skfile = "%s/%s/%s" % \
691                            ( self.tmpdir, self.current_gateways, 
692                                    self.gw_secretkey_base)
693
694                    if not os.path.exists(pkfile):
695                        try:
696                            self.copy_file(gw_pubkey, pkfile)
697                        except IOError:
698                            service_error(service_error.internal,
699                                    "Failed to copy pubkey file")
700
701                    if active and not os.path.exists(skfile):
702                        try:
703                            self.copy_file(gw_secretkey, skfile)
704                        except IOError:
705                            service_error(service_error.internal,
706                                    "Failed to copy secretkey file")
707                return True
708
709    class shunt_to_file:
710        def __init__(self, begin, end, filename):
711            self.begin = re.compile(begin)
712            self.end = re.compile(end)
713            self.in_shunt = False
714            self.file = None
715            self.filename = filename
716
717        def __call__(self, line):
718            if not self.in_shunt:
719                if self.begin.match(line):
720                    self.in_shunt = True
721                    try:
722                        self.file = open(self.filename, "w")
723                    except:
724                        self.file = None
725                        raise
726                    return True
727                else:
728                    return False
729            else:
730                if self.end.match(line):
731                    if self.file: 
732                        self.file.close()
733                        self.file = None
734                    self.in_shunt = False
735                else:
736                    if self.file:
737                        print >>self.file, line
738                return True
739
740    class shunt_to_list:
741        def __init__(self, begin, end):
742            self.begin = re.compile(begin)
743            self.end = re.compile(end)
744            self.in_shunt = False
745            self.list = [ ]
746       
747        def __call__(self, line):
748            if not self.in_shunt:
749                if self.begin.match(line):
750                    self.in_shunt = True
751                    return True
752                else:
753                    return False
754            else:
755                if self.end.match(line):
756                    self.in_shunt = False
757                else:
758                    self.list.append(line)
759                return True
760
761    def create_experiment(self, req, fid):
762        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
763        gw_secretkey_base = "fed.%s" % self.ssh_type
764        gw_pubkey = self.tmpdir + "/keys/" + gw_pubkey_base
765        gw_secretkey = self.tmpdir + "/keys/" + gw_secretkey_base
766        tclfile = self.tmpdir + "/experiment.tcl"
767        tbparams = { }
768
769        pid = "dummy"
770        gid = "dummy"
771        # XXX
772        eid = "faber-splitter"
773        # XXX
774        master = "deter"
775        # XXX
776        smbshare="USERS"
777        # XXX
778        fail_soft = False
779        # XXX
780        startem = True
781
782        try:
783            os.mkdir(self.tmpdir)
784            os.mkdir(self.tmpdir+"/keys")
785        except OSError:
786            raise service_error(service_error.internal,
787                    "Can't make temporary dir")
788
789        # The tcl parser needs to read a file so put the content into that file
790        file_content=req.get('experimentdescription', None)
791        if file_content != None:
792            try:
793                f = open(tclfile, 'w')
794                f.write(file_content)
795                f.close()
796            except IOError:
797                raise service_error(service_error.internal,
798                        "Cannot write temp experiment description")
799        else:
800            raise service_error(service_error.req, "No experiment description")
801
802        try:
803            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
804        except ValueError:
805            raise service_error(service_error.server_config, 
806                    "Bad key type (%s)" % self.ssh_type)
807
808        user = req.get('user', None)
809        if user == None:
810            raise service_error(service_error.req, "No user")
811       
812       
813        tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
814            str(self.muxmax), '-m', master, pid, gid, eid, tclfile]
815        tclparser = Popen(tclcmd, stdout=PIPE)
816
817        allocated = { }
818        started = { }
819
820        parse_current_testbed = self.current_testbed(eid, self.tmpdir)
821        parse_allbeds = self.allbeds(self.get_access)
822        parse_gateways = self.gateways(eid, smbshare, master, self.tmpdir,
823                gw_pubkey_base, gw_secretkey_base, self.copy_file)
824        parse_vtopo = self.shunt_to_file("^#\s+Begin\s+Vtopo",
825                    "^#\s+End\s+Vtopo", self.tmpdir + "/vtopo.xml")
826        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
827                    "^#\s+End\s+hostnames", self.tmpdir + "/hosts")
828        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
829                "^#\s+End\s+tarfiles")
830        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
831                "^#\s+End\s+rpms")
832
833        for line in tclparser.stdout:
834            line = line.rstrip()
835            if parse_current_testbed(line, master, allocated, tbparams):
836                continue
837            elif parse_allbeds(line, user, tbparams):
838                continue
839            elif parse_gateways(line, allocated, tbparams):
840                continue
841            elif parse_vtopo(line):
842                continue
843            elif parse_hostnames(line):
844                continue
845            elif parse_tarfiles(line):
846                continue
847            elif parse_rpms(line):
848                continue
849            else:
850                raise service_error(service_error.internal, 
851                        "Bad tcl parse? %s" % line)
852
853        if not startem: return True
854
855        # Copy tarfiles and rpms needed at remote sites into a staging area
856        try:
857            for t in parse_tarfiles.list:
858                if not os.path.exists("%s/tarfiles" % self.tmpdir):
859                    os.mkdir("%s/tarfiles" % self.tmpdir)
860                self.copy_file(t, "%s/tarfiles/%s" % \
861                        (self.tmpdir, os.path.basename(t)))
862            for r in parse_rpms.list:
863                if not os.path.exists("%s/rpms" % self.tmpdir):
864                    os.mkdir("%s/rpms" % self.tmpdir)
865                self.copy_file(r, "%s/rpms/%s" % \
866                        (self.tmpdir, os.path.basename(r)))
867        except IOError, e:
868            raise service_error(service_error.internal, 
869                    "Cannot stage tarfile/rpm: %s" % e.strerror)
870       
871        # XXX: more parallelism
872        for tb in allocated.iterkeys():
873            if tb != master:
874                if self.start_segment(tb, eid, tbparams, 0):
875                    started[tb] = True
876                else:
877                    break
878
879        if len(started) == len(allocated)-1:
880            if self.start_segment(master, eid, tbparams):
881                started[master] = True
882       
883        if not fail_soft and  len(allocated) != len(started):
884            for tb in started.iterkeys():
885                self.stop_segment(tb, eid, tbparams)
886        else:
887            print "Experiment started"
888
889        # XXX: return value
890
891if __name__ == '__main__':
892    from optparse import OptionParser
893   
894    parser = OptionParser()
895    parser.add_option('-f', '--file', dest='tcl', help='tcl file to parse')
896    opts, args  = parser.parse_args()
897
898    if opts.tcl != None:
899        try:
900            f = open(opts.tcl, 'r')
901            content = ''.join(f)
902            f.close()
903        except IOError, e:
904            sys.exit("Can't read %s: %s" % (opts.tcl, e))
905    else:
906        sys.exit("Must specify a file name")
907
908    obj = fedd_create_experiment_local(
909            scripts_dir="/users/faber/testbed/federation",
910            cert_file="./fedd_client.pem", cert_pwd="faber", 
911            ssh_pubkey_file='/users/faber/.ssh/id_rsa.pub',
912            trusted_certs="./cacert.pem",
913            tbmap = { 
914                'deter':'https://users.isi.deterlab.net:23235',
915                'emulab':'https://users.isi.deterlab.net:23236',
916                'ucb':'https://users.isi.deterlab.net:23237',
917                },
918        ) 
919    obj.create_experiment( {\
920            'experimentdescription' : content, 
921            'user': [ {'userID' : { 'username' : 'faber' } } ],
922            },
923            None)
Note: See TracBrowser for help on using the repository browser.