source: fedd/fedd_experiment_control.py @ 34bc05c

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 34bc05c was 34bc05c, checked in by Ted Faber <faber@…>, 15 years ago

Add map database and clean up access reading method

  • Property mode set to 100644
File size: 59.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from fedd_services import *
22from fedd_internal_services import *
23from fedd_util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26import parse_detail
27from service_error import service_error
28
29
30class nullHandler(logging.Handler):
31    def emit(self, record): pass
32
33fl = logging.getLogger("fedd.experiment_control")
34fl.addHandler(nullHandler())
35
36class fedd_experiment_control_local:
37    """
38    Control of experiments that this system can directly access.
39
40    Includes experiment creation, termination and information dissemination.
41    Thred safe.
42    """
43   
44    class thread_pool:
45        """
46        A class to keep track of a set of threads all invoked for the same
47        task.  Manages the mutual exclusion of the states.
48        """
49        def __init__(self):
50            """
51            Start a pool.
52            """
53            self.changed = Condition()
54            self.started = 0
55            self.terminated = 0
56
57        def acquire(self):
58            """
59            Get the pool's lock.
60            """
61            self.changed.acquire()
62
63        def release(self):
64            """
65            Release the pool's lock.
66            """
67            self.changed.release()
68
69        def wait(self, timeout = None):
70            """
71            Wait for a pool thread to start or stop.
72            """
73            self.changed.wait(timeout)
74
75        def start(self):
76            """
77            Called by a pool thread to report starting.
78            """
79            self.changed.acquire()
80            self.started += 1
81            self.changed.notifyAll()
82            self.changed.release()
83
84        def terminate(self):
85            """
86            Called by a pool thread to report finishing.
87            """
88            self.changed.acquire()
89            self.terminated += 1
90            self.changed.notifyAll()
91            self.changed.release()
92
93        def clear(self):
94            """
95            Clear all pool data.
96            """
97            self.changed.acquire()
98            self.started = 0
99            self.terminated =0
100            self.changed.notifyAll()
101            self.changed.release()
102
103    class pooled_thread(Thread):
104        """
105        One of a set of threads dedicated to a specific task.  Uses the
106        thread_pool class above for coordination.
107        """
108        def __init__(self, group=None, target=None, name=None, args=(), 
109                kwargs={}, pdata=None, trace_file=None):
110            Thread.__init__(self, group, target, name, args, kwargs)
111            self.rv = None          # Return value of the ops in this thread
112            self.exception = None   # Exception that terminated this thread
113            self.target=target      # Target function to run on start()
114            self.args = args        # Args to pass to target
115            self.kwargs = kwargs    # Additional kw args
116            self.pdata = pdata      # thread_pool for this class
117            # Logger for this thread
118            self.log = logging.getLogger("fedd.experiment_control")
119       
120        def run(self):
121            """
122            Emulate Thread.run, except add pool data manipulation and error
123            logging.
124            """
125            if self.pdata:
126                self.pdata.start()
127
128            if self.target:
129                try:
130                    self.rv = self.target(*self.args, **self.kwargs)
131                except service_error, s:
132                    self.exception = s
133                    self.log.error("Thread exception: %s %s" % \
134                            (s.code_string(), s.desc))
135                except:
136                    self.exception = sys.exc_info()[1]
137                    self.log.error(("Unexpected thread exception: %s" +\
138                            "Trace %s") % (self.exception,\
139                                traceback.format_exc()))
140            if self.pdata:
141                self.pdata.terminate()
142
143    call_RequestAccess = service_caller('RequestAccess',
144            'getfeddPortType', feddServiceLocator, 
145            RequestAccessRequestMessage, 'RequestAccessRequestBody')
146
147    call_ReleaseAccess = service_caller('ReleaseAccess',
148            'getfeddPortType', feddServiceLocator, 
149            ReleaseAccessRequestMessage, 'ReleaseAccessRequestBody')
150
151    call_Ns2Split = service_caller('Ns2Split',
152            'getfeddInternalPortType', feddInternalServiceLocator, 
153            Ns2SplitRequestMessage, 'Ns2SplitRequestBody')
154
155    def __init__(self, config=None, auth=None):
156        """
157        Intialize the various attributes, most from the config object
158        """
159        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
160        self.thread_pool = fedd_experiment_control_local.thread_pool
161
162        self.cert_file = None
163        self.cert_pwd = None
164        self.trusted_certs = None
165
166        # Walk through the various relevant certificat specifying config
167        # attributes until the local certificate attributes can be resolved.
168        # The walk is from most specific to most general specification.
169        for s in ("experiment_control", "globals"):
170            if config.has_section(s): 
171                if config.has_option(s, "cert_file"):
172                    if not self.cert_file:
173                        self.cert_file = config.get(s, "cert_file")
174                        self.cert_pwd = config.get(s, "cert_pwd")
175
176                if config.has_option(s, "trusted_certs"):
177                    if not self.trusted_certs:
178                        self.trusted_certs = config.get(s, "trusted_certs")
179
180
181        self.exp_stem = "fed-stem"
182        self.log = logging.getLogger("fedd.experiment_control")
183        set_log_level(config, "experiment_control", self.log)
184        self.muxmax = 2
185        self.nthreads = 2
186        self.randomize_experiments = False
187
188        self.scp_exec = "/usr/bin/scp"
189        self.splitter = None
190        self.ssh_exec="/usr/bin/ssh"
191        self.ssh_keygen = "/usr/bin/ssh-keygen"
192        self.ssh_identity_file = None
193
194
195        self.debug = config.getboolean("experiment_control", "create_debug")
196        self.state_filename = config.get("experiment_control", 
197                "experiment_state_file")
198        self.splitter_url = config.get("experiment_control", "splitter_url")
199        self.fedkit = config.get("experiment_control", "fedkit")
200        accessdb_file = config.get("experiment_control", "accessdb")
201
202        self.ssh_pubkey_file = config.get("experiment_control", 
203                "ssh_pubkey_file")
204        self.ssh_privkey_file = config.get("experiment_control",
205                "ssh_privkey_file")
206        # NB for internal master/slave ops, not experiment setup
207        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
208        self.state = { }
209        self.state_lock = Lock()
210        self.tclsh = "/usr/local/bin/otclsh"
211        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
212        mapdb_file = config.get("experiment_control", "mapdb")
213        self.trace_file = sys.stderr
214
215        self.def_expstart = \
216                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
217                "/tmp/federate";
218        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
219                "FEDDIR/hosts";
220        self.def_gwstart = \
221                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
222                "/tmp/bridge.log";
223        self.def_mgwstart = \
224                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
225                "/tmp/bridge.log";
226        self.def_gwimage = "FBSD61-TUNNEL2";
227        self.def_gwtype = "pc";
228
229        if auth:
230            self.auth = auth
231        else:
232            self.log.error(\
233                    "[access]: No authorizer initialized, creating local one.")
234            auth = authorizer()
235
236
237        if self.ssh_pubkey_file:
238            try:
239                f = open(self.ssh_pubkey_file, 'r')
240                self.ssh_pubkey = f.read()
241                f.close()
242            except IOError:
243                raise service_error(service_error.internal,
244                        "Cannot read sshpubkey")
245        else:
246            raise service_error(service_error.internal, 
247                    "No SSH public key file?")
248
249        if not self.ssh_privkey_file:
250            raise service_error(service_error.internal, 
251                    "No SSH public key file?")
252
253
254        if mapdb_file:
255            self.read_mapdb(mapdb_file)
256            print self.tbmap
257        else:
258            self.log.warn("[experiment_control] No testbed map, using defaults")
259            self.tbmap = { 
260                    'deter':'https://users.isi.deterlab.net:23235',
261                    'emulab':'https://users.isi.deterlab.net:23236',
262                    'ucb':'https://users.isi.deterlab.net:23237',
263                    }
264
265        if accessdb_file:
266                self.read_accessdb(accessdb_file)
267        else:
268            raise service_error(service_error.internal,
269                    "No accessdb specified in config")
270
271        # Grab saved state.  OK to do this w/o locking because it's read only
272        # and only one thread should be in existence that can see self.state at
273        # this point.
274        if self.state_filename:
275            self.read_state()
276
277        # Dispatch tables
278        self.soap_services = {\
279                'Create': soap_handler(\
280                        CreateRequestMessage.typecode,
281                        getattr(self, "create_experiment"), 
282                        CreateResponseMessage,
283                        "CreateResponseBody"),
284                'Vtopo': soap_handler(\
285                        VtopoRequestMessage.typecode,
286                        getattr(self, "get_vtopo"),
287                        VtopoResponseMessage,
288                        "VtopoResponseBody"),
289                'Vis': soap_handler(\
290                        VisRequestMessage.typecode,
291                        getattr(self, "get_vis"),
292                        VisResponseMessage,
293                        "VisResponseBody"),
294                'Info': soap_handler(\
295                        InfoRequestMessage.typecode,
296                        getattr(self, "get_info"),
297                        InfoResponseMessage,
298                        "InfoResponseBody"),
299                'Terminate': soap_handler(\
300                        TerminateRequestMessage.typecode,
301                        getattr(self, "terminate_experiment"),
302                        TerminateResponseMessage,
303                        "TerminateResponseBody"),
304        }
305
306        self.xmlrpc_services = {\
307                'Create': xmlrpc_handler(\
308                        getattr(self, "create_experiment"), 
309                        "CreateResponseBody"),
310                'Vtopo': xmlrpc_handler(\
311                        getattr(self, "get_vtopo"),
312                        "VtopoResponseBody"),
313                'Vis': xmlrpc_handler(\
314                        getattr(self, "get_vis"),
315                        "VisResponseBody"),
316                'Info': xmlrpc_handler(\
317                        getattr(self, "get_info"),
318                        "InfoResponseBody"),
319                'Terminate': xmlrpc_handler(\
320                        getattr(self, "terminate_experiment"),
321                        "TerminateResponseBody"),
322        }
323
324    def copy_file(self, src, dest, size=1024):
325        """
326        Exceedingly simple file copy.
327        """
328        s = open(src,'r')
329        d = open(dest, 'w')
330
331        buf = "x"
332        while buf != "":
333            buf = s.read(size)
334            d.write(buf)
335        s.close()
336        d.close()
337
338    # Call while holding self.state_lock
339    def write_state(self):
340        """
341        Write a new copy of experiment state after copying the existing state
342        to a backup.
343
344        State format is a simple pickling of the state dictionary.
345        """
346        if os.access(self.state_filename, os.W_OK):
347            self.copy_file(self.state_filename, \
348                    "%s.bak" % self.state_filename)
349        try:
350            f = open(self.state_filename, 'w')
351            pickle.dump(self.state, f)
352        except IOError, e:
353            self.log.error("Can't write file %s: %s" % \
354                    (self.state_filename, e))
355        except pickle.PicklingError, e:
356            self.log.error("Pickling problem: %s" % e)
357        except TypeError, e:
358            self.log.error("Pickling problem (TypeError): %s" % e)
359
360    # Call while holding self.state_lock
361    def read_state(self):
362        """
363        Read a new copy of experiment state.  Old state is overwritten.
364
365        State format is a simple pickling of the state dictionary.
366        """
367        try:
368            f = open(self.state_filename, "r")
369            self.state = pickle.load(f)
370            self.log.debug("[read_state]: Read state from %s" % \
371                    self.state_filename)
372        except IOError, e:
373            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
374                    % (self.state_filename, e))
375        except pickle.UnpicklingError, e:
376            self.log.warning(("[read_state]: No saved state: " + \
377                    "Unpickling failed: %s") % e)
378       
379        for k in self.state.keys():
380            try:
381                # This list should only have one element in it, but phrasing it
382                # as a for loop doesn't cost much, really.  We have to find the
383                # fedid elements anyway.
384                for eid in [ f['fedid'] \
385                        for f in self.state[k]['experimentID']\
386                            if f.has_key('fedid') ]:
387                    self.auth.set_attribute(self.state[k]['owner'], eid)
388            except KeyError, e:
389                self.log.warning("[read_state]: State ownership or identity " +\
390                        "misformatted in %s: %s" % (self.state_filename, e))
391
392
393    def read_accessdb(self, accessdb_file):
394        """
395        Read the mapping from fedids that can create experiments to their name
396        in the 3-level access namespace.  All will be asserted from this
397        testbed and can include the local username and porject that will be
398        asserted on their behalf by this fedd.  Each fedid is also added to the
399        authorization system with the "create" attribute.
400        """
401        self.accessdb = {}
402        # These are the regexps for parsing the db
403        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
404        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
405                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
406        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
407                "\s*->\s*(" + name_expr + ")\s*$")
408        lineno = 0
409
410        # Parse the mappings and store in self.authdb, a dict of
411        # fedid -> (proj, user)
412        try:
413            f = open(accessdb_file, "r")
414            for line in f:
415                lineno += 1
416                line = line.strip()
417                if len(line) == 0 or line.startswith('#'):
418                    continue
419                m = project_line.match(line)
420                if m:
421                    fid = fedid(hexstr=m.group(1))
422                    project, user = m.group(2,3)
423                    if not self.accessdb.has_key(fid):
424                        self.accessdb[fid] = []
425                    self.accessdb[fid].append((project, user))
426                    continue
427
428                m = user_line.match(line)
429                if m:
430                    fid = fedid(hexstr=m.group(1))
431                    project = None
432                    user = m.group(2)
433                    if not self.accessdb.has_key(fid):
434                        self.accessdb[fid] = []
435                    self.accessdb[fid].append((project, user))
436                    continue
437                self.log.warn("[experiment_control] Error parsing access " +\
438                        "db %s at line %d" %  (accessdb_file, lineno))
439        except IOError:
440            raise service_error(service_error.internal,
441                    "Error opening/reading %s as experiment " +\
442                            "control accessdb" %  accessdb_file)
443        f.close()
444
445        # Initialize the authorization attributes
446        for fid in self.accessdb.keys():
447            self.auth.set_attribute(fid, 'create')
448
449    def read_mapdb(self, file):
450        """
451        Read a simple colon separated list of mappings for the
452        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
453        """
454
455        self.tbmap = { }
456        lineno =0
457        try:
458            f = open(file, "r")
459            for line in f:
460                lineno += 1
461                line = line.strip()
462                if line.startswith('#') or len(line) == 0:
463                    continue
464                try:
465                    label, url = line.split(':', 1)
466                    self.tbmap[label] = url
467                except ValueError, e:
468                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
469                            "map db: %s %s" % (lineno, line, e))
470        except IOError, e:
471            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
472                    "open %s: %s" % (file, e))
473        f.close()
474
475    def scp_file(self, file, user, host, dest=""):
476        """
477        scp a file to the remote host.  If debug is set the action is only
478        logged.
479        """
480
481        scp_cmd = [self.scp_exec, '-i', self.ssh_privkey_file, file, 
482                "%s@%s:%s" % (user, host, dest)]
483        rv = 0
484
485        try:
486            dnull = open("/dev/null", "r")
487        except IOError:
488            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
489            dnull = Null
490
491        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
492        if not self.debug:
493            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
494            else: rv = call(scp_cmd)
495
496        return rv == 0
497
498    def ssh_cmd(self, user, host, cmd, wname=None):
499        """
500        Run a remote command on host as user.  If debug is set, the action is
501        only logged.
502        """
503        sh_str = "%s -i %s %s@%s %s" % (self.ssh_exec, self.ssh_privkey_file, 
504                user, host, cmd)
505
506        try:
507            dnull = open("/dev/null", "r")
508        except IOError:
509            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
510            dnull = Null
511
512        self.log.debug("[ssh_cmd]: %s" % sh_str)
513        if not self.debug:
514            if dnull:
515                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
516            else:
517                sub = Popen(sh_str, shell=True)
518            return sub.wait() == 0
519        else:
520            return True
521
522    def ship_configs(self, host, user, src_dir, dest_dir):
523        """
524        Copy federant-specific configuration files to the federant.
525        """
526        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
527            return False
528        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
529            return False
530
531        for f in os.listdir(src_dir):
532            if os.path.isdir(f):
533                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
534                        "%s/%s" % (dest_dir, f)):
535                    return False
536            else:
537                if not self.scp_file("%s/%s" % (src_dir, f), 
538                        user, host, dest_dir):
539                    return False
540        return True
541
542    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
543        """
544        Start a sub-experiment on a federant.
545
546        Get the current state, modify or create as appropriate, ship data and
547        configs and start the experiment.  There are small ordering differences
548        based on the initial state of the sub-experiment.
549        """
550        # ops node in the federant
551        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
552        user = tbparams[tb]['user']     # federant user
553        pid = tbparams[tb]['project']   # federant project
554        # XXX
555        base_confs = ( "hosts",)
556        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
557        # command to test experiment state
558        expinfo_exec = "/usr/testbed/bin/expinfo" 
559        # Configuration directories on the remote machine
560        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
561        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
562        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
563        # Regular expressions to parse the expinfo response
564        state_re = re.compile("State:\s+(\w+)")
565        no_exp_re = re.compile("^No\s+such\s+experiment")
566        state = None    # Experiment state parsed from expinfo
567        # The expinfo ssh command
568        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
569
570        # Get status
571        self.log.debug("[start_segment]: %s"% " ".join(cmd))
572        dev_null = None
573        try:
574            dev_null = open("/dev/null", "a")
575        except IOError, e:
576            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
577
578        if self.debug:
579            state = 'swapped'
580            rv = 0
581        else:
582            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
583            for line in status.stdout:
584                m = state_re.match(line)
585                if m: state = m.group(1)
586                else:
587                    m = no_exp_re.match(line)
588                    if m: state = "none"
589            rv = status.wait()
590
591        # If the experiment is not present the subcommand returns a non-zero
592        # return value.  If we successfully parsed a "none" outcome, ignore the
593        # return code.
594        if rv != 0 and state != "none":
595            raise service_error(service_error.internal,
596                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
597
598        self.log.debug("[start_segment]: %s: %s" % (tb, state))
599        self.log.info("[start_segment]:transferring experiment to %s" % tb)
600
601        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
602            return False
603        # Clear the federation files
604        if not self.ssh_cmd(user, host, 
605                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
606            return False
607        if not self.ssh_cmd(user, host, 
608                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
609            return False
610        # Clear and create the tarfiles and rpm directories
611        for d in (tarfiles_dir, rpms_dir):
612            if not self.ssh_cmd(user, host, 
613                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
614                return False
615            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
616                    "create tarfiles"):
617                return False
618       
619        if state == 'active':
620            # Remote experiment is active.  Modify it.
621            for f in base_confs:
622                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
623                        "%s/%s" % (proj_dir, f)):
624                    return False
625            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
626                    proj_dir):
627                return False
628            if os.path.isdir("%s/tarfiles" % tmpdir):
629                if not self.ship_configs(host, user,
630                        "%s/tarfiles" % tmpdir, tarfiles_dir):
631                    return False
632            if os.path.isdir("%s/rpms" % tmpdir):
633                if not self.ship_configs(host, user,
634                        "%s/rpms" % tmpdir, tarfiles_dir):
635                    return False
636            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
637            if not self.ssh_cmd(user, host,
638                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
639                            (pid, eid, tclfile), "modexp"):
640                return False
641            return True
642        elif state == "swapped":
643            # Remote experiment swapped out.  Modify it and swap it in.
644            for f in base_confs:
645                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
646                        "%s/%s" % (proj_dir, f)):
647                    return False
648            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
649                    proj_dir):
650                return False
651            if os.path.isdir("%s/tarfiles" % tmpdir):
652                if not self.ship_configs(host, user,
653                        "%s/tarfiles" % tmpdir, tarfiles_dir):
654                    return False
655            if os.path.isdir("%s/rpms" % tmpdir):
656                if not self.ship_configs(host, user,
657                        "%s/rpms" % tmpdir, tarfiles_dir):
658                    return False
659            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
660            if not self.ssh_cmd(user, host,
661                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
662                    "modexp"):
663                return False
664            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
665            if not self.ssh_cmd(user, host,
666                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
667                    "swapexp"):
668                return False
669            return True
670        elif state == "none":
671            # No remote experiment.  Create one.  We do this in 2 steps so we
672            # can put the configuration files and scripts into the new
673            # experiment directories.
674
675            # Tarfiles must be present for creation to work
676            if os.path.isdir("%s/tarfiles" % tmpdir):
677                if not self.ship_configs(host, user,
678                        "%s/tarfiles" % tmpdir, tarfiles_dir):
679                    return False
680            if os.path.isdir("%s/rpms" % tmpdir):
681                if not self.ship_configs(host, user,
682                        "%s/rpms" % tmpdir, tarfiles_dir):
683                    return False
684            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
685            if not self.ssh_cmd(user, host,
686                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
687                            (pid, eid, tclfile), "startexp"):
688                return False
689            # After startexp the per-experiment directories exist
690            for f in base_confs:
691                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
692                        "%s/%s" % (proj_dir, f)):
693                    return False
694            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
695                    proj_dir):
696                return False
697            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
698            if not self.ssh_cmd(user, host,
699                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
700                    "swapexp"):
701                return False
702            return True
703        else:
704            self.log.debug("[start_segment]:unknown state %s" % state)
705            return False
706
707    def stop_segment(self, tb, eid, tbparams):
708        """
709        Stop a sub experiment by calling swapexp on the federant
710        """
711        user = tbparams[tb]['user']
712        host = tbparams[tb]['host']
713        pid = tbparams[tb]['project']
714
715        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
716        return self.ssh_cmd(user, host,
717                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
718
719       
720    def generate_ssh_keys(self, dest, type="rsa" ):
721        """
722        Generate a set of keys for the gateways to use to talk.
723
724        Keys are of type type and are stored in the required dest file.
725        """
726        valid_types = ("rsa", "dsa")
727        t = type.lower();
728        if t not in valid_types: raise ValueError
729        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
730
731        try:
732            trace = open("/dev/null", "w")
733        except IOError:
734            raise service_error(service_error.internal,
735                    "Cannot open /dev/null??");
736
737        # May raise CalledProcessError
738        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
739        rv = call(cmd, stdout=trace, stderr=trace)
740        if rv != 0:
741            raise service_error(service_error.internal, 
742                    "Cannot generate nonce ssh keys.  %s return code %d" \
743                            % (self.ssh_keygen, rv))
744
745    def gentopo(self, str):
746        """
747        Generate the topology dtat structure from the splitter's XML
748        representation of it.
749
750        The topology XML looks like:
751            <experiment>
752                <nodes>
753                    <node><vname></vname><ips>ip1:ip2</ips></node>
754                </nodes>
755                <lans>
756                    <lan>
757                        <vname></vname><vnode></vnode><ip></ip>
758                        <bandwidth></bandwidth><member>node:port</member>
759                    </lan>
760                </lans>
761        """
762        class topo_parse:
763            """
764            Parse the topology XML and create the dats structure.
765            """
766            def __init__(self):
767                # Typing of the subelements for data conversion
768                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
769                self.int_subelements = ( 'bandwidth',)
770                self.float_subelements = ( 'delay',)
771                # The final data structure
772                self.nodes = [ ]
773                self.lans =  [ ]
774                self.topo = { \
775                        'node': self.nodes,\
776                        'lan' : self.lans,\
777                    }
778                self.element = { }  # Current element being created
779                self.chars = ""     # Last text seen
780
781            def end_element(self, name):
782                # After each sub element the contents is added to the current
783                # element or to the appropriate list.
784                if name == 'node':
785                    self.nodes.append(self.element)
786                    self.element = { }
787                elif name == 'lan':
788                    self.lans.append(self.element)
789                    self.element = { }
790                elif name in self.str_subelements:
791                    self.element[name] = self.chars
792                    self.chars = ""
793                elif name in self.int_subelements:
794                    self.element[name] = int(self.chars)
795                    self.chars = ""
796                elif name in self.float_subelements:
797                    self.element[name] = float(self.chars)
798                    self.chars = ""
799
800            def found_chars(self, data):
801                self.chars += data.rstrip()
802
803
804        tp = topo_parse();
805        parser = xml.parsers.expat.ParserCreate()
806        parser.EndElementHandler = tp.end_element
807        parser.CharacterDataHandler = tp.found_chars
808
809        parser.Parse(str)
810
811        return tp.topo
812       
813
814    def genviz(self, topo):
815        """
816        Generate the visualization the virtual topology
817        """
818
819        neato = "/usr/local/bin/neato"
820        # These are used to parse neato output and to create the visualization
821        # file.
822        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
823        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
824                "%s</type></node>"
825
826        try:
827            # Node names
828            nodes = [ n['vname'] for n in topo['node'] ]
829            topo_lans = topo['lan']
830        except KeyError:
831            raise service_error(service_error.internal, "Bad topology")
832
833        lans = { }
834        links = { }
835
836        # Walk through the virtual topology, organizing the connections into
837        # 2-node connections (links) and more-than-2-node connections (lans).
838        # When a lan is created, it's added to the list of nodes (there's a
839        # node in the visualization for the lan).
840        for l in topo_lans:
841            if links.has_key(l['vname']):
842                if len(links[l['vname']]) < 2:
843                    links[l['vname']].append(l['vnode'])
844                else:
845                    nodes.append(l['vname'])
846                    lans[l['vname']] = links[l['vname']]
847                    del links[l['vname']]
848                    lans[l['vname']].append(l['vnode'])
849            elif lans.has_key(l['vname']):
850                lans[l['vname']].append(l['vnode'])
851            else:
852                links[l['vname']] = [ l['vnode'] ]
853
854
855        # Open up a temporary file for dot to turn into a visualization
856        try:
857            df, dotname = tempfile.mkstemp()
858            dotfile = os.fdopen(df, 'w')
859        except IOError:
860            raise service_error(service_error.internal,
861                    "Failed to open file in genviz")
862
863        # Generate a dot/neato input file from the links, nodes and lans
864        try:
865            print >>dotfile, "graph G {"
866            for n in nodes:
867                print >>dotfile, '\t"%s"' % n
868            for l in links.keys():
869                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
870            for l in lans.keys():
871                for n in lans[l]:
872                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
873            print >>dotfile, "}"
874            dotfile.close()
875        except TypeError:
876            raise service_error(service_error.internal,
877                    "Single endpoint link in vtopo")
878        except IOError:
879            raise service_error(service_error.internal, "Cannot write dot file")
880
881        # Use dot to create a visualization
882        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
883                '-Gpack=true', dotname], stdout=PIPE)
884
885        # Translate dot to vis format
886        vis_nodes = [ ]
887        vis = { 'node': vis_nodes }
888        for line in dot.stdout:
889            m = vis_re.match(line)
890            if m:
891                vn = m.group(1)
892                vis_node = {'name': vn, \
893                        'x': float(m.group(2)),\
894                        'y' : float(m.group(3)),\
895                    }
896                if vn in links.keys() or vn in lans.keys():
897                    vis_node['type'] = 'lan'
898                else:
899                    vis_node['type'] = 'node'
900                vis_nodes.append(vis_node)
901        rv = dot.wait()
902
903        os.remove(dotname)
904        if rv == 0 : return vis
905        else: return None
906
907    def get_access(self, tb, nodes, user, tbparam, master, export_project,
908            access_user):
909        """
910        Get access to testbed through fedd and set the parameters for that tb
911        """
912
913        translate_attr = {
914            'slavenodestartcmd': 'expstart',
915            'slaveconnectorstartcmd': 'gwstart',
916            'masternodestartcmd': 'mexpstart',
917            'masterconnectorstartcmd': 'mgwstart',
918            'connectorimage': 'gwimage',
919            'connectortype': 'gwtype',
920            'tunnelcfg': 'tun',
921            'smbshare': 'smbshare',
922        }
923
924        uri = self.tbmap.get(tb, None)
925        if not uri:
926            raise service_error(serice_error.server_config, 
927                    "Unknown testbed: %s" % tb)
928
929        # currently this lumps all users into one service access group
930        service_keys = [ a for u in user \
931                for a in u.get('access', []) \
932                    if a.has_key('sshPubkey')]
933
934        if len(service_keys) == 0:
935            raise service_error(service_error.req, 
936                    "Must have at least one SSH pubkey for services")
937
938
939        for p, u in access_user:
940
941            if p:
942                # Request with user and project specified
943                req = {\
944                        'destinationTestbed' : { 'uri' : uri },
945                        'project': { 
946                            'name': {'localname': p},
947                            'user': [ {'userID': { 'localname': u } } ],
948                            },
949                        'user':  user,
950                        'allocID' : { 'localname': 'test' },
951                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
952                        'serviceAccess' : service_keys
953                    }
954            else:
955                # Request with only user specified
956                req = {\
957                        'destinationTestbed' : { 'uri' : uri },
958                        'user':  [ {'userID': { 'localname': u } } ],
959                        'allocID' : { 'localname': 'test' },
960                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
961                        'serviceAccess' : service_keys
962                    }
963
964            if tb == master:
965                # NB, the export_project parameter is a dict that includes
966                # the type
967                req['exportProject'] = export_project
968
969            # node resources if any
970            if nodes != None and len(nodes) > 0:
971                rnodes = [ ]
972                for n in nodes:
973                    rn = { }
974                    image, hw, count = n.split(":")
975                    if image: rn['image'] = [ image ]
976                    if hw: rn['hardware'] = [ hw ]
977                    if count: rn['count'] = int(count)
978                    rnodes.append(rn)
979                req['resources']= { }
980                req['resources']['node'] = rnodes
981
982            try:
983                r = self.call_RequestAccess(uri, req, 
984                        self.cert_file, self.cert_pwd, self.trusted_certs)
985            except service_error, e:
986                if e.code == service_error.access:
987                    r = None
988                    continue
989                else:
990                    raise e
991
992            if r.has_key('RequestAccessResponseBody'):
993                r = r['RequestAccessResponseBody']
994            else:
995                raise service_error(service_error.protocol,
996                        "Bad proxy response")
997       
998        if not r:
999            raise service_error(service_error.access, 
1000                    "Access denied by %s (%s)" % (tb, uri))
1001
1002        e = r['emulab']
1003        p = e['project']
1004        tbparam[tb] = { 
1005                "boss": e['boss'],
1006                "host": e['ops'],
1007                "domain": e['domain'],
1008                "fs": e['fileServer'],
1009                "eventserver": e['eventServer'],
1010                "project": unpack_id(p['name']),
1011                "emulab" : e,
1012                "allocID" : r['allocID'],
1013                }
1014        # Make the testbed name be the label the user applied
1015        p['testbed'] = {'localname': tb }
1016
1017        for u in p['user']:
1018            tbparam[tb]['user'] = unpack_id(u['userID'])
1019
1020        for a in e['fedAttr']:
1021            if a['attribute']:
1022                key = translate_attr.get(a['attribute'].lower(), None)
1023                if key:
1024                    tbparam[tb][key]= a['value']
1025       
1026    def release_access(self, tb, aid):
1027        """
1028        Release access to testbed through fedd
1029        """
1030
1031        uri = self.tbmap.get(tb, None)
1032        if not uri:
1033            raise service_error(serice_error.server_config, 
1034                    "Unknown testbed: %s" % tb)
1035
1036        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1037                self.cert_file, self.cert_pwd, self.trusted_certs)
1038
1039        # better error coding
1040
1041    def remote_splitter(self, uri, desc, master):
1042
1043        req = {
1044                'description' : { 'ns2description': desc },
1045                'master': master,
1046                'include_fedkit': bool(self.fedkit)
1047            }
1048
1049        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1050                self.trusted_certs)
1051
1052        if r.has_key('Ns2SplitResponseBody'):
1053            r = r['Ns2SplitResponseBody']
1054            if r.has_key('output'):
1055                return r['output'].splitlines()
1056            else:
1057                raise service_error(service_error.protocol, 
1058                        "Bad splitter response (no output)")
1059        else:
1060            raise service_error(service_error.protocol, "Bad splitter response")
1061       
1062    class current_testbed:
1063        """
1064        Object for collecting the current testbed description.  The testbed
1065        description is saved to a file with the local testbed variables
1066        subsittuted line by line.
1067        """
1068        def __init__(self, eid, tmpdir, fedkit):
1069            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1070            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1071            self.current_testbed = None
1072            self.testbed_file = None
1073
1074            self.def_expstart = \
1075                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1076            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1077            self.def_gwstart = \
1078                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1079            self.def_mgwstart = \
1080                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1081            self.def_gwimage = "FBSD61-TUNNEL2";
1082            self.def_gwtype = "pc";
1083
1084            self.eid = eid
1085            self.tmpdir = tmpdir
1086            self.fedkit = fedkit
1087
1088        def __call__(self, line, master, allocated, tbparams):
1089            # Capture testbed topology descriptions
1090            if self.current_testbed == None:
1091                m = self.begin_testbed.match(line)
1092                if m != None:
1093                    self.current_testbed = m.group(1)
1094                    if self.current_testbed == None:
1095                        raise service_error(service_error.req,
1096                                "Bad request format (unnamed testbed)")
1097                    allocated[self.current_testbed] = \
1098                            allocated.get(self.current_testbed,0) + 1
1099                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1100                    if not os.path.exists(tb_dir):
1101                        try:
1102                            os.mkdir(tb_dir)
1103                        except IOError:
1104                            raise service_error(service_error.internal,
1105                                    "Cannot create %s" % tb_dir)
1106                    try:
1107                        self.testbed_file = open("%s/%s.%s.tcl" %
1108                                (tb_dir, self.eid, self.current_testbed), 'w')
1109                    except IOError:
1110                        self.testbed_file = None
1111                    return True
1112                else: return False
1113            else:
1114                m = self.end_testbed.match(line)
1115                if m != None:
1116                    if m.group(1) != self.current_testbed:
1117                        raise service_error(service_error.internal, 
1118                                "Mismatched testbed markers!?")
1119                    if self.testbed_file != None: 
1120                        self.testbed_file.close()
1121                        self.testbed_file = None
1122                    self.current_testbed = None
1123                elif self.testbed_file:
1124                    # Substitute variables and put the line into the local
1125                    # testbed file.
1126                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1127                            self.def_gwtype)
1128                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1129                            self.def_gwimage)
1130                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1131                            self.def_mgwstart)
1132                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1133                            self.def_mexpstart)
1134                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1135                            self.def_gwstart)
1136                    expstart = tbparams[self.current_testbed].get('expstart', 
1137                            self.def_expstart)
1138                    project = tbparams[self.current_testbed].get('project')
1139                    line = re.sub("GWTYPE", gwtype, line)
1140                    line = re.sub("GWIMAGE", gwimage, line)
1141                    if self.current_testbed == master:
1142                        line = re.sub("GWSTART", mgwstart, line)
1143                        line = re.sub("EXPSTART", mexpstart, line)
1144                    else:
1145                        line = re.sub("GWSTART", gwstart, line)
1146                        line = re.sub("EXPSTART", expstart, line)
1147                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1148                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1149                    line = re.sub("EID", self.eid, line)
1150                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1151                            (project, self.eid), line)
1152                    if self.fedkit:
1153                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1154                                line)
1155                    print >>self.testbed_file, line
1156                return True
1157
1158    class allbeds:
1159        """
1160        Process the Allbeds section.  Get access to each federant and save the
1161        parameters in tbparams
1162        """
1163        def __init__(self, get_access):
1164            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1165            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1166            self.in_allbeds = False
1167            self.get_access = get_access
1168
1169        def __call__(self, line, user, tbparams, master, export_project,
1170                access_user):
1171            # Testbed access parameters
1172            if not self.in_allbeds:
1173                if self.begin_allbeds.match(line):
1174                    self.in_allbeds = True
1175                    return True
1176                else:
1177                    return False
1178            else:
1179                if self.end_allbeds.match(line):
1180                    self.in_allbeds = False
1181                else:
1182                    nodes = line.split('|')
1183                    tb = nodes.pop(0)
1184                    self.get_access(tb, nodes, user, tbparams, master,
1185                            export_project, access_user)
1186                return True
1187
1188    class gateways:
1189        def __init__(self, eid, master, tmpdir, gw_pubkey,
1190                gw_secretkey, copy_file, fedkit):
1191            self.begin_gateways = \
1192                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1193            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1194            self.current_gateways = None
1195            self.control_gateway = None
1196            self.active_end = { }
1197
1198            self.eid = eid
1199            self.master = master
1200            self.tmpdir = tmpdir
1201            self.gw_pubkey_base = gw_pubkey
1202            self.gw_secretkey_base = gw_secretkey
1203
1204            self.copy_file = copy_file
1205            self.fedkit = fedkit
1206
1207
1208        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1209                active_end, tbparams, dtb, myname, desthost, type):
1210            """
1211            Produce a gateway configuration file from a gateways line.
1212            """
1213
1214            sproject = tbparams[gw].get('project', 'project')
1215            dproject = tbparams[dtb].get('project', 'project')
1216            sdomain = ".%s.%s%s" % (eid, sproject,
1217                    tbparams[gw].get('domain', ".example.com"))
1218            ddomain = ".%s.%s%s" % (eid, dproject,
1219                    tbparams[dtb].get('domain', ".example.com"))
1220            boss = tbparams[master].get('boss', "boss")
1221            fs = tbparams[master].get('fs', "fs")
1222            event_server = "%s%s" % \
1223                    (tbparams[gw].get('eventserver', "event_server"),
1224                            tbparams[gw].get('domain', "example.com"))
1225            remote_event_server = "%s%s" % \
1226                    (tbparams[dtb].get('eventserver', "event_server"),
1227                            tbparams[dtb].get('domain', "example.com"))
1228            seer_control = "%s%s" % \
1229                    (tbparams[gw].get('control', "control"), sdomain)
1230
1231            if self.fedkit:
1232                remote_script_dir = "/usr/local/federation/bin"
1233                local_script_dir = "/usr/local/federation/bin"
1234            else:
1235                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1236                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1237
1238            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1239            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1240            tunnel_cfg = tbparams[gw].get("tun", "false")
1241
1242            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1243            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1244
1245            # translate to lower case so the `hostname` hack for specifying
1246            # configuration files works.
1247            conf_file = conf_file.lower();
1248            remote_conf_file = remote_conf_file.lower();
1249
1250            if dtb == master:
1251                active = "false"
1252            elif gw == master:
1253                active = "true"
1254            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1255                active = "false"
1256            else:
1257                active_end['%s-%s' % (gw, dtb)] = 1
1258                active = "true"
1259
1260            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1261            print >>gwconfig, "Active: %s" % active
1262            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1263            print >>gwconfig, "BossName: %s" % boss
1264            print >>gwconfig, "FsName: %s" % fs
1265            print >>gwconfig, "EventServerName: %s" % event_server
1266            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1267            print >>gwconfig, "SeerControl: %s" % seer_control
1268            print >>gwconfig, "Type: %s" % type
1269            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1270            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1271                    local_script_dir
1272            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1273            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1274            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1275                    (remote_conf_dir, remote_conf_file)
1276            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1277            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1278            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1279            gwconfig.close()
1280
1281            return active == "true"
1282
1283        def __call__(self, line, allocated, tbparams):
1284            # Process gateways
1285            if not self.current_gateways:
1286                m = self.begin_gateways.match(line)
1287                if m:
1288                    self.current_gateways = m.group(1)
1289                    if allocated.has_key(self.current_gateways):
1290                        # This test should always succeed
1291                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1292                        if not os.path.exists(tb_dir):
1293                            try:
1294                                os.mkdir(tb_dir)
1295                            except IOError:
1296                                raise service_error(service_error.internal,
1297                                        "Cannot create %s" % tb_dir)
1298                    else:
1299                        # XXX
1300                        self.log.error("[gateways]: Ignoring gateways for " + \
1301                                "unknown testbed %s" % self.current_gateways)
1302                        self.current_gateways = None
1303                    return True
1304                else:
1305                    return False
1306            else:
1307                m = self.end_gateways.match(line)
1308                if m :
1309                    if m.group(1) != self.current_gateways:
1310                        raise service_error(service_error.internal,
1311                                "Mismatched gateway markers!?")
1312                    if self.control_gateway:
1313                        try:
1314                            cc = open("%s/%s/client.conf" %
1315                                    (self.tmpdir, self.current_gateways), 'w')
1316                            print >>cc, "ControlGateway: %s" % \
1317                                    self.control_gateway
1318                            if tbparams[self.master].has_key('smbshare'):
1319                                print >>cc, "SMBSHare: %s" % \
1320                                        tbparams[self.master]['smbshare']
1321                            print >>cc, "ProjectUser: %s" % \
1322                                    tbparams[self.master]['user']
1323                            print >>cc, "ProjectName: %s" % \
1324                                    tbparams[self.master]['project']
1325                            cc.close()
1326                        except IOError:
1327                            raise service_error(service_error.internal,
1328                                    "Error creating client config")
1329                        try:
1330                            cc = open("%s/%s/seer.conf" %
1331                                    (self.tmpdir, self.current_gateways),
1332                                    'w')
1333                            if self.current_gateways != self.master:
1334                                print >>cc, "ControlNode: %s" % \
1335                                        self.control_gateway
1336                            print >>cc, "ExperimentID: %s/%s" % \
1337                                    ( tbparams[self.master]['project'], \
1338                                    self.eid )
1339                            cc.close()
1340                        except IOError:
1341                            raise service_error(service_error.internal,
1342                                    "Error creating seer config")
1343                    else:
1344                        debug.error("[gateways]: No control gateway for %s" %\
1345                                    self.current_gateways)
1346                    self.current_gateways = None
1347                else:
1348                    dtb, myname, desthost, type = line.split(" ")
1349
1350                    if type == "control" or type == "both":
1351                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1352                                self.eid, 
1353                                tbparams[self.current_gateways]['project'],
1354                                tbparams[self.current_gateways]['domain'])
1355                    try:
1356                        active = self.gateway_conf_file(self.current_gateways,
1357                                self.master, self.eid, self.gw_pubkey_base,
1358                                self.gw_secretkey_base,
1359                                self.active_end, tbparams, dtb, myname,
1360                                desthost, type)
1361                    except IOError, e:
1362                        raise service_error(service_error.internal,
1363                                "Failed to write config file for %s" % \
1364                                        self.current_gateway)
1365           
1366                    gw_pubkey = "%s/keys/%s" % \
1367                            (self.tmpdir, self.gw_pubkey_base)
1368                    gw_secretkey = "%s/keys/%s" % \
1369                            (self.tmpdir, self.gw_secretkey_base)
1370
1371                    pkfile = "%s/%s/%s" % \
1372                            ( self.tmpdir, self.current_gateways, 
1373                                    self.gw_pubkey_base)
1374                    skfile = "%s/%s/%s" % \
1375                            ( self.tmpdir, self.current_gateways, 
1376                                    self.gw_secretkey_base)
1377
1378                    if not os.path.exists(pkfile):
1379                        try:
1380                            self.copy_file(gw_pubkey, pkfile)
1381                        except IOError:
1382                            service_error(service_error.internal,
1383                                    "Failed to copy pubkey file")
1384
1385                    if active and not os.path.exists(skfile):
1386                        try:
1387                            self.copy_file(gw_secretkey, skfile)
1388                        except IOError:
1389                            service_error(service_error.internal,
1390                                    "Failed to copy secretkey file")
1391                return True
1392
1393    class shunt_to_file:
1394        """
1395        Simple class to write data between two regexps to a file.
1396        """
1397        def __init__(self, begin, end, filename):
1398            """
1399            Begin shunting on a match of begin, stop on end, send data to
1400            filename.
1401            """
1402            self.begin = re.compile(begin)
1403            self.end = re.compile(end)
1404            self.in_shunt = False
1405            self.file = None
1406            self.filename = filename
1407
1408        def __call__(self, line):
1409            """
1410            Call this on each line in the input that may be shunted.
1411            """
1412            if not self.in_shunt:
1413                if self.begin.match(line):
1414                    self.in_shunt = True
1415                    try:
1416                        self.file = open(self.filename, "w")
1417                    except:
1418                        self.file = None
1419                        raise
1420                    return True
1421                else:
1422                    return False
1423            else:
1424                if self.end.match(line):
1425                    if self.file: 
1426                        self.file.close()
1427                        self.file = None
1428                    self.in_shunt = False
1429                else:
1430                    if self.file:
1431                        print >>self.file, line
1432                return True
1433
1434    class shunt_to_list:
1435        """
1436        Same interface as shunt_to_file.  Data collected in self.list, one list
1437        element per line.
1438        """
1439        def __init__(self, begin, end):
1440            self.begin = re.compile(begin)
1441            self.end = re.compile(end)
1442            self.in_shunt = False
1443            self.list = [ ]
1444       
1445        def __call__(self, line):
1446            if not self.in_shunt:
1447                if self.begin.match(line):
1448                    self.in_shunt = True
1449                    return True
1450                else:
1451                    return False
1452            else:
1453                if self.end.match(line):
1454                    self.in_shunt = False
1455                else:
1456                    self.list.append(line)
1457                return True
1458
1459    class shunt_to_string:
1460        """
1461        Same interface as shunt_to_file.  Data collected in self.str, all in
1462        one string.
1463        """
1464        def __init__(self, begin, end):
1465            self.begin = re.compile(begin)
1466            self.end = re.compile(end)
1467            self.in_shunt = False
1468            self.str = ""
1469       
1470        def __call__(self, line):
1471            if not self.in_shunt:
1472                if self.begin.match(line):
1473                    self.in_shunt = True
1474                    return True
1475                else:
1476                    return False
1477            else:
1478                if self.end.match(line):
1479                    self.in_shunt = False
1480                else:
1481                    self.str += line
1482                return True
1483
1484    def create_experiment(self, req, fid):
1485        """
1486        The external interface to experiment creation called from the
1487        dispatcher.
1488
1489        Creates a working directory, splits the incoming description using the
1490        splitter script and parses out the avrious subsections using the
1491        lcasses above.  Once each sub-experiment is created, use pooled threads
1492        to instantiate them and start it all up.
1493        """
1494
1495        if not self.auth.check_attribute(fid, 'create'):
1496            raise service_error(service_error.access, "Create access denied")
1497
1498        try:
1499            tmpdir = tempfile.mkdtemp(prefix="split-")
1500        except IOError:
1501            raise service_error(service_error.internal, "Cannot create tmp dir")
1502
1503        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1504        gw_secretkey_base = "fed.%s" % self.ssh_type
1505        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1506        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1507        tclfile = tmpdir + "/experiment.tcl"
1508        tbparams = { }
1509        try:
1510            access_user = self.accessdb[fid]
1511        except KeyError:
1512            raise service_error(service_error.internal,
1513                    "Access map and authorizer out of sync in " + \
1514                            "create_experiment for fedid %s"  % fid)
1515
1516        pid = "dummy"
1517        gid = "dummy"
1518        # XXX
1519        fail_soft = False
1520
1521        try:
1522            os.mkdir(tmpdir+"/keys")
1523        except OSError:
1524            raise service_error(service_error.internal,
1525                    "Can't make temporary dir")
1526
1527        req = req.get('CreateRequestBody', None)
1528        if not req:
1529            raise service_error(service_error.req,
1530                    "Bad request format (no CreateRequestBody)")
1531        # The tcl parser needs to read a file so put the content into that file
1532        descr=req.get('experimentdescription', None)
1533        if descr:
1534            file_content=descr.get('ns2description', None)
1535            if file_content:
1536                try:
1537                    f = open(tclfile, 'w')
1538                    f.write(file_content)
1539                    f.close()
1540                except IOError:
1541                    raise service_error(service_error.internal,
1542                            "Cannot write temp experiment description")
1543            else:
1544                raise service_error(service_error.req, 
1545                        "Only ns2descriptions supported")
1546        else:
1547            raise service_error(service_error.req, "No experiment description")
1548
1549        if req.has_key('experimentID') and \
1550                req['experimentID'].has_key('localname'):
1551            eid = req['experimentID']['localname']
1552            self.state_lock.acquire()
1553            while (self.state.has_key(eid)):
1554                eid += random.choice(string.ascii_letters)
1555            # To avoid another thread picking this localname
1556            self.state[eid] = "placeholder"
1557            self.state_lock.release()
1558        else:
1559            eid = self.exp_stem
1560            for i in range(0,5):
1561                eid += random.choice(string.ascii_letters)
1562            self.state_lock.acquire()
1563            while (self.state.has_key(eid)):
1564                eid = self.exp_stem
1565                for i in range(0,5):
1566                    eid += random.choice(string.ascii_letters)
1567            # To avoid another thread picking this localname
1568            self.state[eid] = "placeholder"
1569            self.state_lock.release()
1570
1571        try:
1572            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1573        except ValueError:
1574            raise service_error(service_error.server_config, 
1575                    "Bad key type (%s)" % self.ssh_type)
1576
1577        user = req.get('user', None)
1578        if user == None:
1579            raise service_error(service_error.req, "No user")
1580
1581        master = req.get('master', None)
1582        if not master:
1583            raise service_error(service_error.req, "No master testbed label")
1584        export_project = req.get('exportProject', None)
1585        if not export_project:
1586            raise service_error(service_error.req, "No export project")
1587       
1588        if self.splitter_url:
1589            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1590            split_data = self.remote_splitter(self.splitter_url, file_content,
1591                    master)
1592        else:
1593            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1594                str(self.muxmax), '-m', master]
1595
1596            if self.fedkit:
1597                tclcmd.append('-k')
1598
1599            tclcmd.extend([pid, gid, eid, tclfile])
1600
1601            self.log.debug("running local splitter %s", " ".join(tclcmd))
1602            tclparser = Popen(tclcmd, stdout=PIPE)
1603            split_data = tclparser.stdout
1604
1605        allocated = { }     # Testbeds we can access
1606        started = { }       # Testbeds where a sub-experiment started
1607                            # successfully
1608
1609        # Objects to parse the splitter output (defined above)
1610        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1611        parse_allbeds = self.allbeds(self.get_access)
1612        parse_gateways = self.gateways(eid, master, tmpdir,
1613                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1614        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1615                    "^#\s+End\s+Vtopo")
1616        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1617                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1618        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1619                "^#\s+End\s+tarfiles")
1620        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1621                "^#\s+End\s+rpms")
1622
1623        # Worling on the split data
1624        for line in split_data:
1625            line = line.rstrip()
1626            if parse_current_testbed(line, master, allocated, tbparams):
1627                continue
1628            elif parse_allbeds(line, user, tbparams, master, export_project,
1629                    access_user):
1630                continue
1631            elif parse_gateways(line, allocated, tbparams):
1632                continue
1633            elif parse_vtopo(line):
1634                continue
1635            elif parse_hostnames(line):
1636                continue
1637            elif parse_tarfiles(line):
1638                continue
1639            elif parse_rpms(line):
1640                continue
1641            else:
1642                raise service_error(service_error.internal, 
1643                        "Bad tcl parse? %s" % line)
1644
1645        # Virtual topology and visualization
1646        vtopo = self.gentopo(parse_vtopo.str)
1647        if not vtopo:
1648            raise service_error(service_error.internal, 
1649                    "Failed to generate virtual topology")
1650
1651        vis = self.genviz(vtopo)
1652        if not vis:
1653            raise service_error(service_error.internal, 
1654                    "Failed to generate visualization")
1655
1656        # save federant information
1657        for k in allocated.keys():
1658            tbparams[k]['federant'] = {\
1659                    'name': [ { 'localname' : eid} ],\
1660                    'emulab': tbparams[k]['emulab'],\
1661                    'allocID' : tbparams[k]['allocID'],\
1662                    'master' : k == master,\
1663                }
1664
1665
1666        # Copy tarfiles and rpms needed at remote sites into a staging area
1667        try:
1668            if self.fedkit:
1669                parse_tarfiles.list.append(self.fedkit)
1670            for t in parse_tarfiles.list:
1671                if not os.path.exists("%s/tarfiles" % tmpdir):
1672                    os.mkdir("%s/tarfiles" % tmpdir)
1673                self.copy_file(t, "%s/tarfiles/%s" % \
1674                        (tmpdir, os.path.basename(t)))
1675            for r in parse_rpms.list:
1676                if not os.path.exists("%s/rpms" % tmpdir):
1677                    os.mkdir("%s/rpms" % tmpdir)
1678                self.copy_file(r, "%s/rpms/%s" % \
1679                        (tmpdir, os.path.basename(r)))
1680        except IOError, e:
1681            raise service_error(service_error.internal, 
1682                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1683
1684        thread_pool_info = self.thread_pool()
1685        threads = [ ]
1686
1687        for tb in [ k for k in allocated.keys() if k != master]:
1688            # Wait until we have a free slot to start the next testbed load
1689            thread_pool_info.acquire()
1690            while thread_pool_info.started - \
1691                    thread_pool_info.terminated >= self.nthreads:
1692                thread_pool_info.wait()
1693            thread_pool_info.release()
1694
1695            # Create and start a thread to start the segment, and save it to
1696            # get the return value later
1697            t  = self.pooled_thread(target=self.start_segment, 
1698                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1699                    pdata=thread_pool_info, trace_file=self.trace_file)
1700            threads.append(t)
1701            t.start()
1702
1703        # Wait until all finish (the first clause of the while is to make sure
1704        # one starts)
1705        thread_pool_info.acquire()
1706        while thread_pool_info.started == 0 or \
1707                thread_pool_info.started > thread_pool_info.terminated:
1708            thread_pool_info.wait()
1709        thread_pool_info.release()
1710
1711        # If none failed, start the master
1712        failed = [ t.getName() for t in threads if not t.rv ]
1713
1714        if len(failed) == 0:
1715            if not self.start_segment(master, eid, tbparams, tmpdir):
1716                failed.append(master)
1717
1718        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1719        # If one failed clean up, unless fail_soft is set
1720        if failed:
1721            if not fail_soft:
1722                for tb in succeeded:
1723                    self.stop_segment(tb, eid, tbparams)
1724                # Remove the placeholder
1725                self.state_lock.acquire()
1726                del self.state[eid]
1727                self.state_lock.release()
1728
1729                raise service_error(service_error.federant,
1730                    "Swap in failed on %s" % ",".join(failed))
1731        else:
1732            self.log.info("[start_segment]: Experiment %s started" % eid)
1733
1734        # Generate an ID for the experiment (slice) and a certificate that the
1735        # allocator can use to prove they own it.  We'll ship it back through
1736        # the encrypted connection.
1737        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1738
1739        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1740
1741        # Walk up tmpdir, deleting as we go
1742        for path, dirs, files in os.walk(tmpdir, topdown=False):
1743            for f in files:
1744                os.remove(os.path.join(path, f))
1745            for d in dirs:
1746                os.rmdir(os.path.join(path, d))
1747        os.rmdir(tmpdir)
1748
1749        # The deepcopy prevents the allocation ID and other binaries from being
1750        # translated into other formats
1751        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1752                for tb in tbparams.keys() \
1753                    if tbparams[tb].has_key('federant') ],\
1754                    'vtopo': vtopo,\
1755                    'vis' : vis,
1756                    'experimentID' : [\
1757                            { 'fedid': copy.copy(expid) }, \
1758                            { 'localname': eid },\
1759                        ],\
1760                    'experimentAccess': { 'X509' : expcert },\
1761                }
1762
1763        # Insert the experiment into our state and update the disk copy
1764        self.state_lock.acquire()
1765        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1766                for tb in tbparams.keys() \
1767                    if tbparams[tb].has_key('federant') ],\
1768                    'vtopo': vtopo,\
1769                    'vis' : vis,
1770                    'owner': fid,
1771                    'experimentID' : [\
1772                            { 'fedid': expid }, { 'localname': eid },\
1773                        ],\
1774                }
1775        self.state[eid] = self.state[expid]
1776        if self.state_filename: self.write_state()
1777        self.state_lock.release()
1778
1779        self.auth.set_attribute(fid, expid)
1780        self.auth.set_attribute(expid, expid)
1781
1782        if not failed:
1783            return resp
1784        else:
1785            raise service_error(service_error.partial, \
1786                    "Partial swap in on %s" % ",".join(succeeded))
1787
1788    def check_experiment_access(self, fid, key):
1789        """
1790        Confirm that the fid has access to the experiment.  Though a request
1791        may be made in terms of a local name, the access attribute is always
1792        the experiment's fedid.
1793        """
1794        if not isinstance(key, fedid):
1795            self.state_lock.acquire()
1796            if self.state.has_key(key):
1797                try:
1798                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1799                            if f.has_key('fedid') ]
1800                except KeyError:
1801                    self.state_lock.release()
1802                    raise service_error(service_error.internal, 
1803                            "No fedid for experiment %s when checking " +\
1804                                    "access(!?)" % key)
1805                if len(kl) == 1:
1806                    key = kl[0]
1807                else:
1808                    self.state_lock.release()
1809                    raise service_error(service_error.internal, 
1810                            "multiple fedids for experiment %s when " +\
1811                                    "checking access(!?)" % key)
1812            else:
1813                self.state_lock.release()
1814                raise service_error(service_error.access, "Access Denied")
1815            self.state_lock.release()
1816
1817        if self.auth.check_attribute(fid, key):
1818            return True
1819        else:
1820            raise service_error(service_error.access, "Access Denied")
1821
1822
1823
1824    def get_vtopo(self, req, fid):
1825        """
1826        Return the stored virtual topology for this experiment
1827        """
1828        rv = None
1829
1830        req = req.get('VtopoRequestBody', None)
1831        if not req:
1832            raise service_error(service_error.req,
1833                    "Bad request format (no VtopoRequestBody)")
1834        exp = req.get('experiment', None)
1835        if exp:
1836            if exp.has_key('fedid'):
1837                key = exp['fedid']
1838                keytype = "fedid"
1839            elif exp.has_key('localname'):
1840                key = exp['localname']
1841                keytype = "localname"
1842            else:
1843                raise service_error(service_error.req, "Unknown lookup type")
1844        else:
1845            raise service_error(service_error.req, "No request?")
1846
1847        self.check_experiment_access(fid, key)
1848
1849        self.state_lock.acquire()
1850        if self.state.has_key(key):
1851            rv = { 'experiment' : {keytype: key },\
1852                    'vtopo': self.state[key]['vtopo'],\
1853                }
1854        self.state_lock.release()
1855
1856        if rv: return rv
1857        else: raise service_error(service_error.req, "No such experiment")
1858
1859    def get_vis(self, req, fid):
1860        """
1861        Return the stored visualization for this experiment
1862        """
1863        rv = None
1864
1865        req = req.get('VisRequestBody', None)
1866        if not req:
1867            raise service_error(service_error.req,
1868                    "Bad request format (no VisRequestBody)")
1869        exp = req.get('experiment', None)
1870        if exp:
1871            if exp.has_key('fedid'):
1872                key = exp['fedid']
1873                keytype = "fedid"
1874            elif exp.has_key('localname'):
1875                key = exp['localname']
1876                keytype = "localname"
1877            else:
1878                raise service_error(service_error.req, "Unknown lookup type")
1879        else:
1880            raise service_error(service_error.req, "No request?")
1881
1882        self.check_experiment_access(fid, key)
1883
1884        self.state_lock.acquire()
1885        if self.state.has_key(key):
1886            rv =  { 'experiment' : {keytype: key },\
1887                    'vis': self.state[key]['vis'],\
1888                    }
1889        self.state_lock.release()
1890
1891        if rv: return rv
1892        else: raise service_error(service_error.req, "No such experiment")
1893
1894    def get_info(self, req, fid):
1895        """
1896        Return all the stored info about this experiment
1897        """
1898        rv = None
1899
1900        req = req.get('InfoRequestBody', None)
1901        if not req:
1902            raise service_error(service_error.req,
1903                    "Bad request format (no VisRequestBody)")
1904        exp = req.get('experiment', None)
1905        if exp:
1906            if exp.has_key('fedid'):
1907                key = exp['fedid']
1908                keytype = "fedid"
1909            elif exp.has_key('localname'):
1910                key = exp['localname']
1911                keytype = "localname"
1912            else:
1913                raise service_error(service_error.req, "Unknown lookup type")
1914        else:
1915            raise service_error(service_error.req, "No request?")
1916
1917        self.check_experiment_access(fid, key)
1918
1919        # The state may be massaged by the service function that called
1920        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1921        # state.
1922        self.state_lock.acquire()
1923        if self.state.has_key(key):
1924            rv = copy.deepcopy(self.state[key])
1925        self.state_lock.release()
1926
1927        if rv: return rv
1928        else: raise service_error(service_error.req, "No such experiment")
1929
1930
1931    def terminate_experiment(self, req, fid):
1932        """
1933        Swap this experiment out on the federants and delete the shared
1934        information
1935        """
1936        tbparams = { }
1937        req = req.get('TerminateRequestBody', None)
1938        if not req:
1939            raise service_error(service_error.req,
1940                    "Bad request format (no TerminateRequestBody)")
1941        exp = req.get('experiment', None)
1942        if exp:
1943            if exp.has_key('fedid'):
1944                key = exp['fedid']
1945                keytype = "fedid"
1946            elif exp.has_key('localname'):
1947                key = exp['localname']
1948                keytype = "localname"
1949            else:
1950                raise service_error(service_error.req, "Unknown lookup type")
1951        else:
1952            raise service_error(service_error.req, "No request?")
1953
1954        self.check_experiment_access(fid, key)
1955
1956        self.state_lock.acquire()
1957        fed_exp = self.state.get(key, None)
1958
1959        if fed_exp:
1960            # This branch of the conditional holds the lock to generate a
1961            # consistent temporary tbparams variable to deallocate experiments.
1962            # It releases the lock to do the deallocations and reacquires it to
1963            # remove the experiment state when the termination is complete.
1964            ids = []
1965            #  experimentID is a list of dicts that are self-describing
1966            #  identifiers.  This finds all the fedids and localnames - the
1967            #  keys of self.state - and puts them into ids.
1968            for id in fed_exp.get('experimentID', []):
1969                if id.has_key('fedid'): ids.append(id['fedid'])
1970                if id.has_key('localname'): ids.append(id['localname'])
1971
1972            # Construct enough of the tbparams to make the stop_segment calls
1973            # work
1974            for fed in fed_exp['federant']:
1975                try:
1976                    for e in fed['name']:
1977                        eid = e.get('localname', None)
1978                        if eid: break
1979                    else:
1980                        continue
1981
1982                    p = fed['emulab']['project']
1983
1984                    project = p['name']['localname']
1985                    tb = p['testbed']['localname']
1986                    user = p['user'][0]['userID']['localname']
1987
1988                    domain = fed['emulab']['domain']
1989                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1990                    aid = fed['allocID']
1991                except KeyError, e:
1992                    continue
1993                tbparams[tb] = {\
1994                        'user': user,\
1995                        'domain': domain,\
1996                        'project': project,\
1997                        'host': host,\
1998                        'eid': eid,\
1999                        'aid': aid,\
2000                    }
2001            self.state_lock.release()
2002
2003            # Stop everyone.
2004            for tb in tbparams.keys():
2005                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
2006
2007            # release the allocations
2008            for tb in tbparams.keys():
2009                self.release_access(tb, tbparams[tb]['aid'])
2010
2011            # Remove the terminated experiment
2012            self.state_lock.acquire()
2013            for id in ids:
2014                if self.state.has_key(id): del self.state[id]
2015
2016            if self.state_filename: self.write_state()
2017            self.state_lock.release()
2018
2019            return { 'experiment': exp }
2020        else:
2021            # Don't forget to release the lock
2022            self.state_lock.release()
2023            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.