source: fedd/federation/experiment_control.py @ dadc4da

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since dadc4da was dadc4da, checked in by Ted Faber <faber@…>, 14 years ago

revocation conrtol

  • Property mode set to 100644
File size: 93.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_control_local:
45    """
46    Control of experiments that this system can directly access.
47
48    Includes experiment creation, termination and information dissemination.
49    Thred safe.
50    """
51
52    class ssh_cmd_timeout(RuntimeError): pass
53   
54    class thread_pool:
55        """
56        A class to keep track of a set of threads all invoked for the same
57        task.  Manages the mutual exclusion of the states.
58        """
59        def __init__(self, nthreads):
60            """
61            Start a pool.
62            """
63            self.changed = Condition()
64            self.started = 0
65            self.terminated = 0
66            self.nthreads = nthreads
67
68        def acquire(self):
69            """
70            Get the pool's lock.
71            """
72            self.changed.acquire()
73
74        def release(self):
75            """
76            Release the pool's lock.
77            """
78            self.changed.release()
79
80        def wait(self, timeout = None):
81            """
82            Wait for a pool thread to start or stop.
83            """
84            self.changed.wait(timeout)
85
86        def start(self):
87            """
88            Called by a pool thread to report starting.
89            """
90            self.changed.acquire()
91            self.started += 1
92            self.changed.notifyAll()
93            self.changed.release()
94
95        def terminate(self):
96            """
97            Called by a pool thread to report finishing.
98            """
99            self.changed.acquire()
100            self.terminated += 1
101            self.changed.notifyAll()
102            self.changed.release()
103
104        def clear(self):
105            """
106            Clear all pool data.
107            """
108            self.changed.acquire()
109            self.started = 0
110            self.terminated =0
111            self.changed.notifyAll()
112            self.changed.release()
113
114        def wait_for_slot(self):
115            """
116            Wait until we have a free slot to start another pooled thread
117            """
118            self.acquire()
119            while self.started - self.terminated >= self.nthreads:
120                self.wait()
121            self.release()
122
123        def wait_for_all_done(self, timeout=None):
124            """
125            Wait until all active threads finish (and at least one has
126            started).  If a timeout is given, return after waiting that long
127            for termination.  If all threads are done (and one has started in
128            the since the last clear()) return True, otherwise False.
129            """
130            if timeout:
131                deadline = time.time() + timeout
132            self.acquire()
133            while self.started == 0 or self.started > self.terminated:
134                self.wait(timeout)
135                if timeout:
136                    if time.time() > deadline:
137                        break
138                    timeout = deadline - time.time()
139            self.release()
140            return not (self.started == 0 or self.started > self.terminated)
141
142    class pooled_thread(Thread):
143        """
144        One of a set of threads dedicated to a specific task.  Uses the
145        thread_pool class above for coordination.
146        """
147        def __init__(self, group=None, target=None, name=None, args=(), 
148                kwargs={}, pdata=None, trace_file=None):
149            Thread.__init__(self, group, target, name, args, kwargs)
150            self.rv = None          # Return value of the ops in this thread
151            self.exception = None   # Exception that terminated this thread
152            self.target=target      # Target function to run on start()
153            self.args = args        # Args to pass to target
154            self.kwargs = kwargs    # Additional kw args
155            self.pdata = pdata      # thread_pool for this class
156            # Logger for this thread
157            self.log = logging.getLogger("fedd.experiment_control")
158       
159        def run(self):
160            """
161            Emulate Thread.run, except add pool data manipulation and error
162            logging.
163            """
164            if self.pdata:
165                self.pdata.start()
166
167            if self.target:
168                try:
169                    self.rv = self.target(*self.args, **self.kwargs)
170                except service_error, s:
171                    self.exception = s
172                    self.log.error("Thread exception: %s %s" % \
173                            (s.code_string(), s.desc))
174                except:
175                    self.exception = sys.exc_info()[1]
176                    self.log.error(("Unexpected thread exception: %s" +\
177                            "Trace %s") % (self.exception,\
178                                traceback.format_exc()))
179            if self.pdata:
180                self.pdata.terminate()
181
182    call_RequestAccess = service_caller('RequestAccess')
183    call_ReleaseAccess = service_caller('ReleaseAccess')
184    call_StartSegment = service_caller('StartSegment')
185    call_TerminateSegment = service_caller('TerminateSegment')
186    call_Ns2Split = service_caller('Ns2Split')
187
188    def __init__(self, config=None, auth=None):
189        """
190        Intialize the various attributes, most from the config object
191        """
192
193        def parse_tarfile_list(tf):
194            """
195            Parse a tarfile list from the configuration.  This is a set of
196            paths and tarfiles separated by spaces.
197            """
198            rv = [ ]
199            if tf is not None:
200                tl = tf.split()
201                while len(tl) > 1:
202                    p, t = tl[0:2]
203                    del tl[0:2]
204                    rv.append((p, t))
205            return rv
206
207        self.thread_with_rv = experiment_control_local.pooled_thread
208        self.thread_pool = experiment_control_local.thread_pool
209        self.list_log = list_log.list_log
210
211        self.cert_file = config.get("experiment_control", "cert_file")
212        if self.cert_file:
213            self.cert_pwd = config.get("experiment_control", "cert_pwd")
214        else:
215            self.cert_file = config.get("globals", "cert_file")
216            self.cert_pwd = config.get("globals", "cert_pwd")
217
218        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
219                or config.get("globals", "trusted_certs")
220
221        self.repodir = config.get("experiment_control", "repodir")
222        self.repo_url = config.get("experiment_control", "repo_url", 
223                "https://users.isi.deterlab.net:23235");
224
225        self.exp_stem = "fed-stem"
226        self.log = logging.getLogger("fedd.experiment_control")
227        set_log_level(config, "experiment_control", self.log)
228        self.muxmax = 2
229        self.nthreads = 2
230        self.randomize_experiments = False
231
232        self.splitter = None
233        self.ssh_keygen = "/usr/bin/ssh-keygen"
234        self.ssh_identity_file = None
235
236
237        self.debug = config.getboolean("experiment_control", "create_debug")
238        self.cleanup = not config.getboolean("experiment_control", 
239                "leave_tmpfiles")
240        self.state_filename = config.get("experiment_control", 
241                "experiment_state")
242        self.store_filename = config.get("experiment_control", 
243                "synch_store")
244        self.store_url = config.get("experiment_control", "store_url")
245        self.splitter_url = config.get("experiment_control", "splitter_uri")
246        self.fedkit = parse_tarfile_list(\
247                config.get("experiment_control", "fedkit"))
248        self.gatewaykit = parse_tarfile_list(\
249                config.get("experiment_control", "gatewaykit"))
250        accessdb_file = config.get("experiment_control", "accessdb")
251
252        self.ssh_pubkey_file = config.get("experiment_control", 
253                "ssh_pubkey_file")
254        self.ssh_privkey_file = config.get("experiment_control",
255                "ssh_privkey_file")
256        # NB for internal master/slave ops, not experiment setup
257        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
258
259        self.overrides = set([])
260        ovr = config.get('experiment_control', 'overrides')
261        if ovr:
262            for o in ovr.split(","):
263                o = o.strip()
264                if o.startswith('fedid:'): o = o[len('fedid:'):]
265                self.overrides.add(fedid(hexstr=o))
266
267        self.state = { }
268        self.state_lock = Lock()
269        self.tclsh = "/usr/local/bin/otclsh"
270        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
271                config.get("experiment_control", "tcl_splitter",
272                        "/usr/testbed/lib/ns2ir/parse.tcl")
273        mapdb_file = config.get("experiment_control", "mapdb")
274        self.trace_file = sys.stderr
275
276        self.def_expstart = \
277                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
278                "/tmp/federate";
279        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
280                "FEDDIR/hosts";
281        self.def_gwstart = \
282                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
283                "/tmp/bridge.log";
284        self.def_mgwstart = \
285                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
286                "/tmp/bridge.log";
287        self.def_gwimage = "FBSD61-TUNNEL2";
288        self.def_gwtype = "pc";
289        self.local_access = { }
290
291        if auth:
292            self.auth = auth
293        else:
294            self.log.error(\
295                    "[access]: No authorizer initialized, creating local one.")
296            auth = authorizer()
297
298
299        if self.ssh_pubkey_file:
300            try:
301                f = open(self.ssh_pubkey_file, 'r')
302                self.ssh_pubkey = f.read()
303                f.close()
304            except IOError:
305                raise service_error(service_error.internal,
306                        "Cannot read sshpubkey")
307        else:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311        if not self.ssh_privkey_file:
312            raise service_error(service_error.internal, 
313                    "No SSH public key file?")
314
315
316        if mapdb_file:
317            self.read_mapdb(mapdb_file)
318        else:
319            self.log.warn("[experiment_control] No testbed map, using defaults")
320            self.tbmap = { 
321                    'deter':'https://users.isi.deterlab.net:23235',
322                    'emulab':'https://users.isi.deterlab.net:23236',
323                    'ucb':'https://users.isi.deterlab.net:23237',
324                    }
325
326        if accessdb_file:
327                self.read_accessdb(accessdb_file)
328        else:
329            raise service_error(service_error.internal,
330                    "No accessdb specified in config")
331
332        # Grab saved state.  OK to do this w/o locking because it's read only
333        # and only one thread should be in existence that can see self.state at
334        # this point.
335        if self.state_filename:
336            self.read_state()
337
338        if self.store_filename:
339            self.read_store()
340        else:
341            self.log.warning("No saved synch store")
342            self.synch_store = synch_store
343
344        # Dispatch tables
345        self.soap_services = {\
346                'New': soap_handler('New', self.new_experiment),
347                'Create': soap_handler('Create', self.create_experiment),
348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
352                'Terminate': soap_handler('Terminate', 
353                    self.terminate_experiment),
354                'GetValue': soap_handler('GetValue', self.GetValue),
355                'SetValue': soap_handler('SetValue', self.SetValue),
356        }
357
358        self.xmlrpc_services = {\
359                'New': xmlrpc_handler('New', self.new_experiment),
360                'Create': xmlrpc_handler('Create', self.create_experiment),
361                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
362                'Vis': xmlrpc_handler('Vis', self.get_vis),
363                'Info': xmlrpc_handler('Info', self.get_info),
364                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
365                'Terminate': xmlrpc_handler('Terminate',
366                    self.terminate_experiment),
367                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
368                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
369        }
370
371    # Call while holding self.state_lock
372    def write_state(self):
373        """
374        Write a new copy of experiment state after copying the existing state
375        to a backup.
376
377        State format is a simple pickling of the state dictionary.
378        """
379        if os.access(self.state_filename, os.W_OK):
380            copy_file(self.state_filename, \
381                    "%s.bak" % self.state_filename)
382        try:
383            f = open(self.state_filename, 'w')
384            pickle.dump(self.state, f)
385        except IOError, e:
386            self.log.error("Can't write file %s: %s" % \
387                    (self.state_filename, e))
388        except pickle.PicklingError, e:
389            self.log.error("Pickling problem: %s" % e)
390        except TypeError, e:
391            self.log.error("Pickling problem (TypeError): %s" % e)
392
393    @staticmethod
394    def get_alloc_ids(state):
395        """
396        Pull the fedids of the identifiers of each allocation from the
397        state.  Again, a dict dive that's best isolated.
398
399        Used by read_store and read state
400        """
401
402        return [ f['allocID']['fedid'] 
403                for f in state.get('federant',[]) \
404                    if f.has_key('allocID') and \
405                        f['allocID'].has_key('fedid')]
406
407    # Call while holding self.state_lock
408    def read_state(self):
409        """
410        Read a new copy of experiment state.  Old state is overwritten.
411
412        State format is a simple pickling of the state dictionary.
413        """
414       
415        def get_experiment_id(state):
416            """
417            Pull the fedid experimentID out of the saved state.  This is kind
418            of a gross walk through the dict.
419            """
420
421            if state.has_key('experimentID'):
422                for e in state['experimentID']:
423                    if e.has_key('fedid'):
424                        return e['fedid']
425                else:
426                    return None
427            else:
428                return None
429
430        try:
431            f = open(self.state_filename, "r")
432            self.state = pickle.load(f)
433            self.log.debug("[read_state]: Read state from %s" % \
434                    self.state_filename)
435        except IOError, e:
436            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
437                    % (self.state_filename, e))
438        except pickle.UnpicklingError, e:
439            self.log.warning(("[read_state]: No saved state: " + \
440                    "Unpickling failed: %s") % e)
441       
442        for s in self.state.values():
443            try:
444
445                eid = get_experiment_id(s)
446                if eid : 
447                    # Give the owner rights to the experiment
448                    self.auth.set_attribute(s['owner'], eid)
449                    # And holders of the eid as well
450                    self.auth.set_attribute(eid, eid)
451                    # allow overrides to control experiments as well
452                    for o in self.overrides:
453                        self.auth.set_attribute(o, eid)
454                    # Set permissions to allow reading of the software repo, if
455                    # any, as well.
456                    for a in self.get_alloc_ids(s):
457                        self.auth.set_attribute(a, 'repo/%s' % eid)
458                else:
459                    raise KeyError("No experiment id")
460            except KeyError, e:
461                self.log.warning("[read_state]: State ownership or identity " +\
462                        "misformatted in %s: %s" % (self.state_filename, e))
463
464
465    def read_accessdb(self, accessdb_file):
466        """
467        Read the mapping from fedids that can create experiments to their name
468        in the 3-level access namespace.  All will be asserted from this
469        testbed and can include the local username and porject that will be
470        asserted on their behalf by this fedd.  Each fedid is also added to the
471        authorization system with the "create" attribute.
472        """
473        self.accessdb = {}
474        # These are the regexps for parsing the db
475        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
476        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
477                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
478        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
479                "\s*->\s*(" + name_expr + ")\s*$")
480        lineno = 0
481
482        # Parse the mappings and store in self.authdb, a dict of
483        # fedid -> (proj, user)
484        try:
485            f = open(accessdb_file, "r")
486            for line in f:
487                lineno += 1
488                line = line.strip()
489                if len(line) == 0 or line.startswith('#'):
490                    continue
491                m = project_line.match(line)
492                if m:
493                    fid = fedid(hexstr=m.group(1))
494                    project, user = m.group(2,3)
495                    if not self.accessdb.has_key(fid):
496                        self.accessdb[fid] = []
497                    self.accessdb[fid].append((project, user))
498                    continue
499
500                m = user_line.match(line)
501                if m:
502                    fid = fedid(hexstr=m.group(1))
503                    project = None
504                    user = m.group(2)
505                    if not self.accessdb.has_key(fid):
506                        self.accessdb[fid] = []
507                    self.accessdb[fid].append((project, user))
508                    continue
509                self.log.warn("[experiment_control] Error parsing access " +\
510                        "db %s at line %d" %  (accessdb_file, lineno))
511        except IOError:
512            raise service_error(service_error.internal,
513                    "Error opening/reading %s as experiment " +\
514                            "control accessdb" %  accessdb_file)
515        f.close()
516
517        # Initialize the authorization attributes
518        for fid in self.accessdb.keys():
519            self.auth.set_attribute(fid, 'create')
520            self.auth.set_attribute(fid, 'new')
521
522    def read_mapdb(self, file):
523        """
524        Read a simple colon separated list of mappings for the
525        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
526        """
527
528        self.tbmap = { }
529        lineno =0
530        try:
531            f = open(file, "r")
532            for line in f:
533                lineno += 1
534                line = line.strip()
535                if line.startswith('#') or len(line) == 0:
536                    continue
537                try:
538                    label, url = line.split(':', 1)
539                    self.tbmap[label] = url
540                except ValueError, e:
541                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
542                            "map db: %s %s" % (lineno, line, e))
543        except IOError, e:
544            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
545                    "open %s: %s" % (file, e))
546        f.close()
547
548    def read_store(self):
549        try:
550            self.synch_store = synch_store()
551            self.synch_store.load(self.store_filename)
552            self.log.debug("[read_store]: Read store from %s" % \
553                    self.store_filename)
554        except IOError, e:
555            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
556                    % (self.state_filename, e))
557            self.synch_store = synch_store()
558
559        # Set the initial permissions on data in the store.  XXX: This ad hoc
560        # authorization attribute initialization is getting out of hand.
561        for k in self.synch_store.all_keys():
562            try:
563                if k.startswith('fedid:'):
564                    fid = fedid(hexstr=k[6:46])
565                    if self.state.has_key(fid):
566                        for a in self.get_alloc_ids(self.state[fid]):
567                            self.auth.set_attribute(a, k)
568            except ValueError, e:
569                self.log.warn("Cannot deduce permissions for %s" % k)
570
571
572    def write_store(self):
573        """
574        Write a new copy of synch_store after writing current state
575        to a backup.  We use the internal synch_store pickle method to avoid
576        incinsistent data.
577
578        State format is a simple pickling of the store.
579        """
580        if os.access(self.store_filename, os.W_OK):
581            copy_file(self.store_filename, \
582                    "%s.bak" % self.store_filename)
583        try:
584            self.synch_store.save(self.store_filename)
585        except IOError, e:
586            self.log.error("Can't write file %s: %s" % \
587                    (self.store_filename, e))
588        except TypeError, e:
589            self.log.error("Pickling problem (TypeError): %s" % e)
590
591       
592    def generate_ssh_keys(self, dest, type="rsa" ):
593        """
594        Generate a set of keys for the gateways to use to talk.
595
596        Keys are of type type and are stored in the required dest file.
597        """
598        valid_types = ("rsa", "dsa")
599        t = type.lower();
600        if t not in valid_types: raise ValueError
601        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
602
603        try:
604            trace = open("/dev/null", "w")
605        except IOError:
606            raise service_error(service_error.internal,
607                    "Cannot open /dev/null??");
608
609        # May raise CalledProcessError
610        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
611        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
612        if rv != 0:
613            raise service_error(service_error.internal, 
614                    "Cannot generate nonce ssh keys.  %s return code %d" \
615                            % (self.ssh_keygen, rv))
616
617    def gentopo(self, str):
618        """
619        Generate the topology dtat structure from the splitter's XML
620        representation of it.
621
622        The topology XML looks like:
623            <experiment>
624                <nodes>
625                    <node><vname></vname><ips>ip1:ip2</ips></node>
626                </nodes>
627                <lans>
628                    <lan>
629                        <vname></vname><vnode></vnode><ip></ip>
630                        <bandwidth></bandwidth><member>node:port</member>
631                    </lan>
632                </lans>
633        """
634        class topo_parse:
635            """
636            Parse the topology XML and create the dats structure.
637            """
638            def __init__(self):
639                # Typing of the subelements for data conversion
640                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
641                self.int_subelements = ( 'bandwidth',)
642                self.float_subelements = ( 'delay',)
643                # The final data structure
644                self.nodes = [ ]
645                self.lans =  [ ]
646                self.topo = { \
647                        'node': self.nodes,\
648                        'lan' : self.lans,\
649                    }
650                self.element = { }  # Current element being created
651                self.chars = ""     # Last text seen
652
653            def end_element(self, name):
654                # After each sub element the contents is added to the current
655                # element or to the appropriate list.
656                if name == 'node':
657                    self.nodes.append(self.element)
658                    self.element = { }
659                elif name == 'lan':
660                    self.lans.append(self.element)
661                    self.element = { }
662                elif name in self.str_subelements:
663                    self.element[name] = self.chars
664                    self.chars = ""
665                elif name in self.int_subelements:
666                    self.element[name] = int(self.chars)
667                    self.chars = ""
668                elif name in self.float_subelements:
669                    self.element[name] = float(self.chars)
670                    self.chars = ""
671
672            def found_chars(self, data):
673                self.chars += data.rstrip()
674
675
676        tp = topo_parse();
677        parser = xml.parsers.expat.ParserCreate()
678        parser.EndElementHandler = tp.end_element
679        parser.CharacterDataHandler = tp.found_chars
680
681        parser.Parse(str)
682
683        return tp.topo
684       
685
686    def genviz(self, topo):
687        """
688        Generate the visualization the virtual topology
689        """
690
691        neato = "/usr/local/bin/neato"
692        # These are used to parse neato output and to create the visualization
693        # file.
694        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
695        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
696                "%s</type></node>"
697
698        try:
699            # Node names
700            nodes = [ n['vname'] for n in topo['node'] ]
701            topo_lans = topo['lan']
702        except KeyError, e:
703            raise service_error(service_error.internal, "Bad topology: %s" %e)
704
705        lans = { }
706        links = { }
707
708        # Walk through the virtual topology, organizing the connections into
709        # 2-node connections (links) and more-than-2-node connections (lans).
710        # When a lan is created, it's added to the list of nodes (there's a
711        # node in the visualization for the lan).
712        for l in topo_lans:
713            if links.has_key(l['vname']):
714                if len(links[l['vname']]) < 2:
715                    links[l['vname']].append(l['vnode'])
716                else:
717                    nodes.append(l['vname'])
718                    lans[l['vname']] = links[l['vname']]
719                    del links[l['vname']]
720                    lans[l['vname']].append(l['vnode'])
721            elif lans.has_key(l['vname']):
722                lans[l['vname']].append(l['vnode'])
723            else:
724                links[l['vname']] = [ l['vnode'] ]
725
726
727        # Open up a temporary file for dot to turn into a visualization
728        try:
729            df, dotname = tempfile.mkstemp()
730            dotfile = os.fdopen(df, 'w')
731        except IOError:
732            raise service_error(service_error.internal,
733                    "Failed to open file in genviz")
734
735        try:
736            dnull = open('/dev/null', 'w')
737        except IOError:
738            service_error(service_error.internal,
739                    "Failed to open /dev/null in genviz")
740
741        # Generate a dot/neato input file from the links, nodes and lans
742        try:
743            print >>dotfile, "graph G {"
744            for n in nodes:
745                print >>dotfile, '\t"%s"' % n
746            for l in links.keys():
747                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
748            for l in lans.keys():
749                for n in lans[l]:
750                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
751            print >>dotfile, "}"
752            dotfile.close()
753        except TypeError:
754            raise service_error(service_error.internal,
755                    "Single endpoint link in vtopo")
756        except IOError:
757            raise service_error(service_error.internal, "Cannot write dot file")
758
759        # Use dot to create a visualization
760        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
761                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
762                close_fds=True)
763        dnull.close()
764
765        # Translate dot to vis format
766        vis_nodes = [ ]
767        vis = { 'node': vis_nodes }
768        for line in dot.stdout:
769            m = vis_re.match(line)
770            if m:
771                vn = m.group(1)
772                vis_node = {'name': vn, \
773                        'x': float(m.group(2)),\
774                        'y' : float(m.group(3)),\
775                    }
776                if vn in links.keys() or vn in lans.keys():
777                    vis_node['type'] = 'lan'
778                else:
779                    vis_node['type'] = 'node'
780                vis_nodes.append(vis_node)
781        rv = dot.wait()
782
783        os.remove(dotname)
784        if rv == 0 : return vis
785        else: return None
786
787    def get_access(self, tb, nodes, tbparam, master, export_project,
788            access_user, services):
789        """
790        Get access to testbed through fedd and set the parameters for that tb
791        """
792        uri = self.tbmap.get(tb, None)
793        if not uri:
794            raise service_error(service_error.server_config, 
795                    "Unknown testbed: %s" % tb)
796
797        # Tweak search order so that if there are entries in access_user that
798        # have a project matching the export project, we try them first
799        if export_project and export_project.has_key('localname'): 
800            pn = export_project['localname'] 
801               
802            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
803            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
804        else: 
805            access_sequence = access_user
806
807        for p, u in access_sequence: 
808            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
809                    "to %s") %  ((p or "None"), u, uri))
810
811            if p:
812                # Request with user and project specified
813                req = {\
814                        'destinationTestbed' : { 'uri' : uri },
815                        'credential': [ "project: %s" % p, "user: %s"  % u],
816                        'allocID' : { 'localname': 'test' },
817                    }
818            else:
819                # Request with only user specified
820                req = {\
821                        'destinationTestbed' : { 'uri' : uri },
822                        'credential': [ 'user: %s' % u ],
823                        'user':  [ {'userID': { 'localname': u } } ],
824                        'allocID' : { 'localname': 'test' },
825                    }
826
827            if tb == master:
828                # NB, the export_project parameter is a dict that includes
829                # the type
830                req['exportProject'] = export_project
831                req['service'] = [
832                        { 'name': 'userconfig', 'visibility': 'export'},
833                        { 'name': 'SMB', 'visibility': 'export'},
834                        { 'name': 'seer', 'visibility': 'export'},
835                        { 'name': 'tmcd', 'visibility': 'export'},
836                    ]
837
838            # node resources if any
839            if nodes != None and len(nodes) > 0:
840                rnodes = [ ]
841                for n in nodes:
842                    rn = { }
843                    image, hw, count = n.split(":")
844                    if image: rn['image'] = [ image ]
845                    if hw: rn['hardware'] = [ hw ]
846                    if count and int(count) >0 : rn['count'] = int(count)
847                    rnodes.append(rn)
848                req['resources']= { }
849                req['resources']['node'] = rnodes
850
851            try:
852                if self.local_access.has_key(uri):
853                    # Local access call
854                    req = { 'RequestAccessRequestBody' : req }
855                    r = self.local_access[uri].RequestAccess(req, 
856                            fedid(file=self.cert_file))
857                    r = { 'RequestAccessResponseBody' : r }
858                else:
859                    r = self.call_RequestAccess(uri, req, 
860                            self.cert_file, self.cert_pwd, self.trusted_certs)
861            except service_error, e:
862                if e.code == service_error.access:
863                    self.log.debug("[get_access] Access denied")
864                    r = None
865                    continue
866                else:
867                    raise e
868
869            if r.has_key('RequestAccessResponseBody'):
870                # Through to here we have a valid response, not a fault.
871                # Access denied is a fault, so something better or worse than
872                # access denied has happened.
873                r = r['RequestAccessResponseBody']
874                self.log.debug("[get_access] Access granted")
875                break
876            else:
877                raise service_error(service_error.protocol,
878                        "Bad proxy response")
879       
880        if not r:
881            raise service_error(service_error.access, 
882                    "Access denied by %s (%s)" % (tb, uri))
883
884        tbparam[tb] = { 
885                "allocID" : r['allocID'],
886                "uri": uri,
887                }
888        if 'service' in r: 
889            services.extend(r['service'])
890
891        # Add attributes to parameter space.  We don't allow attributes to
892        # overlay any parameters already installed.
893        for a in r.get('fedAttr', []):
894            try:
895                if a['attribute'] and \
896                        isinstance(a['attribute'], basestring)\
897                        and not tbparam[tb].has_key(a['attribute'].lower()):
898                    tbparam[tb][a['attribute'].lower()] = a['value']
899            except KeyError:
900                self.log.error("Bad attribute in response: %s" % a)
901
902    def release_access(self, tb, aid, uri=None):
903        """
904        Release access to testbed through fedd
905        """
906
907        if not uri:
908            uri = self.tbmap.get(tb, None)
909        if not uri:
910            raise service_error(service_error.server_config, 
911                    "Unknown testbed: %s" % tb)
912
913        if self.local_access.has_key(uri):
914            resp = self.local_access[uri].ReleaseAccess(\
915                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
916                    fedid(file=self.cert_file))
917            resp = { 'ReleaseAccessResponseBody': resp } 
918        else:
919            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
920                    self.cert_file, self.cert_pwd, self.trusted_certs)
921
922        # better error coding
923
924    def remote_splitter(self, uri, desc, master):
925
926        req = {
927                'description' : { 'ns2description': desc },
928                'master': master,
929                'include_fedkit': bool(self.fedkit),
930                'include_gatewaykit': bool(self.gatewaykit)
931            }
932
933        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
934                self.trusted_certs)
935
936        if r.has_key('Ns2SplitResponseBody'):
937            r = r['Ns2SplitResponseBody']
938            if r.has_key('output'):
939                return r['output'].splitlines()
940            else:
941                raise service_error(service_error.protocol, 
942                        "Bad splitter response (no output)")
943        else:
944            raise service_error(service_error.protocol, "Bad splitter response")
945
946    class start_segment:
947        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
948                cert_pwd=None, trusted_certs=None, caller=None,
949                log_collector=None):
950            self.log = log
951            self.debug = debug
952            self.cert_file = cert_file
953            self.cert_pwd = cert_pwd
954            self.trusted_certs = None
955            self.caller = caller
956            self.testbed = testbed
957            self.log_collector = log_collector
958            self.response = None
959
960        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
961                services=None):
962            req = {
963                    'allocID': { 'fedid' : aid }, 
964                    'segmentdescription': { 
965                        'topdldescription': topo.to_dict(),
966                    },
967                    'master': master,
968                }
969
970            if connInfo:
971                req['connection'] = connInfo
972            # Add services to request.  The master exports, everyone else
973            # imports.
974            if services:
975                svcs = [ x.copy() for x in services]
976                for s in svcs:
977                    if master: s['visibility'] = 'export'
978                    else: s['visibility'] = 'import'
979                req['service'] = svcs
980            if attrs:
981                req['fedAttr'] = attrs
982
983            try:
984                self.log.debug("Calling StartSegment at %s " % uri)
985                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
986                        self.trusted_certs)
987                if r.has_key('StartSegmentResponseBody'):
988                    lval = r['StartSegmentResponseBody'].get('allocationLog',
989                            None)
990                    if lval and self.log_collector:
991                        for line in  lval.splitlines(True):
992                            self.log_collector.write(line)
993                    self.response = r
994                else:
995                    raise service_error(service_error.internal, 
996                            "Bad response!?: %s" %r)
997                return True
998            except service_error, e:
999                self.log.error("Start segment failed on %s: %s" % \
1000                        (self.testbed, e))
1001                return False
1002
1003
1004
1005    class terminate_segment:
1006        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1007                cert_pwd=None, trusted_certs=None, caller=None):
1008            self.log = log
1009            self.debug = debug
1010            self.cert_file = cert_file
1011            self.cert_pwd = cert_pwd
1012            self.trusted_certs = None
1013            self.caller = caller
1014            self.testbed = testbed
1015
1016        def __call__(self, uri, aid ):
1017            req = {
1018                    'allocID': aid , 
1019                }
1020            try:
1021                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1022                        self.trusted_certs)
1023                return True
1024            except service_error, e:
1025                self.log.error("Terminate segment failed on %s: %s" % \
1026                        (self.testbed, e))
1027                return False
1028   
1029
1030    def allocate_resources(self, allocated, master, eid, expid, 
1031            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
1032            attrs=None, connInfo={}, services=[]):
1033
1034        started = { }           # Testbeds where a sub-experiment started
1035                                # successfully
1036
1037        # XXX
1038        fail_soft = False
1039
1040        log = alloc_log or self.log
1041
1042        thread_pool = self.thread_pool(self.nthreads)
1043        threads = [ ]
1044
1045        for tb in allocated.keys():
1046            # Create and start a thread to start the segment, and save it
1047            # to get the return value later
1048            thread_pool.wait_for_slot()
1049            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1050            if not uri:
1051                raise service_error(service_error.internal, 
1052                        "Unknown testbed %s !?" % tb)
1053
1054            if tbparams[tb].has_key('allocID') and \
1055                    tbparams[tb]['allocID'].has_key('fedid'):
1056                aid = tbparams[tb]['allocID']['fedid']
1057            else:
1058                raise service_error(service_error.internal, 
1059                        "No alloc id for testbed %s !?" % tb)
1060
1061            t  = self.pooled_thread(\
1062                    target=self.start_segment(log=log, debug=self.debug,
1063                        testbed=tb, cert_file=self.cert_file,
1064                        cert_pwd=self.cert_pwd,
1065                        trusted_certs=self.trusted_certs,
1066                        caller=self.call_StartSegment,
1067                        log_collector=log_collector), 
1068                    args=(uri, aid, topo[tb], tb == master, 
1069                        attrs, connInfo[tb], services),
1070                    name=tb,
1071                    pdata=thread_pool, trace_file=self.trace_file)
1072            threads.append(t)
1073            t.start()
1074
1075        # Wait until all finish (keep pinging the log, though)
1076        mins = 0
1077        revoked = False
1078        while not thread_pool.wait_for_all_done(60.0):
1079            mins += 1
1080            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1081                    % mins)
1082            if not revoked and \
1083                    len([ t.getName() for t in threads if not t.rv ]) > 0:
1084                # a testbed has failed.  Revoke this experiment's
1085                # synchronizarion values so that sub experiments will not
1086                # deadlock waiting for synchronization that will never happen
1087                self.log.info("A subexperiment has failed to swap in, " + \
1088                        "revoking synch keys")
1089                var_key = "fedid:%s" % expid
1090                for k in self.synch_store.all_keys():
1091                    if len(k) > 45 and k[0:46] == var_key:
1092                        self.synch_store.revoke_key(k)
1093                revoked = True
1094
1095        thread_pool.clear()
1096
1097        failed = [ t.getName() for t in threads if not t.rv ]
1098        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1099
1100        # If one failed clean up, unless fail_soft is set
1101        if failed:
1102            if not fail_soft:
1103                thread_pool.clear()
1104                for tb in succeeded:
1105                    # Create and start a thread to stop the segment
1106                    thread_pool.wait_for_slot()
1107                    uri = tbparams[tb]['uri']
1108                    t  = self.pooled_thread(\
1109                            target=self.terminate_segment(log=log,
1110                                testbed=tb,
1111                                cert_file=self.cert_file, 
1112                                cert_pwd=self.cert_pwd,
1113                                trusted_certs=self.trusted_certs,
1114                                caller=self.call_TerminateSegment),
1115                            args=(uri, tbparams[tb]['federant']['allocID']),
1116                            name=tb,
1117                            pdata=thread_pool, trace_file=self.trace_file)
1118                    t.start()
1119                # Wait until all finish
1120                thread_pool.wait_for_all_done()
1121
1122                # release the allocations
1123                for tb in tbparams.keys():
1124                    self.release_access(tb, tbparams[tb]['allocID'],
1125                            tbparams[tb].get('uri', None))
1126                # Remove the placeholder
1127                self.state_lock.acquire()
1128                self.state[eid]['experimentStatus'] = 'failed'
1129                if self.state_filename: self.write_state()
1130                self.state_lock.release()
1131
1132                log.error("Swap in failed on %s" % ",".join(failed))
1133                return
1134        else:
1135            log.info("[start_segment]: Experiment %s active" % eid)
1136
1137
1138        # Walk up tmpdir, deleting as we go
1139        if self.cleanup:
1140            log.debug("[start_experiment]: removing %s" % tmpdir)
1141            for path, dirs, files in os.walk(tmpdir, topdown=False):
1142                for f in files:
1143                    os.remove(os.path.join(path, f))
1144                for d in dirs:
1145                    os.rmdir(os.path.join(path, d))
1146            os.rmdir(tmpdir)
1147        else:
1148            log.debug("[start_experiment]: not removing %s" % tmpdir)
1149
1150        # Insert the experiment into our state and update the disk copy
1151        self.state_lock.acquire()
1152        self.state[expid]['experimentStatus'] = 'active'
1153        self.state[eid] = self.state[expid]
1154        if self.state_filename: self.write_state()
1155        self.state_lock.release()
1156        return
1157
1158
1159    def add_kit(self, e, kit):
1160        """
1161        Add a Software object created from the list of (install, location)
1162        tuples passed as kit  to the software attribute of an object e.  We
1163        do this enough to break out the code, but it's kind of a hack to
1164        avoid changing the old tuple rep.
1165        """
1166
1167        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1168
1169        if isinstance(e.software, list): e.software.extend(s)
1170        else: e.software = s
1171
1172
1173    def create_experiment_state(self, fid, req, expid, expcert, 
1174            state='starting'):
1175        """
1176        Create the initial entry in the experiment's state.  The expid and
1177        expcert are the experiment's fedid and certifacte that represents that
1178        ID, which are installed in the experiment state.  If the request
1179        includes a suggested local name that is used if possible.  If the local
1180        name is already taken by an experiment owned by this user that has
1181        failed, it is overwritten.  Otherwise new letters are added until a
1182        valid localname is found.  The generated local name is returned.
1183        """
1184
1185        if req.has_key('experimentID') and \
1186                req['experimentID'].has_key('localname'):
1187            overwrite = False
1188            eid = req['experimentID']['localname']
1189            # If there's an old failed experiment here with the same local name
1190            # and accessible by this user, we'll overwrite it, otherwise we'll
1191            # fall through and do the collision avoidance.
1192            old_expid = self.get_experiment_fedid(eid)
1193            if old_expid and self.check_experiment_access(fid, old_expid):
1194                self.state_lock.acquire()
1195                status = self.state[eid].get('experimentStatus', None)
1196                if status and status == 'failed':
1197                    # remove the old access attribute
1198                    self.auth.unset_attribute(fid, old_expid)
1199                    overwrite = True
1200                    del self.state[eid]
1201                    del self.state[old_expid]
1202                self.state_lock.release()
1203            self.state_lock.acquire()
1204            while (self.state.has_key(eid) and not overwrite):
1205                eid += random.choice(string.ascii_letters)
1206            # Initial state
1207            self.state[eid] = {
1208                    'experimentID' : \
1209                            [ { 'localname' : eid }, {'fedid': expid } ],
1210                    'experimentStatus': state,
1211                    'experimentAccess': { 'X509' : expcert },
1212                    'owner': fid,
1213                    'log' : [],
1214                }
1215            self.state[expid] = self.state[eid]
1216            if self.state_filename: self.write_state()
1217            self.state_lock.release()
1218        else:
1219            eid = self.exp_stem
1220            for i in range(0,5):
1221                eid += random.choice(string.ascii_letters)
1222            self.state_lock.acquire()
1223            while (self.state.has_key(eid)):
1224                eid = self.exp_stem
1225                for i in range(0,5):
1226                    eid += random.choice(string.ascii_letters)
1227            # Initial state
1228            self.state[eid] = {
1229                    'experimentID' : \
1230                            [ { 'localname' : eid }, {'fedid': expid } ],
1231                    'experimentStatus': state,
1232                    'experimentAccess': { 'X509' : expcert },
1233                    'owner': fid,
1234                    'log' : [],
1235                }
1236            self.state[expid] = self.state[eid]
1237            if self.state_filename: self.write_state()
1238            self.state_lock.release()
1239
1240        return eid
1241
1242
1243    def allocate_ips_to_topo(self, top):
1244        """
1245        Add an ip4_address attribute to all the hosts in the topology, based on
1246        the shared substrates on which they sit.  An /etc/hosts file is also
1247        created and returned as a list of hostfiles entries.  We also return
1248        the allocator, because we may need to allocate IPs to portals
1249        (specifically DRAGON portals).
1250        """
1251        subs = sorted(top.substrates, 
1252                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1253                reverse=True)
1254        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1255        ifs = { }
1256        hosts = [ ]
1257
1258        for idx, s in enumerate(subs):
1259            a = ips.allocate(len(s.interfaces)+2)
1260            if a :
1261                base, num = a
1262                if num < len(s.interfaces) +2 : 
1263                    raise service_error(service_error.internal,
1264                            "Allocator returned wrong number of IPs??")
1265            else:
1266                raise service_error(service_error.req, 
1267                        "Cannot allocate IP addresses")
1268
1269            base += 1
1270            for i in s.interfaces:
1271                i.attribute.append(
1272                        topdl.Attribute('ip4_address', 
1273                            "%s" % ip_addr(base)))
1274                hname = i.element.name[0]
1275                if ifs.has_key(hname):
1276                    hosts.append("%s\t%s-%s %s-%d" % \
1277                            (ip_addr(base), hname, s.name, hname,
1278                                ifs[hname]))
1279                else:
1280                    ifs[hname] = 0
1281                    hosts.append("%s\t%s-%s %s-%d %s" % \
1282                            (ip_addr(base), hname, s.name, hname,
1283                                ifs[hname], hname))
1284
1285                ifs[hname] += 1
1286                base += 1
1287        return hosts, ips
1288
1289    def get_access_to_testbeds(self, testbeds, access_user, 
1290            export_project, master, allocated, tbparams, services):
1291        """
1292        Request access to the various testbeds required for this instantiation
1293        (passed in as testbeds).  User, access_user, expoert_project and master
1294        are used to construct the correct requests.  Per-testbed parameters are
1295        returned in tbparams.
1296        """
1297        for tb in testbeds:
1298            self.get_access(tb, None, tbparams, master,
1299                    export_project, access_user, services)
1300            allocated[tb] = 1
1301
1302    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1303        """
1304        Create the sub-topologies that are needed for experiment instantiation.
1305        """
1306        for tb in testbeds:
1307            topo[tb] = top.clone()
1308            to_delete = [ ]
1309            # XXX: copy in for loop to simplify
1310            for e in topo[tb].elements:
1311                etb = e.get_attribute('testbed')
1312                if etb and etb != tb:
1313                    for i in e.interface:
1314                        for s in i.subs:
1315                            try:
1316                                s.interfaces.remove(i)
1317                            except ValueError:
1318                                raise service_error(service_error.internal,
1319                                        "Can't remove interface??")
1320                    to_delete.append(e)
1321            for e in to_delete:
1322                topo[tb].elements.remove(e)
1323            topo[tb].make_indices()
1324
1325            for e in [ e for e in topo[tb].elements \
1326                    if isinstance(e,topdl.Computer)]:
1327                if self.fedkit: self.add_kit(e, self.fedkit)
1328
1329    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1330            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
1331            expid=None):
1332        """
1333        Return a new internet portal node and a dict with the connectionInfo to
1334        be attached.
1335        """
1336        dproject = tbparams[dt].get('project', 'project')
1337        ddomain = tbparams[dt].get('domain', ".example.com")
1338        mdomain = tbparams[master].get('domain', '.example.com')
1339        mproject = tbparams[master].get('project', 'project')
1340        muser = tbparams[master].get('user', 'root')
1341        smbshare = tbparams[master].get('smbshare', 'USERS')
1342
1343        if st == master or dt == master:
1344            active = ("%s" % (st == master))
1345        else:
1346            active = ("%s" % (st > dt))
1347
1348        ifaces = [ ]
1349        for sub, attrs in iface_desc:
1350            inf = topdl.Interface(
1351                    name="inf%03d" % len(ifaces),
1352                    substrate=sub,
1353                    attribute=[
1354                        topdl.Attribute(
1355                            attribute=n,
1356                            value = v)
1357                        for n, v in attrs
1358                        ]
1359                    )
1360            ifaces.append(inf)
1361        if conn_type == "ssh":
1362            try:
1363                aid = tbparams[st]['allocID']['fedid']
1364            except:
1365                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
1366                        % st)
1367                aid = None
1368            info = {
1369                    "type" : conn_type, 
1370                    "portal": myname,
1371                    'fedAttr': [ 
1372                            { 'attribute': 'masterdomain', 'value': mdomain},
1373                            { 'attribute': 'masterexperiment', 'value': 
1374                                "%s/%s" % (mproject, eid)},
1375                            { 'attribute': 'active', 'value': active},
1376                            # Move to SMB service description
1377                            { 'attribute': 'masteruser', 'value': muser},
1378                            { 'attribute': 'smbshare', 'value': smbshare},
1379                        ],
1380                    'parameter': [
1381                        {
1382                            'name': 'peer',
1383                            'key': 'fedid:%s/%s' % (expid, myname),
1384                            'store': self.store_url,
1385                            'type': 'output',
1386                        },
1387                        {
1388                            'name': 'peer',
1389                            'key': 'fedid:%s/%s' % (expid, desthost),
1390                            'store': self.store_url,
1391                            'type': 'input',
1392                        },
1393                        ]
1394                    }
1395            # Give this allocation the rights to access the key of the
1396            # peers
1397            if aid:
1398                for h in (myname, desthost):
1399                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
1400            else:
1401                self.log.error("No aid for %s in new_portal_node" % st)
1402        else:
1403            info = None
1404
1405        return (topdl.Computer(
1406                name=myname,
1407                attribute=[ 
1408                    topdl.Attribute(attribute=n,value=v)
1409                        for n, v in (\
1410                            ('portal', 'true'),
1411                            ('portal_type', portal_type), 
1412                        )
1413                    ],
1414                interface=ifaces,
1415                ), info)
1416
1417    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
1418        ddomain = tbparams[dt].get('domain', ".example.com")
1419        dproject = tbparams[dt].get('project', 'project')
1420        tsubstrate = \
1421                topdl.Substrate(name='%s-%s' % (st, dt),
1422                        attribute= [
1423                            topdl.Attribute(
1424                                attribute='portal',
1425                                value='true')
1426                            ]
1427                        )
1428        segment_element = topdl.Segment(
1429                id= tbparams[dt]['allocID'],
1430                type='emulab',
1431                uri = self.tbmap.get(dt, None),
1432                interface=[ 
1433                    topdl.Interface(
1434                        substrate=tsubstrate.name),
1435                    ],
1436                attribute = [
1437                    topdl.Attribute(attribute=n, value=v)
1438                        for n, v in (\
1439                            ('domain', ddomain),
1440                            ('experiment', "%s/%s" % \
1441                                    (dproject, eid)),)
1442                    ],
1443                )
1444
1445        return (tsubstrate, segment_element)
1446
1447    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
1448        if sub.capacity is None:
1449            raise service_error(service_error.internal,
1450                    "Cannot DRAGON split substrate w/o capacity")
1451        segs = [ ]
1452        substr = topdl.Substrate(name="dragon%d" % idx, 
1453                capacity=sub.capacity.clone(),
1454                attribute=[ topdl.Attribute(attribute=n, value=v)
1455                    for n, v, in (\
1456                            ('vlan', 'unassigned%d' % idx),)])
1457        name = "dragon%d" % idx
1458        store_key = 'fedid:%s/vlan%d' % (expid, idx)
1459        for tb in tbs.keys():
1460            seg = topdl.Segment(
1461                    id = tbparams[tb]['allocID'],
1462                    type='emulab',
1463                    uri = self.tbmap.get(tb, None),
1464                    interface=[ 
1465                        topdl.Interface(
1466                            substrate=substr.name),
1467                        ],
1468                    attribute=[ topdl.Attribute(
1469                        attribute='dragon_endpoint', 
1470                        value=tbparams[tb]['dragon']),
1471                        ]
1472                    )
1473            if tbparams[tb].has_key('vlans'):
1474                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1475            segs.append(seg)
1476
1477            # Give this allocation the rights to access the key of the
1478            # vlan_id
1479            try:
1480                aid = tbparams[tb]['allocID']['fedid']
1481                self.auth.set_attribute(aid, store_key)
1482            except:
1483                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
1484                        % tb)
1485
1486        connInfo[name] = [ { 
1487            'type': 'transit',
1488            'parameter': [ { 
1489                'name': 'vlan_id',
1490                'key': store_key,
1491                'store': self.store_url,
1492                'type': 'output'
1493                } ]
1494            } ]
1495
1496        topo[name] = \
1497                topdl.Topology(substrates=[substr], elements=segs,
1498                        attribute=[
1499                            topdl.Attribute(attribute="transit", value='true'),
1500                            topdl.Attribute(attribute="dynamic", value='true'),
1501                            topdl.Attribute(attribute="testbed", 
1502                                value='dragon'),
1503                            topdl.Attribute(attribute="store_keys", 
1504                                value=store_key),
1505                            ]
1506                        )
1507
1508    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
1509            connInfo, expid=None):
1510        """
1511        Add attribiutes to the various elements indicating that they are to be
1512        dragon connected and create a dragon segment in topo to be
1513        instantiated.
1514        """
1515
1516        def get_substrate_from_topo(name, t):
1517            for s in t.substrates:
1518                if s.name == name: return s
1519            else: return None
1520
1521
1522        mdomain = tbparams[master].get('domain', '.example.com')
1523        mproject = tbparams[master].get('project', 'project')
1524        # dn is the number of previously created dragon nets.  This routine
1525        # creates a net numbered by dn
1526        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1527        # Count the number of interfaces on this substrate in each testbed from
1528        # the global topology
1529        count = { }
1530        node = { }
1531        for e in [ i.element for i in sub.interfaces ]:
1532            tb = e.get_attribute('testbed')
1533            count[tb] = count.get(tb, 0) + 1
1534            node[tb] = i.get_attribute('ip4_address')
1535
1536
1537        # Set the attributes in the copies that will allow setup of dragon
1538        # connections.
1539        for tb in tbs.keys():
1540            s = get_substrate_from_topo(sub.name, topo[tb])
1541            if s:
1542                if not connInfo.has_key(tb):
1543                    connInfo[tb] = [ ]
1544
1545                try:
1546                    aid = tbparams[tb]['allocID']['fedid']
1547                except:
1548                    self.log.debug("[creat_dragon_substrate] " + 
1549                            "Can't get alloc id for %s?" %tb)
1550                    aid = None
1551
1552                # This may need another look, but only a service gateway will
1553                # look at the active parameter, and these are only inserted to
1554                # connect to the master.
1555                active = "%s" % ( tb == master)
1556                info = {
1557                        'type': 'transit',
1558                        'member': [ {
1559                            'element': i.element.name[0], 
1560                            'interface': i.name
1561                            } for i in s.interfaces \
1562                                    if isinstance(i.element, topdl.Computer) ],
1563                        'fedAttr': [ 
1564                            { 'attribute': 'masterdomain', 'value': mdomain},
1565                            { 'attribute': 'masterexperiment', 'value': 
1566                                "%s/%s" % (mproject, eid)},
1567                            { 'attribute': 'active', 'value': active},
1568                            ],
1569                        'parameter': [ {
1570                            'name': 'vlan_id',
1571                            'key': 'fedid:%s/vlan%d' % (expid, dn),
1572                            'store': self.store_url,
1573                            'type': 'input',
1574                            } ]
1575                        }
1576                if tbs.has_key(tb):
1577                    info['peer'] = tbs[tb]
1578                connInfo[tb].append(info)
1579
1580                # Give this allocation the rights to access the key of the
1581                # vlan_id
1582                if aid:
1583                    self.auth.set_attribute(aid, 
1584                            'fedid:%s/vlan%d' % (expid, dn))
1585            else:
1586                raise service_error(service_error.internal,
1587                        "No substrate %s in testbed %s" % (sub.name, tb))
1588
1589        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
1590
1591    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1592            segment_substrate, portals, connInfo, expid):
1593        # More than one testbed is on this substrate.  Insert
1594        # some portals into the subtopologies.  st == source testbed,
1595        # dt == destination testbed.
1596        for st in tbs.keys():
1597            if not segment_substrate.has_key(st):
1598                segment_substrate[st] = { }
1599            if not portals.has_key(st): 
1600                portals[st] = { }
1601            if not connInfo.has_key(st):
1602                connInfo[st] = [ ]
1603            for dt in [ t for t in tbs.keys() if t != st]:
1604                sproject = tbparams[st].get('project', 'project')
1605                dproject = tbparams[dt].get('project', 'project')
1606                mproject = tbparams[master].get('project', 'project')
1607                sdomain = tbparams[st].get('domain', ".example.com")
1608                ddomain = tbparams[dt].get('domain', ".example.com")
1609                mdomain = tbparams[master].get('domain', '.example.com')
1610                muser = tbparams[master].get('user', 'root')
1611                smbshare = tbparams[master].get('smbshare', 'USERS')
1612                aid = tbparams[dt]['allocID']['fedid']
1613                if st == master or dt == master:
1614                    active = ("%s" % (st == master))
1615                else:
1616                    active = ("%s" %(st > dt))
1617                if not segment_substrate[st].has_key(dt):
1618                    # Put a substrate and a segment for the connected
1619                    # testbed in there.
1620                    tsubstrate, segment_element = \
1621                            self.new_portal_substrate(st, dt, eid, tbparams,
1622                                    expid)
1623                    segment_substrate[st][dt] = tsubstrate
1624                    topo[st].substrates.append(tsubstrate)
1625                    topo[st].elements.append(segment_element)
1626
1627                new_portal = False
1628                if portals[st].has_key(dt):
1629                    # There's a portal set up to go to this destination.
1630                    # See if there's room to multiplex this connection on
1631                    # it.  If so, add an interface to the portal; if not,
1632                    # set up to add a portal below.
1633                    # [This little festival of braces is just a pop of the
1634                    # last element in the list of portals between st and
1635                    # dt.]
1636                    portal = portals[st][dt][-1]
1637                    mux = len([ i for i in portal.interface \
1638                            if not i.get_attribute('portal')])
1639                    if mux == self.muxmax:
1640                        new_portal = True
1641                        portal_type = "experiment"
1642                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1643                        desthost = "%stunnel%d" % (st.lower(), 
1644                                len(portals[st][dt]))
1645                    else:
1646                        new_i = topdl.Interface(
1647                                substrate=sub.name,
1648                                attribute=[ 
1649                                    topdl.Attribute(
1650                                        attribute='ip4_address', 
1651                                        value=tbs[dt]
1652                                    )
1653                                ])
1654                        portal.interface.append(new_i)
1655                else:
1656                    # First connection to this testbed, make an empty list
1657                    # and set up to add the new portal below
1658                    new_portal = True
1659                    portals[st][dt] = [ ]
1660                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1661                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
1662
1663                    if dt == master or st == master: portal_type = "both"
1664                    else: portal_type = "experiment"
1665
1666                if new_portal:
1667                    infs = (
1668                            (segment_substrate[st][dt].name, 
1669                                (('portal', 'true'),)),
1670                            (sub.name, 
1671                                (('ip4_address', tbs[dt]),))
1672                        )
1673                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
1674                            master, eid, myname, desthost, portal_type,
1675                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
1676                    if self.fedkit:
1677                        self.add_kit(portal, self.fedkit)
1678                    if self.gatewaykit: 
1679                        self.add_kit(portal, self.gatewaykit)
1680
1681                    topo[st].elements.append(portal)
1682                    portals[st][dt].append(portal)
1683                    connInfo[st].append(info)
1684
1685    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo, expid):
1686        # Add to the master testbed
1687        tsubstrate, segment_element = \
1688                self.new_portal_substrate(st, dt, eid, tbparams, expid)
1689        myname = "%stunnel" % dt
1690        desthost = "%stunnel" % st
1691
1692        portal, info = self.new_portal_node(st, dt, tbparams, master,
1693                eid, myname, desthost, "control", 
1694                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
1695                conn_attrs=[], expid=expid)
1696        if self.fedkit:
1697            self.add_kit(portal, self.fedkit)
1698        if self.gatewaykit: 
1699            self.add_kit(portal, self.gatewaykit)
1700
1701        topo[st].substrates.append(tsubstrate)
1702        topo[st].elements.append(segment_element)
1703        topo[st].elements.append(portal)
1704        if not connInfo.has_key(st):
1705            connInfo[st] = [ ]
1706        connInfo[st].append(info)
1707
1708    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1709            substrate, tbparams, expid):
1710        # Add to the master testbed
1711        myname = "%stunnel" % dt
1712        desthost = "%s" % ip_addr(dip)
1713
1714        portal, info = self.new_portal_node(st, dt, tbparams, master,
1715                eid, myname, desthost, "control", 
1716                ((substrate.name,(
1717                    ('portal','true'),
1718                    ('ip4_address', "%s" % ip_addr(myip)),)),),
1719                conn_type="transit", conn_attrs=[], expid=expid)
1720        if self.fedkit:
1721            self.add_kit(portal, self.fedkit)
1722        if self.gatewaykit: 
1723            self.add_kit(portal, self.gatewaykit)
1724
1725        return portal
1726
1727    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
1728            connInfo, expid):
1729        """
1730        For each substrate in the main topology, find those that
1731        have nodes on more than one testbed.  Insert portal nodes
1732        into the copies of those substrates on the sub topologies.
1733        """
1734        segment_substrate = { }
1735        portals = { }
1736        for s in top.substrates:
1737            # tbs will contain an ip address on this subsrate that is in
1738            # each testbed.
1739            tbs = { }
1740            for i in s.interfaces:
1741                e = i.element
1742                tb = e.get_attribute('testbed')
1743                if tb and not tbs.has_key(tb):
1744                    for i in e.interface:
1745                        if s in i.subs:
1746                            tbs[tb]= i.get_attribute('ip4_address')
1747            if len(tbs) < 2:
1748                continue
1749
1750            # DRAGON will not create multi-site vlans yet
1751            if len(tbs) == 2 and \
1752                    all([tbparams[x].has_key('dragon') for x in tbs]):
1753                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1754                        master, eid, connInfo, expid)
1755            else:
1756                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1757                        eid, segment_substrate, portals, connInfo, expid)
1758
1759        # Make sure that all the slaves have a control portal back to the
1760        # master.
1761        for tb in [ t for t in tbparams.keys() if t != master ]:
1762            if len([e for e in topo[tb].elements \
1763                    if isinstance(e, topdl.Computer) and \
1764                    e.get_attribute('portal') and \
1765                    e.get_attribute('portal_type') == 'both']) == 0:
1766
1767                if tbparams[master].has_key('dragon') \
1768                        and tbparams[tb].has_key('dragon'):
1769
1770                    idx = len([x for x in topo.keys() \
1771                            if x.startswith('dragon')])
1772                    dip, leng = ip_allocator.allocate(4)
1773                    dip += 1
1774                    mip = dip+1
1775                    csub = topdl.Substrate(
1776                            name="dragon-control-%s" % tb,
1777                            capacity=topdl.Capacity(100000.0, 'max'),
1778                            attribute=[
1779                                topdl.Attribute(
1780                                    attribute='portal',
1781                                    value='true'
1782                                    )
1783                                ]
1784                            )
1785                    seg = topdl.Segment(
1786                            id= tbparams[master]['allocID'],
1787                            type='emulab',
1788                            uri = self.tbmap.get(master, None),
1789                            interface=[ 
1790                                topdl.Interface(
1791                                    substrate=csub.name),
1792                                ],
1793                            attribute = [
1794                                topdl.Attribute(attribute=n, value=v)
1795                                    for n, v in (\
1796                                        ('domain', 
1797                                            tbparams[master].get('domain',
1798                                                ".example.com")),
1799                                        ('experiment', "%s/%s" % \
1800                                                (tbparams[master].get(
1801                                                    'project', 
1802                                                    'project'), 
1803                                                    eid)),)
1804                                ],
1805                            )
1806                    portal = self.new_dragon_portal(tb, master,
1807                            master, eid, dip, mip, idx, csub, tbparams, expid)
1808                    topo[tb].substrates.append(csub)
1809                    topo[tb].elements.append(portal)
1810                    topo[tb].elements.append(seg)
1811
1812                    mcsub = csub.clone()
1813                    seg = topdl.Segment(
1814                            id= tbparams[tb]['allocID'],
1815                            type='emulab',
1816                            uri = self.tbmap.get(tb, None),
1817                            interface=[ 
1818                                topdl.Interface(
1819                                    substrate=csub.name),
1820                                ],
1821                            attribute = [
1822                                topdl.Attribute(attribute=n, value=v)
1823                                    for n, v in (\
1824                                        ('domain', 
1825                                            tbparams[tb].get('domain',
1826                                                ".example.com")),
1827                                        ('experiment', "%s/%s" % \
1828                                                (tbparams[tb].get('project', 
1829                                                    'project'), 
1830                                                    eid)),)
1831                                ],
1832                            )
1833                    portal = self.new_dragon_portal(master, tb, master,
1834                            eid, mip, dip, idx, mcsub, tbparams, expid)
1835                    topo[master].substrates.append(mcsub)
1836                    topo[master].elements.append(portal)
1837                    topo[master].elements.append(seg)
1838                    for t in (master, tb):
1839                        topo[t].incorporate_elements()
1840
1841                    self.create_dragon_substrate(csub, topo, 
1842                            {tb: ip_addr(mip), master: ip_addr(dip)}, 
1843                            tbparams, master, eid, connInfo,
1844                            expid)
1845                else:
1846                    self.add_control_portal(master, tb, master, eid, topo, 
1847                            tbparams, connInfo, expid)
1848                    self.add_control_portal(tb, master, master, eid, topo, 
1849                            tbparams, connInfo, expid)
1850
1851        # Connect the portal nodes into the topologies and clear out
1852        # substrates that are not in the topologies
1853        for tb in tbparams.keys():
1854            topo[tb].incorporate_elements()
1855            topo[tb].substrates = \
1856                    [s for s in topo[tb].substrates \
1857                        if len(s.interfaces) >0]
1858
1859    def wrangle_software(self, expid, top, topo, tbparams):
1860        """
1861        Copy software out to the repository directory, allocate permissions and
1862        rewrite the segment topologies to look for the software in local
1863        places.
1864        """
1865
1866        # Copy the rpms and tarfiles to a distribution directory from
1867        # which the federants can retrieve them
1868        linkpath = "%s/software" %  expid
1869        softdir ="%s/%s" % ( self.repodir, linkpath)
1870        softmap = { }
1871        # These are in a list of tuples format (each kit).  This comprehension
1872        # unwraps them into a single list of tuples that initilaizes the set of
1873        # tuples.
1874        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1875                for p, t in l ])
1876        pkgs.update([x.location for e in top.elements \
1877                for x in e.software])
1878        try:
1879            os.makedirs(softdir)
1880        except IOError, e:
1881            raise service_error(
1882                    "Cannot create software directory: %s" % e)
1883        # The actual copying.  Everything's converted into a url for copying.
1884        for pkg in pkgs:
1885            loc = pkg
1886
1887            scheme, host, path = urlparse(loc)[0:3]
1888            dest = os.path.basename(path)
1889            if not scheme:
1890                if not loc.startswith('/'):
1891                    loc = "/%s" % loc
1892                loc = "file://%s" %loc
1893            try:
1894                u = urlopen(loc)
1895            except Exception, e:
1896                raise service_error(service_error.req, 
1897                        "Cannot open %s: %s" % (loc, e))
1898            try:
1899                f = open("%s/%s" % (softdir, dest) , "w")
1900                self.log.debug("Writing %s/%s" % (softdir,dest) )
1901                data = u.read(4096)
1902                while data:
1903                    f.write(data)
1904                    data = u.read(4096)
1905                f.close()
1906                u.close()
1907            except Exception, e:
1908                raise service_error(service_error.internal,
1909                        "Could not copy %s: %s" % (loc, e))
1910            path = re.sub("/tmp", "", linkpath)
1911            # XXX
1912            softmap[pkg] = \
1913                    "%s/%s/%s" %\
1914                    ( self.repo_url, path, dest)
1915
1916            # Allow the individual segments to access the software.
1917            for tb in tbparams.keys():
1918                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1919                        "/%s/%s" % ( path, dest))
1920
1921        # Convert the software locations in the segments into the local
1922        # copies on this host
1923        for soft in [ s for tb in topo.values() \
1924                for e in tb.elements \
1925                    if getattr(e, 'software', False) \
1926                        for s in e.software ]:
1927            if softmap.has_key(soft.location):
1928                soft.location = softmap[soft.location]
1929
1930
1931    def new_experiment(self, req, fid):
1932        """
1933        The external interface to empty initial experiment creation called from
1934        the dispatcher.
1935
1936        Creates a working directory, splits the incoming description using the
1937        splitter script and parses out the avrious subsections using the
1938        lcasses above.  Once each sub-experiment is created, use pooled threads
1939        to instantiate them and start it all up.
1940        """
1941        if not self.auth.check_attribute(fid, 'new'):
1942            raise service_error(service_error.access, "New access denied")
1943
1944        try:
1945            tmpdir = tempfile.mkdtemp(prefix="split-")
1946        except IOError:
1947            raise service_error(service_error.internal, "Cannot create tmp dir")
1948
1949        try:
1950            access_user = self.accessdb[fid]
1951        except KeyError:
1952            raise service_error(service_error.internal,
1953                    "Access map and authorizer out of sync in " + \
1954                            "new_experiment for fedid %s"  % fid)
1955
1956        pid = "dummy"
1957        gid = "dummy"
1958
1959        req = req.get('NewRequestBody', None)
1960        if not req:
1961            raise service_error(service_error.req,
1962                    "Bad request format (no NewRequestBody)")
1963
1964        # Generate an ID for the experiment (slice) and a certificate that the
1965        # allocator can use to prove they own it.  We'll ship it back through
1966        # the encrypted connection.
1967        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1968
1969        #now we're done with the tmpdir, and it should be empty
1970        if self.cleanup:
1971            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1972            os.rmdir(tmpdir)
1973        else:
1974            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1975
1976        eid = self.create_experiment_state(fid, req, expid, expcert, 
1977                state='empty')
1978
1979        # Let users touch the state
1980        self.auth.set_attribute(fid, expid)
1981        self.auth.set_attribute(expid, expid)
1982        # Override fedids can manipulate state as well
1983        for o in self.overrides:
1984            self.auth.set_attribute(o, expid)
1985
1986        rv = {
1987                'experimentID': [
1988                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1989                ],
1990                'experimentStatus': 'empty',
1991                'experimentAccess': { 'X509' : expcert }
1992            }
1993
1994        return rv
1995
1996
1997    def create_experiment(self, req, fid):
1998        """
1999        The external interface to experiment creation called from the
2000        dispatcher.
2001
2002        Creates a working directory, splits the incoming description using the
2003        splitter script and parses out the avrious subsections using the
2004        lcasses above.  Once each sub-experiment is created, use pooled threads
2005        to instantiate them and start it all up.
2006        """
2007
2008        req = req.get('CreateRequestBody', None)
2009        if not req:
2010            raise service_error(service_error.req,
2011                    "Bad request format (no CreateRequestBody)")
2012
2013        # Get the experiment access
2014        exp = req.get('experimentID', None)
2015        if exp:
2016            if exp.has_key('fedid'):
2017                key = exp['fedid']
2018                expid = key
2019                eid = None
2020            elif exp.has_key('localname'):
2021                key = exp['localname']
2022                eid = key
2023                expid = None
2024            else:
2025                raise service_error(service_error.req, "Unknown lookup type")
2026        else:
2027            raise service_error(service_error.req, "No request?")
2028
2029        self.check_experiment_access(fid, key)
2030
2031        try:
2032            tmpdir = tempfile.mkdtemp(prefix="split-")
2033            os.mkdir(tmpdir+"/keys")
2034        except IOError:
2035            raise service_error(service_error.internal, "Cannot create tmp dir")
2036
2037        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2038        gw_secretkey_base = "fed.%s" % self.ssh_type
2039        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2040        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2041        tclfile = tmpdir + "/experiment.tcl"
2042        tbparams = { }
2043        try:
2044            access_user = self.accessdb[fid]
2045        except KeyError:
2046            raise service_error(service_error.internal,
2047                    "Access map and authorizer out of sync in " + \
2048                            "create_experiment for fedid %s"  % fid)
2049
2050        pid = "dummy"
2051        gid = "dummy"
2052
2053        # The tcl parser needs to read a file so put the content into that file
2054        descr=req.get('experimentdescription', None)
2055        if descr:
2056            file_content=descr.get('ns2description', None)
2057            if file_content:
2058                try:
2059                    f = open(tclfile, 'w')
2060                    f.write(file_content)
2061                    f.close()
2062                except IOError:
2063                    raise service_error(service_error.internal,
2064                            "Cannot write temp experiment description")
2065            else:
2066                raise service_error(service_error.req, 
2067                        "Only ns2descriptions supported")
2068        else:
2069            raise service_error(service_error.req, "No experiment description")
2070
2071        self.state_lock.acquire()
2072        if self.state.has_key(key):
2073            self.state[key]['experimentStatus'] = "starting"
2074            for e in self.state[key].get('experimentID',[]):
2075                if not expid and e.has_key('fedid'):
2076                    expid = e['fedid']
2077                elif not eid and e.has_key('localname'):
2078                    eid = e['localname']
2079        self.state_lock.release()
2080
2081        if not (eid and expid):
2082            raise service_error(service_error.internal, 
2083                    "Cannot find local experiment info!?")
2084
2085        try: 
2086            # This catches exceptions to clear the placeholder if necessary
2087            try:
2088                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2089            except ValueError:
2090                raise service_error(service_error.server_config, 
2091                        "Bad key type (%s)" % self.ssh_type)
2092
2093            master = req.get('master', None)
2094            if not master:
2095                raise service_error(service_error.req,
2096                        "No master testbed label")
2097            export_project = req.get('exportProject', None)
2098            if not export_project:
2099                raise service_error(service_error.req, "No export project")
2100           
2101            # Translate to topdl
2102            if self.splitter_url:
2103                # XXX: need remote topdl translator
2104                self.log.debug("Calling remote splitter at %s" % \
2105                        self.splitter_url)
2106                split_data = self.remote_splitter(self.splitter_url,
2107                        file_content, master)
2108            else:
2109                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2110                    str(self.muxmax), '-m', master]
2111
2112                if self.fedkit:
2113                    tclcmd.append('-k')
2114
2115                if self.gatewaykit:
2116                    tclcmd.append('-K')
2117
2118                tclcmd.extend([pid, gid, eid, tclfile])
2119
2120                self.log.debug("running local splitter %s", " ".join(tclcmd))
2121                # This is just fantastic.  As a side effect the parser copies
2122                # tb_compat.tcl into the current directory, so that directory
2123                # must be writable by the fedd user.  Doing this in the
2124                # temporary subdir ensures this is the case.
2125                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2126                        cwd=tmpdir)
2127                split_data = tclparser.stdout
2128
2129            top = topdl.topology_from_xml(file=split_data, top="experiment")
2130
2131            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2132             # Find the testbeds to look up
2133            testbeds = set([ a.value for e in top.elements \
2134                    for a in e.attribute \
2135                    if a.attribute == 'testbed'] )
2136
2137            allocated = { }         # Testbeds we can access
2138            topo ={ }               # Sub topologies
2139            connInfo = { }          # Connection information
2140            services = [ ]
2141            self.get_access_to_testbeds(testbeds, access_user, 
2142                    export_project, master, allocated, tbparams, services)
2143            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2144
2145            # Copy configuration files into the remote file store
2146            # The config urlpath
2147            configpath = "/%s/config" % expid
2148            # The config file system location
2149            configdir ="%s%s" % ( self.repodir, configpath)
2150            try:
2151                os.makedirs(configdir)
2152            except IOError, e:
2153                raise service_error(
2154                        "Cannot create config directory: %s" % e)
2155            try:
2156                f = open("%s/hosts" % configdir, "w")
2157                f.write('\n'.join(hosts))
2158                f.close()
2159            except IOError, e:
2160                raise service_error(service_error.internal, 
2161                        "Cannot write hosts file: %s" % e)
2162            try:
2163                copy_file("%s" % gw_pubkey, "%s/%s" % \
2164                        (configdir, gw_pubkey_base))
2165                copy_file("%s" % gw_secretkey, "%s/%s" % \
2166                        (configdir, gw_secretkey_base))
2167            except IOError, e:
2168                raise service_error(service_error.internal, 
2169                        "Cannot copy keyfiles: %s" % e)
2170
2171            # Allow the individual testbeds to access the configuration files.
2172            for tb in tbparams.keys():
2173                asignee = tbparams[tb]['allocID']['fedid']
2174                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2175                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2176
2177            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
2178                    connInfo, expid)
2179            # Now get access to the dynamic testbeds
2180            for k, t in topo.items():
2181                if not t.get_attribute('dynamic'):
2182                    continue
2183                tb = t.get_attribute('testbed')
2184                if tb: 
2185                    self.get_access(tb, None, tbparams, master, 
2186                            export_project, access_user, services)
2187                    tbparams[k] = tbparams[tb]
2188                    del tbparams[tb]
2189                    allocated[k] = 1
2190                    store_keys = t.get_attribute('store_keys')
2191                    # Give the testbed access to keys it exports or imports
2192                    if store_keys:
2193                        for sk in store_keys.split(" "):
2194                            self.auth.set_attribute(\
2195                                    tbparams[k]['allocID']['fedid'], sk)
2196                else:
2197                    raise service_error(service_error.internal, 
2198                            "Dynamic allocation from no testbed!?")
2199
2200            self.wrangle_software(expid, top, topo, tbparams)
2201
2202            vtopo = topdl.topology_to_vtopo(top)
2203            vis = self.genviz(vtopo)
2204
2205            # save federant information
2206            for k in allocated.keys():
2207                tbparams[k]['federant'] = {
2208                        'name': [ { 'localname' : eid} ],
2209                        'allocID' : tbparams[k]['allocID'],
2210                        'master' : k == master,
2211                        'uri': tbparams[k]['uri'],
2212                    }
2213                if tbparams[k].has_key('emulab'):
2214                        tbparams[k]['federant']['emulab'] = \
2215                                tbparams[k]['emulab']
2216
2217            self.state_lock.acquire()
2218            self.state[eid]['vtopo'] = vtopo
2219            self.state[eid]['vis'] = vis
2220            self.state[expid]['federant'] = \
2221                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2222                        if tbparams[tb].has_key('federant') ]
2223            if self.state_filename: 
2224                self.write_state()
2225            self.state_lock.release()
2226        except service_error, e:
2227            # If something goes wrong in the parse (usually an access error)
2228            # clear the placeholder state.  From here on out the code delays
2229            # exceptions.  Failing at this point returns a fault to the remote
2230            # caller.
2231
2232            self.state_lock.acquire()
2233            del self.state[eid]
2234            del self.state[expid]
2235            if self.state_filename: self.write_state()
2236            self.state_lock.release()
2237            raise e
2238
2239
2240        # Start the background swapper and return the starting state.  From
2241        # here on out, the state will stick around a while.
2242
2243        # Let users touch the state
2244        self.auth.set_attribute(fid, expid)
2245        self.auth.set_attribute(expid, expid)
2246        # Override fedids can manipulate state as well
2247        for o in self.overrides:
2248            self.auth.set_attribute(o, expid)
2249
2250        # Create a logger that logs to the experiment's state object as well as
2251        # to the main log file.
2252        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2253        alloc_collector = self.list_log(self.state[eid]['log'])
2254        h = logging.StreamHandler(alloc_collector)
2255        # XXX: there should be a global one of these rather than repeating the
2256        # code.
2257        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2258                    '%d %b %y %H:%M:%S'))
2259        alloc_log.addHandler(h)
2260       
2261        attrs = [ 
2262                {
2263                    'attribute': 'ssh_pubkey', 
2264                    'value': '%s/%s/config/%s' % \
2265                            (self.repo_url, expid, gw_pubkey_base)
2266                },
2267                {
2268                    'attribute': 'ssh_secretkey', 
2269                    'value': '%s/%s/config/%s' % \
2270                            (self.repo_url, expid, gw_secretkey_base)
2271                },
2272                {
2273                    'attribute': 'hosts', 
2274                    'value': '%s/%s/config/hosts' % \
2275                            (self.repo_url, expid)
2276                },
2277                {
2278                    'attribute': 'experiment_name',
2279                    'value': eid,
2280                },
2281            ]
2282
2283        # transit and disconnected testbeds may not have a connInfo entry.
2284        # Fill in the blanks.
2285        for t in allocated.keys():
2286            if not connInfo.has_key(t):
2287                connInfo[t] = { }
2288
2289        # Start a thread to do the resource allocation
2290        t  = Thread(target=self.allocate_resources,
2291                args=(allocated, master, eid, expid, tbparams, 
2292                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
2293                    services),
2294                name=eid)
2295        t.start()
2296
2297        rv = {
2298                'experimentID': [
2299                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2300                ],
2301                'experimentStatus': 'starting',
2302            }
2303
2304        return rv
2305   
2306    def get_experiment_fedid(self, key):
2307        """
2308        find the fedid associated with the localname key in the state database.
2309        """
2310
2311        rv = None
2312        self.state_lock.acquire()
2313        if self.state.has_key(key):
2314            if isinstance(self.state[key], dict):
2315                try:
2316                    kl = [ f['fedid'] for f in \
2317                            self.state[key]['experimentID']\
2318                                if f.has_key('fedid') ]
2319                except KeyError:
2320                    self.state_lock.release()
2321                    raise service_error(service_error.internal, 
2322                            "No fedid for experiment %s when getting "+\
2323                                    "fedid(!?)" % key)
2324                if len(kl) == 1:
2325                    rv = kl[0]
2326                else:
2327                    self.state_lock.release()
2328                    raise service_error(service_error.internal, 
2329                            "multiple fedids for experiment %s when " +\
2330                                    "getting fedid(!?)" % key)
2331            else:
2332                self.state_lock.release()
2333                raise service_error(service_error.internal, 
2334                        "Unexpected state for %s" % key)
2335        self.state_lock.release()
2336        return rv
2337
2338    def check_experiment_access(self, fid, key):
2339        """
2340        Confirm that the fid has access to the experiment.  Though a request
2341        may be made in terms of a local name, the access attribute is always
2342        the experiment's fedid.
2343        """
2344        if not isinstance(key, fedid):
2345            key = self.get_experiment_fedid(key)
2346
2347        if self.auth.check_attribute(fid, key):
2348            return True
2349        else:
2350            raise service_error(service_error.access, "Access Denied")
2351
2352
2353    def get_handler(self, path, fid):
2354        self.log.info("Get handler %s %s" % (path, fid))
2355        if self.auth.check_attribute(fid, path):
2356            return ("%s/%s" % (self.repodir, path), "application/binary")
2357        else:
2358            return (None, None)
2359
2360    def get_vtopo(self, req, fid):
2361        """
2362        Return the stored virtual topology for this experiment
2363        """
2364        rv = None
2365        state = None
2366
2367        req = req.get('VtopoRequestBody', None)
2368        if not req:
2369            raise service_error(service_error.req,
2370                    "Bad request format (no VtopoRequestBody)")
2371        exp = req.get('experiment', None)
2372        if exp:
2373            if exp.has_key('fedid'):
2374                key = exp['fedid']
2375                keytype = "fedid"
2376            elif exp.has_key('localname'):
2377                key = exp['localname']
2378                keytype = "localname"
2379            else:
2380                raise service_error(service_error.req, "Unknown lookup type")
2381        else:
2382            raise service_error(service_error.req, "No request?")
2383
2384        self.check_experiment_access(fid, key)
2385
2386        self.state_lock.acquire()
2387        if self.state.has_key(key):
2388            if self.state[key].has_key('vtopo'):
2389                rv = { 'experiment' : {keytype: key },\
2390                        'vtopo': self.state[key]['vtopo'],\
2391                    }
2392            else:
2393                state = self.state[key]['experimentStatus']
2394        self.state_lock.release()
2395
2396        if rv: return rv
2397        else: 
2398            if state:
2399                raise service_error(service_error.partial, 
2400                        "Not ready: %s" % state)
2401            else:
2402                raise service_error(service_error.req, "No such experiment")
2403
2404    def get_vis(self, req, fid):
2405        """
2406        Return the stored visualization for this experiment
2407        """
2408        rv = None
2409        state = None
2410
2411        req = req.get('VisRequestBody', None)
2412        if not req:
2413            raise service_error(service_error.req,
2414                    "Bad request format (no VisRequestBody)")
2415        exp = req.get('experiment', None)
2416        if exp:
2417            if exp.has_key('fedid'):
2418                key = exp['fedid']
2419                keytype = "fedid"
2420            elif exp.has_key('localname'):
2421                key = exp['localname']
2422                keytype = "localname"
2423            else:
2424                raise service_error(service_error.req, "Unknown lookup type")
2425        else:
2426            raise service_error(service_error.req, "No request?")
2427
2428        self.check_experiment_access(fid, key)
2429
2430        self.state_lock.acquire()
2431        if self.state.has_key(key):
2432            if self.state[key].has_key('vis'):
2433                rv =  { 'experiment' : {keytype: key },\
2434                        'vis': self.state[key]['vis'],\
2435                        }
2436            else:
2437                state = self.state[key]['experimentStatus']
2438        self.state_lock.release()
2439
2440        if rv: return rv
2441        else:
2442            if state:
2443                raise service_error(service_error.partial, 
2444                        "Not ready: %s" % state)
2445            else:
2446                raise service_error(service_error.req, "No such experiment")
2447
2448    def clean_info_response(self, rv):
2449        """
2450        Remove the information in the experiment's state object that is not in
2451        the info response.
2452        """
2453        # Remove the owner info (should always be there, but...)
2454        if rv.has_key('owner'): del rv['owner']
2455
2456        # Convert the log into the allocationLog parameter and remove the
2457        # log entry (with defensive programming)
2458        if rv.has_key('log'):
2459            rv['allocationLog'] = "".join(rv['log'])
2460            del rv['log']
2461        else:
2462            rv['allocationLog'] = ""
2463
2464        if rv['experimentStatus'] != 'active':
2465            if rv.has_key('federant'): del rv['federant']
2466        else:
2467            # remove the allocationID and uri info from each federant
2468            for f in rv.get('federant', []):
2469                if f.has_key('allocID'): del f['allocID']
2470                if f.has_key('uri'): del f['uri']
2471        return rv
2472
2473    def get_info(self, req, fid):
2474        """
2475        Return all the stored info about this experiment
2476        """
2477        rv = None
2478
2479        req = req.get('InfoRequestBody', None)
2480        if not req:
2481            raise service_error(service_error.req,
2482                    "Bad request format (no InfoRequestBody)")
2483        exp = req.get('experiment', None)
2484        if exp:
2485            if exp.has_key('fedid'):
2486                key = exp['fedid']
2487                keytype = "fedid"
2488            elif exp.has_key('localname'):
2489                key = exp['localname']
2490                keytype = "localname"
2491            else:
2492                raise service_error(service_error.req, "Unknown lookup type")
2493        else:
2494            raise service_error(service_error.req, "No request?")
2495
2496        self.check_experiment_access(fid, key)
2497
2498        # The state may be massaged by the service function that called
2499        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2500        # state.
2501        self.state_lock.acquire()
2502        if self.state.has_key(key):
2503            rv = copy.deepcopy(self.state[key])
2504        self.state_lock.release()
2505
2506        if rv:
2507            return self.clean_info_response(rv)
2508        else:
2509            raise service_error(service_error.req, "No such experiment")
2510
2511    def get_multi_info(self, req, fid):
2512        """
2513        Return all the stored info that this fedid can access
2514        """
2515        rv = { 'info': [ ] }
2516
2517        self.state_lock.acquire()
2518        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2519            try:
2520                self.check_experiment_access(fid, key)
2521            except service_error, e:
2522                if e.code == service_error.access:
2523                    continue
2524                else:
2525                    self.state_lock.release()
2526                    raise e
2527
2528            if self.state.has_key(key):
2529                e = copy.deepcopy(self.state[key])
2530                e = self.clean_info_response(e)
2531                rv['info'].append(e)
2532        self.state_lock.release()
2533        return rv
2534
2535    def terminate_experiment(self, req, fid):
2536        """
2537        Swap this experiment out on the federants and delete the shared
2538        information
2539        """
2540        tbparams = { }
2541        req = req.get('TerminateRequestBody', None)
2542        if not req:
2543            raise service_error(service_error.req,
2544                    "Bad request format (no TerminateRequestBody)")
2545        force = req.get('force', False)
2546        exp = req.get('experiment', None)
2547        if exp:
2548            if exp.has_key('fedid'):
2549                key = exp['fedid']
2550                keytype = "fedid"
2551            elif exp.has_key('localname'):
2552                key = exp['localname']
2553                keytype = "localname"
2554            else:
2555                raise service_error(service_error.req, "Unknown lookup type")
2556        else:
2557            raise service_error(service_error.req, "No request?")
2558
2559        self.check_experiment_access(fid, key)
2560
2561        dealloc_list = [ ]
2562
2563
2564        # Create a logger that logs to the dealloc_list as well as to the main
2565        # log file.
2566        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2567        h = logging.StreamHandler(self.list_log(dealloc_list))
2568        # XXX: there should be a global one of these rather than repeating the
2569        # code.
2570        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2571                    '%d %b %y %H:%M:%S'))
2572        dealloc_log.addHandler(h)
2573
2574        self.state_lock.acquire()
2575        fed_exp = self.state.get(key, None)
2576
2577        if fed_exp:
2578            # This branch of the conditional holds the lock to generate a
2579            # consistent temporary tbparams variable to deallocate experiments.
2580            # It releases the lock to do the deallocations and reacquires it to
2581            # remove the experiment state when the termination is complete.
2582
2583            # First make sure that the experiment creation is complete.
2584            status = fed_exp.get('experimentStatus', None)
2585
2586            if status:
2587                if status in ('starting', 'terminating'):
2588                    if not force:
2589                        self.state_lock.release()
2590                        raise service_error(service_error.partial, 
2591                                'Experiment still being created or destroyed')
2592                    else:
2593                        self.log.warning('Experiment in %s state ' % status + \
2594                                'being terminated by force.')
2595            else:
2596                # No status??? trouble
2597                self.state_lock.release()
2598                raise service_error(service_error.internal,
2599                        "Experiment has no status!?")
2600
2601            ids = []
2602            #  experimentID is a list of dicts that are self-describing
2603            #  identifiers.  This finds all the fedids and localnames - the
2604            #  keys of self.state - and puts them into ids.
2605            for id in fed_exp.get('experimentID', []):
2606                if id.has_key('fedid'): ids.append(id['fedid'])
2607                if id.has_key('localname'): ids.append(id['localname'])
2608
2609            # Collect the allocation/segment ids into a dict keyed by the fedid
2610            # of the allocation (or a monotonically increasing integer) that
2611            # contains a tuple of uri, aid (which is a dict...)
2612            for i, fed in enumerate(fed_exp.get('federant', [])):
2613                try:
2614                    uri = fed['uri']
2615                    aid = fed['allocID']
2616                    k = fed['allocID'].get('fedid', i)
2617                except KeyError, e:
2618                    continue
2619                tbparams[k] = (uri, aid)
2620            fed_exp['experimentStatus'] = 'terminating'
2621            if self.state_filename: self.write_state()
2622            self.state_lock.release()
2623
2624            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2625            # then completes, so we can't wait if nothing starts.  So, no
2626            # tbparams, no start.
2627            if len(tbparams) > 0:
2628                thread_pool = self.thread_pool(self.nthreads)
2629                for k in tbparams.keys():
2630                    # Create and start a thread to stop the segment
2631                    thread_pool.wait_for_slot()
2632                    uri, aid = tbparams[k]
2633                    t  = self.pooled_thread(\
2634                            target=self.terminate_segment(log=dealloc_log,
2635                                testbed=uri,
2636                                cert_file=self.cert_file, 
2637                                cert_pwd=self.cert_pwd,
2638                                trusted_certs=self.trusted_certs,
2639                                caller=self.call_TerminateSegment),
2640                            args=(uri, aid), name=k,
2641                            pdata=thread_pool, trace_file=self.trace_file)
2642                    t.start()
2643                # Wait for completions
2644                thread_pool.wait_for_all_done()
2645
2646            # release the allocations (failed experiments have done this
2647            # already, and starting experiments may be in odd states, so we
2648            # ignore errors releasing those allocations
2649            try: 
2650                for k in tbparams.keys():
2651                    # This releases access by uri
2652                    uri, aid = tbparams[k]
2653                    self.release_access(None, aid, uri=uri)
2654            except service_error, e:
2655                if status != 'failed' and not force:
2656                    raise e
2657
2658            # Remove the terminated experiment
2659            self.state_lock.acquire()
2660            for id in ids:
2661                if self.state.has_key(id): del self.state[id]
2662
2663            if self.state_filename: self.write_state()
2664            self.state_lock.release()
2665
2666            # Delete any synch points associated with this experiment.  All
2667            # synch points begin with the fedid of the experiment.
2668            fedid_keys = set(["fedid:%s" % f for f in ids \
2669                    if isinstance(f, fedid)])
2670            for k in self.synch_store.all_keys():
2671                try:
2672                    if len(k) > 45 and k[0:46] in fedid_keys:
2673                        self.synch_store.del_value(k)
2674                except synch_store.BadDeletionError:
2675                    pass
2676            self.write_store()
2677               
2678            return { 
2679                    'experiment': exp , 
2680                    'deallocationLog': "".join(dealloc_list),
2681                    }
2682        else:
2683            # Don't forget to release the lock
2684            self.state_lock.release()
2685            raise service_error(service_error.req, "No saved state")
2686
2687
2688    def GetValue(self, req, fid):
2689        """
2690        Get a value from the synchronized store
2691        """
2692        req = req.get('GetValueRequestBody', None)
2693        if not req:
2694            raise service_error(service_error.req,
2695                    "Bad request format (no GetValueRequestBody)")
2696       
2697        name = req['name']
2698        wait = req['wait']
2699        rv = { 'name': name }
2700
2701        if self.auth.check_attribute(fid, name):
2702            try:
2703                v = self.synch_store.get_value(name, wait)
2704            except synch_store.RevokedKeyError:
2705                # No more synch on this key
2706                raise service_error(service_error.federant, 
2707                        "Synch key %s revoked" % name)
2708            if v is not None:
2709                rv['value'] = v
2710            self.log.debug("[GetValue] got %s from %s" % (v, name))
2711            return rv
2712        else:
2713            raise service_error(service_error.access, "Access Denied")
2714       
2715
2716    def SetValue(self, req, fid):
2717        """
2718        Set a value in the synchronized store
2719        """
2720        req = req.get('SetValueRequestBody', None)
2721        if not req:
2722            raise service_error(service_error.req,
2723                    "Bad request format (no SetValueRequestBody)")
2724       
2725        name = req['name']
2726        v = req['value']
2727
2728        if self.auth.check_attribute(fid, name):
2729            try:
2730                self.synch_store.set_value(name, v)
2731                self.write_store()
2732                self.log.debug("[SetValue] set %s to %s" % (name, v))
2733            except synch_store.CollisionError:
2734                # Translate into a service_error
2735                raise service_error(service_error.req,
2736                        "Value already set: %s" %name)
2737            except synch_store.RevokedKeyError:
2738                # No more synch on this key
2739                raise service_error(service_error.federant, 
2740                        "Synch key %s revoked" % name)
2741            return { 'name': name, 'value': v }
2742        else:
2743            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.