source: fedd/federation/experiment_control.py @ 1dcaff4

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 1dcaff4 was 1dcaff4, checked in by Ted Faber <faber@…>, 14 years ago

Improved SSL error handling (more try blocks, BIOError exception)
Separate get_url into util
Work with remote splitter.

  • Property mode set to 100644
File size: 93.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_control_local:
45    """
46    Control of experiments that this system can directly access.
47
48    Includes experiment creation, termination and information dissemination.
49    Thred safe.
50    """
51
52    class ssh_cmd_timeout(RuntimeError): pass
53   
54    class thread_pool:
55        """
56        A class to keep track of a set of threads all invoked for the same
57        task.  Manages the mutual exclusion of the states.
58        """
59        def __init__(self, nthreads):
60            """
61            Start a pool.
62            """
63            self.changed = Condition()
64            self.started = 0
65            self.terminated = 0
66            self.nthreads = nthreads
67
68        def acquire(self):
69            """
70            Get the pool's lock.
71            """
72            self.changed.acquire()
73
74        def release(self):
75            """
76            Release the pool's lock.
77            """
78            self.changed.release()
79
80        def wait(self, timeout = None):
81            """
82            Wait for a pool thread to start or stop.
83            """
84            self.changed.wait(timeout)
85
86        def start(self):
87            """
88            Called by a pool thread to report starting.
89            """
90            self.changed.acquire()
91            self.started += 1
92            self.changed.notifyAll()
93            self.changed.release()
94
95        def terminate(self):
96            """
97            Called by a pool thread to report finishing.
98            """
99            self.changed.acquire()
100            self.terminated += 1
101            self.changed.notifyAll()
102            self.changed.release()
103
104        def clear(self):
105            """
106            Clear all pool data.
107            """
108            self.changed.acquire()
109            self.started = 0
110            self.terminated =0
111            self.changed.notifyAll()
112            self.changed.release()
113
114        def wait_for_slot(self):
115            """
116            Wait until we have a free slot to start another pooled thread
117            """
118            self.acquire()
119            while self.started - self.terminated >= self.nthreads:
120                self.wait()
121            self.release()
122
123        def wait_for_all_done(self, timeout=None):
124            """
125            Wait until all active threads finish (and at least one has
126            started).  If a timeout is given, return after waiting that long
127            for termination.  If all threads are done (and one has started in
128            the since the last clear()) return True, otherwise False.
129            """
130            if timeout:
131                deadline = time.time() + timeout
132            self.acquire()
133            while self.started == 0 or self.started > self.terminated:
134                self.wait(timeout)
135                if timeout:
136                    if time.time() > deadline:
137                        break
138                    timeout = deadline - time.time()
139            self.release()
140            return not (self.started == 0 or self.started > self.terminated)
141
142    class pooled_thread(Thread):
143        """
144        One of a set of threads dedicated to a specific task.  Uses the
145        thread_pool class above for coordination.
146        """
147        def __init__(self, group=None, target=None, name=None, args=(), 
148                kwargs={}, pdata=None, trace_file=None):
149            Thread.__init__(self, group, target, name, args, kwargs)
150            self.rv = None          # Return value of the ops in this thread
151            self.exception = None   # Exception that terminated this thread
152            self.target=target      # Target function to run on start()
153            self.args = args        # Args to pass to target
154            self.kwargs = kwargs    # Additional kw args
155            self.pdata = pdata      # thread_pool for this class
156            # Logger for this thread
157            self.log = logging.getLogger("fedd.experiment_control")
158       
159        def run(self):
160            """
161            Emulate Thread.run, except add pool data manipulation and error
162            logging.
163            """
164            if self.pdata:
165                self.pdata.start()
166
167            if self.target:
168                try:
169                    self.rv = self.target(*self.args, **self.kwargs)
170                except service_error, s:
171                    self.exception = s
172                    self.log.error("Thread exception: %s %s" % \
173                            (s.code_string(), s.desc))
174                except:
175                    self.exception = sys.exc_info()[1]
176                    self.log.error(("Unexpected thread exception: %s" +\
177                            "Trace %s") % (self.exception,\
178                                traceback.format_exc()))
179            if self.pdata:
180                self.pdata.terminate()
181
182    call_RequestAccess = service_caller('RequestAccess')
183    call_ReleaseAccess = service_caller('ReleaseAccess')
184    call_StartSegment = service_caller('StartSegment')
185    call_TerminateSegment = service_caller('TerminateSegment')
186    call_Ns2Split = service_caller('Ns2Split')
187
188    def __init__(self, config=None, auth=None):
189        """
190        Intialize the various attributes, most from the config object
191        """
192
193        def parse_tarfile_list(tf):
194            """
195            Parse a tarfile list from the configuration.  This is a set of
196            paths and tarfiles separated by spaces.
197            """
198            rv = [ ]
199            if tf is not None:
200                tl = tf.split()
201                while len(tl) > 1:
202                    p, t = tl[0:2]
203                    del tl[0:2]
204                    rv.append((p, t))
205            return rv
206
207        self.thread_with_rv = experiment_control_local.pooled_thread
208        self.thread_pool = experiment_control_local.thread_pool
209        self.list_log = list_log.list_log
210
211        self.cert_file = config.get("experiment_control", "cert_file")
212        if self.cert_file:
213            self.cert_pwd = config.get("experiment_control", "cert_pwd")
214        else:
215            self.cert_file = config.get("globals", "cert_file")
216            self.cert_pwd = config.get("globals", "cert_pwd")
217
218        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
219                or config.get("globals", "trusted_certs")
220
221        self.repodir = config.get("experiment_control", "repodir")
222        self.repo_url = config.get("experiment_control", "repo_url", 
223                "https://users.isi.deterlab.net:23235");
224
225        self.exp_stem = "fed-stem"
226        self.log = logging.getLogger("fedd.experiment_control")
227        set_log_level(config, "experiment_control", self.log)
228        self.muxmax = 2
229        self.nthreads = 10
230        self.randomize_experiments = False
231
232        self.splitter = None
233        self.ssh_keygen = "/usr/bin/ssh-keygen"
234        self.ssh_identity_file = None
235
236
237        self.debug = config.getboolean("experiment_control", "create_debug")
238        self.cleanup = not config.getboolean("experiment_control", 
239                "leave_tmpfiles")
240        self.state_filename = config.get("experiment_control", 
241                "experiment_state")
242        self.store_filename = config.get("experiment_control", 
243                "synch_store")
244        self.store_url = config.get("experiment_control", "store_url")
245        self.splitter_url = config.get("experiment_control", "splitter_uri")
246        self.fedkit = parse_tarfile_list(\
247                config.get("experiment_control", "fedkit"))
248        self.gatewaykit = parse_tarfile_list(\
249                config.get("experiment_control", "gatewaykit"))
250        accessdb_file = config.get("experiment_control", "accessdb")
251
252        self.ssh_pubkey_file = config.get("experiment_control", 
253                "ssh_pubkey_file")
254        self.ssh_privkey_file = config.get("experiment_control",
255                "ssh_privkey_file")
256        # NB for internal master/slave ops, not experiment setup
257        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
258
259        self.overrides = set([])
260        ovr = config.get('experiment_control', 'overrides')
261        if ovr:
262            for o in ovr.split(","):
263                o = o.strip()
264                if o.startswith('fedid:'): o = o[len('fedid:'):]
265                self.overrides.add(fedid(hexstr=o))
266
267        self.state = { }
268        self.state_lock = Lock()
269        self.tclsh = "/usr/local/bin/otclsh"
270        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
271                config.get("experiment_control", "tcl_splitter",
272                        "/usr/testbed/lib/ns2ir/parse.tcl")
273        mapdb_file = config.get("experiment_control", "mapdb")
274        self.trace_file = sys.stderr
275
276        self.def_expstart = \
277                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
278                "/tmp/federate";
279        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
280                "FEDDIR/hosts";
281        self.def_gwstart = \
282                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
283                "/tmp/bridge.log";
284        self.def_mgwstart = \
285                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
286                "/tmp/bridge.log";
287        self.def_gwimage = "FBSD61-TUNNEL2";
288        self.def_gwtype = "pc";
289        self.local_access = { }
290
291        if auth:
292            self.auth = auth
293        else:
294            self.log.error(\
295                    "[access]: No authorizer initialized, creating local one.")
296            auth = authorizer()
297
298
299        if self.ssh_pubkey_file:
300            try:
301                f = open(self.ssh_pubkey_file, 'r')
302                self.ssh_pubkey = f.read()
303                f.close()
304            except IOError:
305                raise service_error(service_error.internal,
306                        "Cannot read sshpubkey")
307        else:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311        if not self.ssh_privkey_file:
312            raise service_error(service_error.internal, 
313                    "No SSH public key file?")
314
315
316        if mapdb_file:
317            self.read_mapdb(mapdb_file)
318        else:
319            self.log.warn("[experiment_control] No testbed map, using defaults")
320            self.tbmap = { 
321                    'deter':'https://users.isi.deterlab.net:23235',
322                    'emulab':'https://users.isi.deterlab.net:23236',
323                    'ucb':'https://users.isi.deterlab.net:23237',
324                    }
325
326        if accessdb_file:
327                self.read_accessdb(accessdb_file)
328        else:
329            raise service_error(service_error.internal,
330                    "No accessdb specified in config")
331
332        # Grab saved state.  OK to do this w/o locking because it's read only
333        # and only one thread should be in existence that can see self.state at
334        # this point.
335        if self.state_filename:
336            self.read_state()
337
338        if self.store_filename:
339            self.read_store()
340        else:
341            self.log.warning("No saved synch store")
342            self.synch_store = synch_store
343
344        # Dispatch tables
345        self.soap_services = {\
346                'New': soap_handler('New', self.new_experiment),
347                'Create': soap_handler('Create', self.create_experiment),
348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
352                'Terminate': soap_handler('Terminate', 
353                    self.terminate_experiment),
354                'GetValue': soap_handler('GetValue', self.GetValue),
355                'SetValue': soap_handler('SetValue', self.SetValue),
356        }
357
358        self.xmlrpc_services = {\
359                'New': xmlrpc_handler('New', self.new_experiment),
360                'Create': xmlrpc_handler('Create', self.create_experiment),
361                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
362                'Vis': xmlrpc_handler('Vis', self.get_vis),
363                'Info': xmlrpc_handler('Info', self.get_info),
364                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
365                'Terminate': xmlrpc_handler('Terminate',
366                    self.terminate_experiment),
367                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
368                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
369        }
370
371    # Call while holding self.state_lock
372    def write_state(self):
373        """
374        Write a new copy of experiment state after copying the existing state
375        to a backup.
376
377        State format is a simple pickling of the state dictionary.
378        """
379        if os.access(self.state_filename, os.W_OK):
380            copy_file(self.state_filename, \
381                    "%s.bak" % self.state_filename)
382        try:
383            f = open(self.state_filename, 'w')
384            pickle.dump(self.state, f)
385        except IOError, e:
386            self.log.error("Can't write file %s: %s" % \
387                    (self.state_filename, e))
388        except pickle.PicklingError, e:
389            self.log.error("Pickling problem: %s" % e)
390        except TypeError, e:
391            self.log.error("Pickling problem (TypeError): %s" % e)
392
393    @staticmethod
394    def get_alloc_ids(state):
395        """
396        Pull the fedids of the identifiers of each allocation from the
397        state.  Again, a dict dive that's best isolated.
398
399        Used by read_store and read state
400        """
401
402        return [ f['allocID']['fedid'] 
403                for f in state.get('federant',[]) \
404                    if f.has_key('allocID') and \
405                        f['allocID'].has_key('fedid')]
406
407    # Call while holding self.state_lock
408    def read_state(self):
409        """
410        Read a new copy of experiment state.  Old state is overwritten.
411
412        State format is a simple pickling of the state dictionary.
413        """
414       
415        def get_experiment_id(state):
416            """
417            Pull the fedid experimentID out of the saved state.  This is kind
418            of a gross walk through the dict.
419            """
420
421            if state.has_key('experimentID'):
422                for e in state['experimentID']:
423                    if e.has_key('fedid'):
424                        return e['fedid']
425                else:
426                    return None
427            else:
428                return None
429
430        try:
431            f = open(self.state_filename, "r")
432            self.state = pickle.load(f)
433            self.log.debug("[read_state]: Read state from %s" % \
434                    self.state_filename)
435        except IOError, e:
436            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
437                    % (self.state_filename, e))
438        except pickle.UnpicklingError, e:
439            self.log.warning(("[read_state]: No saved state: " + \
440                    "Unpickling failed: %s") % e)
441       
442        for s in self.state.values():
443            try:
444
445                eid = get_experiment_id(s)
446                if eid : 
447                    # Give the owner rights to the experiment
448                    self.auth.set_attribute(s['owner'], eid)
449                    # And holders of the eid as well
450                    self.auth.set_attribute(eid, eid)
451                    # allow overrides to control experiments as well
452                    for o in self.overrides:
453                        self.auth.set_attribute(o, eid)
454                    # Set permissions to allow reading of the software repo, if
455                    # any, as well.
456                    for a in self.get_alloc_ids(s):
457                        self.auth.set_attribute(a, 'repo/%s' % eid)
458                else:
459                    raise KeyError("No experiment id")
460            except KeyError, e:
461                self.log.warning("[read_state]: State ownership or identity " +\
462                        "misformatted in %s: %s" % (self.state_filename, e))
463
464
465    def read_accessdb(self, accessdb_file):
466        """
467        Read the mapping from fedids that can create experiments to their name
468        in the 3-level access namespace.  All will be asserted from this
469        testbed and can include the local username and porject that will be
470        asserted on their behalf by this fedd.  Each fedid is also added to the
471        authorization system with the "create" attribute.
472        """
473        self.accessdb = {}
474        # These are the regexps for parsing the db
475        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
476        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
477                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
478        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
479                "\s*->\s*(" + name_expr + ")\s*$")
480        lineno = 0
481
482        # Parse the mappings and store in self.authdb, a dict of
483        # fedid -> (proj, user)
484        try:
485            f = open(accessdb_file, "r")
486            for line in f:
487                lineno += 1
488                line = line.strip()
489                if len(line) == 0 or line.startswith('#'):
490                    continue
491                m = project_line.match(line)
492                if m:
493                    fid = fedid(hexstr=m.group(1))
494                    project, user = m.group(2,3)
495                    if not self.accessdb.has_key(fid):
496                        self.accessdb[fid] = []
497                    self.accessdb[fid].append((project, user))
498                    continue
499
500                m = user_line.match(line)
501                if m:
502                    fid = fedid(hexstr=m.group(1))
503                    project = None
504                    user = m.group(2)
505                    if not self.accessdb.has_key(fid):
506                        self.accessdb[fid] = []
507                    self.accessdb[fid].append((project, user))
508                    continue
509                self.log.warn("[experiment_control] Error parsing access " +\
510                        "db %s at line %d" %  (accessdb_file, lineno))
511        except IOError:
512            raise service_error(service_error.internal,
513                    "Error opening/reading %s as experiment " +\
514                            "control accessdb" %  accessdb_file)
515        f.close()
516
517        # Initialize the authorization attributes
518        for fid in self.accessdb.keys():
519            self.auth.set_attribute(fid, 'create')
520            self.auth.set_attribute(fid, 'new')
521
522    def read_mapdb(self, file):
523        """
524        Read a simple colon separated list of mappings for the
525        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
526        """
527
528        self.tbmap = { }
529        lineno =0
530        try:
531            f = open(file, "r")
532            for line in f:
533                lineno += 1
534                line = line.strip()
535                if line.startswith('#') or len(line) == 0:
536                    continue
537                try:
538                    label, url = line.split(':', 1)
539                    self.tbmap[label] = url
540                except ValueError, e:
541                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
542                            "map db: %s %s" % (lineno, line, e))
543        except IOError, e:
544            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
545                    "open %s: %s" % (file, e))
546        f.close()
547
548    def read_store(self):
549        try:
550            self.synch_store = synch_store()
551            self.synch_store.load(self.store_filename)
552            self.log.debug("[read_store]: Read store from %s" % \
553                    self.store_filename)
554        except IOError, e:
555            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
556                    % (self.state_filename, e))
557            self.synch_store = synch_store()
558
559        # Set the initial permissions on data in the store.  XXX: This ad hoc
560        # authorization attribute initialization is getting out of hand.
561        for k in self.synch_store.all_keys():
562            try:
563                if k.startswith('fedid:'):
564                    fid = fedid(hexstr=k[6:46])
565                    if self.state.has_key(fid):
566                        for a in self.get_alloc_ids(self.state[fid]):
567                            self.auth.set_attribute(a, k)
568            except ValueError, e:
569                self.log.warn("Cannot deduce permissions for %s" % k)
570
571
572    def write_store(self):
573        """
574        Write a new copy of synch_store after writing current state
575        to a backup.  We use the internal synch_store pickle method to avoid
576        incinsistent data.
577
578        State format is a simple pickling of the store.
579        """
580        if os.access(self.store_filename, os.W_OK):
581            copy_file(self.store_filename, \
582                    "%s.bak" % self.store_filename)
583        try:
584            self.synch_store.save(self.store_filename)
585        except IOError, e:
586            self.log.error("Can't write file %s: %s" % \
587                    (self.store_filename, e))
588        except TypeError, e:
589            self.log.error("Pickling problem (TypeError): %s" % e)
590
591       
592    def generate_ssh_keys(self, dest, type="rsa" ):
593        """
594        Generate a set of keys for the gateways to use to talk.
595
596        Keys are of type type and are stored in the required dest file.
597        """
598        valid_types = ("rsa", "dsa")
599        t = type.lower();
600        if t not in valid_types: raise ValueError
601        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
602
603        try:
604            trace = open("/dev/null", "w")
605        except IOError:
606            raise service_error(service_error.internal,
607                    "Cannot open /dev/null??");
608
609        # May raise CalledProcessError
610        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
611        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
612        if rv != 0:
613            raise service_error(service_error.internal, 
614                    "Cannot generate nonce ssh keys.  %s return code %d" \
615                            % (self.ssh_keygen, rv))
616
617    def gentopo(self, str):
618        """
619        Generate the topology dtat structure from the splitter's XML
620        representation of it.
621
622        The topology XML looks like:
623            <experiment>
624                <nodes>
625                    <node><vname></vname><ips>ip1:ip2</ips></node>
626                </nodes>
627                <lans>
628                    <lan>
629                        <vname></vname><vnode></vnode><ip></ip>
630                        <bandwidth></bandwidth><member>node:port</member>
631                    </lan>
632                </lans>
633        """
634        class topo_parse:
635            """
636            Parse the topology XML and create the dats structure.
637            """
638            def __init__(self):
639                # Typing of the subelements for data conversion
640                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
641                self.int_subelements = ( 'bandwidth',)
642                self.float_subelements = ( 'delay',)
643                # The final data structure
644                self.nodes = [ ]
645                self.lans =  [ ]
646                self.topo = { \
647                        'node': self.nodes,\
648                        'lan' : self.lans,\
649                    }
650                self.element = { }  # Current element being created
651                self.chars = ""     # Last text seen
652
653            def end_element(self, name):
654                # After each sub element the contents is added to the current
655                # element or to the appropriate list.
656                if name == 'node':
657                    self.nodes.append(self.element)
658                    self.element = { }
659                elif name == 'lan':
660                    self.lans.append(self.element)
661                    self.element = { }
662                elif name in self.str_subelements:
663                    self.element[name] = self.chars
664                    self.chars = ""
665                elif name in self.int_subelements:
666                    self.element[name] = int(self.chars)
667                    self.chars = ""
668                elif name in self.float_subelements:
669                    self.element[name] = float(self.chars)
670                    self.chars = ""
671
672            def found_chars(self, data):
673                self.chars += data.rstrip()
674
675
676        tp = topo_parse();
677        parser = xml.parsers.expat.ParserCreate()
678        parser.EndElementHandler = tp.end_element
679        parser.CharacterDataHandler = tp.found_chars
680
681        parser.Parse(str)
682
683        return tp.topo
684       
685
686    def genviz(self, topo):
687        """
688        Generate the visualization the virtual topology
689        """
690
691        neato = "/usr/local/bin/neato"
692        # These are used to parse neato output and to create the visualization
693        # file.
694        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
695        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
696                "%s</type></node>"
697
698        try:
699            # Node names
700            nodes = [ n['vname'] for n in topo['node'] ]
701            topo_lans = topo['lan']
702        except KeyError, e:
703            raise service_error(service_error.internal, "Bad topology: %s" %e)
704
705        lans = { }
706        links = { }
707
708        # Walk through the virtual topology, organizing the connections into
709        # 2-node connections (links) and more-than-2-node connections (lans).
710        # When a lan is created, it's added to the list of nodes (there's a
711        # node in the visualization for the lan).
712        for l in topo_lans:
713            if links.has_key(l['vname']):
714                if len(links[l['vname']]) < 2:
715                    links[l['vname']].append(l['vnode'])
716                else:
717                    nodes.append(l['vname'])
718                    lans[l['vname']] = links[l['vname']]
719                    del links[l['vname']]
720                    lans[l['vname']].append(l['vnode'])
721            elif lans.has_key(l['vname']):
722                lans[l['vname']].append(l['vnode'])
723            else:
724                links[l['vname']] = [ l['vnode'] ]
725
726
727        # Open up a temporary file for dot to turn into a visualization
728        try:
729            df, dotname = tempfile.mkstemp()
730            dotfile = os.fdopen(df, 'w')
731        except IOError:
732            raise service_error(service_error.internal,
733                    "Failed to open file in genviz")
734
735        try:
736            dnull = open('/dev/null', 'w')
737        except IOError:
738            service_error(service_error.internal,
739                    "Failed to open /dev/null in genviz")
740
741        # Generate a dot/neato input file from the links, nodes and lans
742        try:
743            print >>dotfile, "graph G {"
744            for n in nodes:
745                print >>dotfile, '\t"%s"' % n
746            for l in links.keys():
747                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
748            for l in lans.keys():
749                for n in lans[l]:
750                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
751            print >>dotfile, "}"
752            dotfile.close()
753        except TypeError:
754            raise service_error(service_error.internal,
755                    "Single endpoint link in vtopo")
756        except IOError:
757            raise service_error(service_error.internal, "Cannot write dot file")
758
759        # Use dot to create a visualization
760        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
761                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
762                close_fds=True)
763        dnull.close()
764
765        # Translate dot to vis format
766        vis_nodes = [ ]
767        vis = { 'node': vis_nodes }
768        for line in dot.stdout:
769            m = vis_re.match(line)
770            if m:
771                vn = m.group(1)
772                vis_node = {'name': vn, \
773                        'x': float(m.group(2)),\
774                        'y' : float(m.group(3)),\
775                    }
776                if vn in links.keys() or vn in lans.keys():
777                    vis_node['type'] = 'lan'
778                else:
779                    vis_node['type'] = 'node'
780                vis_nodes.append(vis_node)
781        rv = dot.wait()
782
783        os.remove(dotname)
784        if rv == 0 : return vis
785        else: return None
786
787    def get_access(self, tb, nodes, tbparam, master, export_project,
788            access_user, services):
789        """
790        Get access to testbed through fedd and set the parameters for that tb
791        """
792        uri = self.tbmap.get(tb, None)
793        if not uri:
794            raise service_error(service_error.server_config, 
795                    "Unknown testbed: %s" % tb)
796
797        # Tweak search order so that if there are entries in access_user that
798        # have a project matching the export project, we try them first
799        if export_project and export_project.has_key('localname'): 
800            pn = export_project['localname'] 
801               
802            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
803            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
804        else: 
805            access_sequence = access_user
806
807        for p, u in access_sequence: 
808            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
809                    "to %s") %  ((p or "None"), u, uri))
810
811            if p:
812                # Request with user and project specified
813                req = {\
814                        'destinationTestbed' : { 'uri' : uri },
815                        'credential': [ "project: %s" % p, "user: %s"  % u],
816                        'allocID' : { 'localname': 'test' },
817                    }
818            else:
819                # Request with only user specified
820                req = {\
821                        'destinationTestbed' : { 'uri' : uri },
822                        'credential': [ 'user: %s' % u ],
823                        'user':  [ {'userID': { 'localname': u } } ],
824                        'allocID' : { 'localname': 'test' },
825                    }
826
827            if tb == master:
828                # NB, the export_project parameter is a dict that includes
829                # the type
830                req['exportProject'] = export_project
831                req['service'] = [
832                        { 'name': 'userconfig', 'visibility': 'export'},
833                        { 'name': 'SMB', 'visibility': 'export'},
834                        { 'name': 'seer', 'visibility': 'export'},
835                        { 'name': 'tmcd', 'visibility': 'export'},
836                    ]
837
838            # node resources if any
839            if nodes != None and len(nodes) > 0:
840                rnodes = [ ]
841                for n in nodes:
842                    rn = { }
843                    image, hw, count = n.split(":")
844                    if image: rn['image'] = [ image ]
845                    if hw: rn['hardware'] = [ hw ]
846                    if count and int(count) >0 : rn['count'] = int(count)
847                    rnodes.append(rn)
848                req['resources']= { }
849                req['resources']['node'] = rnodes
850
851            try:
852                if self.local_access.has_key(uri):
853                    # Local access call
854                    req = { 'RequestAccessRequestBody' : req }
855                    r = self.local_access[uri].RequestAccess(req, 
856                            fedid(file=self.cert_file))
857                    r = { 'RequestAccessResponseBody' : r }
858                else:
859                    r = self.call_RequestAccess(uri, req, 
860                            self.cert_file, self.cert_pwd, self.trusted_certs)
861            except service_error, e:
862                if e.code == service_error.access:
863                    self.log.debug("[get_access] Access denied")
864                    r = None
865                    continue
866                else:
867                    raise e
868
869            if r.has_key('RequestAccessResponseBody'):
870                # Through to here we have a valid response, not a fault.
871                # Access denied is a fault, so something better or worse than
872                # access denied has happened.
873                r = r['RequestAccessResponseBody']
874                self.log.debug("[get_access] Access granted")
875                break
876            else:
877                raise service_error(service_error.protocol,
878                        "Bad proxy response")
879       
880        if not r:
881            raise service_error(service_error.access, 
882                    "Access denied by %s (%s)" % (tb, uri))
883
884        tbparam[tb] = { 
885                "allocID" : r['allocID'],
886                "uri": uri,
887                }
888        if 'service' in r: 
889            services.extend(r['service'])
890
891        # Add attributes to parameter space.  We don't allow attributes to
892        # overlay any parameters already installed.
893        for a in r.get('fedAttr', []):
894            try:
895                if a['attribute'] and \
896                        isinstance(a['attribute'], basestring)\
897                        and not tbparam[tb].has_key(a['attribute'].lower()):
898                    tbparam[tb][a['attribute'].lower()] = a['value']
899            except KeyError:
900                self.log.error("Bad attribute in response: %s" % a)
901
902    def release_access(self, tb, aid, uri=None):
903        """
904        Release access to testbed through fedd
905        """
906
907        if not uri:
908            uri = self.tbmap.get(tb, None)
909        if not uri:
910            raise service_error(service_error.server_config, 
911                    "Unknown testbed: %s" % tb)
912
913        if self.local_access.has_key(uri):
914            resp = self.local_access[uri].ReleaseAccess(\
915                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
916                    fedid(file=self.cert_file))
917            resp = { 'ReleaseAccessResponseBody': resp } 
918        else:
919            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
920                    self.cert_file, self.cert_pwd, self.trusted_certs)
921
922        # better error coding
923
924    def remote_splitter(self, uri, desc, master):
925
926        req = {
927                'description' : { 'ns2description': desc },
928                'master': master,
929                'include_fedkit': bool(self.fedkit),
930                'include_gatewaykit': bool(self.gatewaykit)
931            }
932
933        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
934                self.trusted_certs)
935
936        if r.has_key('Ns2SplitResponseBody'):
937            r = r['Ns2SplitResponseBody']
938            ed = r.get('experimentdescription', None)
939            if ed.has_key('topdldescription'):
940                return topdl.Topology(**ed['topdldescription'])
941            else:
942                raise service_error(service_error.protocol, 
943                        "Bad splitter response (no output)")
944        else:
945            raise service_error(service_error.protocol, "Bad splitter response")
946
947    class start_segment:
948        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
949                cert_pwd=None, trusted_certs=None, caller=None,
950                log_collector=None):
951            self.log = log
952            self.debug = debug
953            self.cert_file = cert_file
954            self.cert_pwd = cert_pwd
955            self.trusted_certs = None
956            self.caller = caller
957            self.testbed = testbed
958            self.log_collector = log_collector
959            self.response = None
960
961        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
962                services=None):
963            req = {
964                    'allocID': { 'fedid' : aid }, 
965                    'segmentdescription': { 
966                        'topdldescription': topo.to_dict(),
967                    },
968                    'master': master,
969                }
970
971            if connInfo:
972                req['connection'] = connInfo
973            # Add services to request.  The master exports, everyone else
974            # imports.
975            if services:
976                svcs = [ x.copy() for x in services]
977                for s in svcs:
978                    if master: s['visibility'] = 'export'
979                    else: s['visibility'] = 'import'
980                req['service'] = svcs
981            if attrs:
982                req['fedAttr'] = attrs
983
984            try:
985                self.log.debug("Calling StartSegment at %s " % uri)
986                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
987                        self.trusted_certs)
988                if r.has_key('StartSegmentResponseBody'):
989                    lval = r['StartSegmentResponseBody'].get('allocationLog',
990                            None)
991                    if lval and self.log_collector:
992                        for line in  lval.splitlines(True):
993                            self.log_collector.write(line)
994                    self.response = r
995                else:
996                    raise service_error(service_error.internal, 
997                            "Bad response!?: %s" %r)
998                return True
999            except service_error, e:
1000                self.log.error("Start segment failed on %s: %s" % \
1001                        (self.testbed, e))
1002                return False
1003
1004
1005
1006    class terminate_segment:
1007        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1008                cert_pwd=None, trusted_certs=None, caller=None):
1009            self.log = log
1010            self.debug = debug
1011            self.cert_file = cert_file
1012            self.cert_pwd = cert_pwd
1013            self.trusted_certs = None
1014            self.caller = caller
1015            self.testbed = testbed
1016
1017        def __call__(self, uri, aid ):
1018            req = {
1019                    'allocID': aid , 
1020                }
1021            try:
1022                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1023                        self.trusted_certs)
1024                return True
1025            except service_error, e:
1026                self.log.error("Terminate segment failed on %s: %s" % \
1027                        (self.testbed, e))
1028                return False
1029   
1030
1031    def allocate_resources(self, allocated, master, eid, expid, 
1032            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
1033            attrs=None, connInfo={}, services=[]):
1034
1035        started = { }           # Testbeds where a sub-experiment started
1036                                # successfully
1037
1038        # XXX
1039        fail_soft = False
1040
1041        log = alloc_log or self.log
1042
1043        thread_pool = self.thread_pool(self.nthreads)
1044        threads = [ ]
1045
1046        for tb in allocated.keys():
1047            # Create and start a thread to start the segment, and save it
1048            # to get the return value later
1049            thread_pool.wait_for_slot()
1050            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1051            if not uri:
1052                raise service_error(service_error.internal, 
1053                        "Unknown testbed %s !?" % tb)
1054
1055            if tbparams[tb].has_key('allocID') and \
1056                    tbparams[tb]['allocID'].has_key('fedid'):
1057                aid = tbparams[tb]['allocID']['fedid']
1058            else:
1059                raise service_error(service_error.internal, 
1060                        "No alloc id for testbed %s !?" % tb)
1061
1062            t  = self.pooled_thread(\
1063                    target=self.start_segment(log=log, debug=self.debug,
1064                        testbed=tb, cert_file=self.cert_file,
1065                        cert_pwd=self.cert_pwd,
1066                        trusted_certs=self.trusted_certs,
1067                        caller=self.call_StartSegment,
1068                        log_collector=log_collector), 
1069                    args=(uri, aid, topo[tb], tb == master, 
1070                        attrs, connInfo[tb], services),
1071                    name=tb,
1072                    pdata=thread_pool, trace_file=self.trace_file)
1073            threads.append(t)
1074            t.start()
1075
1076        # Wait until all finish (keep pinging the log, though)
1077        mins = 0
1078        revoked = False
1079        while not thread_pool.wait_for_all_done(60.0):
1080            mins += 1
1081            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1082                    % mins)
1083            if not revoked and \
1084                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1085                # a testbed has failed.  Revoke this experiment's
1086                # synchronizarion values so that sub experiments will not
1087                # deadlock waiting for synchronization that will never happen
1088                self.log.info("A subexperiment has failed to swap in, " + \
1089                        "revoking synch keys")
1090                var_key = "fedid:%s" % expid
1091                for k in self.synch_store.all_keys():
1092                    if len(k) > 45 and k[0:46] == var_key:
1093                        self.synch_store.revoke_key(k)
1094                revoked = True
1095
1096        failed = [ t.getName() for t in threads if not t.rv ]
1097        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1098
1099        # If one failed clean up, unless fail_soft is set
1100        if failed:
1101            if not fail_soft:
1102                thread_pool.clear()
1103                for tb in succeeded:
1104                    # Create and start a thread to stop the segment
1105                    thread_pool.wait_for_slot()
1106                    uri = tbparams[tb]['uri']
1107                    t  = self.pooled_thread(\
1108                            target=self.terminate_segment(log=log,
1109                                testbed=tb,
1110                                cert_file=self.cert_file, 
1111                                cert_pwd=self.cert_pwd,
1112                                trusted_certs=self.trusted_certs,
1113                                caller=self.call_TerminateSegment),
1114                            args=(uri, tbparams[tb]['federant']['allocID']),
1115                            name=tb,
1116                            pdata=thread_pool, trace_file=self.trace_file)
1117                    t.start()
1118                # Wait until all finish (if any are being stopped)
1119                if succeeded:
1120                    thread_pool.wait_for_all_done()
1121
1122                # release the allocations
1123                for tb in tbparams.keys():
1124                    self.release_access(tb, tbparams[tb]['allocID'],
1125                            tbparams[tb].get('uri', None))
1126                # Remove the placeholder
1127                self.state_lock.acquire()
1128                self.state[eid]['experimentStatus'] = 'failed'
1129                if self.state_filename: self.write_state()
1130                self.state_lock.release()
1131
1132                log.error("Swap in failed on %s" % ",".join(failed))
1133                return
1134        else:
1135            log.info("[start_segment]: Experiment %s active" % eid)
1136
1137
1138        # Walk up tmpdir, deleting as we go
1139        if self.cleanup:
1140            log.debug("[start_experiment]: removing %s" % tmpdir)
1141            for path, dirs, files in os.walk(tmpdir, topdown=False):
1142                for f in files:
1143                    os.remove(os.path.join(path, f))
1144                for d in dirs:
1145                    os.rmdir(os.path.join(path, d))
1146            os.rmdir(tmpdir)
1147        else:
1148            log.debug("[start_experiment]: not removing %s" % tmpdir)
1149
1150        # Insert the experiment into our state and update the disk copy
1151        self.state_lock.acquire()
1152        self.state[expid]['experimentStatus'] = 'active'
1153        self.state[eid] = self.state[expid]
1154        if self.state_filename: self.write_state()
1155        self.state_lock.release()
1156        return
1157
1158
1159    def add_kit(self, e, kit):
1160        """
1161        Add a Software object created from the list of (install, location)
1162        tuples passed as kit  to the software attribute of an object e.  We
1163        do this enough to break out the code, but it's kind of a hack to
1164        avoid changing the old tuple rep.
1165        """
1166
1167        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1168
1169        if isinstance(e.software, list): e.software.extend(s)
1170        else: e.software = s
1171
1172
1173    def create_experiment_state(self, fid, req, expid, expcert, 
1174            state='starting'):
1175        """
1176        Create the initial entry in the experiment's state.  The expid and
1177        expcert are the experiment's fedid and certifacte that represents that
1178        ID, which are installed in the experiment state.  If the request
1179        includes a suggested local name that is used if possible.  If the local
1180        name is already taken by an experiment owned by this user that has
1181        failed, it is overwritten.  Otherwise new letters are added until a
1182        valid localname is found.  The generated local name is returned.
1183        """
1184
1185        if req.has_key('experimentID') and \
1186                req['experimentID'].has_key('localname'):
1187            overwrite = False
1188            eid = req['experimentID']['localname']
1189            # If there's an old failed experiment here with the same local name
1190            # and accessible by this user, we'll overwrite it, otherwise we'll
1191            # fall through and do the collision avoidance.
1192            old_expid = self.get_experiment_fedid(eid)
1193            if old_expid and self.check_experiment_access(fid, old_expid):
1194                self.state_lock.acquire()
1195                status = self.state[eid].get('experimentStatus', None)
1196                if status and status == 'failed':
1197                    # remove the old access attribute
1198                    self.auth.unset_attribute(fid, old_expid)
1199                    overwrite = True
1200                    del self.state[eid]
1201                    del self.state[old_expid]
1202                self.state_lock.release()
1203            self.state_lock.acquire()
1204            while (self.state.has_key(eid) and not overwrite):
1205                eid += random.choice(string.ascii_letters)
1206            # Initial state
1207            self.state[eid] = {
1208                    'experimentID' : \
1209                            [ { 'localname' : eid }, {'fedid': expid } ],
1210                    'experimentStatus': state,
1211                    'experimentAccess': { 'X509' : expcert },
1212                    'owner': fid,
1213                    'log' : [],
1214                }
1215            self.state[expid] = self.state[eid]
1216            if self.state_filename: self.write_state()
1217            self.state_lock.release()
1218        else:
1219            eid = self.exp_stem
1220            for i in range(0,5):
1221                eid += random.choice(string.ascii_letters)
1222            self.state_lock.acquire()
1223            while (self.state.has_key(eid)):
1224                eid = self.exp_stem
1225                for i in range(0,5):
1226                    eid += random.choice(string.ascii_letters)
1227            # Initial state
1228            self.state[eid] = {
1229                    'experimentID' : \
1230                            [ { 'localname' : eid }, {'fedid': expid } ],
1231                    'experimentStatus': state,
1232                    'experimentAccess': { 'X509' : expcert },
1233                    'owner': fid,
1234                    'log' : [],
1235                }
1236            self.state[expid] = self.state[eid]
1237            if self.state_filename: self.write_state()
1238            self.state_lock.release()
1239
1240        return eid
1241
1242
1243    def allocate_ips_to_topo(self, top):
1244        """
1245        Add an ip4_address attribute to all the hosts in the topology, based on
1246        the shared substrates on which they sit.  An /etc/hosts file is also
1247        created and returned as a list of hostfiles entries.  We also return
1248        the allocator, because we may need to allocate IPs to portals
1249        (specifically DRAGON portals).
1250        """
1251        subs = sorted(top.substrates, 
1252                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1253                reverse=True)
1254        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1255        ifs = { }
1256        hosts = [ ]
1257
1258        for idx, s in enumerate(subs):
1259            net_size = len(s.interfaces)+2
1260
1261            a = ips.allocate(net_size)
1262            if a :
1263                base, num = a
1264                if num < net_size: 
1265                    raise service_error(service_error.internal,
1266                            "Allocator returned wrong number of IPs??")
1267            else:
1268                raise service_error(service_error.req, 
1269                        "Cannot allocate IP addresses")
1270            mask = ips.min_alloc
1271            while mask < net_size:
1272                mask *= 2
1273
1274            netmask = ((2**32-1) ^ (mask-1))
1275
1276            base += 1
1277            for i in s.interfaces:
1278                i.attribute.append(
1279                        topdl.Attribute('ip4_address', 
1280                            "%s" % ip_addr(base)))
1281                i.attribute.append(
1282                        topdl.Attribute('ip4_netmask', 
1283                            "%s" % ip_addr(int(netmask))))
1284
1285                hname = i.element.name[0]
1286                if ifs.has_key(hname):
1287                    hosts.append("%s\t%s-%s %s-%d" % \
1288                            (ip_addr(base), hname, s.name, hname,
1289                                ifs[hname]))
1290                else:
1291                    ifs[hname] = 0
1292                    hosts.append("%s\t%s-%s %s-%d %s" % \
1293                            (ip_addr(base), hname, s.name, hname,
1294                                ifs[hname], hname))
1295
1296                ifs[hname] += 1
1297                base += 1
1298        return hosts, ips
1299
1300    def get_access_to_testbeds(self, testbeds, access_user, 
1301            export_project, master, allocated, tbparams, services):
1302        """
1303        Request access to the various testbeds required for this instantiation
1304        (passed in as testbeds).  User, access_user, expoert_project and master
1305        are used to construct the correct requests.  Per-testbed parameters are
1306        returned in tbparams.
1307        """
1308        for tb in testbeds:
1309            self.get_access(tb, None, tbparams, master,
1310                    export_project, access_user, services)
1311            allocated[tb] = 1
1312
1313    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1314        """
1315        Create the sub-topologies that are needed for experiment instantiation.
1316        """
1317        for tb in testbeds:
1318            topo[tb] = top.clone()
1319            to_delete = [ ]
1320            # XXX: copy in for loop to simplify
1321            for e in topo[tb].elements:
1322                etb = e.get_attribute('testbed')
1323                if etb and etb != tb:
1324                    for i in e.interface:
1325                        for s in i.subs:
1326                            try:
1327                                s.interfaces.remove(i)
1328                            except ValueError:
1329                                raise service_error(service_error.internal,
1330                                        "Can't remove interface??")
1331                    to_delete.append(e)
1332            for e in to_delete:
1333                topo[tb].elements.remove(e)
1334            topo[tb].make_indices()
1335
1336    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1337            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
1338            expid=None):
1339        """
1340        Return a new internet portal node and a dict with the connectionInfo to
1341        be attached.
1342        """
1343        dproject = tbparams[dt].get('project', 'project')
1344        ddomain = tbparams[dt].get('domain', ".example.com")
1345        mdomain = tbparams[master].get('domain', '.example.com')
1346        mproject = tbparams[master].get('project', 'project')
1347        muser = tbparams[master].get('user', 'root')
1348        smbshare = tbparams[master].get('smbshare', 'USERS')
1349
1350        if st == master or dt == master:
1351            active = ("%s" % (st == master))
1352        else:
1353            active = ("%s" % (st > dt))
1354
1355        ifaces = [ ]
1356        for sub, attrs in iface_desc:
1357            inf = topdl.Interface(
1358                    name="inf%03d" % len(ifaces),
1359                    substrate=sub,
1360                    attribute=[
1361                        topdl.Attribute(
1362                            attribute=n,
1363                            value = v)
1364                        for n, v in attrs
1365                        ]
1366                    )
1367            ifaces.append(inf)
1368        if conn_type == "ssh":
1369            try:
1370                aid = tbparams[st]['allocID']['fedid']
1371            except:
1372                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
1373                        % st)
1374                aid = None
1375            info = {
1376                    "type" : conn_type, 
1377                    "portal": myname,
1378                    'fedAttr': [ 
1379                            { 'attribute': 'masterdomain', 'value': mdomain},
1380                            { 'attribute': 'masterexperiment', 'value': 
1381                                "%s/%s" % (mproject, eid)},
1382                            { 'attribute': 'active', 'value': active},
1383                            # Move to SMB service description
1384                            { 'attribute': 'masteruser', 'value': muser},
1385                            { 'attribute': 'smbshare', 'value': smbshare},
1386                        ],
1387                    'parameter': [
1388                        {
1389                            'name': 'peer',
1390                            'key': 'fedid:%s/%s' % (expid, myname),
1391                            'store': self.store_url,
1392                            'type': 'output',
1393                        },
1394                        {
1395                            'name': 'ssh_port',
1396                            'key': 'fedid:%s/%s-port' % (expid, myname),
1397                            'store': self.store_url,
1398                            'type': 'output',
1399                        },
1400                        {
1401                            'name': 'peer',
1402                            'key': 'fedid:%s/%s' % (expid, desthost),
1403                            'store': self.store_url,
1404                            'type': 'input',
1405                        },
1406                        {
1407                            'name': 'ssh_port',
1408                            'key': 'fedid:%s/%s-port' % (expid, desthost),
1409                            'store': self.store_url,
1410                            'type': 'input',
1411                        },
1412                        ]
1413                    }
1414            # Give this allocation the rights to access the key of the
1415            # peers
1416            if aid:
1417                for h in (myname, desthost):
1418                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
1419                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
1420                            (expid, h))
1421            else:
1422                self.log.error("No aid for %s in new_portal_node" % st)
1423        else:
1424            info = None
1425
1426        return (topdl.Computer(
1427                name=myname,
1428                attribute=[ 
1429                    topdl.Attribute(attribute=n,value=v)
1430                        for n, v in (\
1431                            ('portal', 'true'),
1432                            ('portal_type', portal_type), 
1433                        )
1434                    ],
1435                interface=ifaces,
1436                ), info)
1437
1438    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
1439        ddomain = tbparams[dt].get('domain', ".example.com")
1440        dproject = tbparams[dt].get('project', 'project')
1441        tsubstrate = \
1442                topdl.Substrate(name='%s-%s' % (st, dt),
1443                        attribute= [
1444                            topdl.Attribute(
1445                                attribute='portal',
1446                                value='true')
1447                            ]
1448                        )
1449        segment_element = topdl.Segment(
1450                id= tbparams[dt]['allocID'],
1451                type='emulab',
1452                uri = self.tbmap.get(dt, None),
1453                interface=[ 
1454                    topdl.Interface(
1455                        substrate=tsubstrate.name),
1456                    ],
1457                attribute = [
1458                    topdl.Attribute(attribute=n, value=v)
1459                        for n, v in (\
1460                            ('domain', ddomain),
1461                            ('experiment', "%s/%s" % \
1462                                    (dproject, eid)),)
1463                    ],
1464                )
1465
1466        return (tsubstrate, segment_element)
1467
1468    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
1469        if sub.capacity is None:
1470            raise service_error(service_error.internal,
1471                    "Cannot DRAGON split substrate w/o capacity")
1472        segs = [ ]
1473        substr = topdl.Substrate(name="dragon%d" % idx, 
1474                capacity=sub.capacity.clone(),
1475                attribute=[ topdl.Attribute(attribute=n, value=v)
1476                    for n, v, in (\
1477                            ('vlan', 'unassigned%d' % idx),)])
1478        name = "dragon%d" % idx
1479        store_key = 'fedid:%s/vlan%d' % (expid, idx)
1480        for tb in tbs.keys():
1481            seg = topdl.Segment(
1482                    id = tbparams[tb]['allocID'],
1483                    type='emulab',
1484                    uri = self.tbmap.get(tb, None),
1485                    interface=[ 
1486                        topdl.Interface(
1487                            substrate=substr.name),
1488                        ],
1489                    attribute=[ topdl.Attribute(
1490                        attribute='dragon_endpoint', 
1491                        value=tbparams[tb]['dragon']),
1492                        ]
1493                    )
1494            if tbparams[tb].has_key('vlans'):
1495                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1496            segs.append(seg)
1497
1498            # Give this allocation the rights to access the key of the
1499            # vlan_id
1500            try:
1501                aid = tbparams[tb]['allocID']['fedid']
1502                self.auth.set_attribute(aid, store_key)
1503            except:
1504                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
1505                        % tb)
1506
1507        connInfo[name] = [ { 
1508            'type': 'transit',
1509            'parameter': [ { 
1510                'name': 'vlan_id',
1511                'key': store_key,
1512                'store': self.store_url,
1513                'type': 'output'
1514                } ]
1515            } ]
1516
1517        topo[name] = \
1518                topdl.Topology(substrates=[substr], elements=segs,
1519                        attribute=[
1520                            topdl.Attribute(attribute="transit", value='true'),
1521                            topdl.Attribute(attribute="dynamic", value='true'),
1522                            topdl.Attribute(attribute="testbed", 
1523                                value='dragon'),
1524                            topdl.Attribute(attribute="store_keys", 
1525                                value=store_key),
1526                            ]
1527                        )
1528
1529    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
1530            connInfo, expid=None):
1531        """
1532        Add attribiutes to the various elements indicating that they are to be
1533        dragon connected and create a dragon segment in topo to be
1534        instantiated.
1535        """
1536
1537        def get_substrate_from_topo(name, t):
1538            for s in t.substrates:
1539                if s.name == name: return s
1540            else: return None
1541
1542
1543        mdomain = tbparams[master].get('domain', '.example.com')
1544        mproject = tbparams[master].get('project', 'project')
1545        # dn is the number of previously created dragon nets.  This routine
1546        # creates a net numbered by dn
1547        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1548        # Count the number of interfaces on this substrate in each testbed from
1549        # the global topology
1550        count = { }
1551        node = { }
1552        for e in [ i.element for i in sub.interfaces ]:
1553            tb = e.get_attribute('testbed')
1554            count[tb] = count.get(tb, 0) + 1
1555            node[tb] = i.get_attribute('ip4_address')
1556
1557
1558        # Set the attributes in the copies that will allow setup of dragon
1559        # connections.
1560        for tb in tbs.keys():
1561            s = get_substrate_from_topo(sub.name, topo[tb])
1562            if s:
1563                if not connInfo.has_key(tb):
1564                    connInfo[tb] = [ ]
1565
1566                try:
1567                    aid = tbparams[tb]['allocID']['fedid']
1568                except:
1569                    self.log.debug("[creat_dragon_substrate] " + 
1570                            "Can't get alloc id for %s?" %tb)
1571                    aid = None
1572
1573                # This may need another look, but only a service gateway will
1574                # look at the active parameter, and these are only inserted to
1575                # connect to the master.
1576                active = "%s" % ( tb == master)
1577                info = {
1578                        'type': 'transit',
1579                        'member': [ {
1580                            'element': i.element.name[0], 
1581                            'interface': i.name
1582                            } for i in s.interfaces \
1583                                    if isinstance(i.element, topdl.Computer) ],
1584                        'fedAttr': [ 
1585                            { 'attribute': 'masterdomain', 'value': mdomain},
1586                            { 'attribute': 'masterexperiment', 'value': 
1587                                "%s/%s" % (mproject, eid)},
1588                            { 'attribute': 'active', 'value': active},
1589                            ],
1590                        'parameter': [ {
1591                            'name': 'vlan_id',
1592                            'key': 'fedid:%s/vlan%d' % (expid, dn),
1593                            'store': self.store_url,
1594                            'type': 'input',
1595                            } ]
1596                        }
1597                if tbs.has_key(tb):
1598                    info['peer'] = tbs[tb]
1599                connInfo[tb].append(info)
1600
1601                # Give this allocation the rights to access the key of the
1602                # vlan_id
1603                if aid:
1604                    self.auth.set_attribute(aid, 
1605                            'fedid:%s/vlan%d' % (expid, dn))
1606            else:
1607                raise service_error(service_error.internal,
1608                        "No substrate %s in testbed %s" % (sub.name, tb))
1609
1610        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
1611
1612    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1613            segment_substrate, portals, connInfo, expid):
1614        # More than one testbed is on this substrate.  Insert
1615        # some portals into the subtopologies.  st == source testbed,
1616        # dt == destination testbed.
1617        for st in tbs.keys():
1618            if not segment_substrate.has_key(st):
1619                segment_substrate[st] = { }
1620            if not portals.has_key(st): 
1621                portals[st] = { }
1622            if not connInfo.has_key(st):
1623                connInfo[st] = [ ]
1624            for dt in [ t for t in tbs.keys() if t != st]:
1625                sproject = tbparams[st].get('project', 'project')
1626                dproject = tbparams[dt].get('project', 'project')
1627                mproject = tbparams[master].get('project', 'project')
1628                sdomain = tbparams[st].get('domain', ".example.com")
1629                ddomain = tbparams[dt].get('domain', ".example.com")
1630                mdomain = tbparams[master].get('domain', '.example.com')
1631                muser = tbparams[master].get('user', 'root')
1632                smbshare = tbparams[master].get('smbshare', 'USERS')
1633                aid = tbparams[dt]['allocID']['fedid']
1634                if st == master or dt == master:
1635                    active = ("%s" % (st == master))
1636                else:
1637                    active = ("%s" %(st > dt))
1638                if not segment_substrate[st].has_key(dt):
1639                    # Put a substrate and a segment for the connected
1640                    # testbed in there.
1641                    tsubstrate, segment_element = \
1642                            self.new_portal_substrate(st, dt, eid, tbparams,
1643                                    expid)
1644                    segment_substrate[st][dt] = tsubstrate
1645                    topo[st].substrates.append(tsubstrate)
1646                    topo[st].elements.append(segment_element)
1647
1648                new_portal = False
1649                if portals[st].has_key(dt):
1650                    # There's a portal set up to go to this destination.
1651                    # See if there's room to multiplex this connection on
1652                    # it.  If so, add an interface to the portal; if not,
1653                    # set up to add a portal below.
1654                    # [This little festival of braces is just a pop of the
1655                    # last element in the list of portals between st and
1656                    # dt.]
1657                    portal = portals[st][dt][-1]
1658                    mux = len([ i for i in portal.interface \
1659                            if not i.get_attribute('portal')])
1660                    if mux == self.muxmax:
1661                        new_portal = True
1662                        portal_type = "experiment"
1663                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1664                        desthost = "%stunnel%d" % (st.lower(), 
1665                                len(portals[st][dt]))
1666                    else:
1667                        new_i = topdl.Interface(
1668                                substrate=sub.name,
1669                                attribute=[ 
1670                                    topdl.Attribute(
1671                                        attribute='ip4_address', 
1672                                        value=tbs[dt]
1673                                    )
1674                                ])
1675                        portal.interface.append(new_i)
1676                else:
1677                    # First connection to this testbed, make an empty list
1678                    # and set up to add the new portal below
1679                    new_portal = True
1680                    portals[st][dt] = [ ]
1681                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1682                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
1683
1684                    if dt == master or st == master: portal_type = "both"
1685                    else: portal_type = "experiment"
1686
1687                if new_portal:
1688                    infs = (
1689                            (segment_substrate[st][dt].name, 
1690                                (('portal', 'true'),)),
1691                            (sub.name, 
1692                                (('ip4_address', tbs[dt]),))
1693                        )
1694                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
1695                            master, eid, myname, desthost, portal_type,
1696                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
1697                    #if self.fedkit:
1698                        #self.add_kit(portal, self.fedkit)
1699                    #if self.gatewaykit:
1700                        #self.add_kit(portal, self.gatewaykit)
1701
1702                    topo[st].elements.append(portal)
1703                    portals[st][dt].append(portal)
1704                    connInfo[st].append(info)
1705
1706    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo, expid):
1707        # Add to the master testbed
1708        tsubstrate, segment_element = \
1709                self.new_portal_substrate(st, dt, eid, tbparams, expid)
1710        myname = "%stunnel" % dt
1711        desthost = "%stunnel" % st
1712
1713        portal, info = self.new_portal_node(st, dt, tbparams, master,
1714                eid, myname, desthost, "control", 
1715                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
1716                conn_attrs=[], expid=expid)
1717
1718        topo[st].substrates.append(tsubstrate)
1719        topo[st].elements.append(segment_element)
1720        topo[st].elements.append(portal)
1721        if not connInfo.has_key(st):
1722            connInfo[st] = [ ]
1723        connInfo[st].append(info)
1724
1725    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1726            substrate, tbparams, expid):
1727        # Add to the master testbed
1728        myname = "%stunnel" % dt
1729        desthost = "%s" % ip_addr(dip)
1730
1731        portal, info = self.new_portal_node(st, dt, tbparams, master,
1732                eid, myname, desthost, "control", 
1733                ((substrate.name,(
1734                    ('portal','true'),
1735                    ('ip4_address', "%s" % ip_addr(myip)),)),),
1736                conn_type="transit", conn_attrs=[], expid=expid)
1737
1738        return portal
1739
1740    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
1741            connInfo, expid):
1742        """
1743        For each substrate in the main topology, find those that
1744        have nodes on more than one testbed.  Insert portal nodes
1745        into the copies of those substrates on the sub topologies.
1746        """
1747        segment_substrate = { }
1748        portals = { }
1749        for s in top.substrates:
1750            # tbs will contain an ip address on this subsrate that is in
1751            # each testbed.
1752            tbs = { }
1753            for i in s.interfaces:
1754                e = i.element
1755                tb = e.get_attribute('testbed')
1756                if tb and not tbs.has_key(tb):
1757                    for i in e.interface:
1758                        if s in i.subs:
1759                            tbs[tb]= i.get_attribute('ip4_address')
1760            if len(tbs) < 2:
1761                continue
1762
1763            # DRAGON will not create multi-site vlans yet
1764            if len(tbs) == 2 and \
1765                    all([tbparams[x].has_key('dragon') for x in tbs]):
1766                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1767                        master, eid, connInfo, expid)
1768            else:
1769                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1770                        eid, segment_substrate, portals, connInfo, expid)
1771
1772        # Make sure that all the slaves have a control portal back to the
1773        # master.
1774        for tb in [ t for t in tbparams.keys() if t != master ]:
1775            if len([e for e in topo[tb].elements \
1776                    if isinstance(e, topdl.Computer) and \
1777                    e.get_attribute('portal') and \
1778                    e.get_attribute('portal_type') == 'both']) == 0:
1779
1780                if tbparams[master].has_key('dragon') \
1781                        and tbparams[tb].has_key('dragon'):
1782
1783                    idx = len([x for x in topo.keys() \
1784                            if x.startswith('dragon')])
1785                    dip, leng = ip_allocator.allocate(4)
1786                    dip += 1
1787                    mip = dip+1
1788                    csub = topdl.Substrate(
1789                            name="dragon-control-%s" % tb,
1790                            capacity=topdl.Capacity(100000.0, 'max'),
1791                            attribute=[
1792                                topdl.Attribute(
1793                                    attribute='portal',
1794                                    value='true'
1795                                    )
1796                                ]
1797                            )
1798                    seg = topdl.Segment(
1799                            id= tbparams[master]['allocID'],
1800                            type='emulab',
1801                            uri = self.tbmap.get(master, None),
1802                            interface=[ 
1803                                topdl.Interface(
1804                                    substrate=csub.name),
1805                                ],
1806                            attribute = [
1807                                topdl.Attribute(attribute=n, value=v)
1808                                    for n, v in (\
1809                                        ('domain', 
1810                                            tbparams[master].get('domain',
1811                                                ".example.com")),
1812                                        ('experiment', "%s/%s" % \
1813                                                (tbparams[master].get(
1814                                                    'project', 
1815                                                    'project'), 
1816                                                    eid)),)
1817                                ],
1818                            )
1819                    portal = self.new_dragon_portal(tb, master,
1820                            master, eid, dip, mip, idx, csub, tbparams, expid)
1821                    topo[tb].substrates.append(csub)
1822                    topo[tb].elements.append(portal)
1823                    topo[tb].elements.append(seg)
1824
1825                    mcsub = csub.clone()
1826                    seg = topdl.Segment(
1827                            id= tbparams[tb]['allocID'],
1828                            type='emulab',
1829                            uri = self.tbmap.get(tb, None),
1830                            interface=[ 
1831                                topdl.Interface(
1832                                    substrate=csub.name),
1833                                ],
1834                            attribute = [
1835                                topdl.Attribute(attribute=n, value=v)
1836                                    for n, v in (\
1837                                        ('domain', 
1838                                            tbparams[tb].get('domain',
1839                                                ".example.com")),
1840                                        ('experiment', "%s/%s" % \
1841                                                (tbparams[tb].get('project', 
1842                                                    'project'), 
1843                                                    eid)),)
1844                                ],
1845                            )
1846                    portal = self.new_dragon_portal(master, tb, master,
1847                            eid, mip, dip, idx, mcsub, tbparams, expid)
1848                    topo[master].substrates.append(mcsub)
1849                    topo[master].elements.append(portal)
1850                    topo[master].elements.append(seg)
1851                    for t in (master, tb):
1852                        topo[t].incorporate_elements()
1853
1854                    self.create_dragon_substrate(csub, topo, 
1855                            {tb: ip_addr(mip), master: ip_addr(dip)}, 
1856                            tbparams, master, eid, connInfo,
1857                            expid)
1858                else:
1859                    self.add_control_portal(master, tb, master, eid, topo, 
1860                            tbparams, connInfo, expid)
1861                    self.add_control_portal(tb, master, master, eid, topo, 
1862                            tbparams, connInfo, expid)
1863
1864        # Connect the portal nodes into the topologies and clear out
1865        # substrates that are not in the topologies
1866        for tb in tbparams.keys():
1867            topo[tb].incorporate_elements()
1868            topo[tb].substrates = \
1869                    [s for s in topo[tb].substrates \
1870                        if len(s.interfaces) >0]
1871
1872    def wrangle_software(self, expid, top, topo, tbparams):
1873        """
1874        Copy software out to the repository directory, allocate permissions and
1875        rewrite the segment topologies to look for the software in local
1876        places.
1877        """
1878
1879        # Copy the rpms and tarfiles to a distribution directory from
1880        # which the federants can retrieve them
1881        linkpath = "%s/software" %  expid
1882        softdir ="%s/%s" % ( self.repodir, linkpath)
1883        softmap = { }
1884        # These are in a list of tuples format (each kit).  This comprehension
1885        # unwraps them into a single list of tuples that initilaizes the set of
1886        # tuples.
1887        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1888                for p, t in l ])
1889        pkgs.update([x.location for e in top.elements \
1890                for x in e.software])
1891        try:
1892            os.makedirs(softdir)
1893        except IOError, e:
1894            raise service_error(
1895                    "Cannot create software directory: %s" % e)
1896        # The actual copying.  Everything's converted into a url for copying.
1897        for pkg in pkgs:
1898            loc = pkg
1899
1900            scheme, host, path = urlparse(loc)[0:3]
1901            dest = os.path.basename(path)
1902            if not scheme:
1903                if not loc.startswith('/'):
1904                    loc = "/%s" % loc
1905                loc = "file://%s" %loc
1906            try:
1907                u = urlopen(loc)
1908            except Exception, e:
1909                raise service_error(service_error.req, 
1910                        "Cannot open %s: %s" % (loc, e))
1911            try:
1912                f = open("%s/%s" % (softdir, dest) , "w")
1913                self.log.debug("Writing %s/%s" % (softdir,dest) )
1914                data = u.read(4096)
1915                while data:
1916                    f.write(data)
1917                    data = u.read(4096)
1918                f.close()
1919                u.close()
1920            except Exception, e:
1921                raise service_error(service_error.internal,
1922                        "Could not copy %s: %s" % (loc, e))
1923            path = re.sub("/tmp", "", linkpath)
1924            # XXX
1925            softmap[pkg] = \
1926                    "%s/%s/%s" %\
1927                    ( self.repo_url, path, dest)
1928
1929            # Allow the individual segments to access the software.
1930            for tb in tbparams.keys():
1931                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1932                        "/%s/%s" % ( path, dest))
1933
1934        # Convert the software locations in the segments into the local
1935        # copies on this host
1936        for soft in [ s for tb in topo.values() \
1937                for e in tb.elements \
1938                    if getattr(e, 'software', False) \
1939                        for s in e.software ]:
1940            if softmap.has_key(soft.location):
1941                soft.location = softmap[soft.location]
1942
1943
1944    def new_experiment(self, req, fid):
1945        """
1946        The external interface to empty initial experiment creation called from
1947        the dispatcher.
1948
1949        Creates a working directory, splits the incoming description using the
1950        splitter script and parses out the avrious subsections using the
1951        lcasses above.  Once each sub-experiment is created, use pooled threads
1952        to instantiate them and start it all up.
1953        """
1954        if not self.auth.check_attribute(fid, 'new'):
1955            raise service_error(service_error.access, "New access denied")
1956
1957        try:
1958            tmpdir = tempfile.mkdtemp(prefix="split-")
1959        except IOError:
1960            raise service_error(service_error.internal, "Cannot create tmp dir")
1961
1962        try:
1963            access_user = self.accessdb[fid]
1964        except KeyError:
1965            raise service_error(service_error.internal,
1966                    "Access map and authorizer out of sync in " + \
1967                            "new_experiment for fedid %s"  % fid)
1968
1969        pid = "dummy"
1970        gid = "dummy"
1971
1972        req = req.get('NewRequestBody', None)
1973        if not req:
1974            raise service_error(service_error.req,
1975                    "Bad request format (no NewRequestBody)")
1976
1977        # Generate an ID for the experiment (slice) and a certificate that the
1978        # allocator can use to prove they own it.  We'll ship it back through
1979        # the encrypted connection.
1980        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1981
1982        #now we're done with the tmpdir, and it should be empty
1983        if self.cleanup:
1984            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1985            os.rmdir(tmpdir)
1986        else:
1987            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1988
1989        eid = self.create_experiment_state(fid, req, expid, expcert, 
1990                state='empty')
1991
1992        # Let users touch the state
1993        self.auth.set_attribute(fid, expid)
1994        self.auth.set_attribute(expid, expid)
1995        # Override fedids can manipulate state as well
1996        for o in self.overrides:
1997            self.auth.set_attribute(o, expid)
1998
1999        rv = {
2000                'experimentID': [
2001                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2002                ],
2003                'experimentStatus': 'empty',
2004                'experimentAccess': { 'X509' : expcert }
2005            }
2006
2007        return rv
2008
2009
2010    def create_experiment(self, req, fid):
2011        """
2012        The external interface to experiment creation called from the
2013        dispatcher.
2014
2015        Creates a working directory, splits the incoming description using the
2016        splitter script and parses out the avrious subsections using the
2017        lcasses above.  Once each sub-experiment is created, use pooled threads
2018        to instantiate them and start it all up.
2019        """
2020
2021        req = req.get('CreateRequestBody', None)
2022        if not req:
2023            raise service_error(service_error.req,
2024                    "Bad request format (no CreateRequestBody)")
2025
2026        # Get the experiment access
2027        exp = req.get('experimentID', None)
2028        if exp:
2029            if exp.has_key('fedid'):
2030                key = exp['fedid']
2031                expid = key
2032                eid = None
2033            elif exp.has_key('localname'):
2034                key = exp['localname']
2035                eid = key
2036                expid = None
2037            else:
2038                raise service_error(service_error.req, "Unknown lookup type")
2039        else:
2040            raise service_error(service_error.req, "No request?")
2041
2042        self.check_experiment_access(fid, key)
2043
2044        try:
2045            tmpdir = tempfile.mkdtemp(prefix="split-")
2046            os.mkdir(tmpdir+"/keys")
2047        except IOError:
2048            raise service_error(service_error.internal, "Cannot create tmp dir")
2049
2050        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2051        gw_secretkey_base = "fed.%s" % self.ssh_type
2052        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2053        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2054        tclfile = tmpdir + "/experiment.tcl"
2055        tbparams = { }
2056        try:
2057            access_user = self.accessdb[fid]
2058        except KeyError:
2059            raise service_error(service_error.internal,
2060                    "Access map and authorizer out of sync in " + \
2061                            "create_experiment for fedid %s"  % fid)
2062
2063        pid = "dummy"
2064        gid = "dummy"
2065
2066        # The tcl parser needs to read a file so put the content into that file
2067        descr=req.get('experimentdescription', None)
2068        if descr:
2069            file_content=descr.get('ns2description', None)
2070            if file_content:
2071                try:
2072                    f = open(tclfile, 'w')
2073                    f.write(file_content)
2074                    f.close()
2075                except IOError:
2076                    raise service_error(service_error.internal,
2077                            "Cannot write temp experiment description")
2078            else:
2079                raise service_error(service_error.req, 
2080                        "Only ns2descriptions supported")
2081        else:
2082            raise service_error(service_error.req, "No experiment description")
2083
2084        self.state_lock.acquire()
2085        if self.state.has_key(key):
2086            self.state[key]['experimentStatus'] = "starting"
2087            for e in self.state[key].get('experimentID',[]):
2088                if not expid and e.has_key('fedid'):
2089                    expid = e['fedid']
2090                elif not eid and e.has_key('localname'):
2091                    eid = e['localname']
2092        self.state_lock.release()
2093
2094        if not (eid and expid):
2095            raise service_error(service_error.internal, 
2096                    "Cannot find local experiment info!?")
2097
2098        try: 
2099            # This catches exceptions to clear the placeholder if necessary
2100            try:
2101                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2102            except ValueError:
2103                raise service_error(service_error.server_config, 
2104                        "Bad key type (%s)" % self.ssh_type)
2105
2106            master = req.get('master', None)
2107            if not master:
2108                raise service_error(service_error.req,
2109                        "No master testbed label")
2110            export_project = req.get('exportProject', None)
2111            if not export_project:
2112                raise service_error(service_error.req, "No export project")
2113           
2114            # Translate to topdl
2115            if self.splitter_url:
2116                # XXX: need remote topdl translator
2117                self.log.debug("Calling remote splitter at %s" % \
2118                        self.splitter_url)
2119                top = self.remote_splitter(self.splitter_url,
2120                        file_content, master)
2121            else:
2122                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2123                    str(self.muxmax), '-m', master]
2124
2125                tclcmd.extend([pid, gid, eid, tclfile])
2126
2127                self.log.debug("running local splitter %s", " ".join(tclcmd))
2128                # This is just fantastic.  As a side effect the parser copies
2129                # tb_compat.tcl into the current directory, so that directory
2130                # must be writable by the fedd user.  Doing this in the
2131                # temporary subdir ensures this is the case.
2132                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2133                        cwd=tmpdir)
2134                split_data = tclparser.stdout
2135
2136                top = topdl.topology_from_xml(file=split_data, top="experiment")
2137
2138            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2139             # Find the testbeds to look up
2140            testbeds = set([ a.value for e in top.elements \
2141                    for a in e.attribute \
2142                    if a.attribute == 'testbed'] )
2143
2144            allocated = { }         # Testbeds we can access
2145            topo ={ }               # Sub topologies
2146            connInfo = { }          # Connection information
2147            services = [ ]
2148            self.get_access_to_testbeds(testbeds, access_user, 
2149                    export_project, master, allocated, tbparams, services)
2150            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2151
2152            # Copy configuration files into the remote file store
2153            # The config urlpath
2154            configpath = "/%s/config" % expid
2155            # The config file system location
2156            configdir ="%s%s" % ( self.repodir, configpath)
2157            try:
2158                os.makedirs(configdir)
2159            except IOError, e:
2160                raise service_error(
2161                        "Cannot create config directory: %s" % e)
2162            try:
2163                f = open("%s/hosts" % configdir, "w")
2164                f.write('\n'.join(hosts))
2165                f.close()
2166            except IOError, e:
2167                raise service_error(service_error.internal, 
2168                        "Cannot write hosts file: %s" % e)
2169            try:
2170                copy_file("%s" % gw_pubkey, "%s/%s" % \
2171                        (configdir, gw_pubkey_base))
2172                copy_file("%s" % gw_secretkey, "%s/%s" % \
2173                        (configdir, gw_secretkey_base))
2174            except IOError, e:
2175                raise service_error(service_error.internal, 
2176                        "Cannot copy keyfiles: %s" % e)
2177
2178            # Allow the individual testbeds to access the configuration files.
2179            for tb in tbparams.keys():
2180                asignee = tbparams[tb]['allocID']['fedid']
2181                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2182                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2183
2184            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
2185                    connInfo, expid)
2186            # Now get access to the dynamic testbeds
2187            for k, t in topo.items():
2188                if not t.get_attribute('dynamic'):
2189                    continue
2190                tb = t.get_attribute('testbed')
2191                if tb: 
2192                    self.get_access(tb, None, tbparams, master, 
2193                            export_project, access_user, services)
2194                    tbparams[k] = tbparams[tb]
2195                    del tbparams[tb]
2196                    allocated[k] = 1
2197                    store_keys = t.get_attribute('store_keys')
2198                    # Give the testbed access to keys it exports or imports
2199                    if store_keys:
2200                        for sk in store_keys.split(" "):
2201                            self.auth.set_attribute(\
2202                                    tbparams[k]['allocID']['fedid'], sk)
2203                else:
2204                    raise service_error(service_error.internal, 
2205                            "Dynamic allocation from no testbed!?")
2206
2207            self.wrangle_software(expid, top, topo, tbparams)
2208
2209            vtopo = topdl.topology_to_vtopo(top)
2210            vis = self.genviz(vtopo)
2211
2212            # save federant information
2213            for k in allocated.keys():
2214                tbparams[k]['federant'] = {
2215                        'name': [ { 'localname' : eid} ],
2216                        'allocID' : tbparams[k]['allocID'],
2217                        'master' : k == master,
2218                        'uri': tbparams[k]['uri'],
2219                    }
2220                if tbparams[k].has_key('emulab'):
2221                        tbparams[k]['federant']['emulab'] = \
2222                                tbparams[k]['emulab']
2223
2224            self.state_lock.acquire()
2225            self.state[eid]['vtopo'] = vtopo
2226            self.state[eid]['vis'] = vis
2227            self.state[expid]['federant'] = \
2228                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2229                        if tbparams[tb].has_key('federant') ]
2230            if self.state_filename: 
2231                self.write_state()
2232            self.state_lock.release()
2233        except service_error, e:
2234            # If something goes wrong in the parse (usually an access error)
2235            # clear the placeholder state.  From here on out the code delays
2236            # exceptions.  Failing at this point returns a fault to the remote
2237            # caller.
2238
2239            self.state_lock.acquire()
2240            del self.state[eid]
2241            del self.state[expid]
2242            if self.state_filename: self.write_state()
2243            self.state_lock.release()
2244            raise e
2245
2246
2247        # Start the background swapper and return the starting state.  From
2248        # here on out, the state will stick around a while.
2249
2250        # Let users touch the state
2251        self.auth.set_attribute(fid, expid)
2252        self.auth.set_attribute(expid, expid)
2253        # Override fedids can manipulate state as well
2254        for o in self.overrides:
2255            self.auth.set_attribute(o, expid)
2256
2257        # Create a logger that logs to the experiment's state object as well as
2258        # to the main log file.
2259        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2260        alloc_collector = self.list_log(self.state[eid]['log'])
2261        h = logging.StreamHandler(alloc_collector)
2262        # XXX: there should be a global one of these rather than repeating the
2263        # code.
2264        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2265                    '%d %b %y %H:%M:%S'))
2266        alloc_log.addHandler(h)
2267       
2268        attrs = [ 
2269                {
2270                    'attribute': 'ssh_pubkey', 
2271                    'value': '%s/%s/config/%s' % \
2272                            (self.repo_url, expid, gw_pubkey_base)
2273                },
2274                {
2275                    'attribute': 'ssh_secretkey', 
2276                    'value': '%s/%s/config/%s' % \
2277                            (self.repo_url, expid, gw_secretkey_base)
2278                },
2279                {
2280                    'attribute': 'hosts', 
2281                    'value': '%s/%s/config/hosts' % \
2282                            (self.repo_url, expid)
2283                },
2284                {
2285                    'attribute': 'experiment_name',
2286                    'value': eid,
2287                },
2288            ]
2289
2290        # transit and disconnected testbeds may not have a connInfo entry.
2291        # Fill in the blanks.
2292        for t in allocated.keys():
2293            if not connInfo.has_key(t):
2294                connInfo[t] = { }
2295
2296        # Start a thread to do the resource allocation
2297        t  = Thread(target=self.allocate_resources,
2298                args=(allocated, master, eid, expid, tbparams, 
2299                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
2300                    services),
2301                name=eid)
2302        t.start()
2303
2304        rv = {
2305                'experimentID': [
2306                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2307                ],
2308                'experimentStatus': 'starting',
2309            }
2310
2311        return rv
2312   
2313    def get_experiment_fedid(self, key):
2314        """
2315        find the fedid associated with the localname key in the state database.
2316        """
2317
2318        rv = None
2319        self.state_lock.acquire()
2320        if self.state.has_key(key):
2321            if isinstance(self.state[key], dict):
2322                try:
2323                    kl = [ f['fedid'] for f in \
2324                            self.state[key]['experimentID']\
2325                                if f.has_key('fedid') ]
2326                except KeyError:
2327                    self.state_lock.release()
2328                    raise service_error(service_error.internal, 
2329                            "No fedid for experiment %s when getting "+\
2330                                    "fedid(!?)" % key)
2331                if len(kl) == 1:
2332                    rv = kl[0]
2333                else:
2334                    self.state_lock.release()
2335                    raise service_error(service_error.internal, 
2336                            "multiple fedids for experiment %s when " +\
2337                                    "getting fedid(!?)" % key)
2338            else:
2339                self.state_lock.release()
2340                raise service_error(service_error.internal, 
2341                        "Unexpected state for %s" % key)
2342        self.state_lock.release()
2343        return rv
2344
2345    def check_experiment_access(self, fid, key):
2346        """
2347        Confirm that the fid has access to the experiment.  Though a request
2348        may be made in terms of a local name, the access attribute is always
2349        the experiment's fedid.
2350        """
2351        if not isinstance(key, fedid):
2352            key = self.get_experiment_fedid(key)
2353
2354        if self.auth.check_attribute(fid, key):
2355            return True
2356        else:
2357            raise service_error(service_error.access, "Access Denied")
2358
2359
2360    def get_handler(self, path, fid):
2361        self.log.info("Get handler %s %s" % (path, fid))
2362        if self.auth.check_attribute(fid, path):
2363            return ("%s/%s" % (self.repodir, path), "application/binary")
2364        else:
2365            return (None, None)
2366
2367    def get_vtopo(self, req, fid):
2368        """
2369        Return the stored virtual topology for this experiment
2370        """
2371        rv = None
2372        state = None
2373
2374        req = req.get('VtopoRequestBody', None)
2375        if not req:
2376            raise service_error(service_error.req,
2377                    "Bad request format (no VtopoRequestBody)")
2378        exp = req.get('experiment', None)
2379        if exp:
2380            if exp.has_key('fedid'):
2381                key = exp['fedid']
2382                keytype = "fedid"
2383            elif exp.has_key('localname'):
2384                key = exp['localname']
2385                keytype = "localname"
2386            else:
2387                raise service_error(service_error.req, "Unknown lookup type")
2388        else:
2389            raise service_error(service_error.req, "No request?")
2390
2391        self.check_experiment_access(fid, key)
2392
2393        self.state_lock.acquire()
2394        if self.state.has_key(key):
2395            if self.state[key].has_key('vtopo'):
2396                rv = { 'experiment' : {keytype: key },\
2397                        'vtopo': self.state[key]['vtopo'],\
2398                    }
2399            else:
2400                state = self.state[key]['experimentStatus']
2401        self.state_lock.release()
2402
2403        if rv: return rv
2404        else: 
2405            if state:
2406                raise service_error(service_error.partial, 
2407                        "Not ready: %s" % state)
2408            else:
2409                raise service_error(service_error.req, "No such experiment")
2410
2411    def get_vis(self, req, fid):
2412        """
2413        Return the stored visualization for this experiment
2414        """
2415        rv = None
2416        state = None
2417
2418        req = req.get('VisRequestBody', None)
2419        if not req:
2420            raise service_error(service_error.req,
2421                    "Bad request format (no VisRequestBody)")
2422        exp = req.get('experiment', None)
2423        if exp:
2424            if exp.has_key('fedid'):
2425                key = exp['fedid']
2426                keytype = "fedid"
2427            elif exp.has_key('localname'):
2428                key = exp['localname']
2429                keytype = "localname"
2430            else:
2431                raise service_error(service_error.req, "Unknown lookup type")
2432        else:
2433            raise service_error(service_error.req, "No request?")
2434
2435        self.check_experiment_access(fid, key)
2436
2437        self.state_lock.acquire()
2438        if self.state.has_key(key):
2439            if self.state[key].has_key('vis'):
2440                rv =  { 'experiment' : {keytype: key },\
2441                        'vis': self.state[key]['vis'],\
2442                        }
2443            else:
2444                state = self.state[key]['experimentStatus']
2445        self.state_lock.release()
2446
2447        if rv: return rv
2448        else:
2449            if state:
2450                raise service_error(service_error.partial, 
2451                        "Not ready: %s" % state)
2452            else:
2453                raise service_error(service_error.req, "No such experiment")
2454
2455    def clean_info_response(self, rv):
2456        """
2457        Remove the information in the experiment's state object that is not in
2458        the info response.
2459        """
2460        # Remove the owner info (should always be there, but...)
2461        if rv.has_key('owner'): del rv['owner']
2462
2463        # Convert the log into the allocationLog parameter and remove the
2464        # log entry (with defensive programming)
2465        if rv.has_key('log'):
2466            rv['allocationLog'] = "".join(rv['log'])
2467            del rv['log']
2468        else:
2469            rv['allocationLog'] = ""
2470
2471        if rv['experimentStatus'] != 'active':
2472            if rv.has_key('federant'): del rv['federant']
2473        else:
2474            # remove the allocationID and uri info from each federant
2475            for f in rv.get('federant', []):
2476                if f.has_key('allocID'): del f['allocID']
2477                if f.has_key('uri'): del f['uri']
2478        return rv
2479
2480    def get_info(self, req, fid):
2481        """
2482        Return all the stored info about this experiment
2483        """
2484        rv = None
2485
2486        req = req.get('InfoRequestBody', None)
2487        if not req:
2488            raise service_error(service_error.req,
2489                    "Bad request format (no InfoRequestBody)")
2490        exp = req.get('experiment', None)
2491        if exp:
2492            if exp.has_key('fedid'):
2493                key = exp['fedid']
2494                keytype = "fedid"
2495            elif exp.has_key('localname'):
2496                key = exp['localname']
2497                keytype = "localname"
2498            else:
2499                raise service_error(service_error.req, "Unknown lookup type")
2500        else:
2501            raise service_error(service_error.req, "No request?")
2502
2503        self.check_experiment_access(fid, key)
2504
2505        # The state may be massaged by the service function that called
2506        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2507        # state.
2508        self.state_lock.acquire()
2509        if self.state.has_key(key):
2510            rv = copy.deepcopy(self.state[key])
2511        self.state_lock.release()
2512
2513        if rv:
2514            return self.clean_info_response(rv)
2515        else:
2516            raise service_error(service_error.req, "No such experiment")
2517
2518    def get_multi_info(self, req, fid):
2519        """
2520        Return all the stored info that this fedid can access
2521        """
2522        rv = { 'info': [ ] }
2523
2524        self.state_lock.acquire()
2525        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2526            try:
2527                self.check_experiment_access(fid, key)
2528            except service_error, e:
2529                if e.code == service_error.access:
2530                    continue
2531                else:
2532                    self.state_lock.release()
2533                    raise e
2534
2535            if self.state.has_key(key):
2536                e = copy.deepcopy(self.state[key])
2537                e = self.clean_info_response(e)
2538                rv['info'].append(e)
2539        self.state_lock.release()
2540        return rv
2541
2542    def terminate_experiment(self, req, fid):
2543        """
2544        Swap this experiment out on the federants and delete the shared
2545        information
2546        """
2547        tbparams = { }
2548        req = req.get('TerminateRequestBody', None)
2549        if not req:
2550            raise service_error(service_error.req,
2551                    "Bad request format (no TerminateRequestBody)")
2552        force = req.get('force', False)
2553        exp = req.get('experiment', None)
2554        if exp:
2555            if exp.has_key('fedid'):
2556                key = exp['fedid']
2557                keytype = "fedid"
2558            elif exp.has_key('localname'):
2559                key = exp['localname']
2560                keytype = "localname"
2561            else:
2562                raise service_error(service_error.req, "Unknown lookup type")
2563        else:
2564            raise service_error(service_error.req, "No request?")
2565
2566        self.check_experiment_access(fid, key)
2567
2568        dealloc_list = [ ]
2569
2570
2571        # Create a logger that logs to the dealloc_list as well as to the main
2572        # log file.
2573        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2574        h = logging.StreamHandler(self.list_log(dealloc_list))
2575        # XXX: there should be a global one of these rather than repeating the
2576        # code.
2577        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2578                    '%d %b %y %H:%M:%S'))
2579        dealloc_log.addHandler(h)
2580
2581        self.state_lock.acquire()
2582        fed_exp = self.state.get(key, None)
2583
2584        if fed_exp:
2585            # This branch of the conditional holds the lock to generate a
2586            # consistent temporary tbparams variable to deallocate experiments.
2587            # It releases the lock to do the deallocations and reacquires it to
2588            # remove the experiment state when the termination is complete.
2589
2590            # First make sure that the experiment creation is complete.
2591            status = fed_exp.get('experimentStatus', None)
2592
2593            if status:
2594                if status in ('starting', 'terminating'):
2595                    if not force:
2596                        self.state_lock.release()
2597                        raise service_error(service_error.partial, 
2598                                'Experiment still being created or destroyed')
2599                    else:
2600                        self.log.warning('Experiment in %s state ' % status + \
2601                                'being terminated by force.')
2602            else:
2603                # No status??? trouble
2604                self.state_lock.release()
2605                raise service_error(service_error.internal,
2606                        "Experiment has no status!?")
2607
2608            ids = []
2609            #  experimentID is a list of dicts that are self-describing
2610            #  identifiers.  This finds all the fedids and localnames - the
2611            #  keys of self.state - and puts them into ids.
2612            for id in fed_exp.get('experimentID', []):
2613                if id.has_key('fedid'): ids.append(id['fedid'])
2614                if id.has_key('localname'): ids.append(id['localname'])
2615
2616            # Collect the allocation/segment ids into a dict keyed by the fedid
2617            # of the allocation (or a monotonically increasing integer) that
2618            # contains a tuple of uri, aid (which is a dict...)
2619            for i, fed in enumerate(fed_exp.get('federant', [])):
2620                try:
2621                    uri = fed['uri']
2622                    aid = fed['allocID']
2623                    k = fed['allocID'].get('fedid', i)
2624                except KeyError, e:
2625                    continue
2626                tbparams[k] = (uri, aid)
2627            fed_exp['experimentStatus'] = 'terminating'
2628            if self.state_filename: self.write_state()
2629            self.state_lock.release()
2630
2631            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2632            # then completes, so we can't wait if nothing starts.  So, no
2633            # tbparams, no start.
2634            if len(tbparams) > 0:
2635                thread_pool = self.thread_pool(self.nthreads)
2636                for k in tbparams.keys():
2637                    # Create and start a thread to stop the segment
2638                    thread_pool.wait_for_slot()
2639                    uri, aid = tbparams[k]
2640                    t  = self.pooled_thread(\
2641                            target=self.terminate_segment(log=dealloc_log,
2642                                testbed=uri,
2643                                cert_file=self.cert_file, 
2644                                cert_pwd=self.cert_pwd,
2645                                trusted_certs=self.trusted_certs,
2646                                caller=self.call_TerminateSegment),
2647                            args=(uri, aid), name=k,
2648                            pdata=thread_pool, trace_file=self.trace_file)
2649                    t.start()
2650                # Wait for completions
2651                thread_pool.wait_for_all_done()
2652
2653            # release the allocations (failed experiments have done this
2654            # already, and starting experiments may be in odd states, so we
2655            # ignore errors releasing those allocations
2656            try: 
2657                for k in tbparams.keys():
2658                    # This releases access by uri
2659                    uri, aid = tbparams[k]
2660                    self.release_access(None, aid, uri=uri)
2661            except service_error, e:
2662                if status != 'failed' and not force:
2663                    raise e
2664
2665            # Remove the terminated experiment
2666            self.state_lock.acquire()
2667            for id in ids:
2668                if self.state.has_key(id): del self.state[id]
2669
2670            if self.state_filename: self.write_state()
2671            self.state_lock.release()
2672
2673            # Delete any synch points associated with this experiment.  All
2674            # synch points begin with the fedid of the experiment.
2675            fedid_keys = set(["fedid:%s" % f for f in ids \
2676                    if isinstance(f, fedid)])
2677            for k in self.synch_store.all_keys():
2678                try:
2679                    if len(k) > 45 and k[0:46] in fedid_keys:
2680                        self.synch_store.del_value(k)
2681                except synch_store.BadDeletionError:
2682                    pass
2683            self.write_store()
2684               
2685            return { 
2686                    'experiment': exp , 
2687                    'deallocationLog': "".join(dealloc_list),
2688                    }
2689        else:
2690            # Don't forget to release the lock
2691            self.state_lock.release()
2692            raise service_error(service_error.req, "No saved state")
2693
2694
2695    def GetValue(self, req, fid):
2696        """
2697        Get a value from the synchronized store
2698        """
2699        req = req.get('GetValueRequestBody', None)
2700        if not req:
2701            raise service_error(service_error.req,
2702                    "Bad request format (no GetValueRequestBody)")
2703       
2704        name = req['name']
2705        wait = req['wait']
2706        rv = { 'name': name }
2707
2708        if self.auth.check_attribute(fid, name):
2709            try:
2710                v = self.synch_store.get_value(name, wait)
2711            except synch_store.RevokedKeyError:
2712                # No more synch on this key
2713                raise service_error(service_error.federant, 
2714                        "Synch key %s revoked" % name)
2715            if v is not None:
2716                rv['value'] = v
2717            self.log.debug("[GetValue] got %s from %s" % (v, name))
2718            return rv
2719        else:
2720            raise service_error(service_error.access, "Access Denied")
2721       
2722
2723    def SetValue(self, req, fid):
2724        """
2725        Set a value in the synchronized store
2726        """
2727        req = req.get('SetValueRequestBody', None)
2728        if not req:
2729            raise service_error(service_error.req,
2730                    "Bad request format (no SetValueRequestBody)")
2731       
2732        name = req['name']
2733        v = req['value']
2734
2735        if self.auth.check_attribute(fid, name):
2736            try:
2737                self.synch_store.set_value(name, v)
2738                self.write_store()
2739                self.log.debug("[SetValue] set %s to %s" % (name, v))
2740            except synch_store.CollisionError:
2741                # Translate into a service_error
2742                raise service_error(service_error.req,
2743                        "Value already set: %s" %name)
2744            except synch_store.RevokedKeyError:
2745                # No more synch on this key
2746                raise service_error(service_error.federant, 
2747                        "Synch key %s revoked" % name)
2748            return { 'name': name, 'value': v }
2749        else:
2750            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.