source: fedd/federation/experiment_control.py @ 02786fc

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 02786fc was 02786fc, checked in by Ted Faber <faber@…>, 16 years ago

typo: try to get the cert from a different place if the first one fils!

  • Property mode set to 100644
File size: 60.6 KB
RevLine 
[6679c122]1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
[eee2b2e]11import pickle
[c971895]12import logging
[6679c122]13
[3441fe3]14import traceback
[c971895]15# For parsing visualization output and splitter output
16import xml.parsers.expat
[3441fe3]17
[1af38d6]18from threading import *
[6679c122]19from subprocess import *
20
[ec4fb42]21from util import *
[51cc9df]22from fedid import fedid, generate_fedid
[9460b1e]23from remote_service import xmlrpc_handler, soap_handler, service_caller
[c971895]24from service_error import service_error
[6679c122]25
[11a08b0]26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
[ec4fb42]33class experiment_control_local:
[0ea11af]34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
[6679c122]40   
[1af38d6]41    class thread_pool:
[0ea11af]42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
[1af38d6]46        def __init__(self):
[0ea11af]47            """
48            Start a pool.
49            """
[1af38d6]50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53
54        def acquire(self):
[0ea11af]55            """
56            Get the pool's lock.
57            """
[1af38d6]58            self.changed.acquire()
59
60        def release(self):
[0ea11af]61            """
62            Release the pool's lock.
63            """
[1af38d6]64            self.changed.release()
65
66        def wait(self, timeout = None):
[0ea11af]67            """
68            Wait for a pool thread to start or stop.
69            """
[1af38d6]70            self.changed.wait(timeout)
71
72        def start(self):
[0ea11af]73            """
74            Called by a pool thread to report starting.
75            """
[1af38d6]76            self.changed.acquire()
77            self.started += 1
78            self.changed.notifyAll()
79            self.changed.release()
80
81        def terminate(self):
[0ea11af]82            """
83            Called by a pool thread to report finishing.
84            """
[1af38d6]85            self.changed.acquire()
86            self.terminated += 1
87            self.changed.notifyAll()
88            self.changed.release()
89
90        def clear(self):
[0ea11af]91            """
92            Clear all pool data.
93            """
[1af38d6]94            self.changed.acquire()
95            self.started = 0
96            self.terminated =0
97            self.changed.notifyAll()
98            self.changed.release()
99
100    class pooled_thread(Thread):
[0ea11af]101        """
102        One of a set of threads dedicated to a specific task.  Uses the
103        thread_pool class above for coordination.
104        """
[1af38d6]105        def __init__(self, group=None, target=None, name=None, args=(), 
[3441fe3]106                kwargs={}, pdata=None, trace_file=None):
[1af38d6]107            Thread.__init__(self, group, target, name, args, kwargs)
[0ea11af]108            self.rv = None          # Return value of the ops in this thread
109            self.exception = None   # Exception that terminated this thread
110            self.target=target      # Target function to run on start()
111            self.args = args        # Args to pass to target
112            self.kwargs = kwargs    # Additional kw args
113            self.pdata = pdata      # thread_pool for this class
114            # Logger for this thread
115            self.log = logging.getLogger("fedd.experiment_control")
[1af38d6]116       
117        def run(self):
[0ea11af]118            """
119            Emulate Thread.run, except add pool data manipulation and error
120            logging.
121            """
[1af38d6]122            if self.pdata:
123                self.pdata.start()
124
125            if self.target:
126                try:
127                    self.rv = self.target(*self.args, **self.kwargs)
[3441fe3]128                except service_error, s:
129                    self.exception = s
[0ea11af]130                    self.log.error("Thread exception: %s %s" % \
131                            (s.code_string(), s.desc))
[3441fe3]132                except:
133                    self.exception = sys.exc_info()[1]
[0ea11af]134                    self.log.error(("Unexpected thread exception: %s" +\
135                            "Trace %s") % (self.exception,\
136                                traceback.format_exc()))
[1af38d6]137            if self.pdata:
138                self.pdata.terminate()
[6679c122]139
[f069052]140    call_RequestAccess = service_caller('RequestAccess')
141    call_ReleaseAccess = service_caller('ReleaseAccess')
142    call_Ns2Split = service_caller('Ns2Split')
[058f58e]143
[3f6bc5f]144    def __init__(self, config=None, auth=None):
[0ea11af]145        """
146        Intialize the various attributes, most from the config object
147        """
[ec4fb42]148        self.thread_with_rv = experiment_control_local.pooled_thread
149        self.thread_pool = experiment_control_local.thread_pool
[19cc408]150
[f005e44]151        self.cert_file = config.get("experiment_control", "cert_file")
152        if self.cert_file:
153            self.cert_pwd = config.get("experiment_control", "cert_pwd")
154        else:
[02786fc]155            self.cert_file = config.get("globals", "cert_file")
156            self.cert_pwd = config.get("globals", "cert_pwd")
[f005e44]157
158        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
159                or config.get("globals", "trusted_certs")
[4064742]160
[19cc408]161        self.exp_stem = "fed-stem"
[11a08b0]162        self.log = logging.getLogger("fedd.experiment_control")
[34bc05c]163        set_log_level(config, "experiment_control", self.log)
[19cc408]164        self.muxmax = 2
165        self.nthreads = 2
166        self.randomize_experiments = False
[72ed6e4]167
[19cc408]168        self.scp_exec = "/usr/bin/scp"
169        self.splitter = None
170        self.ssh_exec="/usr/bin/ssh"
171        self.ssh_keygen = "/usr/bin/ssh-keygen"
172        self.ssh_identity_file = None
[72ed6e4]173
[34bc05c]174
[bf0a80e]175        self.debug = config.getboolean("experiment_control", "create_debug")
176        self.state_filename = config.get("experiment_control", 
177                "experiment_state_file")
178        self.splitter_url = config.get("experiment_control", "splitter_url")
179        self.fedkit = config.get("experiment_control", "fedkit")
180        accessdb_file = config.get("experiment_control", "accessdb")
[72ed6e4]181
[721705e9]182        self.ssh_pubkey_file = config.get("experiment_control", 
183                "ssh_pubkey_file")
184        self.ssh_privkey_file = config.get("experiment_control",
185                "ssh_privkey_file")
186        # NB for internal master/slave ops, not experiment setup
187        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
[19cc408]188        self.state = { }
[a97394b]189        self.state_lock = Lock()
[19cc408]190        self.tclsh = "/usr/local/bin/otclsh"
[77a7634]191        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
192                config.get("experiment_control", "tcl_splitter",
193                        "/usr/testbed/lib/ns2ir/parse.tcl")
[34bc05c]194        mapdb_file = config.get("experiment_control", "mapdb")
[19cc408]195        self.trace_file = sys.stderr
196
197        self.def_expstart = \
[2c6128f]198                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
199                "/tmp/federate";
200        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
201                "FEDDIR/hosts";
[19cc408]202        self.def_gwstart = \
[2c6128f]203                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
204                "/tmp/bridge.log";
[19cc408]205        self.def_mgwstart = \
[2c6128f]206                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
207                "/tmp/bridge.log";
[19cc408]208        self.def_gwimage = "FBSD61-TUNNEL2";
209        self.def_gwtype = "pc";
[5fffd82]210        self.local_access = { }
[19cc408]211
[34bc05c]212        if auth:
213            self.auth = auth
214        else:
215            self.log.error(\
216                    "[access]: No authorizer initialized, creating local one.")
217            auth = authorizer()
218
[19cc408]219
220        if self.ssh_pubkey_file:
221            try:
222                f = open(self.ssh_pubkey_file, 'r')
223                self.ssh_pubkey = f.read()
224                f.close()
225            except IOError:
226                raise service_error(service_error.internal,
227                        "Cannot read sshpubkey")
[721705e9]228        else:
229            raise service_error(service_error.internal, 
230                    "No SSH public key file?")
231
232        if not self.ssh_privkey_file:
233            raise service_error(service_error.internal, 
234                    "No SSH public key file?")
[19cc408]235
[11a08b0]236
[34bc05c]237        if mapdb_file:
238            self.read_mapdb(mapdb_file)
[4064742]239        else:
[34bc05c]240            self.log.warn("[experiment_control] No testbed map, using defaults")
241            self.tbmap = { 
242                    'deter':'https://users.isi.deterlab.net:23235',
243                    'emulab':'https://users.isi.deterlab.net:23236',
244                    'ucb':'https://users.isi.deterlab.net:23237',
245                    }
[4064742]246
247        if accessdb_file:
[34bc05c]248                self.read_accessdb(accessdb_file)
[4064742]249        else:
250            raise service_error(service_error.internal,
251                    "No accessdb specified in config")
252
[0ea11af]253        # Grab saved state.  OK to do this w/o locking because it's read only
254        # and only one thread should be in existence that can see self.state at
255        # this point.
[eee2b2e]256        if self.state_filename:
257            self.read_state()
258
[0ea11af]259        # Dispatch tables
260        self.soap_services = {\
[f069052]261                'Create': soap_handler('Create', self.create_experiment),
262                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
263                'Vis': soap_handler('Vis', self.get_vis),
264                'Info': soap_handler('Info', self.get_info),
265                'Terminate': soap_handler('Terminate', 
266                    self.terminate_experiment),
[19cc408]267        }
268
[0ea11af]269        self.xmlrpc_services = {\
[f069052]270                'Create': xmlrpc_handler('Create', self.create_experiment),
271                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
272                'Vis': xmlrpc_handler('Vis', self.get_vis),
273                'Info': xmlrpc_handler('Info', self.get_info),
274                'Terminate': xmlrpc_handler('Terminate',
275                    self.terminate_experiment),
[19cc408]276        }
277
[6679c122]278    def copy_file(self, src, dest, size=1024):
279        """
280        Exceedingly simple file copy.
281        """
282        s = open(src,'r')
283        d = open(dest, 'w')
284
285        buf = "x"
286        while buf != "":
287            buf = s.read(size)
288            d.write(buf)
289        s.close()
290        d.close()
291
[a97394b]292    # Call while holding self.state_lock
[eee2b2e]293    def write_state(self):
[0ea11af]294        """
295        Write a new copy of experiment state after copying the existing state
296        to a backup.
297
298        State format is a simple pickling of the state dictionary.
299        """
[eee2b2e]300        if os.access(self.state_filename, os.W_OK):
301            self.copy_file(self.state_filename, \
302                    "%s.bak" % self.state_filename)
303        try:
304            f = open(self.state_filename, 'w')
305            pickle.dump(self.state, f)
306        except IOError, e:
[11a08b0]307            self.log.error("Can't write file %s: %s" % \
308                    (self.state_filename, e))
[eee2b2e]309        except pickle.PicklingError, e:
[11a08b0]310            self.log.error("Pickling problem: %s" % e)
[4ed10ae]311        except TypeError, e:
312            self.log.error("Pickling problem (TypeError): %s" % e)
[eee2b2e]313
[a97394b]314    # Call while holding self.state_lock
[eee2b2e]315    def read_state(self):
[0ea11af]316        """
317        Read a new copy of experiment state.  Old state is overwritten.
318
319        State format is a simple pickling of the state dictionary.
320        """
[eee2b2e]321        try:
322            f = open(self.state_filename, "r")
323            self.state = pickle.load(f)
[d199ced]324            self.log.debug("[read_state]: Read state from %s" % \
325                    self.state_filename)
[eee2b2e]326        except IOError, e:
[d199ced]327            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
328                    % (self.state_filename, e))
[eee2b2e]329        except pickle.UnpicklingError, e:
[d199ced]330            self.log.warning(("[read_state]: No saved state: " + \
331                    "Unpickling failed: %s") % e)
[4064742]332       
333        for k in self.state.keys():
334            try:
335                # This list should only have one element in it, but phrasing it
336                # as a for loop doesn't cost much, really.  We have to find the
337                # fedid elements anyway.
338                for eid in [ f['fedid'] \
339                        for f in self.state[k]['experimentID']\
340                            if f.has_key('fedid') ]:
341                    self.auth.set_attribute(self.state[k]['owner'], eid)
342            except KeyError, e:
343                self.log.warning("[read_state]: State ownership or identity " +\
344                        "misformatted in %s: %s" % (self.state_filename, e))
345
346
347    def read_accessdb(self, accessdb_file):
[34bc05c]348        """
349        Read the mapping from fedids that can create experiments to their name
350        in the 3-level access namespace.  All will be asserted from this
351        testbed and can include the local username and porject that will be
352        asserted on their behalf by this fedd.  Each fedid is also added to the
353        authorization system with the "create" attribute.
354        """
355        self.accessdb = {}
356        # These are the regexps for parsing the db
[4064742]357        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
358        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
359                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
360        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
361                "\s*->\s*(" + name_expr + ")\s*$")
362        lineno = 0
363
[34bc05c]364        # Parse the mappings and store in self.authdb, a dict of
365        # fedid -> (proj, user)
366        try:
367            f = open(accessdb_file, "r")
368            for line in f:
369                lineno += 1
370                line = line.strip()
371                if len(line) == 0 or line.startswith('#'):
372                    continue
373                m = project_line.match(line)
374                if m:
375                    fid = fedid(hexstr=m.group(1))
376                    project, user = m.group(2,3)
377                    if not self.accessdb.has_key(fid):
378                        self.accessdb[fid] = []
379                    self.accessdb[fid].append((project, user))
380                    continue
381
382                m = user_line.match(line)
383                if m:
384                    fid = fedid(hexstr=m.group(1))
385                    project = None
386                    user = m.group(2)
387                    if not self.accessdb.has_key(fid):
388                        self.accessdb[fid] = []
389                    self.accessdb[fid].append((project, user))
390                    continue
391                self.log.warn("[experiment_control] Error parsing access " +\
392                        "db %s at line %d" %  (accessdb_file, lineno))
393        except IOError:
[4064742]394            raise service_error(service_error.internal,
[34bc05c]395                    "Error opening/reading %s as experiment " +\
396                            "control accessdb" %  accessdb_file)
[4064742]397        f.close()
398
[34bc05c]399        # Initialize the authorization attributes
400        for fid in self.accessdb.keys():
401            self.auth.set_attribute(fid, 'create')
402
403    def read_mapdb(self, file):
404        """
405        Read a simple colon separated list of mappings for the
406        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
407        """
408
409        self.tbmap = { }
410        lineno =0
411        try:
412            f = open(file, "r")
413            for line in f:
414                lineno += 1
415                line = line.strip()
416                if line.startswith('#') or len(line) == 0:
417                    continue
418                try:
419                    label, url = line.split(':', 1)
420                    self.tbmap[label] = url
421                except ValueError, e:
422                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
423                            "map db: %s %s" % (lineno, line, e))
424        except IOError, e:
425            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
426                    "open %s: %s" % (file, e))
427        f.close()
[eee2b2e]428
[6679c122]429    def scp_file(self, file, user, host, dest=""):
430        """
[0ea11af]431        scp a file to the remote host.  If debug is set the action is only
432        logged.
[6679c122]433        """
[d0ae12d]434
[dceeef9]435        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', '-i', 
436                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
[11a08b0]437        rv = 0
[3441fe3]438
[d199ced]439        try:
440            dnull = open("/dev/null", "r")
441        except IOError:
442            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
443            dnull = Null
444
[11a08b0]445        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
[d0ae12d]446        if not self.debug:
[d199ced]447            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
448            else: rv = call(scp_cmd)
[d0ae12d]449
450        return rv == 0
[6679c122]451
[3441fe3]452    def ssh_cmd(self, user, host, cmd, wname=None):
[0ea11af]453        """
454        Run a remote command on host as user.  If debug is set, the action is
455        only logged.
456        """
[dceeef9]457        sh_str = "%s -o 'IdentitiesOnly yes' -i %s %s@%s %s" % \
458                (self.ssh_exec, self.ssh_privkey_file, 
459                        user, host, cmd)
[d0ae12d]460
[d199ced]461        try:
462            dnull = open("/dev/null", "r")
463        except IOError:
464            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
465            dnull = Null
466
[11a08b0]467        self.log.debug("[ssh_cmd]: %s" % sh_str)
[d0ae12d]468        if not self.debug:
[d199ced]469            if dnull:
470                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
471            else:
472                sub = Popen(sh_str, shell=True)
[d0ae12d]473            return sub.wait() == 0
474        else:
475            return True
[6679c122]476
477    def ship_configs(self, host, user, src_dir, dest_dir):
[0ea11af]478        """
479        Copy federant-specific configuration files to the federant.
480        """
[6679c122]481        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
482            return False
483        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
484            return False
485
486        for f in os.listdir(src_dir):
487            if os.path.isdir(f):
488                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
489                        "%s/%s" % (dest_dir, f)):
490                    return False
491            else:
492                if not self.scp_file("%s/%s" % (src_dir, f), 
493                        user, host, dest_dir):
494                    return False
495        return True
496
[6546868]497    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
[0ea11af]498        """
499        Start a sub-experiment on a federant.
500
501        Get the current state, modify or create as appropriate, ship data and
502        configs and start the experiment.  There are small ordering differences
503        based on the initial state of the sub-experiment.
504        """
505        # ops node in the federant
[6679c122]506        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
[0ea11af]507        user = tbparams[tb]['user']     # federant user
508        pid = tbparams[tb]['project']   # federant project
[6679c122]509        # XXX
[0d830de]510        base_confs = ( "hosts",)
[0ea11af]511        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
512        # command to test experiment state
513        expinfo_exec = "/usr/testbed/bin/expinfo" 
514        # Configuration directories on the remote machine
[6679c122]515        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
516        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
517        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
[0ea11af]518        # Regular expressions to parse the expinfo response
[6679c122]519        state_re = re.compile("State:\s+(\w+)")
[3441fe3]520        no_exp_re = re.compile("^No\s+such\s+experiment")
[0ea11af]521        state = None    # Experiment state parsed from expinfo
[dceeef9]522        # The expinfo ssh command.  Note the identity restriction to use only
523        # the identity provided in the pubkey given.
524        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-i', 
525                self.ssh_privkey_file, "%s@%s" % (user, host), 
526                expinfo_exec, pid, eid]
[3441fe3]527
[0ea11af]528        # Get status
[11a08b0]529        self.log.debug("[start_segment]: %s"% " ".join(cmd))
530        dev_null = None
531        try:
532            dev_null = open("/dev/null", "a")
533        except IOError, e:
534            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
[3441fe3]535
[4ed10ae]536        if self.debug:
537            state = 'swapped'
538            rv = 0
539        else:
540            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
541            for line in status.stdout:
542                m = state_re.match(line)
543                if m: state = m.group(1)
544                else:
545                    m = no_exp_re.match(line)
546                    if m: state = "none"
547            rv = status.wait()
[0ea11af]548
[0d830de]549        # If the experiment is not present the subcommand returns a non-zero
550        # return value.  If we successfully parsed a "none" outcome, ignore the
551        # return code.
[3441fe3]552        if rv != 0 and state != "none":
[6679c122]553            raise service_error(service_error.internal,
554                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
[11a08b0]555
556        self.log.debug("[start_segment]: %s: %s" % (tb, state))
557        self.log.info("[start_segment]:transferring experiment to %s" % tb)
[6679c122]558
[6546868]559        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
[6679c122]560            return False
[c8853fd]561        # Clear the federation config dirs
[a8b42b5]562        if not self.ssh_cmd(user, host, 
563                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
564            return False
[6679c122]565        # Clear and create the tarfiles and rpm directories
566        for d in (tarfiles_dir, rpms_dir):
567            if not self.ssh_cmd(user, host, 
568                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
569                return False
570            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
571                    "create tarfiles"):
572                return False
573       
574        if state == 'active':
[c8853fd]575            # Create the federation config dirs (do not move outside the
576            # conditional.  Happens later in new expriment creation)
577            if not self.ssh_cmd(user, host, 
578                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
579                return False
[6679c122]580            # Remote experiment is active.  Modify it.
581            for f in base_confs:
[6546868]582                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
[6679c122]583                        "%s/%s" % (proj_dir, f)):
584                    return False
[6546868]585            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
[6679c122]586                    proj_dir):
587                return False
[6546868]588            if os.path.isdir("%s/tarfiles" % tmpdir):
[6679c122]589                if not self.ship_configs(host, user,
[6546868]590                        "%s/tarfiles" % tmpdir, tarfiles_dir):
[6679c122]591                    return False
[6546868]592            if os.path.isdir("%s/rpms" % tmpdir):
[6679c122]593                if not self.ship_configs(host, user,
[6546868]594                        "%s/rpms" % tmpdir, tarfiles_dir):
[6679c122]595                    return False
[11a08b0]596            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
[6679c122]597            if not self.ssh_cmd(user, host,
598                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
599                            (pid, eid, tclfile), "modexp"):
600                return False
601            return True
602        elif state == "swapped":
[c8853fd]603            # Create the federation config dirs (do not move outside the
604            # conditional.  Happens later in new expriment creation)
605            if not self.ssh_cmd(user, host, 
606                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
607                return False
[6679c122]608            # Remote experiment swapped out.  Modify it and swap it in.
609            for f in base_confs:
[6546868]610                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
[6679c122]611                        "%s/%s" % (proj_dir, f)):
612                    return False
[6546868]613            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
[6679c122]614                    proj_dir):
615                return False
[6546868]616            if os.path.isdir("%s/tarfiles" % tmpdir):
[6679c122]617                if not self.ship_configs(host, user,
[6546868]618                        "%s/tarfiles" % tmpdir, tarfiles_dir):
[6679c122]619                    return False
[6546868]620            if os.path.isdir("%s/rpms" % tmpdir):
[6679c122]621                if not self.ship_configs(host, user,
[6546868]622                        "%s/rpms" % tmpdir, tarfiles_dir):
[6679c122]623                    return False
[11a08b0]624            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
[6679c122]625            if not self.ssh_cmd(user, host,
626                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
627                    "modexp"):
628                return False
[11a08b0]629            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
[6679c122]630            if not self.ssh_cmd(user, host,
631                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
[3441fe3]632                    "swapexp"):
[6679c122]633                return False
634            return True
635        elif state == "none":
636            # No remote experiment.  Create one.  We do this in 2 steps so we
637            # can put the configuration files and scripts into the new
638            # experiment directories.
639
640            # Tarfiles must be present for creation to work
[6546868]641            if os.path.isdir("%s/tarfiles" % tmpdir):
[6679c122]642                if not self.ship_configs(host, user,
[6546868]643                        "%s/tarfiles" % tmpdir, tarfiles_dir):
[6679c122]644                    return False
[6546868]645            if os.path.isdir("%s/rpms" % tmpdir):
[6679c122]646                if not self.ship_configs(host, user,
[6546868]647                        "%s/rpms" % tmpdir, tarfiles_dir):
[6679c122]648                    return False
[11a08b0]649            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
[6679c122]650            if not self.ssh_cmd(user, host,
651                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
[3441fe3]652                            (pid, eid, tclfile), "startexp"):
[6679c122]653                return False
[c8853fd]654            # Create the federation config dirs (do not move outside the
655            # conditional.)
656            if not self.ssh_cmd(user, host, 
657                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
658                return False
[6679c122]659            # After startexp the per-experiment directories exist
660            for f in base_confs:
[6546868]661                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
[6679c122]662                        "%s/%s" % (proj_dir, f)):
663                    return False
[6546868]664            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
[6679c122]665                    proj_dir):
666                return False
[11a08b0]667            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
[6679c122]668            if not self.ssh_cmd(user, host,
669                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
[3441fe3]670                    "swapexp"):
[6679c122]671                return False
672            return True
673        else:
[11a08b0]674            self.log.debug("[start_segment]:unknown state %s" % state)
[6679c122]675            return False
676
677    def stop_segment(self, tb, eid, tbparams):
[0ea11af]678        """
679        Stop a sub experiment by calling swapexp on the federant
680        """
[6679c122]681        user = tbparams[tb]['user']
[ea0a821]682        host = tbparams[tb]['host']
[6679c122]683        pid = tbparams[tb]['project']
684
[11a08b0]685        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
[6679c122]686        return self.ssh_cmd(user, host,
[3441fe3]687                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
[6679c122]688
689       
690    def generate_ssh_keys(self, dest, type="rsa" ):
691        """
692        Generate a set of keys for the gateways to use to talk.
693
694        Keys are of type type and are stored in the required dest file.
695        """
696        valid_types = ("rsa", "dsa")
697        t = type.lower();
698        if t not in valid_types: raise ValueError
[11a08b0]699        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
[3441fe3]700
[11a08b0]701        try:
702            trace = open("/dev/null", "w")
703        except IOError:
704            raise service_error(service_error.internal,
705                    "Cannot open /dev/null??");
[3441fe3]706
[6679c122]707        # May raise CalledProcessError
[11a08b0]708        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
709        rv = call(cmd, stdout=trace, stderr=trace)
[6679c122]710        if rv != 0:
711            raise service_error(service_error.internal, 
712                    "Cannot generate nonce ssh keys.  %s return code %d" \
713                            % (self.ssh_keygen, rv))
714
[0d830de]715    def gentopo(self, str):
[0ea11af]716        """
717        Generate the topology dtat structure from the splitter's XML
718        representation of it.
719
720        The topology XML looks like:
721            <experiment>
722                <nodes>
723                    <node><vname></vname><ips>ip1:ip2</ips></node>
724                </nodes>
725                <lans>
726                    <lan>
727                        <vname></vname><vnode></vnode><ip></ip>
728                        <bandwidth></bandwidth><member>node:port</member>
729                    </lan>
730                </lans>
731        """
[d0ae12d]732        class topo_parse:
[0ea11af]733            """
734            Parse the topology XML and create the dats structure.
735            """
[d0ae12d]736            def __init__(self):
[0ea11af]737                # Typing of the subelements for data conversion
[0d830de]738                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
739                self.int_subelements = ( 'bandwidth',)
740                self.float_subelements = ( 'delay',)
[0ea11af]741                # The final data structure
[0d830de]742                self.nodes = [ ]
743                self.lans =  [ ]
744                self.topo = { \
[bcbf543]745                        'node': self.nodes,\
746                        'lan' : self.lans,\
[0d830de]747                    }
[0ea11af]748                self.element = { }  # Current element being created
749                self.chars = ""     # Last text seen
[0d830de]750
[d0ae12d]751            def end_element(self, name):
[0ea11af]752                # After each sub element the contents is added to the current
753                # element or to the appropriate list.
[0d830de]754                if name == 'node':
755                    self.nodes.append(self.element)
756                    self.element = { }
757                elif name == 'lan':
758                    self.lans.append(self.element)
759                    self.element = { }
760                elif name in self.str_subelements:
761                    self.element[name] = self.chars
762                    self.chars = ""
763                elif name in self.int_subelements:
764                    self.element[name] = int(self.chars)
765                    self.chars = ""
766                elif name in self.float_subelements:
767                    self.element[name] = float(self.chars)
768                    self.chars = ""
[d0ae12d]769
770            def found_chars(self, data):
[0d830de]771                self.chars += data.rstrip()
772
773
774        tp = topo_parse();
775        parser = xml.parsers.expat.ParserCreate()
776        parser.EndElementHandler = tp.end_element
777        parser.CharacterDataHandler = tp.found_chars
778
779        parser.Parse(str)
780
781        return tp.topo
782       
783
784    def genviz(self, topo):
785        """
786        Generate the visualization the virtual topology
787        """
[d0ae12d]788
789        neato = "/usr/local/bin/neato"
790        # These are used to parse neato output and to create the visualization
791        # file.
792        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
793        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
[0d830de]794                "%s</type></node>"
795
796        try:
797            # Node names
[bcbf543]798            nodes = [ n['vname'] for n in topo['node'] ]
799            topo_lans = topo['lan']
[0d830de]800        except KeyError:
801            raise service_error(service_error.internal, "Bad topology")
802
803        lans = { }
804        links = { }
805
806        # Walk through the virtual topology, organizing the connections into
807        # 2-node connections (links) and more-than-2-node connections (lans).
808        # When a lan is created, it's added to the list of nodes (there's a
809        # node in the visualization for the lan).
810        for l in topo_lans:
811            if links.has_key(l['vname']):
812                if len(links[l['vname']]) < 2:
813                    links[l['vname']].append(l['vnode'])
814                else:
815                    nodes.append(l['vname'])
816                    lans[l['vname']] = links[l['vname']]
817                    del links[l['vname']]
818                    lans[l['vname']].append(l['vnode'])
819            elif lans.has_key(l['vname']):
820                lans[l['vname']].append(l['vnode'])
821            else:
822                links[l['vname']] = [ l['vnode'] ]
823
824
825        # Open up a temporary file for dot to turn into a visualization
[d0ae12d]826        try:
827            df, dotname = tempfile.mkstemp()
828            dotfile = os.fdopen(df, 'w')
829        except IOError:
830            raise service_error(service_error.internal,
831                    "Failed to open file in genviz")
832
833        # Generate a dot/neato input file from the links, nodes and lans
834        try:
835            print >>dotfile, "graph G {"
[0d830de]836            for n in nodes:
[d0ae12d]837                print >>dotfile, '\t"%s"' % n
[0d830de]838            for l in links.keys():
839                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
840            for l in lans.keys():
841                for n in lans[l]:
[d0ae12d]842                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
843            print >>dotfile, "}"
844            dotfile.close()
[0d830de]845        except TypeError:
846            raise service_error(service_error.internal,
847                    "Single endpoint link in vtopo")
[d0ae12d]848        except IOError:
849            raise service_error(service_error.internal, "Cannot write dot file")
850
851        # Use dot to create a visualization
852        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
853                '-Gpack=true', dotname], stdout=PIPE)
854
[0d830de]855        # Translate dot to vis format
856        vis_nodes = [ ]
857        vis = { 'node': vis_nodes }
858        for line in dot.stdout:
859            m = vis_re.match(line)
860            if m:
861                vn = m.group(1)
862                vis_node = {'name': vn, \
[bcbf543]863                        'x': float(m.group(2)),\
864                        'y' : float(m.group(3)),\
[0d830de]865                    }
866                if vn in links.keys() or vn in lans.keys():
867                    vis_node['type'] = 'lan'
868                else:
869                    vis_node['type'] = 'node'
870                vis_nodes.append(vis_node)
[d0ae12d]871        rv = dot.wait()
[0d830de]872
[d0ae12d]873        os.remove(dotname)
[0d830de]874        if rv == 0 : return vis
875        else: return None
[d0ae12d]876
[4064742]877    def get_access(self, tb, nodes, user, tbparam, master, export_project,
878            access_user):
[6679c122]879        """
880        Get access to testbed through fedd and set the parameters for that tb
881        """
882
883        translate_attr = {
884            'slavenodestartcmd': 'expstart',
885            'slaveconnectorstartcmd': 'gwstart',
886            'masternodestartcmd': 'mexpstart',
887            'masterconnectorstartcmd': 'mgwstart',
888            'connectorimage': 'gwimage',
889            'connectortype': 'gwtype',
890            'tunnelcfg': 'tun',
[3441fe3]891            'smbshare': 'smbshare',
[6679c122]892        }
893
894        uri = self.tbmap.get(tb, None)
895        if not uri:
896            raise service_error(serice_error.server_config, 
897                    "Unknown testbed: %s" % tb)
898
[721705e9]899        # currently this lumps all users into one service access group
900        service_keys = [ a for u in user \
901                for a in u.get('access', []) \
902                    if a.has_key('sshPubkey')]
903
904        if len(service_keys) == 0:
905            raise service_error(service_error.req, 
906                    "Must have at least one SSH pubkey for services")
907
908
[4064742]909        for p, u in access_user:
[d90f0fa]910            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
911                    "to %s") %  ((p or "None"), u, uri))
[4064742]912
913            if p:
914                # Request with user and project specified
915                req = {\
916                        'destinationTestbed' : { 'uri' : uri },
917                        'project': { 
918                            'name': {'localname': p},
919                            'user': [ {'userID': { 'localname': u } } ],
920                            },
921                        'user':  user,
922                        'allocID' : { 'localname': 'test' },
923                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
924                        'serviceAccess' : service_keys
925                    }
926            else:
927                # Request with only user specified
928                req = {\
929                        'destinationTestbed' : { 'uri' : uri },
930                        'user':  [ {'userID': { 'localname': u } } ],
931                        'allocID' : { 'localname': 'test' },
932                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
933                        'serviceAccess' : service_keys
934                    }
[721705e9]935
[4064742]936            if tb == master:
937                # NB, the export_project parameter is a dict that includes
938                # the type
939                req['exportProject'] = export_project
940
941            # node resources if any
942            if nodes != None and len(nodes) > 0:
943                rnodes = [ ]
944                for n in nodes:
945                    rn = { }
946                    image, hw, count = n.split(":")
947                    if image: rn['image'] = [ image ]
948                    if hw: rn['hardware'] = [ hw ]
[dceeef9]949                    if count and int(count) >0 : rn['count'] = int(count)
[4064742]950                    rnodes.append(rn)
951                req['resources']= { }
952                req['resources']['node'] = rnodes
[5576a47]953
[4064742]954            try:
[5fffd82]955                if self.local_access.has_key(uri):
956                    # Local access call
957                    req = { 'RequestAccessRequestBody' : req }
958                    r = self.local_access[uri].RequestAccess(req, 
959                            fedid(file=self.cert_file))
960                    r = { 'RequestAccessResponseBody' : r }
961                else:
962                    r = self.call_RequestAccess(uri, req, 
963                            self.cert_file, self.cert_pwd, self.trusted_certs)
[4064742]964            except service_error, e:
965                if e.code == service_error.access:
[d90f0fa]966                    self.log.debug("[get_access] Access denied")
[4064742]967                    r = None
968                    continue
969                else:
970                    raise e
[6679c122]971
[4064742]972            if r.has_key('RequestAccessResponseBody'):
[d90f0fa]973                # Through to here we have a valid response, not a fault.
974                # Access denied is a fault, so something better or worse than
975                # access denied has happened.
[4064742]976                r = r['RequestAccessResponseBody']
[d90f0fa]977                self.log.debug("[get_access] Access granted")
978                break
[4064742]979            else:
980                raise service_error(service_error.protocol,
981                        "Bad proxy response")
982       
983        if not r:
984            raise service_error(service_error.access, 
985                    "Access denied by %s (%s)" % (tb, uri))
[6679c122]986
987        e = r['emulab']
988        p = e['project']
989        tbparam[tb] = { 
990                "boss": e['boss'],
991                "host": e['ops'],
992                "domain": e['domain'],
993                "fs": e['fileServer'],
994                "eventserver": e['eventServer'],
[3441fe3]995                "project": unpack_id(p['name']),
[d81971a]996                "emulab" : e,
997                "allocID" : r['allocID'],
[6679c122]998                }
[7a8d667]999        # Make the testbed name be the label the user applied
1000        p['testbed'] = {'localname': tb }
[6679c122]1001
1002        for u in p['user']:
1003            tbparam[tb]['user'] = unpack_id(u['userID'])
1004
1005        for a in e['fedAttr']:
1006            if a['attribute']:
1007                key = translate_attr.get(a['attribute'].lower(), None)
1008                if key:
1009                    tbparam[tb][key]= a['value']
[4ed10ae]1010       
[d81971a]1011    def release_access(self, tb, aid):
1012        """
1013        Release access to testbed through fedd
1014        """
1015
1016        uri = self.tbmap.get(tb, None)
1017        if not uri:
1018            raise service_error(serice_error.server_config, 
1019                    "Unknown testbed: %s" % tb)
1020
[5fffd82]1021        if self.local_access.has_key(uri):
1022            resp = self.local_access[uri].ReleaseAccess(\
1023                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1024                    fedid(file=self.cert_file))
1025            resp = { 'ReleaseAccessResponseBody': resp } 
1026        else:
1027            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1028                    self.cert_file, self.cert_pwd, self.trusted_certs)
[d81971a]1029
1030        # better error coding
1031
[f4f4117]1032    def remote_splitter(self, uri, desc, master):
1033
1034        req = {
1035                'description' : { 'ns2description': desc },
1036                'master': master,
[2c6128f]1037                'include_fedkit': bool(self.fedkit)
[f4f4117]1038            }
1039
[058f58e]1040        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1041                self.trusted_certs)
[f4f4117]1042
1043        if r.has_key('Ns2SplitResponseBody'):
1044            r = r['Ns2SplitResponseBody']
1045            if r.has_key('output'):
1046                return r['output'].splitlines()
1047            else:
[058f58e]1048                raise service_error(service_error.protocol, 
[f4f4117]1049                        "Bad splitter response (no output)")
1050        else:
[058f58e]1051            raise service_error(service_error.protocol, "Bad splitter response")
[6679c122]1052       
1053    class current_testbed:
[0ea11af]1054        """
1055        Object for collecting the current testbed description.  The testbed
1056        description is saved to a file with the local testbed variables
1057        subsittuted line by line.
1058        """
[2c6128f]1059        def __init__(self, eid, tmpdir, fedkit):
[6679c122]1060            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1061            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1062            self.current_testbed = None
1063            self.testbed_file = None
1064
1065            self.def_expstart = \
1066                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1067            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1068            self.def_gwstart = \
1069                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1070            self.def_mgwstart = \
1071                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1072            self.def_gwimage = "FBSD61-TUNNEL2";
1073            self.def_gwtype = "pc";
1074
1075            self.eid = eid
1076            self.tmpdir = tmpdir
[2c6128f]1077            self.fedkit = fedkit
[6679c122]1078
1079        def __call__(self, line, master, allocated, tbparams):
1080            # Capture testbed topology descriptions
1081            if self.current_testbed == None:
1082                m = self.begin_testbed.match(line)
1083                if m != None:
1084                    self.current_testbed = m.group(1)
1085                    if self.current_testbed == None:
1086                        raise service_error(service_error.req,
1087                                "Bad request format (unnamed testbed)")
1088                    allocated[self.current_testbed] = \
1089                            allocated.get(self.current_testbed,0) + 1
1090                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1091                    if not os.path.exists(tb_dir):
1092                        try:
1093                            os.mkdir(tb_dir)
1094                        except IOError:
1095                            raise service_error(service_error.internal,
1096                                    "Cannot create %s" % tb_dir)
1097                    try:
1098                        self.testbed_file = open("%s/%s.%s.tcl" %
1099                                (tb_dir, self.eid, self.current_testbed), 'w')
1100                    except IOError:
1101                        self.testbed_file = None
1102                    return True
1103                else: return False
1104            else:
1105                m = self.end_testbed.match(line)
1106                if m != None:
1107                    if m.group(1) != self.current_testbed:
1108                        raise service_error(service_error.internal, 
1109                                "Mismatched testbed markers!?")
1110                    if self.testbed_file != None: 
1111                        self.testbed_file.close()
1112                        self.testbed_file = None
1113                    self.current_testbed = None
1114                elif self.testbed_file:
1115                    # Substitute variables and put the line into the local
1116                    # testbed file.
1117                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1118                            self.def_gwtype)
1119                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1120                            self.def_gwimage)
1121                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1122                            self.def_mgwstart)
1123                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1124                            self.def_mexpstart)
1125                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1126                            self.def_gwstart)
1127                    expstart = tbparams[self.current_testbed].get('expstart', 
1128                            self.def_expstart)
1129                    project = tbparams[self.current_testbed].get('project')
1130                    line = re.sub("GWTYPE", gwtype, line)
1131                    line = re.sub("GWIMAGE", gwimage, line)
1132                    if self.current_testbed == master:
1133                        line = re.sub("GWSTART", mgwstart, line)
1134                        line = re.sub("EXPSTART", mexpstart, line)
1135                    else:
1136                        line = re.sub("GWSTART", gwstart, line)
1137                        line = re.sub("EXPSTART", expstart, line)
1138                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1139                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1140                    line = re.sub("EID", self.eid, line)
1141                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1142                            (project, self.eid), line)
[2c6128f]1143                    if self.fedkit:
1144                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1145                                line)
[6679c122]1146                    print >>self.testbed_file, line
1147                return True
1148
1149    class allbeds:
[0ea11af]1150        """
1151        Process the Allbeds section.  Get access to each federant and save the
1152        parameters in tbparams
1153        """
[6679c122]1154        def __init__(self, get_access):
1155            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1156            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1157            self.in_allbeds = False
1158            self.get_access = get_access
1159
[4064742]1160        def __call__(self, line, user, tbparams, master, export_project,
1161                access_user):
[6679c122]1162            # Testbed access parameters
1163            if not self.in_allbeds:
1164                if self.begin_allbeds.match(line):
1165                    self.in_allbeds = True
1166                    return True
1167                else:
1168                    return False
1169            else:
1170                if self.end_allbeds.match(line):
1171                    self.in_allbeds = False
1172                else:
1173                    nodes = line.split('|')
1174                    tb = nodes.pop(0)
[5576a47]1175                    self.get_access(tb, nodes, user, tbparams, master,
[4064742]1176                            export_project, access_user)
[6679c122]1177                return True
1178
1179    class gateways:
[3441fe3]1180        def __init__(self, eid, master, tmpdir, gw_pubkey,
[2c6128f]1181                gw_secretkey, copy_file, fedkit):
[6679c122]1182            self.begin_gateways = \
1183                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1184            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1185            self.current_gateways = None
1186            self.control_gateway = None
1187            self.active_end = { }
1188
1189            self.eid = eid
1190            self.master = master
1191            self.tmpdir = tmpdir
1192            self.gw_pubkey_base = gw_pubkey
1193            self.gw_secretkey_base = gw_secretkey
1194
1195            self.copy_file = copy_file
[2c6128f]1196            self.fedkit = fedkit
[6679c122]1197
1198
1199        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1200                active_end, tbparams, dtb, myname, desthost, type):
1201            """
[1af38d6]1202            Produce a gateway configuration file from a gateways line.
[6679c122]1203            """
1204
1205            sproject = tbparams[gw].get('project', 'project')
1206            dproject = tbparams[dtb].get('project', 'project')
1207            sdomain = ".%s.%s%s" % (eid, sproject,
1208                    tbparams[gw].get('domain', ".example.com"))
1209            ddomain = ".%s.%s%s" % (eid, dproject,
1210                    tbparams[dtb].get('domain', ".example.com"))
1211            boss = tbparams[master].get('boss', "boss")
1212            fs = tbparams[master].get('fs', "fs")
1213            event_server = "%s%s" % \
[27b6aea]1214                    (tbparams[gw].get('eventserver', "event_server"),
1215                            tbparams[gw].get('domain', "example.com"))
[6679c122]1216            remote_event_server = "%s%s" % \
1217                    (tbparams[dtb].get('eventserver', "event_server"),
1218                            tbparams[dtb].get('domain', "example.com"))
[27b6aea]1219            seer_control = "%s%s" % \
1220                    (tbparams[gw].get('control', "control"), sdomain)
[2c6128f]1221
1222            if self.fedkit:
1223                remote_script_dir = "/usr/local/federation/bin"
1224                local_script_dir = "/usr/local/federation/bin"
1225            else:
1226                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1227                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1228
1229            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1230            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
[6679c122]1231            tunnel_cfg = tbparams[gw].get("tun", "false")
1232
1233            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1234            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1235
1236            # translate to lower case so the `hostname` hack for specifying
1237            # configuration files works.
1238            conf_file = conf_file.lower();
1239            remote_conf_file = remote_conf_file.lower();
1240
1241            if dtb == master:
1242                active = "false"
1243            elif gw == master:
1244                active = "true"
1245            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1246                active = "false"
1247            else:
1248                active_end['%s-%s' % (gw, dtb)] = 1
1249                active = "true"
1250
1251            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1252            print >>gwconfig, "Active: %s" % active
1253            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1254            print >>gwconfig, "BossName: %s" % boss
1255            print >>gwconfig, "FsName: %s" % fs
1256            print >>gwconfig, "EventServerName: %s" % event_server
1257            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
[27b6aea]1258            print >>gwconfig, "SeerControl: %s" % seer_control
[6679c122]1259            print >>gwconfig, "Type: %s" % type
1260            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1261            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1262                    local_script_dir
1263            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1264            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1265            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
[2c6128f]1266                    (remote_conf_dir, remote_conf_file)
[6679c122]1267            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
[2c6128f]1268            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1269            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
[6679c122]1270            gwconfig.close()
1271
1272            return active == "true"
1273
1274        def __call__(self, line, allocated, tbparams):
1275            # Process gateways
1276            if not self.current_gateways:
1277                m = self.begin_gateways.match(line)
1278                if m:
1279                    self.current_gateways = m.group(1)
1280                    if allocated.has_key(self.current_gateways):
1281                        # This test should always succeed
1282                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1283                        if not os.path.exists(tb_dir):
1284                            try:
1285                                os.mkdir(tb_dir)
1286                            except IOError:
1287                                raise service_error(service_error.internal,
1288                                        "Cannot create %s" % tb_dir)
1289                    else:
1290                        # XXX
[11a08b0]1291                        self.log.error("[gateways]: Ignoring gateways for " + \
1292                                "unknown testbed %s" % self.current_gateways)
[6679c122]1293                        self.current_gateways = None
1294                    return True
1295                else:
1296                    return False
1297            else:
1298                m = self.end_gateways.match(line)
1299                if m :
1300                    if m.group(1) != self.current_gateways:
1301                        raise service_error(service_error.internal,
1302                                "Mismatched gateway markers!?")
1303                    if self.control_gateway:
1304                        try:
1305                            cc = open("%s/%s/client.conf" %
1306                                    (self.tmpdir, self.current_gateways), 'w')
1307                            print >>cc, "ControlGateway: %s" % \
1308                                    self.control_gateway
[3441fe3]1309                            if tbparams[self.master].has_key('smbshare'):
1310                                print >>cc, "SMBSHare: %s" % \
1311                                        tbparams[self.master]['smbshare']
[6679c122]1312                            print >>cc, "ProjectUser: %s" % \
[3441fe3]1313                                    tbparams[self.master]['user']
[6679c122]1314                            print >>cc, "ProjectName: %s" % \
1315                                    tbparams[self.master]['project']
1316                            cc.close()
1317                        except IOError:
1318                            raise service_error(service_error.internal,
1319                                    "Error creating client config")
[291423b]1320                        try:
1321                            cc = open("%s/%s/seer.conf" %
[a8b42b5]1322                                    (self.tmpdir, self.current_gateways),
1323                                    'w')
1324                            if self.current_gateways != self.master:
1325                                print >>cc, "ControlNode: %s" % \
1326                                        self.control_gateway
[291423b]1327                            print >>cc, "ExperimentID: %s/%s" % \
1328                                    ( tbparams[self.master]['project'], \
1329                                    self.eid )
1330                            cc.close()
1331                        except IOError:
1332                            raise service_error(service_error.internal,
1333                                    "Error creating seer config")
[6679c122]1334                    else:
[11a08b0]1335                        debug.error("[gateways]: No control gateway for %s" %\
1336                                    self.current_gateways)
[6679c122]1337                    self.current_gateways = None
1338                else:
1339                    dtb, myname, desthost, type = line.split(" ")
1340
1341                    if type == "control" or type == "both":
1342                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1343                                self.eid, 
1344                                tbparams[self.current_gateways]['project'],
1345                                tbparams[self.current_gateways]['domain'])
1346                    try:
1347                        active = self.gateway_conf_file(self.current_gateways,
1348                                self.master, self.eid, self.gw_pubkey_base,
1349                                self.gw_secretkey_base,
1350                                self.active_end, tbparams, dtb, myname,
1351                                desthost, type)
1352                    except IOError, e:
1353                        raise service_error(service_error.internal,
1354                                "Failed to write config file for %s" % \
1355                                        self.current_gateway)
1356           
1357                    gw_pubkey = "%s/keys/%s" % \
1358                            (self.tmpdir, self.gw_pubkey_base)
1359                    gw_secretkey = "%s/keys/%s" % \
1360                            (self.tmpdir, self.gw_secretkey_base)
1361
1362                    pkfile = "%s/%s/%s" % \
1363                            ( self.tmpdir, self.current_gateways, 
1364                                    self.gw_pubkey_base)
1365                    skfile = "%s/%s/%s" % \
1366                            ( self.tmpdir, self.current_gateways, 
1367                                    self.gw_secretkey_base)
1368
1369                    if not os.path.exists(pkfile):
1370                        try:
1371                            self.copy_file(gw_pubkey, pkfile)
1372                        except IOError:
1373                            service_error(service_error.internal,
1374                                    "Failed to copy pubkey file")
1375
1376                    if active and not os.path.exists(skfile):
1377                        try:
1378                            self.copy_file(gw_secretkey, skfile)
1379                        except IOError:
1380                            service_error(service_error.internal,
1381                                    "Failed to copy secretkey file")
1382                return True
1383
1384    class shunt_to_file:
[0ea11af]1385        """
1386        Simple class to write data between two regexps to a file.
1387        """
[6679c122]1388        def __init__(self, begin, end, filename):
[0ea11af]1389            """
1390            Begin shunting on a match of begin, stop on end, send data to
1391            filename.
1392            """
[6679c122]1393            self.begin = re.compile(begin)
1394            self.end = re.compile(end)
1395            self.in_shunt = False
1396            self.file = None
1397            self.filename = filename
1398
1399        def __call__(self, line):
[0ea11af]1400            """
1401            Call this on each line in the input that may be shunted.
1402            """
[6679c122]1403            if not self.in_shunt:
1404                if self.begin.match(line):
1405                    self.in_shunt = True
1406                    try:
1407                        self.file = open(self.filename, "w")
1408                    except:
1409                        self.file = None
1410                        raise
1411                    return True
1412                else:
1413                    return False
1414            else:
1415                if self.end.match(line):
1416                    if self.file: 
1417                        self.file.close()
1418                        self.file = None
1419                    self.in_shunt = False
1420                else:
1421                    if self.file:
1422                        print >>self.file, line
1423                return True
1424
1425    class shunt_to_list:
[0ea11af]1426        """
1427        Same interface as shunt_to_file.  Data collected in self.list, one list
1428        element per line.
1429        """
[6679c122]1430        def __init__(self, begin, end):
1431            self.begin = re.compile(begin)
1432            self.end = re.compile(end)
1433            self.in_shunt = False
1434            self.list = [ ]
1435       
1436        def __call__(self, line):
1437            if not self.in_shunt:
1438                if self.begin.match(line):
1439                    self.in_shunt = True
1440                    return True
1441                else:
1442                    return False
1443            else:
1444                if self.end.match(line):
1445                    self.in_shunt = False
1446                else:
1447                    self.list.append(line)
1448                return True
1449
[0d830de]1450    class shunt_to_string:
[0ea11af]1451        """
1452        Same interface as shunt_to_file.  Data collected in self.str, all in
1453        one string.
1454        """
[0d830de]1455        def __init__(self, begin, end):
1456            self.begin = re.compile(begin)
1457            self.end = re.compile(end)
1458            self.in_shunt = False
1459            self.str = ""
1460       
1461        def __call__(self, line):
1462            if not self.in_shunt:
1463                if self.begin.match(line):
1464                    self.in_shunt = True
1465                    return True
1466                else:
1467                    return False
1468            else:
1469                if self.end.match(line):
1470                    self.in_shunt = False
1471                else:
1472                    self.str += line
1473                return True
1474
[6679c122]1475    def create_experiment(self, req, fid):
[0ea11af]1476        """
1477        The external interface to experiment creation called from the
1478        dispatcher.
1479
1480        Creates a working directory, splits the incoming description using the
1481        splitter script and parses out the avrious subsections using the
1482        lcasses above.  Once each sub-experiment is created, use pooled threads
1483        to instantiate them and start it all up.
1484        """
[4064742]1485
1486        if not self.auth.check_attribute(fid, 'create'):
1487            raise service_error(service_error.access, "Create access denied")
1488
[6546868]1489        try:
1490            tmpdir = tempfile.mkdtemp(prefix="split-")
1491        except IOError:
1492            raise service_error(service_error.internal, "Cannot create tmp dir")
1493
[6679c122]1494        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1495        gw_secretkey_base = "fed.%s" % self.ssh_type
[6546868]1496        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1497        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1498        tclfile = tmpdir + "/experiment.tcl"
[6679c122]1499        tbparams = { }
[4064742]1500        try:
1501            access_user = self.accessdb[fid]
1502        except KeyError:
1503            raise service_error(service_error.internal,
1504                    "Access map and authorizer out of sync in " + \
1505                            "create_experiment for fedid %s"  % fid)
[6679c122]1506
1507        pid = "dummy"
1508        gid = "dummy"
1509        # XXX
1510        fail_soft = False
1511
1512        try:
[6546868]1513            os.mkdir(tmpdir+"/keys")
[6679c122]1514        except OSError:
1515            raise service_error(service_error.internal,
1516                    "Can't make temporary dir")
1517
[b234bb9]1518        req = req.get('CreateRequestBody', None)
1519        if not req:
[2d5c8b6]1520            raise service_error(service_error.req,
[b234bb9]1521                    "Bad request format (no CreateRequestBody)")
[6679c122]1522        # The tcl parser needs to read a file so put the content into that file
[3925b50]1523        descr=req.get('experimentdescription', None)
1524        if descr:
1525            file_content=descr.get('ns2description', None)
1526            if file_content:
1527                try:
1528                    f = open(tclfile, 'w')
1529                    f.write(file_content)
1530                    f.close()
1531                except IOError:
1532                    raise service_error(service_error.internal,
1533                            "Cannot write temp experiment description")
1534            else:
1535                raise service_error(service_error.req, 
1536                        "Only ns2descriptions supported")
[6679c122]1537        else:
1538            raise service_error(service_error.req, "No experiment description")
1539
[e40c7ee]1540        if req.has_key('experimentID') and \
1541                req['experimentID'].has_key('localname'):
1542            eid = req['experimentID']['localname']
[a97394b]1543            self.state_lock.acquire()
[e40c7ee]1544            while (self.state.has_key(eid)):
1545                eid += random.choice(string.ascii_letters)
[0ea11af]1546            # To avoid another thread picking this localname
[a97394b]1547            self.state[eid] = "placeholder"
1548            self.state_lock.release()
[e40c7ee]1549        else:
1550            eid = self.exp_stem
1551            for i in range(0,5):
1552                eid += random.choice(string.ascii_letters)
[a97394b]1553            self.state_lock.acquire()
[e40c7ee]1554            while (self.state.has_key(eid)):
1555                eid = self.exp_stem
1556                for i in range(0,5):
1557                    eid += random.choice(string.ascii_letters)
[0ea11af]1558            # To avoid another thread picking this localname
[a97394b]1559            self.state[eid] = "placeholder"
1560            self.state_lock.release()
[e40c7ee]1561
[c7e40f5]1562        try: 
[05191a6]1563            # This catches exceptions to clear the placeholder if necessary
1564            try:
1565                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1566            except ValueError:
1567                raise service_error(service_error.server_config, 
1568                        "Bad key type (%s)" % self.ssh_type)
1569
1570            user = req.get('user', None)
1571            if user == None:
1572                raise service_error(service_error.req, "No user")
1573
1574            master = req.get('master', None)
1575            if not master:
1576                raise service_error(service_error.req,
1577                        "No master testbed label")
1578            export_project = req.get('exportProject', None)
1579            if not export_project:
1580                raise service_error(service_error.req, "No export project")
1581           
1582            if self.splitter_url:
1583                self.log.debug("Calling remote splitter at %s" % \
1584                        self.splitter_url)
1585                split_data = self.remote_splitter(self.splitter_url,
1586                        file_content, master)
1587            else:
1588                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1589                    str(self.muxmax), '-m', master]
1590
1591                if self.fedkit:
1592                    tclcmd.append('-k')
1593
1594                tclcmd.extend([pid, gid, eid, tclfile])
1595
1596                self.log.debug("running local splitter %s", " ".join(tclcmd))
1597                tclparser = Popen(tclcmd, stdout=PIPE)
1598                split_data = tclparser.stdout
1599
1600            allocated = { }         # Testbeds we can access
1601            started = { }           # Testbeds where a sub-experiment started
1602                                # successfully
1603
1604            # Objects to parse the splitter output (defined above)
1605            parse_current_testbed = self.current_testbed(eid, tmpdir,
1606                    self.fedkit)
1607            parse_allbeds = self.allbeds(self.get_access)
1608            parse_gateways = self.gateways(eid, master, tmpdir,
1609                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1610                    self.fedkit)
1611            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1612                        "^#\s+End\s+Vtopo")
1613            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1614                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1615            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1616                    "^#\s+End\s+tarfiles")
1617            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1618                    "^#\s+End\s+rpms")
1619
[c7e40f5]1620            # Working on the split data
1621            for line in split_data:
1622                line = line.rstrip()
1623                if parse_current_testbed(line, master, allocated, tbparams):
1624                    continue
1625                elif parse_allbeds(line, user, tbparams, master, export_project,
1626                        access_user):
1627                    continue
1628                elif parse_gateways(line, allocated, tbparams):
1629                    continue
1630                elif parse_vtopo(line):
1631                    continue
1632                elif parse_hostnames(line):
1633                    continue
1634                elif parse_tarfiles(line):
1635                    continue
1636                elif parse_rpms(line):
1637                    continue
1638                else:
1639                    raise service_error(service_error.internal, 
1640                            "Bad tcl parse? %s" % line)
[05191a6]1641            # Virtual topology and visualization
1642            vtopo = self.gentopo(parse_vtopo.str)
1643            if not vtopo:
1644                raise service_error(service_error.internal, 
1645                        "Failed to generate virtual topology")
1646
1647            vis = self.genviz(vtopo)
1648            if not vis:
1649                raise service_error(service_error.internal, 
1650                        "Failed to generate visualization")
1651           
1652            # save federant information
1653            for k in allocated.keys():
1654                tbparams[k]['federant'] = {\
1655                        'name': [ { 'localname' : eid} ],\
1656                        'emulab': tbparams[k]['emulab'],\
1657                        'allocID' : tbparams[k]['allocID'],\
1658                        'master' : k == master,\
1659                    }
1660
1661
1662            # Copy tarfiles and rpms needed at remote sites into a staging area
1663            try:
1664                if self.fedkit:
1665                    parse_tarfiles.list.append(self.fedkit)
1666                for t in parse_tarfiles.list:
1667                    if not os.path.exists("%s/tarfiles" % tmpdir):
1668                        os.mkdir("%s/tarfiles" % tmpdir)
1669                    self.copy_file(t, "%s/tarfiles/%s" % \
1670                            (tmpdir, os.path.basename(t)))
1671                for r in parse_rpms.list:
1672                    if not os.path.exists("%s/rpms" % tmpdir):
1673                        os.mkdir("%s/rpms" % tmpdir)
1674                    self.copy_file(r, "%s/rpms/%s" % \
1675                            (tmpdir, os.path.basename(r)))
1676            except IOError, e:
1677                raise service_error(service_error.internal, 
1678                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1679
[c7e40f5]1680        except service_error, e:
1681            # If something goes wrong in the parse (usually an access error)
[05191a6]1682            # clear the placeholder state.  From here on out the code delays
1683            # exceptions.
[c7e40f5]1684            self.state_lock.acquire()
1685            del self.state[eid]
1686            self.state_lock.release()
1687            raise e
[6679c122]1688
[1af38d6]1689        thread_pool_info = self.thread_pool()
1690        threads = [ ]
1691
1692        for tb in [ k for k in allocated.keys() if k != master]:
1693            # Wait until we have a free slot to start the next testbed load
1694            thread_pool_info.acquire()
1695            while thread_pool_info.started - \
1696                    thread_pool_info.terminated >= self.nthreads:
1697                thread_pool_info.wait()
1698            thread_pool_info.release()
1699
1700            # Create and start a thread to start the segment, and save it to
1701            # get the return value later
1702            t  = self.pooled_thread(target=self.start_segment, 
1703                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
[3441fe3]1704                    pdata=thread_pool_info, trace_file=self.trace_file)
[1af38d6]1705            threads.append(t)
1706            t.start()
1707
1708        # Wait until all finish (the first clause of the while is to make sure
1709        # one starts)
1710        thread_pool_info.acquire()
1711        while thread_pool_info.started == 0 or \
1712                thread_pool_info.started > thread_pool_info.terminated:
1713            thread_pool_info.wait()
1714        thread_pool_info.release()
1715
1716        # If none failed, start the master
1717        failed = [ t.getName() for t in threads if not t.rv ]
1718
1719        if len(failed) == 0:
1720            if not self.start_segment(master, eid, tbparams, tmpdir):
1721                failed.append(master)
1722
[a97394b]1723        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1724        # If one failed clean up, unless fail_soft is set
1725        if failed:
1726            if not fail_soft:
[3441fe3]1727                for tb in succeeded:
1728                    self.stop_segment(tb, eid, tbparams)
[a97394b]1729                # Remove the placeholder
1730                self.state_lock.acquire()
1731                del self.state[eid]
1732                self.state_lock.release()
1733
[3441fe3]1734                raise service_error(service_error.federant,
1735                    "Swap in failed on %s" % ",".join(failed))
[6679c122]1736        else:
[11a08b0]1737            self.log.info("[start_segment]: Experiment %s started" % eid)
[6679c122]1738
[4fc2250]1739        # Generate an ID for the experiment (slice) and a certificate that the
1740        # allocator can use to prove they own it.  We'll ship it back through
1741        # the encrypted connection.
[11a08b0]1742        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
[4fc2250]1743
[11a08b0]1744        self.log.debug("[start_experiment]: removing %s" % tmpdir)
[4fc2250]1745
1746        # Walk up tmpdir, deleting as we go
1747        for path, dirs, files in os.walk(tmpdir, topdown=False):
1748            for f in files:
1749                os.remove(os.path.join(path, f))
1750            for d in dirs:
1751                os.rmdir(os.path.join(path, d))
1752        os.rmdir(tmpdir)
[987aaa1]1753
[f8582c9]1754        # The deepcopy prevents the allocation ID and other binaries from being
1755        # translated into other formats
1756        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
[3441fe3]1757                for tb in tbparams.keys() \
[e40c7ee]1758                    if tbparams[tb].has_key('federant') ],\
[c52c48d]1759                    'vtopo': vtopo,\
[4fc2250]1760                    'vis' : vis,
[e40c7ee]1761                    'experimentID' : [\
[c52c48d]1762                            { 'fedid': copy.copy(expid) }, \
1763                            { 'localname': eid },\
[e40c7ee]1764                        ],\
[4fc2250]1765                    'experimentAccess': { 'X509' : expcert },\
1766                }
[a97394b]1767
[0ea11af]1768        # Insert the experiment into our state and update the disk copy
[a97394b]1769        self.state_lock.acquire()
[c52c48d]1770        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1771                for tb in tbparams.keys() \
1772                    if tbparams[tb].has_key('federant') ],\
1773                    'vtopo': vtopo,\
1774                    'vis' : vis,
[4064742]1775                    'owner': fid,
[c52c48d]1776                    'experimentID' : [\
1777                            { 'fedid': expid }, { 'localname': eid },\
1778                        ],\
1779                }
[e40c7ee]1780        self.state[eid] = self.state[expid]
[eee2b2e]1781        if self.state_filename: self.write_state()
[a97394b]1782        self.state_lock.release()
1783
[4064742]1784        self.auth.set_attribute(fid, expid)
1785        self.auth.set_attribute(expid, expid)
1786
[a97394b]1787        if not failed:
1788            return resp
1789        else:
1790            raise service_error(service_error.partial, \
1791                    "Partial swap in on %s" % ",".join(succeeded))
1792
[4064742]1793    def check_experiment_access(self, fid, key):
1794        """
1795        Confirm that the fid has access to the experiment.  Though a request
1796        may be made in terms of a local name, the access attribute is always
1797        the experiment's fedid.
1798        """
1799        if not isinstance(key, fedid):
1800            self.state_lock.acquire()
1801            if self.state.has_key(key):
1802                try:
1803                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1804                            if f.has_key('fedid') ]
1805                except KeyError:
1806                    self.state_lock.release()
1807                    raise service_error(service_error.internal, 
1808                            "No fedid for experiment %s when checking " +\
1809                                    "access(!?)" % key)
1810                if len(kl) == 1:
1811                    key = kl[0]
1812                else:
1813                    self.state_lock.release()
1814                    raise service_error(service_error.internal, 
1815                            "multiple fedids for experiment %s when " +\
1816                                    "checking access(!?)" % key)
1817            else:
1818                self.state_lock.release()
1819                raise service_error(service_error.access, "Access Denied")
1820            self.state_lock.release()
1821
1822        if self.auth.check_attribute(fid, key):
1823            return True
1824        else:
1825            raise service_error(service_error.access, "Access Denied")
1826
1827
[987aaa1]1828
1829    def get_vtopo(self, req, fid):
[0ea11af]1830        """
1831        Return the stored virtual topology for this experiment
1832        """
[a97394b]1833        rv = None
[987aaa1]1834
1835        req = req.get('VtopoRequestBody', None)
1836        if not req:
1837            raise service_error(service_error.req,
1838                    "Bad request format (no VtopoRequestBody)")
[e40c7ee]1839        exp = req.get('experiment', None)
1840        if exp:
1841            if exp.has_key('fedid'):
[f8582c9]1842                key = exp['fedid']
[e40c7ee]1843                keytype = "fedid"
1844            elif exp.has_key('localname'):
1845                key = exp['localname']
1846                keytype = "localname"
1847            else:
1848                raise service_error(service_error.req, "Unknown lookup type")
[987aaa1]1849        else:
[e40c7ee]1850            raise service_error(service_error.req, "No request?")
[987aaa1]1851
[4064742]1852        self.check_experiment_access(fid, key)
1853
[a97394b]1854        self.state_lock.acquire()
[e40c7ee]1855        if self.state.has_key(key):
[a97394b]1856            rv = { 'experiment' : {keytype: key },\
[e40c7ee]1857                    'vtopo': self.state[key]['vtopo'],\
[a97394b]1858                }
1859        self.state_lock.release()
1860
1861        if rv: return rv
1862        else: raise service_error(service_error.req, "No such experiment")
[987aaa1]1863
1864    def get_vis(self, req, fid):
[0ea11af]1865        """
1866        Return the stored visualization for this experiment
1867        """
[a97394b]1868        rv = None
[987aaa1]1869
1870        req = req.get('VisRequestBody', None)
1871        if not req:
1872            raise service_error(service_error.req,
1873                    "Bad request format (no VisRequestBody)")
[e40c7ee]1874        exp = req.get('experiment', None)
1875        if exp:
1876            if exp.has_key('fedid'):
[f8582c9]1877                key = exp['fedid']
[e40c7ee]1878                keytype = "fedid"
1879            elif exp.has_key('localname'):
1880                key = exp['localname']
1881                keytype = "localname"
1882            else:
1883                raise service_error(service_error.req, "Unknown lookup type")
[987aaa1]1884        else:
[e40c7ee]1885            raise service_error(service_error.req, "No request?")
[987aaa1]1886
[4064742]1887        self.check_experiment_access(fid, key)
1888
[a97394b]1889        self.state_lock.acquire()
[e40c7ee]1890        if self.state.has_key(key):
[a97394b]1891            rv =  { 'experiment' : {keytype: key },\
[e40c7ee]1892                    'vis': self.state[key]['vis'],\
[987aaa1]1893                    }
[a97394b]1894        self.state_lock.release()
1895
1896        if rv: return rv
1897        else: raise service_error(service_error.req, "No such experiment")
[987aaa1]1898
[c52c48d]1899    def get_info(self, req, fid):
[0ea11af]1900        """
1901        Return all the stored info about this experiment
1902        """
[a97394b]1903        rv = None
[c52c48d]1904
1905        req = req.get('InfoRequestBody', None)
1906        if not req:
1907            raise service_error(service_error.req,
1908                    "Bad request format (no VisRequestBody)")
1909        exp = req.get('experiment', None)
1910        if exp:
1911            if exp.has_key('fedid'):
[f8582c9]1912                key = exp['fedid']
[c52c48d]1913                keytype = "fedid"
1914            elif exp.has_key('localname'):
1915                key = exp['localname']
1916                keytype = "localname"
1917            else:
1918                raise service_error(service_error.req, "Unknown lookup type")
1919        else:
1920            raise service_error(service_error.req, "No request?")
1921
[4064742]1922        self.check_experiment_access(fid, key)
1923
[c52c48d]1924        # The state may be massaged by the service function that called
1925        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1926        # state.
[a97394b]1927        self.state_lock.acquire()
[c52c48d]1928        if self.state.has_key(key):
[a97394b]1929            rv = copy.deepcopy(self.state[key])
1930        self.state_lock.release()
[6679c122]1931
[a97394b]1932        if rv: return rv
1933        else: raise service_error(service_error.req, "No such experiment")
[7a8d667]1934
1935
1936    def terminate_experiment(self, req, fid):
[0ea11af]1937        """
1938        Swap this experiment out on the federants and delete the shared
1939        information
1940        """
[7a8d667]1941        tbparams = { }
1942        req = req.get('TerminateRequestBody', None)
1943        if not req:
1944            raise service_error(service_error.req,
1945                    "Bad request format (no TerminateRequestBody)")
1946        exp = req.get('experiment', None)
1947        if exp:
1948            if exp.has_key('fedid'):
[f8582c9]1949                key = exp['fedid']
[7a8d667]1950                keytype = "fedid"
1951            elif exp.has_key('localname'):
1952                key = exp['localname']
1953                keytype = "localname"
1954            else:
1955                raise service_error(service_error.req, "Unknown lookup type")
1956        else:
1957            raise service_error(service_error.req, "No request?")
1958
[4064742]1959        self.check_experiment_access(fid, key)
1960
[a97394b]1961        self.state_lock.acquire()
[7a8d667]1962        fed_exp = self.state.get(key, None)
1963
1964        if fed_exp:
[a97394b]1965            # This branch of the conditional holds the lock to generate a
1966            # consistent temporary tbparams variable to deallocate experiments.
1967            # It releases the lock to do the deallocations and reacquires it to
1968            # remove the experiment state when the termination is complete.
[eee2b2e]1969            ids = []
1970            #  experimentID is a list of dicts that are self-describing
1971            #  identifiers.  This finds all the fedids and localnames - the
1972            #  keys of self.state - and puts them into ids.
1973            for id in fed_exp.get('experimentID', []):
1974                if id.has_key('fedid'): ids.append(id['fedid'])
1975                if id.has_key('localname'): ids.append(id['localname'])
1976
[7a8d667]1977            # Construct enough of the tbparams to make the stop_segment calls
1978            # work
1979            for fed in fed_exp['federant']:
1980                try:
1981                    for e in fed['name']:
1982                        eid = e.get('localname', None)
1983                        if eid: break
1984                    else:
1985                        continue
1986
1987                    p = fed['emulab']['project']
1988
1989                    project = p['name']['localname']
1990                    tb = p['testbed']['localname']
1991                    user = p['user'][0]['userID']['localname']
1992
1993                    domain = fed['emulab']['domain']
1994                    host  = "%s%s" % (fed['emulab']['ops'], domain)
[d81971a]1995                    aid = fed['allocID']
[7a8d667]1996                except KeyError, e:
1997                    continue
1998                tbparams[tb] = {\
1999                        'user': user,\
2000                        'domain': domain,\
2001                        'project': project,\
2002                        'host': host,\
2003                        'eid': eid,\
[d81971a]2004                        'aid': aid,\
[7a8d667]2005                    }
[a97394b]2006            self.state_lock.release()
2007
[7a8d667]2008            # Stop everyone.
2009            for tb in tbparams.keys():
2010                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
[eee2b2e]2011
[d81971a]2012            # release the allocations
2013            for tb in tbparams.keys():
2014                self.release_access(tb, tbparams[tb]['aid'])
2015
2016            # Remove the terminated experiment
[a97394b]2017            self.state_lock.acquire()
[eee2b2e]2018            for id in ids:
2019                if self.state.has_key(id): del self.state[id]
2020
2021            if self.state_filename: self.write_state()
[a97394b]2022            self.state_lock.release()
2023
[7a8d667]2024            return { 'experiment': exp }
2025        else:
[a97394b]2026            # Don't forget to release the lock
2027            self.state_lock.release()
[7a8d667]2028            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.