source: fedd/fedd_experiment_control.py @ 4064742

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 4064742 was 4064742, checked in by Ted Faber <faber@…>, 15 years ago

Add access control to the experiment control module.

  • Property mode set to 100644
File size: 58.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from fedd_services import *
22from fedd_internal_services import *
23from fedd_util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26import parse_detail
27from service_error import service_error
28
29
30class nullHandler(logging.Handler):
31    def emit(self, record): pass
32
33fl = logging.getLogger("fedd.experiment_control")
34fl.addHandler(nullHandler())
35
36class fedd_experiment_control_local:
37    """
38    Control of experiments that this system can directly access.
39
40    Includes experiment creation, termination and information dissemination.
41    Thred safe.
42    """
43   
44    class thread_pool:
45        """
46        A class to keep track of a set of threads all invoked for the same
47        task.  Manages the mutual exclusion of the states.
48        """
49        def __init__(self):
50            """
51            Start a pool.
52            """
53            self.changed = Condition()
54            self.started = 0
55            self.terminated = 0
56
57        def acquire(self):
58            """
59            Get the pool's lock.
60            """
61            self.changed.acquire()
62
63        def release(self):
64            """
65            Release the pool's lock.
66            """
67            self.changed.release()
68
69        def wait(self, timeout = None):
70            """
71            Wait for a pool thread to start or stop.
72            """
73            self.changed.wait(timeout)
74
75        def start(self):
76            """
77            Called by a pool thread to report starting.
78            """
79            self.changed.acquire()
80            self.started += 1
81            self.changed.notifyAll()
82            self.changed.release()
83
84        def terminate(self):
85            """
86            Called by a pool thread to report finishing.
87            """
88            self.changed.acquire()
89            self.terminated += 1
90            self.changed.notifyAll()
91            self.changed.release()
92
93        def clear(self):
94            """
95            Clear all pool data.
96            """
97            self.changed.acquire()
98            self.started = 0
99            self.terminated =0
100            self.changed.notifyAll()
101            self.changed.release()
102
103    class pooled_thread(Thread):
104        """
105        One of a set of threads dedicated to a specific task.  Uses the
106        thread_pool class above for coordination.
107        """
108        def __init__(self, group=None, target=None, name=None, args=(), 
109                kwargs={}, pdata=None, trace_file=None):
110            Thread.__init__(self, group, target, name, args, kwargs)
111            self.rv = None          # Return value of the ops in this thread
112            self.exception = None   # Exception that terminated this thread
113            self.target=target      # Target function to run on start()
114            self.args = args        # Args to pass to target
115            self.kwargs = kwargs    # Additional kw args
116            self.pdata = pdata      # thread_pool for this class
117            # Logger for this thread
118            self.log = logging.getLogger("fedd.experiment_control")
119       
120        def run(self):
121            """
122            Emulate Thread.run, except add pool data manipulation and error
123            logging.
124            """
125            if self.pdata:
126                self.pdata.start()
127
128            if self.target:
129                try:
130                    self.rv = self.target(*self.args, **self.kwargs)
131                except service_error, s:
132                    self.exception = s
133                    self.log.error("Thread exception: %s %s" % \
134                            (s.code_string(), s.desc))
135                except:
136                    self.exception = sys.exc_info()[1]
137                    self.log.error(("Unexpected thread exception: %s" +\
138                            "Trace %s") % (self.exception,\
139                                traceback.format_exc()))
140            if self.pdata:
141                self.pdata.terminate()
142
143    call_RequestAccess = service_caller('RequestAccess',
144            'getfeddPortType', feddServiceLocator, 
145            RequestAccessRequestMessage, 'RequestAccessRequestBody')
146
147    call_ReleaseAccess = service_caller('ReleaseAccess',
148            'getfeddPortType', feddServiceLocator, 
149            ReleaseAccessRequestMessage, 'ReleaseAccessRequestBody')
150
151    call_Ns2Split = service_caller('Ns2Split',
152            'getfeddInternalPortType', feddInternalServiceLocator, 
153            Ns2SplitRequestMessage, 'Ns2SplitRequestBody')
154
155    def __init__(self, config=None, auth=None):
156        """
157        Intialize the various attributes, most from the config object
158        """
159        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
160        self.thread_pool = fedd_experiment_control_local.thread_pool
161
162        self.cert_file = None
163        self.cert_pwd = None
164        self.trusted_certs = None
165
166        # Walk through the various relevant certificat specifying config
167        # attributes until the local certificate attributes can be resolved.
168        # The walk is from most specific to most general specification.
169        for s in ("experiment_control", "globals"):
170            if config.has_section(s): 
171                if config.has_option(s, "cert_file"):
172                    if not self.cert_file:
173                        self.cert_file = config.get(s, "cert_file")
174                        self.cert_pwd = config.get(s, "cert_pwd")
175
176                if config.has_option(s, "trusted_certs"):
177                    if not self.trusted_certs:
178                        self.trusted_certs = config.get(s, "trusted_certs")
179
180
181        self.exp_stem = "fed-stem"
182        self.log = logging.getLogger("fedd.experiment_control")
183        self.muxmax = 2
184        self.nthreads = 2
185        self.randomize_experiments = False
186
187        self.scp_exec = "/usr/bin/scp"
188        self.splitter = None
189        self.ssh_exec="/usr/bin/ssh"
190        self.ssh_keygen = "/usr/bin/ssh-keygen"
191        self.ssh_identity_file = None
192
193        # XXX remove if/else?
194        if config.has_section("experiment_control"):
195            self.debug = config.get("experiment_control", "create_debug")
196            self.state_filename = config.get("experiment_control", 
197                    "experiment_state_file")
198            self.splitter_url = config.get("experiment_control", "splitter_url")
199            self.fedkit = config.get("experiment_control", "fedkit")
200            accessdb_file = config.get("experiment_control", "accessdb")
201        else:
202            self.debug = False
203            self.state_filename = None
204            self.splitter_url = None
205            self.fedkit = None
206
207        self.ssh_pubkey_file = config.get("experiment_control", 
208                "ssh_pubkey_file")
209        self.ssh_privkey_file = config.get("experiment_control",
210                "ssh_privkey_file")
211        # NB for internal master/slave ops, not experiment setup
212        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
213        self.state = { }
214        self.state_lock = Lock()
215        self.tclsh = "/usr/local/bin/otclsh"
216        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
217        self.tbmap = { 
218                'deter':'https://users.isi.deterlab.net:23235',
219                'emulab':'https://users.isi.deterlab.net:23236',
220                'ucb':'https://users.isi.deterlab.net:23237',
221                }
222        self.trace_file = sys.stderr
223
224        self.def_expstart = \
225                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
226                "/tmp/federate";
227        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
228                "FEDDIR/hosts";
229        self.def_gwstart = \
230                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
231                "/tmp/bridge.log";
232        self.def_mgwstart = \
233                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
234                "/tmp/bridge.log";
235        self.def_gwimage = "FBSD61-TUNNEL2";
236        self.def_gwtype = "pc";
237
238
239        if self.ssh_pubkey_file:
240            try:
241                f = open(self.ssh_pubkey_file, 'r')
242                self.ssh_pubkey = f.read()
243                f.close()
244            except IOError:
245                raise service_error(service_error.internal,
246                        "Cannot read sshpubkey")
247        else:
248            raise service_error(service_error.internal, 
249                    "No SSH public key file?")
250
251        if not self.ssh_privkey_file:
252            raise service_error(service_error.internal, 
253                    "No SSH public key file?")
254
255        set_log_level(config, "experiment_control", self.log)
256
257        if auth:
258            self.auth = auth
259        else:
260            self.log.error(\
261                    "[access]: No authorizer initialized, creating local one.")
262            auth = authorizer()
263
264        if accessdb_file:
265            try:
266                self.accessdb = self.read_accessdb(accessdb_file)
267            except IOError:
268                raise service_error(service_error.internal,
269                        "Error opening %s as experiment control accessdb" % \
270                                accessdb_file)
271            for fid in self.accessdb.keys():
272                self.auth.set_attribute(fid, 'create')
273        else:
274            raise service_error(service_error.internal,
275                    "No accessdb specified in config")
276
277        # Grab saved state.  OK to do this w/o locking because it's read only
278        # and only one thread should be in existence that can see self.state at
279        # this point.
280        if self.state_filename:
281            self.read_state()
282
283        # Dispatch tables
284        self.soap_services = {\
285                'Create': soap_handler(\
286                        CreateRequestMessage.typecode,
287                        getattr(self, "create_experiment"), 
288                        CreateResponseMessage,
289                        "CreateResponseBody"),
290                'Vtopo': soap_handler(\
291                        VtopoRequestMessage.typecode,
292                        getattr(self, "get_vtopo"),
293                        VtopoResponseMessage,
294                        "VtopoResponseBody"),
295                'Vis': soap_handler(\
296                        VisRequestMessage.typecode,
297                        getattr(self, "get_vis"),
298                        VisResponseMessage,
299                        "VisResponseBody"),
300                'Info': soap_handler(\
301                        InfoRequestMessage.typecode,
302                        getattr(self, "get_info"),
303                        InfoResponseMessage,
304                        "InfoResponseBody"),
305                'Terminate': soap_handler(\
306                        TerminateRequestMessage.typecode,
307                        getattr(self, "terminate_experiment"),
308                        TerminateResponseMessage,
309                        "TerminateResponseBody"),
310        }
311
312        self.xmlrpc_services = {\
313                'Create': xmlrpc_handler(\
314                        getattr(self, "create_experiment"), 
315                        "CreateResponseBody"),
316                'Vtopo': xmlrpc_handler(\
317                        getattr(self, "get_vtopo"),
318                        "VtopoResponseBody"),
319                'Vis': xmlrpc_handler(\
320                        getattr(self, "get_vis"),
321                        "VisResponseBody"),
322                'Info': xmlrpc_handler(\
323                        getattr(self, "get_info"),
324                        "InfoResponseBody"),
325                'Terminate': xmlrpc_handler(\
326                        getattr(self, "terminate_experiment"),
327                        "TerminateResponseBody"),
328        }
329
330    def copy_file(self, src, dest, size=1024):
331        """
332        Exceedingly simple file copy.
333        """
334        s = open(src,'r')
335        d = open(dest, 'w')
336
337        buf = "x"
338        while buf != "":
339            buf = s.read(size)
340            d.write(buf)
341        s.close()
342        d.close()
343
344    # Call while holding self.state_lock
345    def write_state(self):
346        """
347        Write a new copy of experiment state after copying the existing state
348        to a backup.
349
350        State format is a simple pickling of the state dictionary.
351        """
352        if os.access(self.state_filename, os.W_OK):
353            self.copy_file(self.state_filename, \
354                    "%s.bak" % self.state_filename)
355        try:
356            f = open(self.state_filename, 'w')
357            pickle.dump(self.state, f)
358        except IOError, e:
359            self.log.error("Can't write file %s: %s" % \
360                    (self.state_filename, e))
361        except pickle.PicklingError, e:
362            self.log.error("Pickling problem: %s" % e)
363        except TypeError, e:
364            self.log.error("Pickling problem (TypeError): %s" % e)
365
366    # Call while holding self.state_lock
367    def read_state(self):
368        """
369        Read a new copy of experiment state.  Old state is overwritten.
370
371        State format is a simple pickling of the state dictionary.
372        """
373        try:
374            f = open(self.state_filename, "r")
375            self.state = pickle.load(f)
376            self.log.debug("[read_state]: Read state from %s" % \
377                    self.state_filename)
378        except IOError, e:
379            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
380                    % (self.state_filename, e))
381        except pickle.UnpicklingError, e:
382            self.log.warning(("[read_state]: No saved state: " + \
383                    "Unpickling failed: %s") % e)
384       
385        for k in self.state.keys():
386            try:
387                # This list should only have one element in it, but phrasing it
388                # as a for loop doesn't cost much, really.  We have to find the
389                # fedid elements anyway.
390                for eid in [ f['fedid'] \
391                        for f in self.state[k]['experimentID']\
392                            if f.has_key('fedid') ]:
393                    self.auth.set_attribute(self.state[k]['owner'], eid)
394            except KeyError, e:
395                self.log.warning("[read_state]: State ownership or identity " +\
396                        "misformatted in %s: %s" % (self.state_filename, e))
397
398
399    def read_accessdb(self, accessdb_file):
400        access = {}
401        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
402        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
403                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
404        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
405                "\s*->\s*(" + name_expr + ")\s*$")
406        lineno = 0
407
408        f = open(accessdb_file, "r")
409        for line in f:
410            lineno += 1
411            line.strip()
412            if len(line) == 0 or line.startswith('#'):
413                continue
414            m = project_line.match(line)
415            if m:
416                fid = fedid(hexstr=m.group(1))
417                project, user = m.group(2,3)
418                if not access.has_key(fid):
419                    access[fid] = []
420                access[fid].append((project, user))
421                continue
422
423            m = user_line.match(line)
424            if m:
425                fid = fedid(hexstr=m.group(1))
426                project = None
427                user = m.group(2)
428                if not access.has_key(fid):
429                    access[fid] = []
430                access[fid].append((project, user))
431                continue
432            raise service_error(service_error.internal,
433                    "Error parsing access db %s at line %d" % \
434                        (accessdb_file, lineno))
435        f.close()
436        return access
437
438
439    def scp_file(self, file, user, host, dest=""):
440        """
441        scp a file to the remote host.  If debug is set the action is only
442        logged.
443        """
444
445        scp_cmd = [self.scp_exec, '-i', self.ssh_privkey_file, file, 
446                "%s@%s:%s" % (user, host, dest)]
447        rv = 0
448
449        try:
450            dnull = open("/dev/null", "r")
451        except IOError:
452            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
453            dnull = Null
454
455        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
456        if not self.debug:
457            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
458            else: rv = call(scp_cmd)
459
460        return rv == 0
461
462    def ssh_cmd(self, user, host, cmd, wname=None):
463        """
464        Run a remote command on host as user.  If debug is set, the action is
465        only logged.
466        """
467        sh_str = "%s -i %s %s@%s %s" % (self.ssh_exec, self.ssh_privkey_file, 
468                user, host, cmd)
469
470        try:
471            dnull = open("/dev/null", "r")
472        except IOError:
473            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
474            dnull = Null
475
476        self.log.debug("[ssh_cmd]: %s" % sh_str)
477        if not self.debug:
478            if dnull:
479                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
480            else:
481                sub = Popen(sh_str, shell=True)
482            return sub.wait() == 0
483        else:
484            return True
485
486    def ship_configs(self, host, user, src_dir, dest_dir):
487        """
488        Copy federant-specific configuration files to the federant.
489        """
490        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
491            return False
492        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
493            return False
494
495        for f in os.listdir(src_dir):
496            if os.path.isdir(f):
497                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
498                        "%s/%s" % (dest_dir, f)):
499                    return False
500            else:
501                if not self.scp_file("%s/%s" % (src_dir, f), 
502                        user, host, dest_dir):
503                    return False
504        return True
505
506    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
507        """
508        Start a sub-experiment on a federant.
509
510        Get the current state, modify or create as appropriate, ship data and
511        configs and start the experiment.  There are small ordering differences
512        based on the initial state of the sub-experiment.
513        """
514        # ops node in the federant
515        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
516        user = tbparams[tb]['user']     # federant user
517        pid = tbparams[tb]['project']   # federant project
518        # XXX
519        base_confs = ( "hosts",)
520        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
521        # command to test experiment state
522        expinfo_exec = "/usr/testbed/bin/expinfo" 
523        # Configuration directories on the remote machine
524        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
525        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
526        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
527        # Regular expressions to parse the expinfo response
528        state_re = re.compile("State:\s+(\w+)")
529        no_exp_re = re.compile("^No\s+such\s+experiment")
530        state = None    # Experiment state parsed from expinfo
531        # The expinfo ssh command
532        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
533
534        # Get status
535        self.log.debug("[start_segment]: %s"% " ".join(cmd))
536        dev_null = None
537        try:
538            dev_null = open("/dev/null", "a")
539        except IOError, e:
540            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
541
542        if self.debug:
543            state = 'swapped'
544            rv = 0
545        else:
546            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
547            for line in status.stdout:
548                m = state_re.match(line)
549                if m: state = m.group(1)
550                else:
551                    m = no_exp_re.match(line)
552                    if m: state = "none"
553            rv = status.wait()
554
555        # If the experiment is not present the subcommand returns a non-zero
556        # return value.  If we successfully parsed a "none" outcome, ignore the
557        # return code.
558        if rv != 0 and state != "none":
559            raise service_error(service_error.internal,
560                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
561
562        self.log.debug("[start_segment]: %s: %s" % (tb, state))
563        self.log.info("[start_segment]:transferring experiment to %s" % tb)
564
565        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
566            return False
567        # Clear the federation files
568        if not self.ssh_cmd(user, host, 
569                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
570            return False
571        if not self.ssh_cmd(user, host, 
572                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
573            return False
574        # Clear and create the tarfiles and rpm directories
575        for d in (tarfiles_dir, rpms_dir):
576            if not self.ssh_cmd(user, host, 
577                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
578                return False
579            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
580                    "create tarfiles"):
581                return False
582       
583        if state == 'active':
584            # Remote experiment is active.  Modify it.
585            for f in base_confs:
586                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
587                        "%s/%s" % (proj_dir, f)):
588                    return False
589            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
590                    proj_dir):
591                return False
592            if os.path.isdir("%s/tarfiles" % tmpdir):
593                if not self.ship_configs(host, user,
594                        "%s/tarfiles" % tmpdir, tarfiles_dir):
595                    return False
596            if os.path.isdir("%s/rpms" % tmpdir):
597                if not self.ship_configs(host, user,
598                        "%s/rpms" % tmpdir, tarfiles_dir):
599                    return False
600            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
601            if not self.ssh_cmd(user, host,
602                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
603                            (pid, eid, tclfile), "modexp"):
604                return False
605            return True
606        elif state == "swapped":
607            # Remote experiment swapped out.  Modify it and swap it in.
608            for f in base_confs:
609                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
610                        "%s/%s" % (proj_dir, f)):
611                    return False
612            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
613                    proj_dir):
614                return False
615            if os.path.isdir("%s/tarfiles" % tmpdir):
616                if not self.ship_configs(host, user,
617                        "%s/tarfiles" % tmpdir, tarfiles_dir):
618                    return False
619            if os.path.isdir("%s/rpms" % tmpdir):
620                if not self.ship_configs(host, user,
621                        "%s/rpms" % tmpdir, tarfiles_dir):
622                    return False
623            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
624            if not self.ssh_cmd(user, host,
625                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
626                    "modexp"):
627                return False
628            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
629            if not self.ssh_cmd(user, host,
630                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
631                    "swapexp"):
632                return False
633            return True
634        elif state == "none":
635            # No remote experiment.  Create one.  We do this in 2 steps so we
636            # can put the configuration files and scripts into the new
637            # experiment directories.
638
639            # Tarfiles must be present for creation to work
640            if os.path.isdir("%s/tarfiles" % tmpdir):
641                if not self.ship_configs(host, user,
642                        "%s/tarfiles" % tmpdir, tarfiles_dir):
643                    return False
644            if os.path.isdir("%s/rpms" % tmpdir):
645                if not self.ship_configs(host, user,
646                        "%s/rpms" % tmpdir, tarfiles_dir):
647                    return False
648            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
649            if not self.ssh_cmd(user, host,
650                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
651                            (pid, eid, tclfile), "startexp"):
652                return False
653            # After startexp the per-experiment directories exist
654            for f in base_confs:
655                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
656                        "%s/%s" % (proj_dir, f)):
657                    return False
658            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
659                    proj_dir):
660                return False
661            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
662            if not self.ssh_cmd(user, host,
663                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
664                    "swapexp"):
665                return False
666            return True
667        else:
668            self.log.debug("[start_segment]:unknown state %s" % state)
669            return False
670
671    def stop_segment(self, tb, eid, tbparams):
672        """
673        Stop a sub experiment by calling swapexp on the federant
674        """
675        user = tbparams[tb]['user']
676        host = tbparams[tb]['host']
677        pid = tbparams[tb]['project']
678
679        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
680        return self.ssh_cmd(user, host,
681                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
682
683       
684    def generate_ssh_keys(self, dest, type="rsa" ):
685        """
686        Generate a set of keys for the gateways to use to talk.
687
688        Keys are of type type and are stored in the required dest file.
689        """
690        valid_types = ("rsa", "dsa")
691        t = type.lower();
692        if t not in valid_types: raise ValueError
693        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
694
695        try:
696            trace = open("/dev/null", "w")
697        except IOError:
698            raise service_error(service_error.internal,
699                    "Cannot open /dev/null??");
700
701        # May raise CalledProcessError
702        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
703        rv = call(cmd, stdout=trace, stderr=trace)
704        if rv != 0:
705            raise service_error(service_error.internal, 
706                    "Cannot generate nonce ssh keys.  %s return code %d" \
707                            % (self.ssh_keygen, rv))
708
709    def gentopo(self, str):
710        """
711        Generate the topology dtat structure from the splitter's XML
712        representation of it.
713
714        The topology XML looks like:
715            <experiment>
716                <nodes>
717                    <node><vname></vname><ips>ip1:ip2</ips></node>
718                </nodes>
719                <lans>
720                    <lan>
721                        <vname></vname><vnode></vnode><ip></ip>
722                        <bandwidth></bandwidth><member>node:port</member>
723                    </lan>
724                </lans>
725        """
726        class topo_parse:
727            """
728            Parse the topology XML and create the dats structure.
729            """
730            def __init__(self):
731                # Typing of the subelements for data conversion
732                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
733                self.int_subelements = ( 'bandwidth',)
734                self.float_subelements = ( 'delay',)
735                # The final data structure
736                self.nodes = [ ]
737                self.lans =  [ ]
738                self.topo = { \
739                        'node': self.nodes,\
740                        'lan' : self.lans,\
741                    }
742                self.element = { }  # Current element being created
743                self.chars = ""     # Last text seen
744
745            def end_element(self, name):
746                # After each sub element the contents is added to the current
747                # element or to the appropriate list.
748                if name == 'node':
749                    self.nodes.append(self.element)
750                    self.element = { }
751                elif name == 'lan':
752                    self.lans.append(self.element)
753                    self.element = { }
754                elif name in self.str_subelements:
755                    self.element[name] = self.chars
756                    self.chars = ""
757                elif name in self.int_subelements:
758                    self.element[name] = int(self.chars)
759                    self.chars = ""
760                elif name in self.float_subelements:
761                    self.element[name] = float(self.chars)
762                    self.chars = ""
763
764            def found_chars(self, data):
765                self.chars += data.rstrip()
766
767
768        tp = topo_parse();
769        parser = xml.parsers.expat.ParserCreate()
770        parser.EndElementHandler = tp.end_element
771        parser.CharacterDataHandler = tp.found_chars
772
773        parser.Parse(str)
774
775        return tp.topo
776       
777
778    def genviz(self, topo):
779        """
780        Generate the visualization the virtual topology
781        """
782
783        neato = "/usr/local/bin/neato"
784        # These are used to parse neato output and to create the visualization
785        # file.
786        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
787        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
788                "%s</type></node>"
789
790        try:
791            # Node names
792            nodes = [ n['vname'] for n in topo['node'] ]
793            topo_lans = topo['lan']
794        except KeyError:
795            raise service_error(service_error.internal, "Bad topology")
796
797        lans = { }
798        links = { }
799
800        # Walk through the virtual topology, organizing the connections into
801        # 2-node connections (links) and more-than-2-node connections (lans).
802        # When a lan is created, it's added to the list of nodes (there's a
803        # node in the visualization for the lan).
804        for l in topo_lans:
805            if links.has_key(l['vname']):
806                if len(links[l['vname']]) < 2:
807                    links[l['vname']].append(l['vnode'])
808                else:
809                    nodes.append(l['vname'])
810                    lans[l['vname']] = links[l['vname']]
811                    del links[l['vname']]
812                    lans[l['vname']].append(l['vnode'])
813            elif lans.has_key(l['vname']):
814                lans[l['vname']].append(l['vnode'])
815            else:
816                links[l['vname']] = [ l['vnode'] ]
817
818
819        # Open up a temporary file for dot to turn into a visualization
820        try:
821            df, dotname = tempfile.mkstemp()
822            dotfile = os.fdopen(df, 'w')
823        except IOError:
824            raise service_error(service_error.internal,
825                    "Failed to open file in genviz")
826
827        # Generate a dot/neato input file from the links, nodes and lans
828        try:
829            print >>dotfile, "graph G {"
830            for n in nodes:
831                print >>dotfile, '\t"%s"' % n
832            for l in links.keys():
833                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
834            for l in lans.keys():
835                for n in lans[l]:
836                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
837            print >>dotfile, "}"
838            dotfile.close()
839        except TypeError:
840            raise service_error(service_error.internal,
841                    "Single endpoint link in vtopo")
842        except IOError:
843            raise service_error(service_error.internal, "Cannot write dot file")
844
845        # Use dot to create a visualization
846        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
847                '-Gpack=true', dotname], stdout=PIPE)
848
849        # Translate dot to vis format
850        vis_nodes = [ ]
851        vis = { 'node': vis_nodes }
852        for line in dot.stdout:
853            m = vis_re.match(line)
854            if m:
855                vn = m.group(1)
856                vis_node = {'name': vn, \
857                        'x': float(m.group(2)),\
858                        'y' : float(m.group(3)),\
859                    }
860                if vn in links.keys() or vn in lans.keys():
861                    vis_node['type'] = 'lan'
862                else:
863                    vis_node['type'] = 'node'
864                vis_nodes.append(vis_node)
865        rv = dot.wait()
866
867        os.remove(dotname)
868        if rv == 0 : return vis
869        else: return None
870
871    def get_access(self, tb, nodes, user, tbparam, master, export_project,
872            access_user):
873        """
874        Get access to testbed through fedd and set the parameters for that tb
875        """
876
877        translate_attr = {
878            'slavenodestartcmd': 'expstart',
879            'slaveconnectorstartcmd': 'gwstart',
880            'masternodestartcmd': 'mexpstart',
881            'masterconnectorstartcmd': 'mgwstart',
882            'connectorimage': 'gwimage',
883            'connectortype': 'gwtype',
884            'tunnelcfg': 'tun',
885            'smbshare': 'smbshare',
886        }
887
888        uri = self.tbmap.get(tb, None)
889        if not uri:
890            raise service_error(serice_error.server_config, 
891                    "Unknown testbed: %s" % tb)
892
893        # currently this lumps all users into one service access group
894        service_keys = [ a for u in user \
895                for a in u.get('access', []) \
896                    if a.has_key('sshPubkey')]
897
898        if len(service_keys) == 0:
899            raise service_error(service_error.req, 
900                    "Must have at least one SSH pubkey for services")
901
902
903        for p, u in access_user:
904
905            if p:
906                # Request with user and project specified
907                req = {\
908                        'destinationTestbed' : { 'uri' : uri },
909                        'project': { 
910                            'name': {'localname': p},
911                            'user': [ {'userID': { 'localname': u } } ],
912                            },
913                        'user':  user,
914                        'allocID' : { 'localname': 'test' },
915                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
916                        'serviceAccess' : service_keys
917                    }
918            else:
919                # Request with only user specified
920                req = {\
921                        'destinationTestbed' : { 'uri' : uri },
922                        'user':  [ {'userID': { 'localname': u } } ],
923                        'allocID' : { 'localname': 'test' },
924                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
925                        'serviceAccess' : service_keys
926                    }
927
928            if tb == master:
929                # NB, the export_project parameter is a dict that includes
930                # the type
931                req['exportProject'] = export_project
932
933            # node resources if any
934            if nodes != None and len(nodes) > 0:
935                rnodes = [ ]
936                for n in nodes:
937                    rn = { }
938                    image, hw, count = n.split(":")
939                    if image: rn['image'] = [ image ]
940                    if hw: rn['hardware'] = [ hw ]
941                    if count: rn['count'] = int(count)
942                    rnodes.append(rn)
943                req['resources']= { }
944                req['resources']['node'] = rnodes
945
946            try:
947                r = self.call_RequestAccess(uri, req, 
948                        self.cert_file, self.cert_pwd, self.trusted_certs)
949            except service_error, e:
950                if e.code == service_error.access:
951                    r = None
952                    continue
953                else:
954                    raise e
955
956            if r.has_key('RequestAccessResponseBody'):
957                r = r['RequestAccessResponseBody']
958            else:
959                raise service_error(service_error.protocol,
960                        "Bad proxy response")
961       
962        if not r:
963            raise service_error(service_error.access, 
964                    "Access denied by %s (%s)" % (tb, uri))
965
966        e = r['emulab']
967        p = e['project']
968        tbparam[tb] = { 
969                "boss": e['boss'],
970                "host": e['ops'],
971                "domain": e['domain'],
972                "fs": e['fileServer'],
973                "eventserver": e['eventServer'],
974                "project": unpack_id(p['name']),
975                "emulab" : e,
976                "allocID" : r['allocID'],
977                }
978        # Make the testbed name be the label the user applied
979        p['testbed'] = {'localname': tb }
980
981        for u in p['user']:
982            tbparam[tb]['user'] = unpack_id(u['userID'])
983
984        for a in e['fedAttr']:
985            if a['attribute']:
986                key = translate_attr.get(a['attribute'].lower(), None)
987                if key:
988                    tbparam[tb][key]= a['value']
989       
990    def release_access(self, tb, aid):
991        """
992        Release access to testbed through fedd
993        """
994
995        uri = self.tbmap.get(tb, None)
996        if not uri:
997            raise service_error(serice_error.server_config, 
998                    "Unknown testbed: %s" % tb)
999
1000        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1001                self.cert_file, self.cert_pwd, self.trusted_certs)
1002
1003        # better error coding
1004
1005    def remote_splitter(self, uri, desc, master):
1006
1007        req = {
1008                'description' : { 'ns2description': desc },
1009                'master': master,
1010                'include_fedkit': bool(self.fedkit)
1011            }
1012
1013        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1014                self.trusted_certs)
1015
1016        if r.has_key('Ns2SplitResponseBody'):
1017            r = r['Ns2SplitResponseBody']
1018            if r.has_key('output'):
1019                return r['output'].splitlines()
1020            else:
1021                raise service_error(service_error.protocol, 
1022                        "Bad splitter response (no output)")
1023        else:
1024            raise service_error(service_error.protocol, "Bad splitter response")
1025       
1026    class current_testbed:
1027        """
1028        Object for collecting the current testbed description.  The testbed
1029        description is saved to a file with the local testbed variables
1030        subsittuted line by line.
1031        """
1032        def __init__(self, eid, tmpdir, fedkit):
1033            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1034            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1035            self.current_testbed = None
1036            self.testbed_file = None
1037
1038            self.def_expstart = \
1039                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1040            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1041            self.def_gwstart = \
1042                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1043            self.def_mgwstart = \
1044                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1045            self.def_gwimage = "FBSD61-TUNNEL2";
1046            self.def_gwtype = "pc";
1047
1048            self.eid = eid
1049            self.tmpdir = tmpdir
1050            self.fedkit = fedkit
1051
1052        def __call__(self, line, master, allocated, tbparams):
1053            # Capture testbed topology descriptions
1054            if self.current_testbed == None:
1055                m = self.begin_testbed.match(line)
1056                if m != None:
1057                    self.current_testbed = m.group(1)
1058                    if self.current_testbed == None:
1059                        raise service_error(service_error.req,
1060                                "Bad request format (unnamed testbed)")
1061                    allocated[self.current_testbed] = \
1062                            allocated.get(self.current_testbed,0) + 1
1063                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1064                    if not os.path.exists(tb_dir):
1065                        try:
1066                            os.mkdir(tb_dir)
1067                        except IOError:
1068                            raise service_error(service_error.internal,
1069                                    "Cannot create %s" % tb_dir)
1070                    try:
1071                        self.testbed_file = open("%s/%s.%s.tcl" %
1072                                (tb_dir, self.eid, self.current_testbed), 'w')
1073                    except IOError:
1074                        self.testbed_file = None
1075                    return True
1076                else: return False
1077            else:
1078                m = self.end_testbed.match(line)
1079                if m != None:
1080                    if m.group(1) != self.current_testbed:
1081                        raise service_error(service_error.internal, 
1082                                "Mismatched testbed markers!?")
1083                    if self.testbed_file != None: 
1084                        self.testbed_file.close()
1085                        self.testbed_file = None
1086                    self.current_testbed = None
1087                elif self.testbed_file:
1088                    # Substitute variables and put the line into the local
1089                    # testbed file.
1090                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1091                            self.def_gwtype)
1092                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1093                            self.def_gwimage)
1094                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1095                            self.def_mgwstart)
1096                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1097                            self.def_mexpstart)
1098                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1099                            self.def_gwstart)
1100                    expstart = tbparams[self.current_testbed].get('expstart', 
1101                            self.def_expstart)
1102                    project = tbparams[self.current_testbed].get('project')
1103                    line = re.sub("GWTYPE", gwtype, line)
1104                    line = re.sub("GWIMAGE", gwimage, line)
1105                    if self.current_testbed == master:
1106                        line = re.sub("GWSTART", mgwstart, line)
1107                        line = re.sub("EXPSTART", mexpstart, line)
1108                    else:
1109                        line = re.sub("GWSTART", gwstart, line)
1110                        line = re.sub("EXPSTART", expstart, line)
1111                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1112                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1113                    line = re.sub("EID", self.eid, line)
1114                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1115                            (project, self.eid), line)
1116                    if self.fedkit:
1117                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1118                                line)
1119                    print >>self.testbed_file, line
1120                return True
1121
1122    class allbeds:
1123        """
1124        Process the Allbeds section.  Get access to each federant and save the
1125        parameters in tbparams
1126        """
1127        def __init__(self, get_access):
1128            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1129            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1130            self.in_allbeds = False
1131            self.get_access = get_access
1132
1133        def __call__(self, line, user, tbparams, master, export_project,
1134                access_user):
1135            # Testbed access parameters
1136            if not self.in_allbeds:
1137                if self.begin_allbeds.match(line):
1138                    self.in_allbeds = True
1139                    return True
1140                else:
1141                    return False
1142            else:
1143                if self.end_allbeds.match(line):
1144                    self.in_allbeds = False
1145                else:
1146                    nodes = line.split('|')
1147                    tb = nodes.pop(0)
1148                    self.get_access(tb, nodes, user, tbparams, master,
1149                            export_project, access_user)
1150                return True
1151
1152    class gateways:
1153        def __init__(self, eid, master, tmpdir, gw_pubkey,
1154                gw_secretkey, copy_file, fedkit):
1155            self.begin_gateways = \
1156                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1157            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1158            self.current_gateways = None
1159            self.control_gateway = None
1160            self.active_end = { }
1161
1162            self.eid = eid
1163            self.master = master
1164            self.tmpdir = tmpdir
1165            self.gw_pubkey_base = gw_pubkey
1166            self.gw_secretkey_base = gw_secretkey
1167
1168            self.copy_file = copy_file
1169            self.fedkit = fedkit
1170
1171
1172        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1173                active_end, tbparams, dtb, myname, desthost, type):
1174            """
1175            Produce a gateway configuration file from a gateways line.
1176            """
1177
1178            sproject = tbparams[gw].get('project', 'project')
1179            dproject = tbparams[dtb].get('project', 'project')
1180            sdomain = ".%s.%s%s" % (eid, sproject,
1181                    tbparams[gw].get('domain', ".example.com"))
1182            ddomain = ".%s.%s%s" % (eid, dproject,
1183                    tbparams[dtb].get('domain', ".example.com"))
1184            boss = tbparams[master].get('boss', "boss")
1185            fs = tbparams[master].get('fs', "fs")
1186            event_server = "%s%s" % \
1187                    (tbparams[gw].get('eventserver', "event_server"),
1188                            tbparams[gw].get('domain', "example.com"))
1189            remote_event_server = "%s%s" % \
1190                    (tbparams[dtb].get('eventserver', "event_server"),
1191                            tbparams[dtb].get('domain', "example.com"))
1192            seer_control = "%s%s" % \
1193                    (tbparams[gw].get('control', "control"), sdomain)
1194
1195            if self.fedkit:
1196                remote_script_dir = "/usr/local/federation/bin"
1197                local_script_dir = "/usr/local/federation/bin"
1198            else:
1199                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1200                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1201
1202            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1203            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1204            tunnel_cfg = tbparams[gw].get("tun", "false")
1205
1206            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1207            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1208
1209            # translate to lower case so the `hostname` hack for specifying
1210            # configuration files works.
1211            conf_file = conf_file.lower();
1212            remote_conf_file = remote_conf_file.lower();
1213
1214            if dtb == master:
1215                active = "false"
1216            elif gw == master:
1217                active = "true"
1218            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1219                active = "false"
1220            else:
1221                active_end['%s-%s' % (gw, dtb)] = 1
1222                active = "true"
1223
1224            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1225            print >>gwconfig, "Active: %s" % active
1226            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1227            print >>gwconfig, "BossName: %s" % boss
1228            print >>gwconfig, "FsName: %s" % fs
1229            print >>gwconfig, "EventServerName: %s" % event_server
1230            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1231            print >>gwconfig, "SeerControl: %s" % seer_control
1232            print >>gwconfig, "Type: %s" % type
1233            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1234            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1235                    local_script_dir
1236            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1237            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1238            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1239                    (remote_conf_dir, remote_conf_file)
1240            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1241            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1242            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1243            gwconfig.close()
1244
1245            return active == "true"
1246
1247        def __call__(self, line, allocated, tbparams):
1248            # Process gateways
1249            if not self.current_gateways:
1250                m = self.begin_gateways.match(line)
1251                if m:
1252                    self.current_gateways = m.group(1)
1253                    if allocated.has_key(self.current_gateways):
1254                        # This test should always succeed
1255                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1256                        if not os.path.exists(tb_dir):
1257                            try:
1258                                os.mkdir(tb_dir)
1259                            except IOError:
1260                                raise service_error(service_error.internal,
1261                                        "Cannot create %s" % tb_dir)
1262                    else:
1263                        # XXX
1264                        self.log.error("[gateways]: Ignoring gateways for " + \
1265                                "unknown testbed %s" % self.current_gateways)
1266                        self.current_gateways = None
1267                    return True
1268                else:
1269                    return False
1270            else:
1271                m = self.end_gateways.match(line)
1272                if m :
1273                    if m.group(1) != self.current_gateways:
1274                        raise service_error(service_error.internal,
1275                                "Mismatched gateway markers!?")
1276                    if self.control_gateway:
1277                        try:
1278                            cc = open("%s/%s/client.conf" %
1279                                    (self.tmpdir, self.current_gateways), 'w')
1280                            print >>cc, "ControlGateway: %s" % \
1281                                    self.control_gateway
1282                            if tbparams[self.master].has_key('smbshare'):
1283                                print >>cc, "SMBSHare: %s" % \
1284                                        tbparams[self.master]['smbshare']
1285                            print >>cc, "ProjectUser: %s" % \
1286                                    tbparams[self.master]['user']
1287                            print >>cc, "ProjectName: %s" % \
1288                                    tbparams[self.master]['project']
1289                            cc.close()
1290                        except IOError:
1291                            raise service_error(service_error.internal,
1292                                    "Error creating client config")
1293                        try:
1294                            cc = open("%s/%s/seer.conf" %
1295                                    (self.tmpdir, self.current_gateways),
1296                                    'w')
1297                            if self.current_gateways != self.master:
1298                                print >>cc, "ControlNode: %s" % \
1299                                        self.control_gateway
1300                            print >>cc, "ExperimentID: %s/%s" % \
1301                                    ( tbparams[self.master]['project'], \
1302                                    self.eid )
1303                            cc.close()
1304                        except IOError:
1305                            raise service_error(service_error.internal,
1306                                    "Error creating seer config")
1307                    else:
1308                        debug.error("[gateways]: No control gateway for %s" %\
1309                                    self.current_gateways)
1310                    self.current_gateways = None
1311                else:
1312                    dtb, myname, desthost, type = line.split(" ")
1313
1314                    if type == "control" or type == "both":
1315                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1316                                self.eid, 
1317                                tbparams[self.current_gateways]['project'],
1318                                tbparams[self.current_gateways]['domain'])
1319                    try:
1320                        active = self.gateway_conf_file(self.current_gateways,
1321                                self.master, self.eid, self.gw_pubkey_base,
1322                                self.gw_secretkey_base,
1323                                self.active_end, tbparams, dtb, myname,
1324                                desthost, type)
1325                    except IOError, e:
1326                        raise service_error(service_error.internal,
1327                                "Failed to write config file for %s" % \
1328                                        self.current_gateway)
1329           
1330                    gw_pubkey = "%s/keys/%s" % \
1331                            (self.tmpdir, self.gw_pubkey_base)
1332                    gw_secretkey = "%s/keys/%s" % \
1333                            (self.tmpdir, self.gw_secretkey_base)
1334
1335                    pkfile = "%s/%s/%s" % \
1336                            ( self.tmpdir, self.current_gateways, 
1337                                    self.gw_pubkey_base)
1338                    skfile = "%s/%s/%s" % \
1339                            ( self.tmpdir, self.current_gateways, 
1340                                    self.gw_secretkey_base)
1341
1342                    if not os.path.exists(pkfile):
1343                        try:
1344                            self.copy_file(gw_pubkey, pkfile)
1345                        except IOError:
1346                            service_error(service_error.internal,
1347                                    "Failed to copy pubkey file")
1348
1349                    if active and not os.path.exists(skfile):
1350                        try:
1351                            self.copy_file(gw_secretkey, skfile)
1352                        except IOError:
1353                            service_error(service_error.internal,
1354                                    "Failed to copy secretkey file")
1355                return True
1356
1357    class shunt_to_file:
1358        """
1359        Simple class to write data between two regexps to a file.
1360        """
1361        def __init__(self, begin, end, filename):
1362            """
1363            Begin shunting on a match of begin, stop on end, send data to
1364            filename.
1365            """
1366            self.begin = re.compile(begin)
1367            self.end = re.compile(end)
1368            self.in_shunt = False
1369            self.file = None
1370            self.filename = filename
1371
1372        def __call__(self, line):
1373            """
1374            Call this on each line in the input that may be shunted.
1375            """
1376            if not self.in_shunt:
1377                if self.begin.match(line):
1378                    self.in_shunt = True
1379                    try:
1380                        self.file = open(self.filename, "w")
1381                    except:
1382                        self.file = None
1383                        raise
1384                    return True
1385                else:
1386                    return False
1387            else:
1388                if self.end.match(line):
1389                    if self.file: 
1390                        self.file.close()
1391                        self.file = None
1392                    self.in_shunt = False
1393                else:
1394                    if self.file:
1395                        print >>self.file, line
1396                return True
1397
1398    class shunt_to_list:
1399        """
1400        Same interface as shunt_to_file.  Data collected in self.list, one list
1401        element per line.
1402        """
1403        def __init__(self, begin, end):
1404            self.begin = re.compile(begin)
1405            self.end = re.compile(end)
1406            self.in_shunt = False
1407            self.list = [ ]
1408       
1409        def __call__(self, line):
1410            if not self.in_shunt:
1411                if self.begin.match(line):
1412                    self.in_shunt = True
1413                    return True
1414                else:
1415                    return False
1416            else:
1417                if self.end.match(line):
1418                    self.in_shunt = False
1419                else:
1420                    self.list.append(line)
1421                return True
1422
1423    class shunt_to_string:
1424        """
1425        Same interface as shunt_to_file.  Data collected in self.str, all in
1426        one string.
1427        """
1428        def __init__(self, begin, end):
1429            self.begin = re.compile(begin)
1430            self.end = re.compile(end)
1431            self.in_shunt = False
1432            self.str = ""
1433       
1434        def __call__(self, line):
1435            if not self.in_shunt:
1436                if self.begin.match(line):
1437                    self.in_shunt = True
1438                    return True
1439                else:
1440                    return False
1441            else:
1442                if self.end.match(line):
1443                    self.in_shunt = False
1444                else:
1445                    self.str += line
1446                return True
1447
1448    def create_experiment(self, req, fid):
1449        """
1450        The external interface to experiment creation called from the
1451        dispatcher.
1452
1453        Creates a working directory, splits the incoming description using the
1454        splitter script and parses out the avrious subsections using the
1455        lcasses above.  Once each sub-experiment is created, use pooled threads
1456        to instantiate them and start it all up.
1457        """
1458
1459        if not self.auth.check_attribute(fid, 'create'):
1460            raise service_error(service_error.access, "Create access denied")
1461
1462        try:
1463            tmpdir = tempfile.mkdtemp(prefix="split-")
1464        except IOError:
1465            raise service_error(service_error.internal, "Cannot create tmp dir")
1466
1467        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1468        gw_secretkey_base = "fed.%s" % self.ssh_type
1469        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1470        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1471        tclfile = tmpdir + "/experiment.tcl"
1472        tbparams = { }
1473        try:
1474            access_user = self.accessdb[fid]
1475        except KeyError:
1476            raise service_error(service_error.internal,
1477                    "Access map and authorizer out of sync in " + \
1478                            "create_experiment for fedid %s"  % fid)
1479
1480        pid = "dummy"
1481        gid = "dummy"
1482        # XXX
1483        fail_soft = False
1484
1485        try:
1486            os.mkdir(tmpdir+"/keys")
1487        except OSError:
1488            raise service_error(service_error.internal,
1489                    "Can't make temporary dir")
1490
1491        req = req.get('CreateRequestBody', None)
1492        if not req:
1493            raise service_error(service_error.req,
1494                    "Bad request format (no CreateRequestBody)")
1495        # The tcl parser needs to read a file so put the content into that file
1496        descr=req.get('experimentdescription', None)
1497        if descr:
1498            file_content=descr.get('ns2description', None)
1499            if file_content:
1500                try:
1501                    f = open(tclfile, 'w')
1502                    f.write(file_content)
1503                    f.close()
1504                except IOError:
1505                    raise service_error(service_error.internal,
1506                            "Cannot write temp experiment description")
1507            else:
1508                raise service_error(service_error.req, 
1509                        "Only ns2descriptions supported")
1510        else:
1511            raise service_error(service_error.req, "No experiment description")
1512
1513        if req.has_key('experimentID') and \
1514                req['experimentID'].has_key('localname'):
1515            eid = req['experimentID']['localname']
1516            self.state_lock.acquire()
1517            while (self.state.has_key(eid)):
1518                eid += random.choice(string.ascii_letters)
1519            # To avoid another thread picking this localname
1520            self.state[eid] = "placeholder"
1521            self.state_lock.release()
1522        else:
1523            eid = self.exp_stem
1524            for i in range(0,5):
1525                eid += random.choice(string.ascii_letters)
1526            self.state_lock.acquire()
1527            while (self.state.has_key(eid)):
1528                eid = self.exp_stem
1529                for i in range(0,5):
1530                    eid += random.choice(string.ascii_letters)
1531            # To avoid another thread picking this localname
1532            self.state[eid] = "placeholder"
1533            self.state_lock.release()
1534
1535        try:
1536            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1537        except ValueError:
1538            raise service_error(service_error.server_config, 
1539                    "Bad key type (%s)" % self.ssh_type)
1540
1541        user = req.get('user', None)
1542        if user == None:
1543            raise service_error(service_error.req, "No user")
1544
1545        master = req.get('master', None)
1546        if not master:
1547            raise service_error(service_error.req, "No master testbed label")
1548        export_project = req.get('exportProject', None)
1549        if not export_project:
1550            raise service_error(service_error.req, "No export project")
1551       
1552        if self.splitter_url:
1553            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1554            split_data = self.remote_splitter(self.splitter_url, file_content,
1555                    master)
1556        else:
1557            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1558                str(self.muxmax), '-m', master]
1559
1560            if self.fedkit:
1561                tclcmd.append('-k')
1562
1563            tclcmd.extend([pid, gid, eid, tclfile])
1564
1565            self.log.debug("running local splitter %s", " ".join(tclcmd))
1566            tclparser = Popen(tclcmd, stdout=PIPE)
1567            split_data = tclparser.stdout
1568
1569        allocated = { }     # Testbeds we can access
1570        started = { }       # Testbeds where a sub-experiment started
1571                            # successfully
1572
1573        # Objects to parse the splitter output (defined above)
1574        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1575        parse_allbeds = self.allbeds(self.get_access)
1576        parse_gateways = self.gateways(eid, master, tmpdir,
1577                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1578        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1579                    "^#\s+End\s+Vtopo")
1580        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1581                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1582        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1583                "^#\s+End\s+tarfiles")
1584        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1585                "^#\s+End\s+rpms")
1586
1587        # Worling on the split data
1588        for line in split_data:
1589            line = line.rstrip()
1590            if parse_current_testbed(line, master, allocated, tbparams):
1591                continue
1592            elif parse_allbeds(line, user, tbparams, master, export_project,
1593                    access_user):
1594                continue
1595            elif parse_gateways(line, allocated, tbparams):
1596                continue
1597            elif parse_vtopo(line):
1598                continue
1599            elif parse_hostnames(line):
1600                continue
1601            elif parse_tarfiles(line):
1602                continue
1603            elif parse_rpms(line):
1604                continue
1605            else:
1606                raise service_error(service_error.internal, 
1607                        "Bad tcl parse? %s" % line)
1608
1609        # Virtual topology and visualization
1610        vtopo = self.gentopo(parse_vtopo.str)
1611        if not vtopo:
1612            raise service_error(service_error.internal, 
1613                    "Failed to generate virtual topology")
1614
1615        vis = self.genviz(vtopo)
1616        if not vis:
1617            raise service_error(service_error.internal, 
1618                    "Failed to generate visualization")
1619
1620        # save federant information
1621        for k in allocated.keys():
1622            tbparams[k]['federant'] = {\
1623                    'name': [ { 'localname' : eid} ],\
1624                    'emulab': tbparams[k]['emulab'],\
1625                    'allocID' : tbparams[k]['allocID'],\
1626                    'master' : k == master,\
1627                }
1628
1629
1630        # Copy tarfiles and rpms needed at remote sites into a staging area
1631        try:
1632            if self.fedkit:
1633                parse_tarfiles.list.append(self.fedkit)
1634            for t in parse_tarfiles.list:
1635                if not os.path.exists("%s/tarfiles" % tmpdir):
1636                    os.mkdir("%s/tarfiles" % tmpdir)
1637                self.copy_file(t, "%s/tarfiles/%s" % \
1638                        (tmpdir, os.path.basename(t)))
1639            for r in parse_rpms.list:
1640                if not os.path.exists("%s/rpms" % tmpdir):
1641                    os.mkdir("%s/rpms" % tmpdir)
1642                self.copy_file(r, "%s/rpms/%s" % \
1643                        (tmpdir, os.path.basename(r)))
1644        except IOError, e:
1645            raise service_error(service_error.internal, 
1646                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1647
1648        thread_pool_info = self.thread_pool()
1649        threads = [ ]
1650
1651        for tb in [ k for k in allocated.keys() if k != master]:
1652            # Wait until we have a free slot to start the next testbed load
1653            thread_pool_info.acquire()
1654            while thread_pool_info.started - \
1655                    thread_pool_info.terminated >= self.nthreads:
1656                thread_pool_info.wait()
1657            thread_pool_info.release()
1658
1659            # Create and start a thread to start the segment, and save it to
1660            # get the return value later
1661            t  = self.pooled_thread(target=self.start_segment, 
1662                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1663                    pdata=thread_pool_info, trace_file=self.trace_file)
1664            threads.append(t)
1665            t.start()
1666
1667        # Wait until all finish (the first clause of the while is to make sure
1668        # one starts)
1669        thread_pool_info.acquire()
1670        while thread_pool_info.started == 0 or \
1671                thread_pool_info.started > thread_pool_info.terminated:
1672            thread_pool_info.wait()
1673        thread_pool_info.release()
1674
1675        # If none failed, start the master
1676        failed = [ t.getName() for t in threads if not t.rv ]
1677
1678        if len(failed) == 0:
1679            if not self.start_segment(master, eid, tbparams, tmpdir):
1680                failed.append(master)
1681
1682        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1683        # If one failed clean up, unless fail_soft is set
1684        if failed:
1685            if not fail_soft:
1686                for tb in succeeded:
1687                    self.stop_segment(tb, eid, tbparams)
1688                # Remove the placeholder
1689                self.state_lock.acquire()
1690                del self.state[eid]
1691                self.state_lock.release()
1692
1693                raise service_error(service_error.federant,
1694                    "Swap in failed on %s" % ",".join(failed))
1695        else:
1696            self.log.info("[start_segment]: Experiment %s started" % eid)
1697
1698        # Generate an ID for the experiment (slice) and a certificate that the
1699        # allocator can use to prove they own it.  We'll ship it back through
1700        # the encrypted connection.
1701        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1702
1703        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1704
1705        # Walk up tmpdir, deleting as we go
1706        for path, dirs, files in os.walk(tmpdir, topdown=False):
1707            for f in files:
1708                os.remove(os.path.join(path, f))
1709            for d in dirs:
1710                os.rmdir(os.path.join(path, d))
1711        os.rmdir(tmpdir)
1712
1713        # The deepcopy prevents the allocation ID and other binaries from being
1714        # translated into other formats
1715        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1716                for tb in tbparams.keys() \
1717                    if tbparams[tb].has_key('federant') ],\
1718                    'vtopo': vtopo,\
1719                    'vis' : vis,
1720                    'experimentID' : [\
1721                            { 'fedid': copy.copy(expid) }, \
1722                            { 'localname': eid },\
1723                        ],\
1724                    'experimentAccess': { 'X509' : expcert },\
1725                }
1726
1727        # Insert the experiment into our state and update the disk copy
1728        self.state_lock.acquire()
1729        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1730                for tb in tbparams.keys() \
1731                    if tbparams[tb].has_key('federant') ],\
1732                    'vtopo': vtopo,\
1733                    'vis' : vis,
1734                    'owner': fid,
1735                    'experimentID' : [\
1736                            { 'fedid': expid }, { 'localname': eid },\
1737                        ],\
1738                }
1739        self.state[eid] = self.state[expid]
1740        if self.state_filename: self.write_state()
1741        self.state_lock.release()
1742
1743        self.auth.set_attribute(fid, expid)
1744        self.auth.set_attribute(expid, expid)
1745
1746        if not failed:
1747            return resp
1748        else:
1749            raise service_error(service_error.partial, \
1750                    "Partial swap in on %s" % ",".join(succeeded))
1751
1752    def check_experiment_access(self, fid, key):
1753        """
1754        Confirm that the fid has access to the experiment.  Though a request
1755        may be made in terms of a local name, the access attribute is always
1756        the experiment's fedid.
1757        """
1758        if not isinstance(key, fedid):
1759            self.state_lock.acquire()
1760            if self.state.has_key(key):
1761                try:
1762                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1763                            if f.has_key('fedid') ]
1764                except KeyError:
1765                    self.state_lock.release()
1766                    raise service_error(service_error.internal, 
1767                            "No fedid for experiment %s when checking " +\
1768                                    "access(!?)" % key)
1769                if len(kl) == 1:
1770                    key = kl[0]
1771                else:
1772                    self.state_lock.release()
1773                    raise service_error(service_error.internal, 
1774                            "multiple fedids for experiment %s when " +\
1775                                    "checking access(!?)" % key)
1776            else:
1777                self.state_lock.release()
1778                raise service_error(service_error.access, "Access Denied")
1779            self.state_lock.release()
1780
1781        if self.auth.check_attribute(fid, key):
1782            return True
1783        else:
1784            raise service_error(service_error.access, "Access Denied")
1785
1786
1787
1788    def get_vtopo(self, req, fid):
1789        """
1790        Return the stored virtual topology for this experiment
1791        """
1792        rv = None
1793
1794        req = req.get('VtopoRequestBody', None)
1795        if not req:
1796            raise service_error(service_error.req,
1797                    "Bad request format (no VtopoRequestBody)")
1798        exp = req.get('experiment', None)
1799        if exp:
1800            if exp.has_key('fedid'):
1801                key = exp['fedid']
1802                keytype = "fedid"
1803            elif exp.has_key('localname'):
1804                key = exp['localname']
1805                keytype = "localname"
1806            else:
1807                raise service_error(service_error.req, "Unknown lookup type")
1808        else:
1809            raise service_error(service_error.req, "No request?")
1810
1811        self.check_experiment_access(fid, key)
1812
1813        self.state_lock.acquire()
1814        if self.state.has_key(key):
1815            rv = { 'experiment' : {keytype: key },\
1816                    'vtopo': self.state[key]['vtopo'],\
1817                }
1818        self.state_lock.release()
1819
1820        if rv: return rv
1821        else: raise service_error(service_error.req, "No such experiment")
1822
1823    def get_vis(self, req, fid):
1824        """
1825        Return the stored visualization for this experiment
1826        """
1827        rv = None
1828
1829        req = req.get('VisRequestBody', None)
1830        if not req:
1831            raise service_error(service_error.req,
1832                    "Bad request format (no VisRequestBody)")
1833        exp = req.get('experiment', None)
1834        if exp:
1835            if exp.has_key('fedid'):
1836                key = exp['fedid']
1837                keytype = "fedid"
1838            elif exp.has_key('localname'):
1839                key = exp['localname']
1840                keytype = "localname"
1841            else:
1842                raise service_error(service_error.req, "Unknown lookup type")
1843        else:
1844            raise service_error(service_error.req, "No request?")
1845
1846        self.check_experiment_access(fid, key)
1847
1848        self.state_lock.acquire()
1849        if self.state.has_key(key):
1850            rv =  { 'experiment' : {keytype: key },\
1851                    'vis': self.state[key]['vis'],\
1852                    }
1853        self.state_lock.release()
1854
1855        if rv: return rv
1856        else: raise service_error(service_error.req, "No such experiment")
1857
1858    def get_info(self, req, fid):
1859        """
1860        Return all the stored info about this experiment
1861        """
1862        rv = None
1863
1864        req = req.get('InfoRequestBody', None)
1865        if not req:
1866            raise service_error(service_error.req,
1867                    "Bad request format (no VisRequestBody)")
1868        exp = req.get('experiment', None)
1869        if exp:
1870            if exp.has_key('fedid'):
1871                key = exp['fedid']
1872                keytype = "fedid"
1873            elif exp.has_key('localname'):
1874                key = exp['localname']
1875                keytype = "localname"
1876            else:
1877                raise service_error(service_error.req, "Unknown lookup type")
1878        else:
1879            raise service_error(service_error.req, "No request?")
1880
1881        self.check_experiment_access(fid, key)
1882
1883        # The state may be massaged by the service function that called
1884        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1885        # state.
1886        self.state_lock.acquire()
1887        if self.state.has_key(key):
1888            rv = copy.deepcopy(self.state[key])
1889        self.state_lock.release()
1890
1891        if rv: return rv
1892        else: raise service_error(service_error.req, "No such experiment")
1893
1894
1895    def terminate_experiment(self, req, fid):
1896        """
1897        Swap this experiment out on the federants and delete the shared
1898        information
1899        """
1900        tbparams = { }
1901        req = req.get('TerminateRequestBody', None)
1902        if not req:
1903            raise service_error(service_error.req,
1904                    "Bad request format (no TerminateRequestBody)")
1905        exp = req.get('experiment', None)
1906        if exp:
1907            if exp.has_key('fedid'):
1908                key = exp['fedid']
1909                keytype = "fedid"
1910            elif exp.has_key('localname'):
1911                key = exp['localname']
1912                keytype = "localname"
1913            else:
1914                raise service_error(service_error.req, "Unknown lookup type")
1915        else:
1916            raise service_error(service_error.req, "No request?")
1917
1918        self.check_experiment_access(fid, key)
1919
1920        self.state_lock.acquire()
1921        fed_exp = self.state.get(key, None)
1922
1923        if fed_exp:
1924            # This branch of the conditional holds the lock to generate a
1925            # consistent temporary tbparams variable to deallocate experiments.
1926            # It releases the lock to do the deallocations and reacquires it to
1927            # remove the experiment state when the termination is complete.
1928            ids = []
1929            #  experimentID is a list of dicts that are self-describing
1930            #  identifiers.  This finds all the fedids and localnames - the
1931            #  keys of self.state - and puts them into ids.
1932            for id in fed_exp.get('experimentID', []):
1933                if id.has_key('fedid'): ids.append(id['fedid'])
1934                if id.has_key('localname'): ids.append(id['localname'])
1935
1936            # Construct enough of the tbparams to make the stop_segment calls
1937            # work
1938            for fed in fed_exp['federant']:
1939                try:
1940                    for e in fed['name']:
1941                        eid = e.get('localname', None)
1942                        if eid: break
1943                    else:
1944                        continue
1945
1946                    p = fed['emulab']['project']
1947
1948                    project = p['name']['localname']
1949                    tb = p['testbed']['localname']
1950                    user = p['user'][0]['userID']['localname']
1951
1952                    domain = fed['emulab']['domain']
1953                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1954                    aid = fed['allocID']
1955                except KeyError, e:
1956                    continue
1957                tbparams[tb] = {\
1958                        'user': user,\
1959                        'domain': domain,\
1960                        'project': project,\
1961                        'host': host,\
1962                        'eid': eid,\
1963                        'aid': aid,\
1964                    }
1965            self.state_lock.release()
1966
1967            # Stop everyone.
1968            for tb in tbparams.keys():
1969                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1970
1971            # release the allocations
1972            for tb in tbparams.keys():
1973                self.release_access(tb, tbparams[tb]['aid'])
1974
1975            # Remove the terminated experiment
1976            self.state_lock.acquire()
1977            for id in ids:
1978                if self.state.has_key(id): del self.state[id]
1979
1980            if self.state_filename: self.write_state()
1981            self.state_lock.release()
1982
1983            return { 'experiment': exp }
1984        else:
1985            # Don't forget to release the lock
1986            self.state_lock.release()
1987            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.