source: fedd/federation/experiment_control.py @ af75d48

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since af75d48 was b78c9ea, checked in by Ted Faber <faber@…>, 15 years ago

typo

  • Property mode set to 100644
File size: 92.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_control_local:
45    """
46    Control of experiments that this system can directly access.
47
48    Includes experiment creation, termination and information dissemination.
49    Thred safe.
50    """
51
52    class ssh_cmd_timeout(RuntimeError): pass
53   
54    class thread_pool:
55        """
56        A class to keep track of a set of threads all invoked for the same
57        task.  Manages the mutual exclusion of the states.
58        """
59        def __init__(self, nthreads):
60            """
61            Start a pool.
62            """
63            self.changed = Condition()
64            self.started = 0
65            self.terminated = 0
66            self.nthreads = nthreads
67
68        def acquire(self):
69            """
70            Get the pool's lock.
71            """
72            self.changed.acquire()
73
74        def release(self):
75            """
76            Release the pool's lock.
77            """
78            self.changed.release()
79
80        def wait(self, timeout = None):
81            """
82            Wait for a pool thread to start or stop.
83            """
84            self.changed.wait(timeout)
85
86        def start(self):
87            """
88            Called by a pool thread to report starting.
89            """
90            self.changed.acquire()
91            self.started += 1
92            self.changed.notifyAll()
93            self.changed.release()
94
95        def terminate(self):
96            """
97            Called by a pool thread to report finishing.
98            """
99            self.changed.acquire()
100            self.terminated += 1
101            self.changed.notifyAll()
102            self.changed.release()
103
104        def clear(self):
105            """
106            Clear all pool data.
107            """
108            self.changed.acquire()
109            self.started = 0
110            self.terminated =0
111            self.changed.notifyAll()
112            self.changed.release()
113
114        def wait_for_slot(self):
115            """
116            Wait until we have a free slot to start another pooled thread
117            """
118            self.acquire()
119            while self.started - self.terminated >= self.nthreads:
120                self.wait()
121            self.release()
122
123        def wait_for_all_done(self, timeout=None):
124            """
125            Wait until all active threads finish (and at least one has
126            started).  If a timeout is given, return after waiting that long
127            for termination.  If all threads are done (and one has started in
128            the since the last clear()) return True, otherwise False.
129            """
130            if timeout:
131                deadline = time.time() + timeout
132            self.acquire()
133            while self.started == 0 or self.started > self.terminated:
134                self.wait(timeout)
135                if timeout:
136                    if time.time() > deadline:
137                        break
138                    timeout = deadline - time.time()
139            self.release()
140            return not (self.started == 0 or self.started > self.terminated)
141
142    class pooled_thread(Thread):
143        """
144        One of a set of threads dedicated to a specific task.  Uses the
145        thread_pool class above for coordination.
146        """
147        def __init__(self, group=None, target=None, name=None, args=(), 
148                kwargs={}, pdata=None, trace_file=None):
149            Thread.__init__(self, group, target, name, args, kwargs)
150            self.rv = None          # Return value of the ops in this thread
151            self.exception = None   # Exception that terminated this thread
152            self.target=target      # Target function to run on start()
153            self.args = args        # Args to pass to target
154            self.kwargs = kwargs    # Additional kw args
155            self.pdata = pdata      # thread_pool for this class
156            # Logger for this thread
157            self.log = logging.getLogger("fedd.experiment_control")
158       
159        def run(self):
160            """
161            Emulate Thread.run, except add pool data manipulation and error
162            logging.
163            """
164            if self.pdata:
165                self.pdata.start()
166
167            if self.target:
168                try:
169                    self.rv = self.target(*self.args, **self.kwargs)
170                except service_error, s:
171                    self.exception = s
172                    self.log.error("Thread exception: %s %s" % \
173                            (s.code_string(), s.desc))
174                except:
175                    self.exception = sys.exc_info()[1]
176                    self.log.error(("Unexpected thread exception: %s" +\
177                            "Trace %s") % (self.exception,\
178                                traceback.format_exc()))
179            if self.pdata:
180                self.pdata.terminate()
181
182    call_RequestAccess = service_caller('RequestAccess')
183    call_ReleaseAccess = service_caller('ReleaseAccess')
184    call_StartSegment = service_caller('StartSegment')
185    call_TerminateSegment = service_caller('TerminateSegment')
186    call_Ns2Split = service_caller('Ns2Split')
187
188    def __init__(self, config=None, auth=None):
189        """
190        Intialize the various attributes, most from the config object
191        """
192
193        def parse_tarfile_list(tf):
194            """
195            Parse a tarfile list from the configuration.  This is a set of
196            paths and tarfiles separated by spaces.
197            """
198            rv = [ ]
199            if tf is not None:
200                tl = tf.split()
201                while len(tl) > 1:
202                    p, t = tl[0:2]
203                    del tl[0:2]
204                    rv.append((p, t))
205            return rv
206
207        self.thread_with_rv = experiment_control_local.pooled_thread
208        self.thread_pool = experiment_control_local.thread_pool
209        self.list_log = list_log.list_log
210
211        self.cert_file = config.get("experiment_control", "cert_file")
212        if self.cert_file:
213            self.cert_pwd = config.get("experiment_control", "cert_pwd")
214        else:
215            self.cert_file = config.get("globals", "cert_file")
216            self.cert_pwd = config.get("globals", "cert_pwd")
217
218        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
219                or config.get("globals", "trusted_certs")
220
221        self.repodir = config.get("experiment_control", "repodir")
222        self.repo_url = config.get("experiment_control", "repo_url", 
223                "https://users.isi.deterlab.net:23235");
224
225        self.exp_stem = "fed-stem"
226        self.log = logging.getLogger("fedd.experiment_control")
227        set_log_level(config, "experiment_control", self.log)
228        self.muxmax = 2
229        self.nthreads = 2
230        self.randomize_experiments = False
231
232        self.splitter = None
233        self.ssh_keygen = "/usr/bin/ssh-keygen"
234        self.ssh_identity_file = None
235
236
237        self.debug = config.getboolean("experiment_control", "create_debug")
238        self.cleanup = not config.getboolean("experiment_control", 
239                "leave_tmpfiles")
240        self.state_filename = config.get("experiment_control", 
241                "experiment_state")
242        self.store_filename = config.get("experiment_control", 
243                "synch_store")
244        self.store_url = config.get("experiment_control", "store_url")
245        self.splitter_url = config.get("experiment_control", "splitter_uri")
246        self.fedkit = parse_tarfile_list(\
247                config.get("experiment_control", "fedkit"))
248        self.gatewaykit = parse_tarfile_list(\
249                config.get("experiment_control", "gatewaykit"))
250        accessdb_file = config.get("experiment_control", "accessdb")
251
252        self.ssh_pubkey_file = config.get("experiment_control", 
253                "ssh_pubkey_file")
254        self.ssh_privkey_file = config.get("experiment_control",
255                "ssh_privkey_file")
256        # NB for internal master/slave ops, not experiment setup
257        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
258
259        self.overrides = set([])
260        ovr = config.get('experiment_control', 'overrides')
261        if ovr:
262            for o in ovr.split(","):
263                o = o.strip()
264                if o.startswith('fedid:'): o = o[len('fedid:'):]
265                self.overrides.add(fedid(hexstr=o))
266
267        self.state = { }
268        self.state_lock = Lock()
269        self.tclsh = "/usr/local/bin/otclsh"
270        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
271                config.get("experiment_control", "tcl_splitter",
272                        "/usr/testbed/lib/ns2ir/parse.tcl")
273        mapdb_file = config.get("experiment_control", "mapdb")
274        self.trace_file = sys.stderr
275
276        self.def_expstart = \
277                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
278                "/tmp/federate";
279        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
280                "FEDDIR/hosts";
281        self.def_gwstart = \
282                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
283                "/tmp/bridge.log";
284        self.def_mgwstart = \
285                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
286                "/tmp/bridge.log";
287        self.def_gwimage = "FBSD61-TUNNEL2";
288        self.def_gwtype = "pc";
289        self.local_access = { }
290
291        if auth:
292            self.auth = auth
293        else:
294            self.log.error(\
295                    "[access]: No authorizer initialized, creating local one.")
296            auth = authorizer()
297
298
299        if self.ssh_pubkey_file:
300            try:
301                f = open(self.ssh_pubkey_file, 'r')
302                self.ssh_pubkey = f.read()
303                f.close()
304            except IOError:
305                raise service_error(service_error.internal,
306                        "Cannot read sshpubkey")
307        else:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311        if not self.ssh_privkey_file:
312            raise service_error(service_error.internal, 
313                    "No SSH public key file?")
314
315
316        if mapdb_file:
317            self.read_mapdb(mapdb_file)
318        else:
319            self.log.warn("[experiment_control] No testbed map, using defaults")
320            self.tbmap = { 
321                    'deter':'https://users.isi.deterlab.net:23235',
322                    'emulab':'https://users.isi.deterlab.net:23236',
323                    'ucb':'https://users.isi.deterlab.net:23237',
324                    }
325
326        if accessdb_file:
327                self.read_accessdb(accessdb_file)
328        else:
329            raise service_error(service_error.internal,
330                    "No accessdb specified in config")
331
332        # Grab saved state.  OK to do this w/o locking because it's read only
333        # and only one thread should be in existence that can see self.state at
334        # this point.
335        if self.state_filename:
336            self.read_state()
337
338        if self.store_filename:
339            self.read_store()
340        else:
341            self.log.warning("No saved synch store")
342            self.synch_store = synch_store
343
344        # Dispatch tables
345        self.soap_services = {\
346                'New': soap_handler('New', self.new_experiment),
347                'Create': soap_handler('Create', self.create_experiment),
348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
352                'Terminate': soap_handler('Terminate', 
353                    self.terminate_experiment),
354                'GetValue': soap_handler('GetValue', self.GetValue),
355                'SetValue': soap_handler('SetValue', self.SetValue),
356        }
357
358        self.xmlrpc_services = {\
359                'New': xmlrpc_handler('New', self.new_experiment),
360                'Create': xmlrpc_handler('Create', self.create_experiment),
361                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
362                'Vis': xmlrpc_handler('Vis', self.get_vis),
363                'Info': xmlrpc_handler('Info', self.get_info),
364                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
365                'Terminate': xmlrpc_handler('Terminate',
366                    self.terminate_experiment),
367                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
368                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
369        }
370
371    # Call while holding self.state_lock
372    def write_state(self):
373        """
374        Write a new copy of experiment state after copying the existing state
375        to a backup.
376
377        State format is a simple pickling of the state dictionary.
378        """
379        if os.access(self.state_filename, os.W_OK):
380            copy_file(self.state_filename, \
381                    "%s.bak" % self.state_filename)
382        try:
383            f = open(self.state_filename, 'w')
384            pickle.dump(self.state, f)
385        except IOError, e:
386            self.log.error("Can't write file %s: %s" % \
387                    (self.state_filename, e))
388        except pickle.PicklingError, e:
389            self.log.error("Pickling problem: %s" % e)
390        except TypeError, e:
391            self.log.error("Pickling problem (TypeError): %s" % e)
392
393    @staticmethod
394    def get_alloc_ids(state):
395        """
396        Pull the fedids of the identifiers of each allocation from the
397        state.  Again, a dict dive that's best isolated.
398
399        Used by read_store and read state
400        """
401
402        return [ f['allocID']['fedid'] 
403                for f in state.get('federant',[]) \
404                    if f.has_key('allocID') and \
405                        f['allocID'].has_key('fedid')]
406
407    # Call while holding self.state_lock
408    def read_state(self):
409        """
410        Read a new copy of experiment state.  Old state is overwritten.
411
412        State format is a simple pickling of the state dictionary.
413        """
414       
415        def get_experiment_id(state):
416            """
417            Pull the fedid experimentID out of the saved state.  This is kind
418            of a gross walk through the dict.
419            """
420
421            if state.has_key('experimentID'):
422                for e in state['experimentID']:
423                    if e.has_key('fedid'):
424                        return e['fedid']
425                else:
426                    return None
427            else:
428                return None
429
430        try:
431            f = open(self.state_filename, "r")
432            self.state = pickle.load(f)
433            self.log.debug("[read_state]: Read state from %s" % \
434                    self.state_filename)
435        except IOError, e:
436            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
437                    % (self.state_filename, e))
438        except pickle.UnpicklingError, e:
439            self.log.warning(("[read_state]: No saved state: " + \
440                    "Unpickling failed: %s") % e)
441       
442        for s in self.state.values():
443            try:
444
445                eid = get_experiment_id(s)
446                if eid : 
447                    # Give the owner rights to the experiment
448                    self.auth.set_attribute(s['owner'], eid)
449                    # And holders of the eid as well
450                    self.auth.set_attribute(eid, eid)
451                    # allow overrides to control experiments as well
452                    for o in self.overrides:
453                        self.auth.set_attribute(o, eid)
454                    # Set permissions to allow reading of the software repo, if
455                    # any, as well.
456                    for a in self.get_alloc_ids(s):
457                        self.auth.set_attribute(a, 'repo/%s' % eid)
458                else:
459                    raise KeyError("No experiment id")
460            except KeyError, e:
461                self.log.warning("[read_state]: State ownership or identity " +\
462                        "misformatted in %s: %s" % (self.state_filename, e))
463
464
465    def read_accessdb(self, accessdb_file):
466        """
467        Read the mapping from fedids that can create experiments to their name
468        in the 3-level access namespace.  All will be asserted from this
469        testbed and can include the local username and porject that will be
470        asserted on their behalf by this fedd.  Each fedid is also added to the
471        authorization system with the "create" attribute.
472        """
473        self.accessdb = {}
474        # These are the regexps for parsing the db
475        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
476        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
477                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
478        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
479                "\s*->\s*(" + name_expr + ")\s*$")
480        lineno = 0
481
482        # Parse the mappings and store in self.authdb, a dict of
483        # fedid -> (proj, user)
484        try:
485            f = open(accessdb_file, "r")
486            for line in f:
487                lineno += 1
488                line = line.strip()
489                if len(line) == 0 or line.startswith('#'):
490                    continue
491                m = project_line.match(line)
492                if m:
493                    fid = fedid(hexstr=m.group(1))
494                    project, user = m.group(2,3)
495                    if not self.accessdb.has_key(fid):
496                        self.accessdb[fid] = []
497                    self.accessdb[fid].append((project, user))
498                    continue
499
500                m = user_line.match(line)
501                if m:
502                    fid = fedid(hexstr=m.group(1))
503                    project = None
504                    user = m.group(2)
505                    if not self.accessdb.has_key(fid):
506                        self.accessdb[fid] = []
507                    self.accessdb[fid].append((project, user))
508                    continue
509                self.log.warn("[experiment_control] Error parsing access " +\
510                        "db %s at line %d" %  (accessdb_file, lineno))
511        except IOError:
512            raise service_error(service_error.internal,
513                    "Error opening/reading %s as experiment " +\
514                            "control accessdb" %  accessdb_file)
515        f.close()
516
517        # Initialize the authorization attributes
518        for fid in self.accessdb.keys():
519            self.auth.set_attribute(fid, 'create')
520            self.auth.set_attribute(fid, 'new')
521
522    def read_mapdb(self, file):
523        """
524        Read a simple colon separated list of mappings for the
525        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
526        """
527
528        self.tbmap = { }
529        lineno =0
530        try:
531            f = open(file, "r")
532            for line in f:
533                lineno += 1
534                line = line.strip()
535                if line.startswith('#') or len(line) == 0:
536                    continue
537                try:
538                    label, url = line.split(':', 1)
539                    self.tbmap[label] = url
540                except ValueError, e:
541                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
542                            "map db: %s %s" % (lineno, line, e))
543        except IOError, e:
544            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
545                    "open %s: %s" % (file, e))
546        f.close()
547
548    def read_store(self):
549        try:
550            self.synch_store = synch_store()
551            self.synch_store.load(self.store_filename)
552            self.log.debug("[read_store]: Read store from %s" % \
553                    self.store_filename)
554        except IOError, e:
555            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
556                    % (self.state_filename, e))
557            self.synch_store = synch_store()
558
559        # Set the initial permissions on data in the store.  XXX: This ad hoc
560        # authorization attribute initialization is getting out of hand.
561        for k in self.synch_store.all_keys():
562            try:
563                if k.startswith('fedid:'):
564                    fid = fedid(hexstr=k[6:46])
565                    if self.state.has_key(fid):
566                        for a in self.get_alloc_ids(self.state[fid]):
567                            self.auth.set_attribute(a, k)
568            except ValueError, e:
569                self.log.warn("Cannot deduce permissions for %s" % k)
570
571
572    def write_store(self):
573        """
574        Write a new copy of synch_store after writing current state
575        to a backup.  We use the internal synch_store pickle method to avoid
576        incinsistent data.
577
578        State format is a simple pickling of the store.
579        """
580        if os.access(self.store_filename, os.W_OK):
581            copy_file(self.store_filename, \
582                    "%s.bak" % self.store_filename)
583        try:
584            self.synch_store.save(self.store_filename)
585        except IOError, e:
586            self.log.error("Can't write file %s: %s" % \
587                    (self.store_filename, e))
588        except TypeError, e:
589            self.log.error("Pickling problem (TypeError): %s" % e)
590
591       
592    def generate_ssh_keys(self, dest, type="rsa" ):
593        """
594        Generate a set of keys for the gateways to use to talk.
595
596        Keys are of type type and are stored in the required dest file.
597        """
598        valid_types = ("rsa", "dsa")
599        t = type.lower();
600        if t not in valid_types: raise ValueError
601        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
602
603        try:
604            trace = open("/dev/null", "w")
605        except IOError:
606            raise service_error(service_error.internal,
607                    "Cannot open /dev/null??");
608
609        # May raise CalledProcessError
610        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
611        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
612        if rv != 0:
613            raise service_error(service_error.internal, 
614                    "Cannot generate nonce ssh keys.  %s return code %d" \
615                            % (self.ssh_keygen, rv))
616
617    def gentopo(self, str):
618        """
619        Generate the topology dtat structure from the splitter's XML
620        representation of it.
621
622        The topology XML looks like:
623            <experiment>
624                <nodes>
625                    <node><vname></vname><ips>ip1:ip2</ips></node>
626                </nodes>
627                <lans>
628                    <lan>
629                        <vname></vname><vnode></vnode><ip></ip>
630                        <bandwidth></bandwidth><member>node:port</member>
631                    </lan>
632                </lans>
633        """
634        class topo_parse:
635            """
636            Parse the topology XML and create the dats structure.
637            """
638            def __init__(self):
639                # Typing of the subelements for data conversion
640                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
641                self.int_subelements = ( 'bandwidth',)
642                self.float_subelements = ( 'delay',)
643                # The final data structure
644                self.nodes = [ ]
645                self.lans =  [ ]
646                self.topo = { \
647                        'node': self.nodes,\
648                        'lan' : self.lans,\
649                    }
650                self.element = { }  # Current element being created
651                self.chars = ""     # Last text seen
652
653            def end_element(self, name):
654                # After each sub element the contents is added to the current
655                # element or to the appropriate list.
656                if name == 'node':
657                    self.nodes.append(self.element)
658                    self.element = { }
659                elif name == 'lan':
660                    self.lans.append(self.element)
661                    self.element = { }
662                elif name in self.str_subelements:
663                    self.element[name] = self.chars
664                    self.chars = ""
665                elif name in self.int_subelements:
666                    self.element[name] = int(self.chars)
667                    self.chars = ""
668                elif name in self.float_subelements:
669                    self.element[name] = float(self.chars)
670                    self.chars = ""
671
672            def found_chars(self, data):
673                self.chars += data.rstrip()
674
675
676        tp = topo_parse();
677        parser = xml.parsers.expat.ParserCreate()
678        parser.EndElementHandler = tp.end_element
679        parser.CharacterDataHandler = tp.found_chars
680
681        parser.Parse(str)
682
683        return tp.topo
684       
685
686    def genviz(self, topo):
687        """
688        Generate the visualization the virtual topology
689        """
690
691        neato = "/usr/local/bin/neato"
692        # These are used to parse neato output and to create the visualization
693        # file.
694        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
695        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
696                "%s</type></node>"
697
698        try:
699            # Node names
700            nodes = [ n['vname'] for n in topo['node'] ]
701            topo_lans = topo['lan']
702        except KeyError, e:
703            raise service_error(service_error.internal, "Bad topology: %s" %e)
704
705        lans = { }
706        links = { }
707
708        # Walk through the virtual topology, organizing the connections into
709        # 2-node connections (links) and more-than-2-node connections (lans).
710        # When a lan is created, it's added to the list of nodes (there's a
711        # node in the visualization for the lan).
712        for l in topo_lans:
713            if links.has_key(l['vname']):
714                if len(links[l['vname']]) < 2:
715                    links[l['vname']].append(l['vnode'])
716                else:
717                    nodes.append(l['vname'])
718                    lans[l['vname']] = links[l['vname']]
719                    del links[l['vname']]
720                    lans[l['vname']].append(l['vnode'])
721            elif lans.has_key(l['vname']):
722                lans[l['vname']].append(l['vnode'])
723            else:
724                links[l['vname']] = [ l['vnode'] ]
725
726
727        # Open up a temporary file for dot to turn into a visualization
728        try:
729            df, dotname = tempfile.mkstemp()
730            dotfile = os.fdopen(df, 'w')
731        except IOError:
732            raise service_error(service_error.internal,
733                    "Failed to open file in genviz")
734
735        try:
736            dnull = open('/dev/null', 'w')
737        except IOError:
738            service_error(service_error.internal,
739                    "Failed to open /dev/null in genviz")
740
741        # Generate a dot/neato input file from the links, nodes and lans
742        try:
743            print >>dotfile, "graph G {"
744            for n in nodes:
745                print >>dotfile, '\t"%s"' % n
746            for l in links.keys():
747                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
748            for l in lans.keys():
749                for n in lans[l]:
750                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
751            print >>dotfile, "}"
752            dotfile.close()
753        except TypeError:
754            raise service_error(service_error.internal,
755                    "Single endpoint link in vtopo")
756        except IOError:
757            raise service_error(service_error.internal, "Cannot write dot file")
758
759        # Use dot to create a visualization
760        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
761                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
762                close_fds=True)
763        dnull.close()
764
765        # Translate dot to vis format
766        vis_nodes = [ ]
767        vis = { 'node': vis_nodes }
768        for line in dot.stdout:
769            m = vis_re.match(line)
770            if m:
771                vn = m.group(1)
772                vis_node = {'name': vn, \
773                        'x': float(m.group(2)),\
774                        'y' : float(m.group(3)),\
775                    }
776                if vn in links.keys() or vn in lans.keys():
777                    vis_node['type'] = 'lan'
778                else:
779                    vis_node['type'] = 'node'
780                vis_nodes.append(vis_node)
781        rv = dot.wait()
782
783        os.remove(dotname)
784        if rv == 0 : return vis
785        else: return None
786
787    def get_access(self, tb, nodes, tbparam, master, export_project,
788            access_user, services):
789        """
790        Get access to testbed through fedd and set the parameters for that tb
791        """
792        uri = self.tbmap.get(tb, None)
793        if not uri:
794            raise service_error(service_error.server_config, 
795                    "Unknown testbed: %s" % tb)
796
797        # Tweak search order so that if there are entries in access_user that
798        # have a project matching the export project, we try them first
799        if export_project and export_project.has_key('localname'): 
800            pn = export_project['localname'] 
801               
802            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
803            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
804        else: 
805            access_sequence = access_user
806
807        for p, u in access_sequence: 
808            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
809                    "to %s") %  ((p or "None"), u, uri))
810
811            if p:
812                # Request with user and project specified
813                req = {\
814                        'destinationTestbed' : { 'uri' : uri },
815                        'credential': [ "project: %s" % p, "user: %s"  % u],
816                        'allocID' : { 'localname': 'test' },
817                    }
818            else:
819                # Request with only user specified
820                req = {\
821                        'destinationTestbed' : { 'uri' : uri },
822                        'credential': [ 'user: %s' % u ],
823                        'user':  [ {'userID': { 'localname': u } } ],
824                        'allocID' : { 'localname': 'test' },
825                    }
826
827            if tb == master:
828                # NB, the export_project parameter is a dict that includes
829                # the type
830                req['exportProject'] = export_project
831                req['service'] = [
832                        { 'name': 'userconfig', 'visibility': 'export'},
833                        { 'name': 'SMB', 'visibility': 'export'},
834                        { 'name': 'seer', 'visibility': 'export'},
835                        { 'name': 'tmcd', 'visibility': 'export'},
836                    ]
837
838            # node resources if any
839            if nodes != None and len(nodes) > 0:
840                rnodes = [ ]
841                for n in nodes:
842                    rn = { }
843                    image, hw, count = n.split(":")
844                    if image: rn['image'] = [ image ]
845                    if hw: rn['hardware'] = [ hw ]
846                    if count and int(count) >0 : rn['count'] = int(count)
847                    rnodes.append(rn)
848                req['resources']= { }
849                req['resources']['node'] = rnodes
850
851            try:
852                if self.local_access.has_key(uri):
853                    # Local access call
854                    req = { 'RequestAccessRequestBody' : req }
855                    r = self.local_access[uri].RequestAccess(req, 
856                            fedid(file=self.cert_file))
857                    r = { 'RequestAccessResponseBody' : r }
858                else:
859                    r = self.call_RequestAccess(uri, req, 
860                            self.cert_file, self.cert_pwd, self.trusted_certs)
861            except service_error, e:
862                if e.code == service_error.access:
863                    self.log.debug("[get_access] Access denied")
864                    r = None
865                    continue
866                else:
867                    raise e
868
869            if r.has_key('RequestAccessResponseBody'):
870                # Through to here we have a valid response, not a fault.
871                # Access denied is a fault, so something better or worse than
872                # access denied has happened.
873                r = r['RequestAccessResponseBody']
874                self.log.debug("[get_access] Access granted")
875                break
876            else:
877                raise service_error(service_error.protocol,
878                        "Bad proxy response")
879       
880        if not r:
881            raise service_error(service_error.access, 
882                    "Access denied by %s (%s)" % (tb, uri))
883
884        tbparam[tb] = { 
885                "allocID" : r['allocID'],
886                "uri": uri,
887                }
888        if 'service' in r: 
889            services.extend(r['service'])
890
891        # Add attributes to parameter space.  We don't allow attributes to
892        # overlay any parameters already installed.
893        for a in r.get('fedAttr', []):
894            try:
895                if a['attribute'] and \
896                        isinstance(a['attribute'], basestring)\
897                        and not tbparam[tb].has_key(a['attribute'].lower()):
898                    tbparam[tb][a['attribute'].lower()] = a['value']
899            except KeyError:
900                self.log.error("Bad attribute in response: %s" % a)
901
902    def release_access(self, tb, aid, uri=None):
903        """
904        Release access to testbed through fedd
905        """
906
907        if not uri:
908            uri = self.tbmap.get(tb, None)
909        if not uri:
910            raise service_error(service_error.server_config, 
911                    "Unknown testbed: %s" % tb)
912
913        if self.local_access.has_key(uri):
914            resp = self.local_access[uri].ReleaseAccess(\
915                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
916                    fedid(file=self.cert_file))
917            resp = { 'ReleaseAccessResponseBody': resp } 
918        else:
919            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
920                    self.cert_file, self.cert_pwd, self.trusted_certs)
921
922        # better error coding
923
924    def remote_splitter(self, uri, desc, master):
925
926        req = {
927                'description' : { 'ns2description': desc },
928                'master': master,
929                'include_fedkit': bool(self.fedkit),
930                'include_gatewaykit': bool(self.gatewaykit)
931            }
932
933        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
934                self.trusted_certs)
935
936        if r.has_key('Ns2SplitResponseBody'):
937            r = r['Ns2SplitResponseBody']
938            if r.has_key('output'):
939                return r['output'].splitlines()
940            else:
941                raise service_error(service_error.protocol, 
942                        "Bad splitter response (no output)")
943        else:
944            raise service_error(service_error.protocol, "Bad splitter response")
945
946    class start_segment:
947        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
948                cert_pwd=None, trusted_certs=None, caller=None,
949                log_collector=None):
950            self.log = log
951            self.debug = debug
952            self.cert_file = cert_file
953            self.cert_pwd = cert_pwd
954            self.trusted_certs = None
955            self.caller = caller
956            self.testbed = testbed
957            self.log_collector = log_collector
958            self.response = None
959
960        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
961                services=None):
962            req = {
963                    'allocID': { 'fedid' : aid }, 
964                    'segmentdescription': { 
965                        'topdldescription': topo.to_dict(),
966                    },
967                    'master': master,
968                }
969
970            if connInfo:
971                req['connection'] = connInfo
972            # Add services to request.  The master exports, everyone else
973            # imports.
974            if services:
975                svcs = [ x.copy() for x in services]
976                for s in svcs:
977                    if master: s['visibility'] = 'export'
978                    else: s['visibility'] = 'import'
979                req['service'] = svcs
980            if attrs:
981                req['fedAttr'] = attrs
982
983            try:
984                self.log.debug("Calling StartSegment at %s " % uri)
985                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
986                        self.trusted_certs)
987                if r.has_key('StartSegmentResponseBody'):
988                    lval = r['StartSegmentResponseBody'].get('allocationLog',
989                            None)
990                    if lval and self.log_collector:
991                        for line in  lval.splitlines(True):
992                            self.log_collector.write(line)
993                    self.response = r
994                else:
995                    raise service_error(service_error.internal, 
996                            "Bad response!?: %s" %r)
997                return True
998            except service_error, e:
999                self.log.error("Start segment failed on %s: %s" % \
1000                        (self.testbed, e))
1001                return False
1002
1003
1004
1005    class terminate_segment:
1006        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1007                cert_pwd=None, trusted_certs=None, caller=None):
1008            self.log = log
1009            self.debug = debug
1010            self.cert_file = cert_file
1011            self.cert_pwd = cert_pwd
1012            self.trusted_certs = None
1013            self.caller = caller
1014            self.testbed = testbed
1015
1016        def __call__(self, uri, aid ):
1017            req = {
1018                    'allocID': aid , 
1019                }
1020            try:
1021                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1022                        self.trusted_certs)
1023                return True
1024            except service_error, e:
1025                self.log.error("Terminate segment failed on %s: %s" % \
1026                        (self.testbed, e))
1027                return False
1028   
1029
1030    def allocate_resources(self, allocated, master, eid, expid, 
1031            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
1032            attrs=None, connInfo={}, services=[]):
1033
1034        started = { }           # Testbeds where a sub-experiment started
1035                                # successfully
1036
1037        # XXX
1038        fail_soft = False
1039
1040        log = alloc_log or self.log
1041
1042        thread_pool = self.thread_pool(self.nthreads)
1043        threads = [ ]
1044
1045        for tb in allocated.keys():
1046            # Create and start a thread to start the segment, and save it
1047            # to get the return value later
1048            thread_pool.wait_for_slot()
1049            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1050            if not uri:
1051                raise service_error(service_error.internal, 
1052                        "Unknown testbed %s !?" % tb)
1053
1054            if tbparams[tb].has_key('allocID') and \
1055                    tbparams[tb]['allocID'].has_key('fedid'):
1056                aid = tbparams[tb]['allocID']['fedid']
1057            else:
1058                raise service_error(service_error.internal, 
1059                        "No alloc id for testbed %s !?" % tb)
1060
1061            t  = self.pooled_thread(\
1062                    target=self.start_segment(log=log, debug=self.debug,
1063                        testbed=tb, cert_file=self.cert_file,
1064                        cert_pwd=self.cert_pwd,
1065                        trusted_certs=self.trusted_certs,
1066                        caller=self.call_StartSegment,
1067                        log_collector=log_collector), 
1068                    args=(uri, aid, topo[tb], tb == master, 
1069                        attrs, connInfo[tb], services),
1070                    name=tb,
1071                    pdata=thread_pool, trace_file=self.trace_file)
1072            threads.append(t)
1073            t.start()
1074
1075        # Wait until all finish (keep pinging the log, though)
1076        mins = 0
1077        while not thread_pool.wait_for_all_done(60.0):
1078            mins += 1
1079            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1080                    % mins)
1081
1082        thread_pool.clear()
1083
1084        failed = [ t.getName() for t in threads if not t.rv ]
1085        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1086
1087        # If one failed clean up, unless fail_soft is set
1088        if failed:
1089            if not fail_soft:
1090                thread_pool.clear()
1091                for tb in succeeded:
1092                    # Create and start a thread to stop the segment
1093                    thread_pool.wait_for_slot()
1094                    uri = tbparams[tb]['uri']
1095                    t  = self.pooled_thread(\
1096                            target=self.terminate_segment(log=log,
1097                                testbed=tb,
1098                                cert_file=self.cert_file, 
1099                                cert_pwd=self.cert_pwd,
1100                                trusted_certs=self.trusted_certs,
1101                                caller=self.call_TerminateSegment),
1102                            args=(uri, tbparams[tb]['federant']['allocID']),
1103                            name=tb,
1104                            pdata=thread_pool, trace_file=self.trace_file)
1105                    t.start()
1106                # Wait until all finish
1107                thread_pool.wait_for_all_done()
1108
1109                # release the allocations
1110                for tb in tbparams.keys():
1111                    self.release_access(tb, tbparams[tb]['allocID'],
1112                            tbparams[tb].get('uri', None))
1113                # Remove the placeholder
1114                self.state_lock.acquire()
1115                self.state[eid]['experimentStatus'] = 'failed'
1116                if self.state_filename: self.write_state()
1117                self.state_lock.release()
1118
1119                log.error("Swap in failed on %s" % ",".join(failed))
1120                return
1121        else:
1122            log.info("[start_segment]: Experiment %s active" % eid)
1123
1124
1125        # Walk up tmpdir, deleting as we go
1126        if self.cleanup:
1127            log.debug("[start_experiment]: removing %s" % tmpdir)
1128            for path, dirs, files in os.walk(tmpdir, topdown=False):
1129                for f in files:
1130                    os.remove(os.path.join(path, f))
1131                for d in dirs:
1132                    os.rmdir(os.path.join(path, d))
1133            os.rmdir(tmpdir)
1134        else:
1135            log.debug("[start_experiment]: not removing %s" % tmpdir)
1136
1137        # Insert the experiment into our state and update the disk copy
1138        self.state_lock.acquire()
1139        self.state[expid]['experimentStatus'] = 'active'
1140        self.state[eid] = self.state[expid]
1141        if self.state_filename: self.write_state()
1142        self.state_lock.release()
1143        return
1144
1145
1146    def add_kit(self, e, kit):
1147        """
1148        Add a Software object created from the list of (install, location)
1149        tuples passed as kit  to the software attribute of an object e.  We
1150        do this enough to break out the code, but it's kind of a hack to
1151        avoid changing the old tuple rep.
1152        """
1153
1154        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1155
1156        if isinstance(e.software, list): e.software.extend(s)
1157        else: e.software = s
1158
1159
1160    def create_experiment_state(self, fid, req, expid, expcert, 
1161            state='starting'):
1162        """
1163        Create the initial entry in the experiment's state.  The expid and
1164        expcert are the experiment's fedid and certifacte that represents that
1165        ID, which are installed in the experiment state.  If the request
1166        includes a suggested local name that is used if possible.  If the local
1167        name is already taken by an experiment owned by this user that has
1168        failed, it is overwritten.  Otherwise new letters are added until a
1169        valid localname is found.  The generated local name is returned.
1170        """
1171
1172        if req.has_key('experimentID') and \
1173                req['experimentID'].has_key('localname'):
1174            overwrite = False
1175            eid = req['experimentID']['localname']
1176            # If there's an old failed experiment here with the same local name
1177            # and accessible by this user, we'll overwrite it, otherwise we'll
1178            # fall through and do the collision avoidance.
1179            old_expid = self.get_experiment_fedid(eid)
1180            if old_expid and self.check_experiment_access(fid, old_expid):
1181                self.state_lock.acquire()
1182                status = self.state[eid].get('experimentStatus', None)
1183                if status and status == 'failed':
1184                    # remove the old access attribute
1185                    self.auth.unset_attribute(fid, old_expid)
1186                    overwrite = True
1187                    del self.state[eid]
1188                    del self.state[old_expid]
1189                self.state_lock.release()
1190            self.state_lock.acquire()
1191            while (self.state.has_key(eid) and not overwrite):
1192                eid += random.choice(string.ascii_letters)
1193            # Initial state
1194            self.state[eid] = {
1195                    'experimentID' : \
1196                            [ { 'localname' : eid }, {'fedid': expid } ],
1197                    'experimentStatus': state,
1198                    'experimentAccess': { 'X509' : expcert },
1199                    'owner': fid,
1200                    'log' : [],
1201                }
1202            self.state[expid] = self.state[eid]
1203            if self.state_filename: self.write_state()
1204            self.state_lock.release()
1205        else:
1206            eid = self.exp_stem
1207            for i in range(0,5):
1208                eid += random.choice(string.ascii_letters)
1209            self.state_lock.acquire()
1210            while (self.state.has_key(eid)):
1211                eid = self.exp_stem
1212                for i in range(0,5):
1213                    eid += random.choice(string.ascii_letters)
1214            # Initial state
1215            self.state[eid] = {
1216                    'experimentID' : \
1217                            [ { 'localname' : eid }, {'fedid': expid } ],
1218                    'experimentStatus': state,
1219                    'experimentAccess': { 'X509' : expcert },
1220                    'owner': fid,
1221                    'log' : [],
1222                }
1223            self.state[expid] = self.state[eid]
1224            if self.state_filename: self.write_state()
1225            self.state_lock.release()
1226
1227        return eid
1228
1229
1230    def allocate_ips_to_topo(self, top):
1231        """
1232        Add an ip4_address attribute to all the hosts in the topology, based on
1233        the shared substrates on which they sit.  An /etc/hosts file is also
1234        created and returned as a list of hostfiles entries.  We also return
1235        the allocator, because we may need to allocate IPs to portals
1236        (specifically DRAGON portals).
1237        """
1238        subs = sorted(top.substrates, 
1239                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1240                reverse=True)
1241        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1242        ifs = { }
1243        hosts = [ ]
1244
1245        for idx, s in enumerate(subs):
1246            a = ips.allocate(len(s.interfaces)+2)
1247            if a :
1248                base, num = a
1249                if num < len(s.interfaces) +2 : 
1250                    raise service_error(service_error.internal,
1251                            "Allocator returned wrong number of IPs??")
1252            else:
1253                raise service_error(service_error.req, 
1254                        "Cannot allocate IP addresses")
1255
1256            base += 1
1257            for i in s.interfaces:
1258                i.attribute.append(
1259                        topdl.Attribute('ip4_address', 
1260                            "%s" % ip_addr(base)))
1261                hname = i.element.name[0]
1262                if ifs.has_key(hname):
1263                    hosts.append("%s\t%s-%s %s-%d" % \
1264                            (ip_addr(base), hname, s.name, hname,
1265                                ifs[hname]))
1266                else:
1267                    ifs[hname] = 0
1268                    hosts.append("%s\t%s-%s %s-%d %s" % \
1269                            (ip_addr(base), hname, s.name, hname,
1270                                ifs[hname], hname))
1271
1272                ifs[hname] += 1
1273                base += 1
1274        return hosts, ips
1275
1276    def get_access_to_testbeds(self, testbeds, access_user, 
1277            export_project, master, allocated, tbparams, services):
1278        """
1279        Request access to the various testbeds required for this instantiation
1280        (passed in as testbeds).  User, access_user, expoert_project and master
1281        are used to construct the correct requests.  Per-testbed parameters are
1282        returned in tbparams.
1283        """
1284        for tb in testbeds:
1285            self.get_access(tb, None, tbparams, master,
1286                    export_project, access_user, services)
1287            allocated[tb] = 1
1288
1289    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1290        """
1291        Create the sub-topologies that are needed for experiment instantiation.
1292        """
1293        for tb in testbeds:
1294            topo[tb] = top.clone()
1295            to_delete = [ ]
1296            # XXX: copy in for loop to simplify
1297            for e in topo[tb].elements:
1298                etb = e.get_attribute('testbed')
1299                if etb and etb != tb:
1300                    for i in e.interface:
1301                        for s in i.subs:
1302                            try:
1303                                s.interfaces.remove(i)
1304                            except ValueError:
1305                                raise service_error(service_error.internal,
1306                                        "Can't remove interface??")
1307                    to_delete.append(e)
1308            for e in to_delete:
1309                topo[tb].elements.remove(e)
1310            topo[tb].make_indices()
1311
1312            for e in [ e for e in topo[tb].elements \
1313                    if isinstance(e,topdl.Computer)]:
1314                if self.fedkit: self.add_kit(e, self.fedkit)
1315
1316    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1317            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
1318            expid=None):
1319        """
1320        Return a new internet portal node and a dict with the connectionInfo to
1321        be attached.
1322        """
1323        dproject = tbparams[dt].get('project', 'project')
1324        ddomain = tbparams[dt].get('domain', ".example.com")
1325        mdomain = tbparams[master].get('domain', '.example.com')
1326        mproject = tbparams[master].get('project', 'project')
1327        muser = tbparams[master].get('user', 'root')
1328        smbshare = tbparams[master].get('smbshare', 'USERS')
1329
1330        if st == master or dt == master:
1331            active = ("%s" % (st == master))
1332        else:
1333            active = ("%s" % (st > dt))
1334
1335        ifaces = [ ]
1336        for sub, attrs in iface_desc:
1337            inf = topdl.Interface(
1338                    name="inf%03d" % len(ifaces),
1339                    substrate=sub,
1340                    attribute=[
1341                        topdl.Attribute(
1342                            attribute=n,
1343                            value = v)
1344                        for n, v in attrs
1345                        ]
1346                    )
1347            ifaces.append(inf)
1348        if conn_type == "ssh":
1349            try:
1350                aid = tbparams[st]['allocID']['fedid']
1351            except:
1352                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
1353                        % st)
1354                aid = None
1355            info = {
1356                    "type" : conn_type, 
1357                    "portal": myname,
1358                    'fedAttr': [ 
1359                            { 'attribute': 'masterdomain', 'value': mdomain},
1360                            { 'attribute': 'masterexperiment', 'value': 
1361                                "%s/%s" % (mproject, eid)},
1362                            { 'attribute': 'active', 'value': active},
1363                            # Move to SMB service description
1364                            { 'attribute': 'masteruser', 'value': muser},
1365                            { 'attribute': 'smbshare', 'value': smbshare},
1366                        ],
1367                    'parameter': [
1368                        {
1369                            'name': 'peer',
1370                            'key': 'fedid:%s/%s' % (expid, myname),
1371                            'store': self.store_url,
1372                            'type': 'output',
1373                        },
1374                        {
1375                            'name': 'peer',
1376                            'key': 'fedid:%s/%s' % (expid, desthost),
1377                            'store': self.store_url,
1378                            'type': 'input',
1379                        },
1380                        ]
1381                    }
1382            # Give this allocation the rights to access the key of the
1383            # peers
1384            if aid:
1385                for h in (myname, desthost):
1386                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
1387            else:
1388                self.log.error("No aid for %s in new_portal_node" % st)
1389        else:
1390            info = None
1391
1392        return (topdl.Computer(
1393                name=myname,
1394                attribute=[ 
1395                    topdl.Attribute(attribute=n,value=v)
1396                        for n, v in (\
1397                            ('portal', 'true'),
1398                            ('portal_type', portal_type), 
1399                        )
1400                    ],
1401                interface=ifaces,
1402                ), info)
1403
1404    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
1405        ddomain = tbparams[dt].get('domain', ".example.com")
1406        dproject = tbparams[dt].get('project', 'project')
1407        tsubstrate = \
1408                topdl.Substrate(name='%s-%s' % (st, dt),
1409                        attribute= [
1410                            topdl.Attribute(
1411                                attribute='portal',
1412                                value='true')
1413                            ]
1414                        )
1415        segment_element = topdl.Segment(
1416                id= tbparams[dt]['allocID'],
1417                type='emulab',
1418                uri = self.tbmap.get(dt, None),
1419                interface=[ 
1420                    topdl.Interface(
1421                        substrate=tsubstrate.name),
1422                    ],
1423                attribute = [
1424                    topdl.Attribute(attribute=n, value=v)
1425                        for n, v in (\
1426                            ('domain', ddomain),
1427                            ('experiment', "%s/%s" % \
1428                                    (dproject, eid)),)
1429                    ],
1430                )
1431
1432        return (tsubstrate, segment_element)
1433
1434    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
1435        if sub.capacity is None:
1436            raise service_error(service_error.internal,
1437                    "Cannot DRAGON split substrate w/o capacity")
1438        segs = [ ]
1439        substr = topdl.Substrate(name="dragon%d" % idx, 
1440                capacity=sub.capacity.clone(),
1441                attribute=[ topdl.Attribute(attribute=n, value=v)
1442                    for n, v, in (\
1443                            ('vlan', 'unassigned%d' % idx),)])
1444        name = "dragon%d" % idx
1445        store_key = 'fedid:%s/vlan%d' % (expid, idx)
1446        for tb in tbs.keys():
1447            seg = topdl.Segment(
1448                    id = tbparams[tb]['allocID'],
1449                    type='emulab',
1450                    uri = self.tbmap.get(tb, None),
1451                    interface=[ 
1452                        topdl.Interface(
1453                            substrate=substr.name),
1454                        ],
1455                    attribute=[ topdl.Attribute(
1456                        attribute='dragon_endpoint', 
1457                        value=tbparams[tb]['dragon']),
1458                        ]
1459                    )
1460            if tbparams[tb].has_key('vlans'):
1461                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1462            segs.append(seg)
1463
1464            # Give this allocation the rights to access the key of the
1465            # vlan_id
1466            try:
1467                aid = tbparams[tb]['allocID']['fedid']
1468                self.auth.set_attribute(aid, store_key)
1469            except:
1470                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
1471                        % tb)
1472
1473        connInfo[name] = [ { 
1474            'type': 'transit',
1475            'parameter': [ { 
1476                'name': 'vlan_id',
1477                'key': store_key,
1478                'store': self.store_url,
1479                'type': 'output'
1480                } ]
1481            } ]
1482
1483        topo[name] = \
1484                topdl.Topology(substrates=[substr], elements=segs,
1485                        attribute=[
1486                            topdl.Attribute(attribute="transit", value='true'),
1487                            topdl.Attribute(attribute="dynamic", value='true'),
1488                            topdl.Attribute(attribute="testbed", 
1489                                value='dragon'),
1490                            topdl.Attribute(attribute="store_keys", 
1491                                value=store_key),
1492                            ]
1493                        )
1494
1495    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
1496            connInfo, expid=None):
1497        """
1498        Add attribiutes to the various elements indicating that they are to be
1499        dragon connected and create a dragon segment in topo to be
1500        instantiated.
1501        """
1502
1503        def get_substrate_from_topo(name, t):
1504            for s in t.substrates:
1505                if s.name == name: return s
1506            else: return None
1507
1508
1509        mdomain = tbparams[master].get('domain', '.example.com')
1510        mproject = tbparams[master].get('project', 'project')
1511        # dn is the number of previously created dragon nets.  This routine
1512        # creates a net numbered by dn
1513        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1514        # Count the number of interfaces on this substrate in each testbed from
1515        # the global topology
1516        count = { }
1517        node = { }
1518        for e in [ i.element for i in sub.interfaces ]:
1519            tb = e.get_attribute('testbed')
1520            count[tb] = count.get(tb, 0) + 1
1521            node[tb] = i.get_attribute('ip4_address')
1522
1523
1524        # Set the attributes in the copies that will allow setup of dragon
1525        # connections.
1526        for tb in tbs.keys():
1527            s = get_substrate_from_topo(sub.name, topo[tb])
1528            if s:
1529                if not connInfo.has_key(tb):
1530                    connInfo[tb] = [ ]
1531
1532                try:
1533                    aid = tbparams[tb]['allocID']['fedid']
1534                except:
1535                    self.log.debug("[creat_dragon_substrate] " + 
1536                            "Can't get alloc id for %s?" %tb)
1537                    aid = None
1538
1539                # This may need another look, but only a service gateway will
1540                # look at the active parameter, and these are only inserted to
1541                # connect to the master.
1542                active = "%s" % ( tb == master)
1543                info = {
1544                        'type': 'transit',
1545                        'member': [ {
1546                            'element': i.element.name[0], 
1547                            'interface': i.name
1548                            } for i in s.interfaces \
1549                                    if isinstance(i.element, topdl.Computer) ],
1550                        'fedAttr': [ 
1551                            { 'attribute': 'masterdomain', 'value': mdomain},
1552                            { 'attribute': 'masterexperiment', 'value': 
1553                                "%s/%s" % (mproject, eid)},
1554                            { 'attribute': 'active', 'value': active},
1555                            ],
1556                        'parameter': [ {
1557                            'name': 'vlan_id',
1558                            'key': 'fedid:%s/vlan%d' % (expid, dn),
1559                            'store': self.store_url,
1560                            'type': 'input',
1561                            } ]
1562                        }
1563                if tbs.has_key(tb):
1564                    info['peer'] = tbs[tb]
1565                connInfo[tb].append(info)
1566
1567                # Give this allocation the rights to access the key of the
1568                # vlan_id
1569                if aid:
1570                    self.auth.set_attribute(aid, 
1571                            'fedid:%s/vlan%d' % (expid, dn))
1572            else:
1573                raise service_error(service_error.internal,
1574                        "No substrate %s in testbed %s" % (sub.name, tb))
1575
1576        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
1577
1578    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1579            segment_substrate, portals, connInfo, expid):
1580        # More than one testbed is on this substrate.  Insert
1581        # some portals into the subtopologies.  st == source testbed,
1582        # dt == destination testbed.
1583        for st in tbs.keys():
1584            if not segment_substrate.has_key(st):
1585                segment_substrate[st] = { }
1586            if not portals.has_key(st): 
1587                portals[st] = { }
1588            if not connInfo.has_key(st):
1589                connInfo[st] = [ ]
1590            for dt in [ t for t in tbs.keys() if t != st]:
1591                sproject = tbparams[st].get('project', 'project')
1592                dproject = tbparams[dt].get('project', 'project')
1593                mproject = tbparams[master].get('project', 'project')
1594                sdomain = tbparams[st].get('domain', ".example.com")
1595                ddomain = tbparams[dt].get('domain', ".example.com")
1596                mdomain = tbparams[master].get('domain', '.example.com')
1597                muser = tbparams[master].get('user', 'root')
1598                smbshare = tbparams[master].get('smbshare', 'USERS')
1599                aid = tbparams[dt]['allocID']['fedid']
1600                if st == master or dt == master:
1601                    active = ("%s" % (st == master))
1602                else:
1603                    active = ("%s" %(st > dt))
1604                if not segment_substrate[st].has_key(dt):
1605                    # Put a substrate and a segment for the connected
1606                    # testbed in there.
1607                    tsubstrate, segment_element = \
1608                            self.new_portal_substrate(st, dt, eid, tbparams,
1609                                    expid)
1610                    segment_substrate[st][dt] = tsubstrate
1611                    topo[st].substrates.append(tsubstrate)
1612                    topo[st].elements.append(segment_element)
1613
1614                new_portal = False
1615                if portals[st].has_key(dt):
1616                    # There's a portal set up to go to this destination.
1617                    # See if there's room to multiplex this connection on
1618                    # it.  If so, add an interface to the portal; if not,
1619                    # set up to add a portal below.
1620                    # [This little festival of braces is just a pop of the
1621                    # last element in the list of portals between st and
1622                    # dt.]
1623                    portal = portals[st][dt][-1]
1624                    mux = len([ i for i in portal.interface \
1625                            if not i.get_attribute('portal')])
1626                    if mux == self.muxmax:
1627                        new_portal = True
1628                        portal_type = "experiment"
1629                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1630                        desthost = "%stunnel%d" % (st.lower(), 
1631                                len(portals[st][dt]))
1632                    else:
1633                        new_i = topdl.Interface(
1634                                substrate=sub.name,
1635                                attribute=[ 
1636                                    topdl.Attribute(
1637                                        attribute='ip4_address', 
1638                                        value=tbs[dt]
1639                                    )
1640                                ])
1641                        portal.interface.append(new_i)
1642                else:
1643                    # First connection to this testbed, make an empty list
1644                    # and set up to add the new portal below
1645                    new_portal = True
1646                    portals[st][dt] = [ ]
1647                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1648                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
1649
1650                    if dt == master or st == master: portal_type = "both"
1651                    else: portal_type = "experiment"
1652
1653                if new_portal:
1654                    infs = (
1655                            (segment_substrate[st][dt].name, 
1656                                (('portal', 'true'),)),
1657                            (sub.name, 
1658                                (('ip4_address', tbs[dt]),))
1659                        )
1660                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
1661                            master, eid, myname, desthost, portal_type,
1662                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
1663                    if self.fedkit:
1664                        self.add_kit(portal, self.fedkit)
1665                    if self.gatewaykit: 
1666                        self.add_kit(portal, self.gatewaykit)
1667
1668                    topo[st].elements.append(portal)
1669                    portals[st][dt].append(portal)
1670                    connInfo[st].append(info)
1671
1672    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo, expid):
1673        # Add to the master testbed
1674        tsubstrate, segment_element = \
1675                self.new_portal_substrate(st, dt, eid, tbparams, expid)
1676        myname = "%stunnel" % dt
1677        desthost = "%stunnel" % st
1678
1679        portal, info = self.new_portal_node(st, dt, tbparams, master,
1680                eid, myname, desthost, "control", 
1681                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
1682                conn_attrs=[], expid=expid)
1683        if self.fedkit:
1684            self.add_kit(portal, self.fedkit)
1685        if self.gatewaykit: 
1686            self.add_kit(portal, self.gatewaykit)
1687
1688        topo[st].substrates.append(tsubstrate)
1689        topo[st].elements.append(segment_element)
1690        topo[st].elements.append(portal)
1691        if not connInfo.has_key(st):
1692            connInfo[st] = [ ]
1693        connInfo[st].append(info)
1694
1695    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1696            substrate, tbparams, expid):
1697        # Add to the master testbed
1698        myname = "%stunnel" % dt
1699        desthost = "%s" % ip_addr(dip)
1700
1701        portal, info = self.new_portal_node(st, dt, tbparams, master,
1702                eid, myname, desthost, "control", 
1703                ((substrate.name,(
1704                    ('portal','true'),
1705                    ('ip4_address', "%s" % ip_addr(myip)),)),),
1706                conn_type="transit", conn_attrs=[], expid=expid)
1707        if self.fedkit:
1708            self.add_kit(portal, self.fedkit)
1709        if self.gatewaykit: 
1710            self.add_kit(portal, self.gatewaykit)
1711
1712        return portal
1713
1714    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
1715            connInfo, expid):
1716        """
1717        For each substrate in the main topology, find those that
1718        have nodes on more than one testbed.  Insert portal nodes
1719        into the copies of those substrates on the sub topologies.
1720        """
1721        segment_substrate = { }
1722        portals = { }
1723        for s in top.substrates:
1724            # tbs will contain an ip address on this subsrate that is in
1725            # each testbed.
1726            tbs = { }
1727            for i in s.interfaces:
1728                e = i.element
1729                tb = e.get_attribute('testbed')
1730                if tb and not tbs.has_key(tb):
1731                    for i in e.interface:
1732                        if s in i.subs:
1733                            tbs[tb]= i.get_attribute('ip4_address')
1734            if len(tbs) < 2:
1735                continue
1736
1737            # DRAGON will not create multi-site vlans yet
1738            if len(tbs) == 2 and \
1739                    all([tbparams[x].has_key('dragon') for x in tbs]):
1740                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1741                        master, eid, connInfo, expid)
1742            else:
1743                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1744                        eid, segment_substrate, portals, connInfo, expid)
1745
1746        # Make sure that all the slaves have a control portal back to the
1747        # master.
1748        for tb in [ t for t in tbparams.keys() if t != master ]:
1749            if len([e for e in topo[tb].elements \
1750                    if isinstance(e, topdl.Computer) and \
1751                    e.get_attribute('portal') and \
1752                    e.get_attribute('portal_type') == 'both']) == 0:
1753
1754                if tbparams[master].has_key('dragon') \
1755                        and tbparams[tb].has_key('dragon'):
1756
1757                    idx = len([x for x in topo.keys() \
1758                            if x.startswith('dragon')])
1759                    dip, leng = ip_allocator.allocate(4)
1760                    dip += 1
1761                    mip = dip+1
1762                    csub = topdl.Substrate(
1763                            name="dragon-control-%s" % tb,
1764                            capacity=topdl.Capacity(100000.0, 'max'),
1765                            attribute=[
1766                                topdl.Attribute(
1767                                    attribute='portal',
1768                                    value='true'
1769                                    )
1770                                ]
1771                            )
1772                    seg = topdl.Segment(
1773                            id= tbparams[master]['allocID'],
1774                            type='emulab',
1775                            uri = self.tbmap.get(master, None),
1776                            interface=[ 
1777                                topdl.Interface(
1778                                    substrate=csub.name),
1779                                ],
1780                            attribute = [
1781                                topdl.Attribute(attribute=n, value=v)
1782                                    for n, v in (\
1783                                        ('domain', 
1784                                            tbparams[master].get('domain',
1785                                                ".example.com")),
1786                                        ('experiment', "%s/%s" % \
1787                                                (tbparams[master].get(
1788                                                    'project', 
1789                                                    'project'), 
1790                                                    eid)),)
1791                                ],
1792                            )
1793                    portal = self.new_dragon_portal(tb, master,
1794                            master, eid, dip, mip, idx, csub, tbparams, expid)
1795                    topo[tb].substrates.append(csub)
1796                    topo[tb].elements.append(portal)
1797                    topo[tb].elements.append(seg)
1798
1799                    mcsub = csub.clone()
1800                    seg = topdl.Segment(
1801                            id= tbparams[tb]['allocID'],
1802                            type='emulab',
1803                            uri = self.tbmap.get(tb, None),
1804                            interface=[ 
1805                                topdl.Interface(
1806                                    substrate=csub.name),
1807                                ],
1808                            attribute = [
1809                                topdl.Attribute(attribute=n, value=v)
1810                                    for n, v in (\
1811                                        ('domain', 
1812                                            tbparams[tb].get('domain',
1813                                                ".example.com")),
1814                                        ('experiment', "%s/%s" % \
1815                                                (tbparams[tb].get('project', 
1816                                                    'project'), 
1817                                                    eid)),)
1818                                ],
1819                            )
1820                    portal = self.new_dragon_portal(master, tb, master,
1821                            eid, mip, dip, idx, mcsub, tbparams, expid)
1822                    topo[master].substrates.append(mcsub)
1823                    topo[master].elements.append(portal)
1824                    topo[master].elements.append(seg)
1825                    for t in (master, tb):
1826                        topo[t].incorporate_elements()
1827
1828                    self.create_dragon_substrate(csub, topo, 
1829                            {tb: ip_addr(mip), master: ip_addr(dip)}, 
1830                            tbparams, master, eid, connInfo,
1831                            expid)
1832                else:
1833                    self.add_control_portal(master, tb, master, eid, topo, 
1834                            tbparams, connInfo, expid)
1835                    self.add_control_portal(tb, master, master, eid, topo, 
1836                            tbparams, connInfo, expid)
1837
1838        # Connect the portal nodes into the topologies and clear out
1839        # substrates that are not in the topologies
1840        for tb in tbparams.keys():
1841            topo[tb].incorporate_elements()
1842            topo[tb].substrates = \
1843                    [s for s in topo[tb].substrates \
1844                        if len(s.interfaces) >0]
1845
1846    def wrangle_software(self, expid, top, topo, tbparams):
1847        """
1848        Copy software out to the repository directory, allocate permissions and
1849        rewrite the segment topologies to look for the software in local
1850        places.
1851        """
1852
1853        # Copy the rpms and tarfiles to a distribution directory from
1854        # which the federants can retrieve them
1855        linkpath = "%s/software" %  expid
1856        softdir ="%s/%s" % ( self.repodir, linkpath)
1857        softmap = { }
1858        # These are in a list of tuples format (each kit).  This comprehension
1859        # unwraps them into a single list of tuples that initilaizes the set of
1860        # tuples.
1861        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1862                for p, t in l ])
1863        pkgs.update([x.location for e in top.elements \
1864                for x in e.software])
1865        try:
1866            os.makedirs(softdir)
1867        except IOError, e:
1868            raise service_error(
1869                    "Cannot create software directory: %s" % e)
1870        # The actual copying.  Everything's converted into a url for copying.
1871        for pkg in pkgs:
1872            loc = pkg
1873
1874            scheme, host, path = urlparse(loc)[0:3]
1875            dest = os.path.basename(path)
1876            if not scheme:
1877                if not loc.startswith('/'):
1878                    loc = "/%s" % loc
1879                loc = "file://%s" %loc
1880            try:
1881                u = urlopen(loc)
1882            except Exception, e:
1883                raise service_error(service_error.req, 
1884                        "Cannot open %s: %s" % (loc, e))
1885            try:
1886                f = open("%s/%s" % (softdir, dest) , "w")
1887                self.log.debug("Writing %s/%s" % (softdir,dest) )
1888                data = u.read(4096)
1889                while data:
1890                    f.write(data)
1891                    data = u.read(4096)
1892                f.close()
1893                u.close()
1894            except Exception, e:
1895                raise service_error(service_error.internal,
1896                        "Could not copy %s: %s" % (loc, e))
1897            path = re.sub("/tmp", "", linkpath)
1898            # XXX
1899            softmap[pkg] = \
1900                    "%s/%s/%s" %\
1901                    ( self.repo_url, path, dest)
1902
1903            # Allow the individual segments to access the software.
1904            for tb in tbparams.keys():
1905                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1906                        "/%s/%s" % ( path, dest))
1907
1908        # Convert the software locations in the segments into the local
1909        # copies on this host
1910        for soft in [ s for tb in topo.values() \
1911                for e in tb.elements \
1912                    if getattr(e, 'software', False) \
1913                        for s in e.software ]:
1914            if softmap.has_key(soft.location):
1915                soft.location = softmap[soft.location]
1916
1917
1918    def new_experiment(self, req, fid):
1919        """
1920        The external interface to empty initial experiment creation called from
1921        the dispatcher.
1922
1923        Creates a working directory, splits the incoming description using the
1924        splitter script and parses out the avrious subsections using the
1925        lcasses above.  Once each sub-experiment is created, use pooled threads
1926        to instantiate them and start it all up.
1927        """
1928        if not self.auth.check_attribute(fid, 'new'):
1929            raise service_error(service_error.access, "New access denied")
1930
1931        try:
1932            tmpdir = tempfile.mkdtemp(prefix="split-")
1933        except IOError:
1934            raise service_error(service_error.internal, "Cannot create tmp dir")
1935
1936        try:
1937            access_user = self.accessdb[fid]
1938        except KeyError:
1939            raise service_error(service_error.internal,
1940                    "Access map and authorizer out of sync in " + \
1941                            "new_experiment for fedid %s"  % fid)
1942
1943        pid = "dummy"
1944        gid = "dummy"
1945
1946        req = req.get('NewRequestBody', None)
1947        if not req:
1948            raise service_error(service_error.req,
1949                    "Bad request format (no NewRequestBody)")
1950
1951        # Generate an ID for the experiment (slice) and a certificate that the
1952        # allocator can use to prove they own it.  We'll ship it back through
1953        # the encrypted connection.
1954        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1955
1956        #now we're done with the tmpdir, and it should be empty
1957        if self.cleanup:
1958            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1959            os.rmdir(tmpdir)
1960        else:
1961            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1962
1963        eid = self.create_experiment_state(fid, req, expid, expcert, 
1964                state='empty')
1965
1966        # Let users touch the state
1967        self.auth.set_attribute(fid, expid)
1968        self.auth.set_attribute(expid, expid)
1969        # Override fedids can manipulate state as well
1970        for o in self.overrides:
1971            self.auth.set_attribute(o, expid)
1972
1973        rv = {
1974                'experimentID': [
1975                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1976                ],
1977                'experimentStatus': 'empty',
1978                'experimentAccess': { 'X509' : expcert }
1979            }
1980
1981        return rv
1982
1983
1984    def create_experiment(self, req, fid):
1985        """
1986        The external interface to experiment creation called from the
1987        dispatcher.
1988
1989        Creates a working directory, splits the incoming description using the
1990        splitter script and parses out the avrious subsections using the
1991        lcasses above.  Once each sub-experiment is created, use pooled threads
1992        to instantiate them and start it all up.
1993        """
1994
1995        req = req.get('CreateRequestBody', None)
1996        if not req:
1997            raise service_error(service_error.req,
1998                    "Bad request format (no CreateRequestBody)")
1999
2000        # Get the experiment access
2001        exp = req.get('experimentID', None)
2002        if exp:
2003            if exp.has_key('fedid'):
2004                key = exp['fedid']
2005                expid = key
2006                eid = None
2007            elif exp.has_key('localname'):
2008                key = exp['localname']
2009                eid = key
2010                expid = None
2011            else:
2012                raise service_error(service_error.req, "Unknown lookup type")
2013        else:
2014            raise service_error(service_error.req, "No request?")
2015
2016        self.check_experiment_access(fid, key)
2017
2018        try:
2019            tmpdir = tempfile.mkdtemp(prefix="split-")
2020            os.mkdir(tmpdir+"/keys")
2021        except IOError:
2022            raise service_error(service_error.internal, "Cannot create tmp dir")
2023
2024        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2025        gw_secretkey_base = "fed.%s" % self.ssh_type
2026        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2027        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2028        tclfile = tmpdir + "/experiment.tcl"
2029        tbparams = { }
2030        try:
2031            access_user = self.accessdb[fid]
2032        except KeyError:
2033            raise service_error(service_error.internal,
2034                    "Access map and authorizer out of sync in " + \
2035                            "create_experiment for fedid %s"  % fid)
2036
2037        pid = "dummy"
2038        gid = "dummy"
2039
2040        # The tcl parser needs to read a file so put the content into that file
2041        descr=req.get('experimentdescription', None)
2042        if descr:
2043            file_content=descr.get('ns2description', None)
2044            if file_content:
2045                try:
2046                    f = open(tclfile, 'w')
2047                    f.write(file_content)
2048                    f.close()
2049                except IOError:
2050                    raise service_error(service_error.internal,
2051                            "Cannot write temp experiment description")
2052            else:
2053                raise service_error(service_error.req, 
2054                        "Only ns2descriptions supported")
2055        else:
2056            raise service_error(service_error.req, "No experiment description")
2057
2058        self.state_lock.acquire()
2059        if self.state.has_key(key):
2060            self.state[key]['experimentStatus'] = "starting"
2061            for e in self.state[key].get('experimentID',[]):
2062                if not expid and e.has_key('fedid'):
2063                    expid = e['fedid']
2064                elif not eid and e.has_key('localname'):
2065                    eid = e['localname']
2066        self.state_lock.release()
2067
2068        if not (eid and expid):
2069            raise service_error(service_error.internal, 
2070                    "Cannot find local experiment info!?")
2071
2072        try: 
2073            # This catches exceptions to clear the placeholder if necessary
2074            try:
2075                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2076            except ValueError:
2077                raise service_error(service_error.server_config, 
2078                        "Bad key type (%s)" % self.ssh_type)
2079
2080            master = req.get('master', None)
2081            if not master:
2082                raise service_error(service_error.req,
2083                        "No master testbed label")
2084            export_project = req.get('exportProject', None)
2085            if not export_project:
2086                raise service_error(service_error.req, "No export project")
2087           
2088            # Translate to topdl
2089            if self.splitter_url:
2090                # XXX: need remote topdl translator
2091                self.log.debug("Calling remote splitter at %s" % \
2092                        self.splitter_url)
2093                split_data = self.remote_splitter(self.splitter_url,
2094                        file_content, master)
2095            else:
2096                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2097                    str(self.muxmax), '-m', master]
2098
2099                if self.fedkit:
2100                    tclcmd.append('-k')
2101
2102                if self.gatewaykit:
2103                    tclcmd.append('-K')
2104
2105                tclcmd.extend([pid, gid, eid, tclfile])
2106
2107                self.log.debug("running local splitter %s", " ".join(tclcmd))
2108                # This is just fantastic.  As a side effect the parser copies
2109                # tb_compat.tcl into the current directory, so that directory
2110                # must be writable by the fedd user.  Doing this in the
2111                # temporary subdir ensures this is the case.
2112                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2113                        cwd=tmpdir)
2114                split_data = tclparser.stdout
2115
2116            top = topdl.topology_from_xml(file=split_data, top="experiment")
2117
2118            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2119             # Find the testbeds to look up
2120            testbeds = set([ a.value for e in top.elements \
2121                    for a in e.attribute \
2122                    if a.attribute == 'testbed'] )
2123
2124            allocated = { }         # Testbeds we can access
2125            topo ={ }               # Sub topologies
2126            connInfo = { }          # Connection information
2127            services = [ ]
2128            self.get_access_to_testbeds(testbeds, access_user, 
2129                    export_project, master, allocated, tbparams, services)
2130            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2131
2132            # Copy configuration files into the remote file store
2133            # The config urlpath
2134            configpath = "/%s/config" % expid
2135            # The config file system location
2136            configdir ="%s%s" % ( self.repodir, configpath)
2137            try:
2138                os.makedirs(configdir)
2139            except IOError, e:
2140                raise service_error(
2141                        "Cannot create config directory: %s" % e)
2142            try:
2143                f = open("%s/hosts" % configdir, "w")
2144                f.write('\n'.join(hosts))
2145                f.close()
2146            except IOError, e:
2147                raise service_error(service_error.internal, 
2148                        "Cannot write hosts file: %s" % e)
2149            try:
2150                copy_file("%s" % gw_pubkey, "%s/%s" % \
2151                        (configdir, gw_pubkey_base))
2152                copy_file("%s" % gw_secretkey, "%s/%s" % \
2153                        (configdir, gw_secretkey_base))
2154            except IOError, e:
2155                raise service_error(service_error.internal, 
2156                        "Cannot copy keyfiles: %s" % e)
2157
2158            # Allow the individual testbeds to access the configuration files.
2159            for tb in tbparams.keys():
2160                asignee = tbparams[tb]['allocID']['fedid']
2161                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2162                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2163
2164            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
2165                    connInfo, expid)
2166            # Now get access to the dynamic testbeds
2167            for k, t in topo.items():
2168                if not t.get_attribute('dynamic'):
2169                    continue
2170                tb = t.get_attribute('testbed')
2171                if tb: 
2172                    self.get_access(tb, None, tbparams, master, 
2173                            export_project, access_user, services)
2174                    tbparams[k] = tbparams[tb]
2175                    del tbparams[tb]
2176                    allocated[k] = 1
2177                    store_keys = t.get_attribute('store_keys')
2178                    # Give the testbed access to keys it exports or imports
2179                    if store_keys:
2180                        for sk in store_keys.split(" "):
2181                            self.auth.set_attribute(\
2182                                    tbparams[k]['allocID']['fedid'], sk)
2183                else:
2184                    raise service_error(service_error.internal, 
2185                            "Dynamic allocation from no testbed!?")
2186
2187            self.wrangle_software(expid, top, topo, tbparams)
2188
2189            vtopo = topdl.topology_to_vtopo(top)
2190            vis = self.genviz(vtopo)
2191
2192            # save federant information
2193            for k in allocated.keys():
2194                tbparams[k]['federant'] = {
2195                        'name': [ { 'localname' : eid} ],
2196                        'allocID' : tbparams[k]['allocID'],
2197                        'master' : k == master,
2198                        'uri': tbparams[k]['uri'],
2199                    }
2200                if tbparams[k].has_key('emulab'):
2201                        tbparams[k]['federant']['emulab'] = \
2202                                tbparams[k]['emulab']
2203
2204            self.state_lock.acquire()
2205            self.state[eid]['vtopo'] = vtopo
2206            self.state[eid]['vis'] = vis
2207            self.state[expid]['federant'] = \
2208                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2209                        if tbparams[tb].has_key('federant') ]
2210            if self.state_filename: 
2211                self.write_state()
2212            self.state_lock.release()
2213        except service_error, e:
2214            # If something goes wrong in the parse (usually an access error)
2215            # clear the placeholder state.  From here on out the code delays
2216            # exceptions.  Failing at this point returns a fault to the remote
2217            # caller.
2218
2219            self.state_lock.acquire()
2220            del self.state[eid]
2221            del self.state[expid]
2222            if self.state_filename: self.write_state()
2223            self.state_lock.release()
2224            raise e
2225
2226
2227        # Start the background swapper and return the starting state.  From
2228        # here on out, the state will stick around a while.
2229
2230        # Let users touch the state
2231        self.auth.set_attribute(fid, expid)
2232        self.auth.set_attribute(expid, expid)
2233        # Override fedids can manipulate state as well
2234        for o in self.overrides:
2235            self.auth.set_attribute(o, expid)
2236
2237        # Create a logger that logs to the experiment's state object as well as
2238        # to the main log file.
2239        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2240        alloc_collector = self.list_log(self.state[eid]['log'])
2241        h = logging.StreamHandler(alloc_collector)
2242        # XXX: there should be a global one of these rather than repeating the
2243        # code.
2244        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2245                    '%d %b %y %H:%M:%S'))
2246        alloc_log.addHandler(h)
2247       
2248        attrs = [ 
2249                {
2250                    'attribute': 'ssh_pubkey', 
2251                    'value': '%s/%s/config/%s' % \
2252                            (self.repo_url, expid, gw_pubkey_base)
2253                },
2254                {
2255                    'attribute': 'ssh_secretkey', 
2256                    'value': '%s/%s/config/%s' % \
2257                            (self.repo_url, expid, gw_secretkey_base)
2258                },
2259                {
2260                    'attribute': 'hosts', 
2261                    'value': '%s/%s/config/hosts' % \
2262                            (self.repo_url, expid)
2263                },
2264                {
2265                    'attribute': 'experiment_name',
2266                    'value': eid,
2267                },
2268            ]
2269
2270        # transit and disconnected testbeds may not have a connInfo entry.
2271        # Fill in the blanks.
2272        for t in allocated.keys():
2273            if not connInfo.has_key(t):
2274                connInfo[t] = { }
2275
2276        # Start a thread to do the resource allocation
2277        t  = Thread(target=self.allocate_resources,
2278                args=(allocated, master, eid, expid, tbparams, 
2279                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
2280                    services),
2281                name=eid)
2282        t.start()
2283
2284        rv = {
2285                'experimentID': [
2286                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2287                ],
2288                'experimentStatus': 'starting',
2289            }
2290
2291        return rv
2292   
2293    def get_experiment_fedid(self, key):
2294        """
2295        find the fedid associated with the localname key in the state database.
2296        """
2297
2298        rv = None
2299        self.state_lock.acquire()
2300        if self.state.has_key(key):
2301            if isinstance(self.state[key], dict):
2302                try:
2303                    kl = [ f['fedid'] for f in \
2304                            self.state[key]['experimentID']\
2305                                if f.has_key('fedid') ]
2306                except KeyError:
2307                    self.state_lock.release()
2308                    raise service_error(service_error.internal, 
2309                            "No fedid for experiment %s when getting "+\
2310                                    "fedid(!?)" % key)
2311                if len(kl) == 1:
2312                    rv = kl[0]
2313                else:
2314                    self.state_lock.release()
2315                    raise service_error(service_error.internal, 
2316                            "multiple fedids for experiment %s when " +\
2317                                    "getting fedid(!?)" % key)
2318            else:
2319                self.state_lock.release()
2320                raise service_error(service_error.internal, 
2321                        "Unexpected state for %s" % key)
2322        self.state_lock.release()
2323        return rv
2324
2325    def check_experiment_access(self, fid, key):
2326        """
2327        Confirm that the fid has access to the experiment.  Though a request
2328        may be made in terms of a local name, the access attribute is always
2329        the experiment's fedid.
2330        """
2331        if not isinstance(key, fedid):
2332            key = self.get_experiment_fedid(key)
2333
2334        if self.auth.check_attribute(fid, key):
2335            return True
2336        else:
2337            raise service_error(service_error.access, "Access Denied")
2338
2339
2340    def get_handler(self, path, fid):
2341        self.log.info("Get handler %s %s" % (path, fid))
2342        if self.auth.check_attribute(fid, path):
2343            return ("%s/%s" % (self.repodir, path), "application/binary")
2344        else:
2345            return (None, None)
2346
2347    def get_vtopo(self, req, fid):
2348        """
2349        Return the stored virtual topology for this experiment
2350        """
2351        rv = None
2352        state = None
2353
2354        req = req.get('VtopoRequestBody', None)
2355        if not req:
2356            raise service_error(service_error.req,
2357                    "Bad request format (no VtopoRequestBody)")
2358        exp = req.get('experiment', None)
2359        if exp:
2360            if exp.has_key('fedid'):
2361                key = exp['fedid']
2362                keytype = "fedid"
2363            elif exp.has_key('localname'):
2364                key = exp['localname']
2365                keytype = "localname"
2366            else:
2367                raise service_error(service_error.req, "Unknown lookup type")
2368        else:
2369            raise service_error(service_error.req, "No request?")
2370
2371        self.check_experiment_access(fid, key)
2372
2373        self.state_lock.acquire()
2374        if self.state.has_key(key):
2375            if self.state[key].has_key('vtopo'):
2376                rv = { 'experiment' : {keytype: key },\
2377                        'vtopo': self.state[key]['vtopo'],\
2378                    }
2379            else:
2380                state = self.state[key]['experimentStatus']
2381        self.state_lock.release()
2382
2383        if rv: return rv
2384        else: 
2385            if state:
2386                raise service_error(service_error.partial, 
2387                        "Not ready: %s" % state)
2388            else:
2389                raise service_error(service_error.req, "No such experiment")
2390
2391    def get_vis(self, req, fid):
2392        """
2393        Return the stored visualization for this experiment
2394        """
2395        rv = None
2396        state = None
2397
2398        req = req.get('VisRequestBody', None)
2399        if not req:
2400            raise service_error(service_error.req,
2401                    "Bad request format (no VisRequestBody)")
2402        exp = req.get('experiment', None)
2403        if exp:
2404            if exp.has_key('fedid'):
2405                key = exp['fedid']
2406                keytype = "fedid"
2407            elif exp.has_key('localname'):
2408                key = exp['localname']
2409                keytype = "localname"
2410            else:
2411                raise service_error(service_error.req, "Unknown lookup type")
2412        else:
2413            raise service_error(service_error.req, "No request?")
2414
2415        self.check_experiment_access(fid, key)
2416
2417        self.state_lock.acquire()
2418        if self.state.has_key(key):
2419            if self.state[key].has_key('vis'):
2420                rv =  { 'experiment' : {keytype: key },\
2421                        'vis': self.state[key]['vis'],\
2422                        }
2423            else:
2424                state = self.state[key]['experimentStatus']
2425        self.state_lock.release()
2426
2427        if rv: return rv
2428        else:
2429            if state:
2430                raise service_error(service_error.partial, 
2431                        "Not ready: %s" % state)
2432            else:
2433                raise service_error(service_error.req, "No such experiment")
2434
2435    def clean_info_response(self, rv):
2436        """
2437        Remove the information in the experiment's state object that is not in
2438        the info response.
2439        """
2440        # Remove the owner info (should always be there, but...)
2441        if rv.has_key('owner'): del rv['owner']
2442
2443        # Convert the log into the allocationLog parameter and remove the
2444        # log entry (with defensive programming)
2445        if rv.has_key('log'):
2446            rv['allocationLog'] = "".join(rv['log'])
2447            del rv['log']
2448        else:
2449            rv['allocationLog'] = ""
2450
2451        if rv['experimentStatus'] != 'active':
2452            if rv.has_key('federant'): del rv['federant']
2453        else:
2454            # remove the allocationID and uri info from each federant
2455            for f in rv.get('federant', []):
2456                if f.has_key('allocID'): del f['allocID']
2457                if f.has_key('uri'): del f['uri']
2458        return rv
2459
2460    def get_info(self, req, fid):
2461        """
2462        Return all the stored info about this experiment
2463        """
2464        rv = None
2465
2466        req = req.get('InfoRequestBody', None)
2467        if not req:
2468            raise service_error(service_error.req,
2469                    "Bad request format (no InfoRequestBody)")
2470        exp = req.get('experiment', None)
2471        if exp:
2472            if exp.has_key('fedid'):
2473                key = exp['fedid']
2474                keytype = "fedid"
2475            elif exp.has_key('localname'):
2476                key = exp['localname']
2477                keytype = "localname"
2478            else:
2479                raise service_error(service_error.req, "Unknown lookup type")
2480        else:
2481            raise service_error(service_error.req, "No request?")
2482
2483        self.check_experiment_access(fid, key)
2484
2485        # The state may be massaged by the service function that called
2486        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2487        # state.
2488        self.state_lock.acquire()
2489        if self.state.has_key(key):
2490            rv = copy.deepcopy(self.state[key])
2491        self.state_lock.release()
2492
2493        if rv:
2494            return self.clean_info_response(rv)
2495        else:
2496            raise service_error(service_error.req, "No such experiment")
2497
2498    def get_multi_info(self, req, fid):
2499        """
2500        Return all the stored info that this fedid can access
2501        """
2502        rv = { 'info': [ ] }
2503
2504        self.state_lock.acquire()
2505        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2506            try:
2507                self.check_experiment_access(fid, key)
2508            except service_error, e:
2509                if e.code == service_error.access:
2510                    continue
2511                else:
2512                    self.state_lock.release()
2513                    raise e
2514
2515            if self.state.has_key(key):
2516                e = copy.deepcopy(self.state[key])
2517                e = self.clean_info_response(e)
2518                rv['info'].append(e)
2519        self.state_lock.release()
2520        return rv
2521
2522    def terminate_experiment(self, req, fid):
2523        """
2524        Swap this experiment out on the federants and delete the shared
2525        information
2526        """
2527        tbparams = { }
2528        req = req.get('TerminateRequestBody', None)
2529        if not req:
2530            raise service_error(service_error.req,
2531                    "Bad request format (no TerminateRequestBody)")
2532        force = req.get('force', False)
2533        exp = req.get('experiment', None)
2534        if exp:
2535            if exp.has_key('fedid'):
2536                key = exp['fedid']
2537                keytype = "fedid"
2538            elif exp.has_key('localname'):
2539                key = exp['localname']
2540                keytype = "localname"
2541            else:
2542                raise service_error(service_error.req, "Unknown lookup type")
2543        else:
2544            raise service_error(service_error.req, "No request?")
2545
2546        self.check_experiment_access(fid, key)
2547
2548        dealloc_list = [ ]
2549
2550
2551        # Create a logger that logs to the dealloc_list as well as to the main
2552        # log file.
2553        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2554        h = logging.StreamHandler(self.list_log(dealloc_list))
2555        # XXX: there should be a global one of these rather than repeating the
2556        # code.
2557        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2558                    '%d %b %y %H:%M:%S'))
2559        dealloc_log.addHandler(h)
2560
2561        self.state_lock.acquire()
2562        fed_exp = self.state.get(key, None)
2563
2564        if fed_exp:
2565            # This branch of the conditional holds the lock to generate a
2566            # consistent temporary tbparams variable to deallocate experiments.
2567            # It releases the lock to do the deallocations and reacquires it to
2568            # remove the experiment state when the termination is complete.
2569
2570            # First make sure that the experiment creation is complete.
2571            status = fed_exp.get('experimentStatus', None)
2572
2573            if status:
2574                if status in ('starting', 'terminating'):
2575                    if not force:
2576                        self.state_lock.release()
2577                        raise service_error(service_error.partial, 
2578                                'Experiment still being created or destroyed')
2579                    else:
2580                        self.log.warning('Experiment in %s state ' % status + \
2581                                'being terminated by force.')
2582            else:
2583                # No status??? trouble
2584                self.state_lock.release()
2585                raise service_error(service_error.internal,
2586                        "Experiment has no status!?")
2587
2588            ids = []
2589            #  experimentID is a list of dicts that are self-describing
2590            #  identifiers.  This finds all the fedids and localnames - the
2591            #  keys of self.state - and puts them into ids.
2592            for id in fed_exp.get('experimentID', []):
2593                if id.has_key('fedid'): ids.append(id['fedid'])
2594                if id.has_key('localname'): ids.append(id['localname'])
2595
2596            # Collect the allocation/segment ids into a dict keyed by the fedid
2597            # of the allocation (or a monotonically increasing integer) that
2598            # contains a tuple of uri, aid (which is a dict...)
2599            for i, fed in enumerate(fed_exp.get('federant', [])):
2600                try:
2601                    uri = fed['uri']
2602                    aid = fed['allocID']
2603                    k = fed['allocID'].get('fedid', i)
2604                except KeyError, e:
2605                    continue
2606                tbparams[k] = (uri, aid)
2607            fed_exp['experimentStatus'] = 'terminating'
2608            if self.state_filename: self.write_state()
2609            self.state_lock.release()
2610
2611            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2612            # then completes, so we can't wait if nothing starts.  So, no
2613            # tbparams, no start.
2614            if len(tbparams) > 0:
2615                thread_pool = self.thread_pool(self.nthreads)
2616                for k in tbparams.keys():
2617                    # Create and start a thread to stop the segment
2618                    thread_pool.wait_for_slot()
2619                    uri, aid = tbparams[k]
2620                    t  = self.pooled_thread(\
2621                            target=self.terminate_segment(log=dealloc_log,
2622                                testbed=uri,
2623                                cert_file=self.cert_file, 
2624                                cert_pwd=self.cert_pwd,
2625                                trusted_certs=self.trusted_certs,
2626                                caller=self.call_TerminateSegment),
2627                            args=(uri, aid), name=k,
2628                            pdata=thread_pool, trace_file=self.trace_file)
2629                    t.start()
2630                # Wait for completions
2631                thread_pool.wait_for_all_done()
2632
2633            # release the allocations (failed experiments have done this
2634            # already, and starting experiments may be in odd states, so we
2635            # ignore errors releasing those allocations
2636            try: 
2637                for k in tbparams.keys():
2638                    # This releases access by uri
2639                    uri, aid = tbparams[k]
2640                    self.release_access(None, aid, uri=uri)
2641            except service_error, e:
2642                if status != 'failed' and not force:
2643                    raise e
2644
2645            # Remove the terminated experiment
2646            self.state_lock.acquire()
2647            for id in ids:
2648                if self.state.has_key(id): del self.state[id]
2649
2650            if self.state_filename: self.write_state()
2651            self.state_lock.release()
2652
2653            # Delete any synch points associated with this experiment.  All
2654            # synch points begin with the fedid of the experiment.
2655            fedid_keys = set(["fedid:%s" % f for f in ids \
2656                    if isinstance(f, fedid)])
2657            for k in self.synch_store.all_keys():
2658                try:
2659                    if len(k) > 45 and k[0:46] in fedid_keys:
2660                        self.synch_store.del_value(k)
2661                except synch_store.BadDeleteionError:
2662                    pass
2663            self.write_store()
2664               
2665            return { 
2666                    'experiment': exp , 
2667                    'deallocationLog': "".join(dealloc_list),
2668                    }
2669        else:
2670            # Don't forget to release the lock
2671            self.state_lock.release()
2672            raise service_error(service_error.req, "No saved state")
2673
2674
2675    def GetValue(self, req, fid):
2676        """
2677        Get a value from the synchronized store
2678        """
2679        req = req.get('GetValueRequestBody', None)
2680        if not req:
2681            raise service_error(service_error.req,
2682                    "Bad request format (no GetValueRequestBody)")
2683       
2684        name = req['name']
2685        wait = req['wait']
2686        rv = { 'name': name }
2687
2688        if self.auth.check_attribute(fid, name):
2689            v = self.synch_store.get_value(name, wait)
2690            if v is not None:
2691                rv['value'] = v
2692            self.log.debug("[GetValue] got %s from %s" % (v, name))
2693            return rv
2694        else:
2695            raise service_error(service_error.access, "Access Denied")
2696       
2697
2698    def SetValue(self, req, fid):
2699        """
2700        Set a value in the synchronized store
2701        """
2702        req = req.get('SetValueRequestBody', None)
2703        if not req:
2704            raise service_error(service_error.req,
2705                    "Bad request format (no SetValueRequestBody)")
2706       
2707        name = req['name']
2708        v = req['value']
2709
2710        if self.auth.check_attribute(fid, name):
2711            try:
2712                self.synch_store.set_value(name, v)
2713                self.write_store()
2714                self.log.debug("[SetValue] set %s to %s" % (name, v))
2715            except synch_store.CollisionError:
2716                # Translate into a service_error
2717                raise service_error(service_error.req,
2718                        "Value already set: %s" %name)
2719            return { 'name': name, 'value': v }
2720        else:
2721            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.