source: fedd/federation/experiment_control.py @ 7fe81be

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 7fe81be was 7fe81be, checked in by Ted Faber <faber@…>, 14 years ago

tweaks

  • Property mode set to 100644
File size: 93.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_control_local:
45    """
46    Control of experiments that this system can directly access.
47
48    Includes experiment creation, termination and information dissemination.
49    Thred safe.
50    """
51
52    class ssh_cmd_timeout(RuntimeError): pass
53   
54    class thread_pool:
55        """
56        A class to keep track of a set of threads all invoked for the same
57        task.  Manages the mutual exclusion of the states.
58        """
59        def __init__(self, nthreads):
60            """
61            Start a pool.
62            """
63            self.changed = Condition()
64            self.started = 0
65            self.terminated = 0
66            self.nthreads = nthreads
67
68        def acquire(self):
69            """
70            Get the pool's lock.
71            """
72            self.changed.acquire()
73
74        def release(self):
75            """
76            Release the pool's lock.
77            """
78            self.changed.release()
79
80        def wait(self, timeout = None):
81            """
82            Wait for a pool thread to start or stop.
83            """
84            self.changed.wait(timeout)
85
86        def start(self):
87            """
88            Called by a pool thread to report starting.
89            """
90            self.changed.acquire()
91            self.started += 1
92            self.changed.notifyAll()
93            self.changed.release()
94
95        def terminate(self):
96            """
97            Called by a pool thread to report finishing.
98            """
99            self.changed.acquire()
100            self.terminated += 1
101            self.changed.notifyAll()
102            self.changed.release()
103
104        def clear(self):
105            """
106            Clear all pool data.
107            """
108            self.changed.acquire()
109            self.started = 0
110            self.terminated =0
111            self.changed.notifyAll()
112            self.changed.release()
113
114        def wait_for_slot(self):
115            """
116            Wait until we have a free slot to start another pooled thread
117            """
118            self.acquire()
119            while self.started - self.terminated >= self.nthreads:
120                self.wait()
121            self.release()
122
123        def wait_for_all_done(self, timeout=None):
124            """
125            Wait until all active threads finish (and at least one has
126            started).  If a timeout is given, return after waiting that long
127            for termination.  If all threads are done (and one has started in
128            the since the last clear()) return True, otherwise False.
129            """
130            if timeout:
131                deadline = time.time() + timeout
132            self.acquire()
133            while self.started == 0 or self.started > self.terminated:
134                self.wait(timeout)
135                if timeout:
136                    if time.time() > deadline:
137                        break
138                    timeout = deadline - time.time()
139            self.release()
140            return not (self.started == 0 or self.started > self.terminated)
141
142    class pooled_thread(Thread):
143        """
144        One of a set of threads dedicated to a specific task.  Uses the
145        thread_pool class above for coordination.
146        """
147        def __init__(self, group=None, target=None, name=None, args=(), 
148                kwargs={}, pdata=None, trace_file=None):
149            Thread.__init__(self, group, target, name, args, kwargs)
150            self.rv = None          # Return value of the ops in this thread
151            self.exception = None   # Exception that terminated this thread
152            self.target=target      # Target function to run on start()
153            self.args = args        # Args to pass to target
154            self.kwargs = kwargs    # Additional kw args
155            self.pdata = pdata      # thread_pool for this class
156            # Logger for this thread
157            self.log = logging.getLogger("fedd.experiment_control")
158       
159        def run(self):
160            """
161            Emulate Thread.run, except add pool data manipulation and error
162            logging.
163            """
164            if self.pdata:
165                self.pdata.start()
166
167            if self.target:
168                try:
169                    self.rv = self.target(*self.args, **self.kwargs)
170                except service_error, s:
171                    self.exception = s
172                    self.log.error("Thread exception: %s %s" % \
173                            (s.code_string(), s.desc))
174                except:
175                    self.exception = sys.exc_info()[1]
176                    self.log.error(("Unexpected thread exception: %s" +\
177                            "Trace %s") % (self.exception,\
178                                traceback.format_exc()))
179            if self.pdata:
180                self.pdata.terminate()
181
182    call_RequestAccess = service_caller('RequestAccess')
183    call_ReleaseAccess = service_caller('ReleaseAccess')
184    call_StartSegment = service_caller('StartSegment')
185    call_TerminateSegment = service_caller('TerminateSegment')
186    call_Ns2Topdl = service_caller('Ns2Topdl')
187
188    def __init__(self, config=None, auth=None):
189        """
190        Intialize the various attributes, most from the config object
191        """
192
193        def parse_tarfile_list(tf):
194            """
195            Parse a tarfile list from the configuration.  This is a set of
196            paths and tarfiles separated by spaces.
197            """
198            rv = [ ]
199            if tf is not None:
200                tl = tf.split()
201                while len(tl) > 1:
202                    p, t = tl[0:2]
203                    del tl[0:2]
204                    rv.append((p, t))
205            return rv
206
207        self.thread_with_rv = experiment_control_local.pooled_thread
208        self.thread_pool = experiment_control_local.thread_pool
209        self.list_log = list_log.list_log
210
211        self.cert_file = config.get("experiment_control", "cert_file")
212        if self.cert_file:
213            self.cert_pwd = config.get("experiment_control", "cert_pwd")
214        else:
215            self.cert_file = config.get("globals", "cert_file")
216            self.cert_pwd = config.get("globals", "cert_pwd")
217
218        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
219                or config.get("globals", "trusted_certs")
220
221        self.repodir = config.get("experiment_control", "repodir")
222        self.repo_url = config.get("experiment_control", "repo_url", 
223                "https://users.isi.deterlab.net:23235");
224
225        self.exp_stem = "fed-stem"
226        self.log = logging.getLogger("fedd.experiment_control")
227        set_log_level(config, "experiment_control", self.log)
228        self.muxmax = 2
229        self.nthreads = 10
230        self.randomize_experiments = False
231
232        self.splitter = None
233        self.ssh_keygen = "/usr/bin/ssh-keygen"
234        self.ssh_identity_file = None
235
236
237        self.debug = config.getboolean("experiment_control", "create_debug")
238        self.cleanup = not config.getboolean("experiment_control", 
239                "leave_tmpfiles")
240        self.state_filename = config.get("experiment_control", 
241                "experiment_state")
242        self.store_filename = config.get("experiment_control", 
243                "synch_store")
244        self.store_url = config.get("experiment_control", "store_url")
245        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
246        self.fedkit = parse_tarfile_list(\
247                config.get("experiment_control", "fedkit"))
248        self.gatewaykit = parse_tarfile_list(\
249                config.get("experiment_control", "gatewaykit"))
250        accessdb_file = config.get("experiment_control", "accessdb")
251
252        self.ssh_pubkey_file = config.get("experiment_control", 
253                "ssh_pubkey_file")
254        self.ssh_privkey_file = config.get("experiment_control",
255                "ssh_privkey_file")
256        # NB for internal master/slave ops, not experiment setup
257        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
258
259        self.overrides = set([])
260        ovr = config.get('experiment_control', 'overrides')
261        if ovr:
262            for o in ovr.split(","):
263                o = o.strip()
264                if o.startswith('fedid:'): o = o[len('fedid:'):]
265                self.overrides.add(fedid(hexstr=o))
266
267        self.state = { }
268        self.state_lock = Lock()
269        self.tclsh = "/usr/local/bin/otclsh"
270        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
271                config.get("experiment_control", "tcl_splitter",
272                        "/usr/testbed/lib/ns2ir/parse.tcl")
273        mapdb_file = config.get("experiment_control", "mapdb")
274        self.trace_file = sys.stderr
275
276        self.def_expstart = \
277                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
278                "/tmp/federate";
279        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
280                "FEDDIR/hosts";
281        self.def_gwstart = \
282                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
283                "/tmp/bridge.log";
284        self.def_mgwstart = \
285                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
286                "/tmp/bridge.log";
287        self.def_gwimage = "FBSD61-TUNNEL2";
288        self.def_gwtype = "pc";
289        self.local_access = { }
290
291        if auth:
292            self.auth = auth
293        else:
294            self.log.error(\
295                    "[access]: No authorizer initialized, creating local one.")
296            auth = authorizer()
297
298
299        if self.ssh_pubkey_file:
300            try:
301                f = open(self.ssh_pubkey_file, 'r')
302                self.ssh_pubkey = f.read()
303                f.close()
304            except IOError:
305                raise service_error(service_error.internal,
306                        "Cannot read sshpubkey")
307        else:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311        if not self.ssh_privkey_file:
312            raise service_error(service_error.internal, 
313                    "No SSH public key file?")
314
315
316        if mapdb_file:
317            self.read_mapdb(mapdb_file)
318        else:
319            self.log.warn("[experiment_control] No testbed map, using defaults")
320            self.tbmap = { 
321                    'deter':'https://users.isi.deterlab.net:23235',
322                    'emulab':'https://users.isi.deterlab.net:23236',
323                    'ucb':'https://users.isi.deterlab.net:23237',
324                    }
325
326        if accessdb_file:
327                self.read_accessdb(accessdb_file)
328        else:
329            raise service_error(service_error.internal,
330                    "No accessdb specified in config")
331
332        # Grab saved state.  OK to do this w/o locking because it's read only
333        # and only one thread should be in existence that can see self.state at
334        # this point.
335        if self.state_filename:
336            self.read_state()
337
338        if self.store_filename:
339            self.read_store()
340        else:
341            self.log.warning("No saved synch store")
342            self.synch_store = synch_store
343
344        # Dispatch tables
345        self.soap_services = {\
346                'New': soap_handler('New', self.new_experiment),
347                'Create': soap_handler('Create', self.create_experiment),
348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
352                'Terminate': soap_handler('Terminate', 
353                    self.terminate_experiment),
354                'GetValue': soap_handler('GetValue', self.GetValue),
355                'SetValue': soap_handler('SetValue', self.SetValue),
356        }
357
358        self.xmlrpc_services = {\
359                'New': xmlrpc_handler('New', self.new_experiment),
360                'Create': xmlrpc_handler('Create', self.create_experiment),
361                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
362                'Vis': xmlrpc_handler('Vis', self.get_vis),
363                'Info': xmlrpc_handler('Info', self.get_info),
364                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
365                'Terminate': xmlrpc_handler('Terminate',
366                    self.terminate_experiment),
367                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
368                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
369        }
370
371    # Call while holding self.state_lock
372    def write_state(self):
373        """
374        Write a new copy of experiment state after copying the existing state
375        to a backup.
376
377        State format is a simple pickling of the state dictionary.
378        """
379        if os.access(self.state_filename, os.W_OK):
380            copy_file(self.state_filename, \
381                    "%s.bak" % self.state_filename)
382        try:
383            f = open(self.state_filename, 'w')
384            pickle.dump(self.state, f)
385        except IOError, e:
386            self.log.error("Can't write file %s: %s" % \
387                    (self.state_filename, e))
388        except pickle.PicklingError, e:
389            self.log.error("Pickling problem: %s" % e)
390        except TypeError, e:
391            self.log.error("Pickling problem (TypeError): %s" % e)
392
393    @staticmethod
394    def get_alloc_ids(state):
395        """
396        Pull the fedids of the identifiers of each allocation from the
397        state.  Again, a dict dive that's best isolated.
398
399        Used by read_store and read state
400        """
401
402        return [ f['allocID']['fedid'] 
403                for f in state.get('federant',[]) \
404                    if f.has_key('allocID') and \
405                        f['allocID'].has_key('fedid')]
406
407    # Call while holding self.state_lock
408    def read_state(self):
409        """
410        Read a new copy of experiment state.  Old state is overwritten.
411
412        State format is a simple pickling of the state dictionary.
413        """
414       
415        def get_experiment_id(state):
416            """
417            Pull the fedid experimentID out of the saved state.  This is kind
418            of a gross walk through the dict.
419            """
420
421            if state.has_key('experimentID'):
422                for e in state['experimentID']:
423                    if e.has_key('fedid'):
424                        return e['fedid']
425                else:
426                    return None
427            else:
428                return None
429
430        try:
431            f = open(self.state_filename, "r")
432            self.state = pickle.load(f)
433            self.log.debug("[read_state]: Read state from %s" % \
434                    self.state_filename)
435        except IOError, e:
436            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
437                    % (self.state_filename, e))
438        except pickle.UnpicklingError, e:
439            self.log.warning(("[read_state]: No saved state: " + \
440                    "Unpickling failed: %s") % e)
441       
442        for s in self.state.values():
443            try:
444
445                eid = get_experiment_id(s)
446                if eid : 
447                    # Give the owner rights to the experiment
448                    self.auth.set_attribute(s['owner'], eid)
449                    # And holders of the eid as well
450                    self.auth.set_attribute(eid, eid)
451                    # allow overrides to control experiments as well
452                    for o in self.overrides:
453                        self.auth.set_attribute(o, eid)
454                    # Set permissions to allow reading of the software repo, if
455                    # any, as well.
456                    for a in self.get_alloc_ids(s):
457                        self.auth.set_attribute(a, 'repo/%s' % eid)
458                else:
459                    raise KeyError("No experiment id")
460            except KeyError, e:
461                self.log.warning("[read_state]: State ownership or identity " +\
462                        "misformatted in %s: %s" % (self.state_filename, e))
463
464
465    def read_accessdb(self, accessdb_file):
466        """
467        Read the mapping from fedids that can create experiments to their name
468        in the 3-level access namespace.  All will be asserted from this
469        testbed and can include the local username and porject that will be
470        asserted on their behalf by this fedd.  Each fedid is also added to the
471        authorization system with the "create" attribute.
472        """
473        self.accessdb = {}
474        # These are the regexps for parsing the db
475        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
476        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
477                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
478        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
479                "\s*->\s*(" + name_expr + ")\s*$")
480        lineno = 0
481
482        # Parse the mappings and store in self.authdb, a dict of
483        # fedid -> (proj, user)
484        try:
485            f = open(accessdb_file, "r")
486            for line in f:
487                lineno += 1
488                line = line.strip()
489                if len(line) == 0 or line.startswith('#'):
490                    continue
491                m = project_line.match(line)
492                if m:
493                    fid = fedid(hexstr=m.group(1))
494                    project, user = m.group(2,3)
495                    if not self.accessdb.has_key(fid):
496                        self.accessdb[fid] = []
497                    self.accessdb[fid].append((project, user))
498                    continue
499
500                m = user_line.match(line)
501                if m:
502                    fid = fedid(hexstr=m.group(1))
503                    project = None
504                    user = m.group(2)
505                    if not self.accessdb.has_key(fid):
506                        self.accessdb[fid] = []
507                    self.accessdb[fid].append((project, user))
508                    continue
509                self.log.warn("[experiment_control] Error parsing access " +\
510                        "db %s at line %d" %  (accessdb_file, lineno))
511        except IOError:
512            raise service_error(service_error.internal,
513                    "Error opening/reading %s as experiment " +\
514                            "control accessdb" %  accessdb_file)
515        f.close()
516
517        # Initialize the authorization attributes
518        for fid in self.accessdb.keys():
519            self.auth.set_attribute(fid, 'create')
520            self.auth.set_attribute(fid, 'new')
521
522    def read_mapdb(self, file):
523        """
524        Read a simple colon separated list of mappings for the
525        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
526        """
527
528        self.tbmap = { }
529        lineno =0
530        try:
531            f = open(file, "r")
532            for line in f:
533                lineno += 1
534                line = line.strip()
535                if line.startswith('#') or len(line) == 0:
536                    continue
537                try:
538                    label, url = line.split(':', 1)
539                    self.tbmap[label] = url
540                except ValueError, e:
541                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
542                            "map db: %s %s" % (lineno, line, e))
543        except IOError, e:
544            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
545                    "open %s: %s" % (file, e))
546        f.close()
547
548    def read_store(self):
549        try:
550            self.synch_store = synch_store()
551            self.synch_store.load(self.store_filename)
552            self.log.debug("[read_store]: Read store from %s" % \
553                    self.store_filename)
554        except IOError, e:
555            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
556                    % (self.state_filename, e))
557            self.synch_store = synch_store()
558
559        # Set the initial permissions on data in the store.  XXX: This ad hoc
560        # authorization attribute initialization is getting out of hand.
561        for k in self.synch_store.all_keys():
562            try:
563                if k.startswith('fedid:'):
564                    fid = fedid(hexstr=k[6:46])
565                    if self.state.has_key(fid):
566                        for a in self.get_alloc_ids(self.state[fid]):
567                            self.auth.set_attribute(a, k)
568            except ValueError, e:
569                self.log.warn("Cannot deduce permissions for %s" % k)
570
571
572    def write_store(self):
573        """
574        Write a new copy of synch_store after writing current state
575        to a backup.  We use the internal synch_store pickle method to avoid
576        incinsistent data.
577
578        State format is a simple pickling of the store.
579        """
580        if os.access(self.store_filename, os.W_OK):
581            copy_file(self.store_filename, \
582                    "%s.bak" % self.store_filename)
583        try:
584            self.synch_store.save(self.store_filename)
585        except IOError, e:
586            self.log.error("Can't write file %s: %s" % \
587                    (self.store_filename, e))
588        except TypeError, e:
589            self.log.error("Pickling problem (TypeError): %s" % e)
590
591       
592    def generate_ssh_keys(self, dest, type="rsa" ):
593        """
594        Generate a set of keys for the gateways to use to talk.
595
596        Keys are of type type and are stored in the required dest file.
597        """
598        valid_types = ("rsa", "dsa")
599        t = type.lower();
600        if t not in valid_types: raise ValueError
601        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
602
603        try:
604            trace = open("/dev/null", "w")
605        except IOError:
606            raise service_error(service_error.internal,
607                    "Cannot open /dev/null??");
608
609        # May raise CalledProcessError
610        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
611        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
612        if rv != 0:
613            raise service_error(service_error.internal, 
614                    "Cannot generate nonce ssh keys.  %s return code %d" \
615                            % (self.ssh_keygen, rv))
616
617    def gentopo(self, str):
618        """
619        Generate the topology dtat structure from the splitter's XML
620        representation of it.
621
622        The topology XML looks like:
623            <experiment>
624                <nodes>
625                    <node><vname></vname><ips>ip1:ip2</ips></node>
626                </nodes>
627                <lans>
628                    <lan>
629                        <vname></vname><vnode></vnode><ip></ip>
630                        <bandwidth></bandwidth><member>node:port</member>
631                    </lan>
632                </lans>
633        """
634        class topo_parse:
635            """
636            Parse the topology XML and create the dats structure.
637            """
638            def __init__(self):
639                # Typing of the subelements for data conversion
640                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
641                self.int_subelements = ( 'bandwidth',)
642                self.float_subelements = ( 'delay',)
643                # The final data structure
644                self.nodes = [ ]
645                self.lans =  [ ]
646                self.topo = { \
647                        'node': self.nodes,\
648                        'lan' : self.lans,\
649                    }
650                self.element = { }  # Current element being created
651                self.chars = ""     # Last text seen
652
653            def end_element(self, name):
654                # After each sub element the contents is added to the current
655                # element or to the appropriate list.
656                if name == 'node':
657                    self.nodes.append(self.element)
658                    self.element = { }
659                elif name == 'lan':
660                    self.lans.append(self.element)
661                    self.element = { }
662                elif name in self.str_subelements:
663                    self.element[name] = self.chars
664                    self.chars = ""
665                elif name in self.int_subelements:
666                    self.element[name] = int(self.chars)
667                    self.chars = ""
668                elif name in self.float_subelements:
669                    self.element[name] = float(self.chars)
670                    self.chars = ""
671
672            def found_chars(self, data):
673                self.chars += data.rstrip()
674
675
676        tp = topo_parse();
677        parser = xml.parsers.expat.ParserCreate()
678        parser.EndElementHandler = tp.end_element
679        parser.CharacterDataHandler = tp.found_chars
680
681        parser.Parse(str)
682
683        return tp.topo
684       
685
686    def genviz(self, topo):
687        """
688        Generate the visualization the virtual topology
689        """
690
691        neato = "/usr/local/bin/neato"
692        # These are used to parse neato output and to create the visualization
693        # file.
694        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
695        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
696                "%s</type></node>"
697
698        try:
699            # Node names
700            nodes = [ n['vname'] for n in topo['node'] ]
701            topo_lans = topo['lan']
702        except KeyError, e:
703            raise service_error(service_error.internal, "Bad topology: %s" %e)
704
705        lans = { }
706        links = { }
707
708        # Walk through the virtual topology, organizing the connections into
709        # 2-node connections (links) and more-than-2-node connections (lans).
710        # When a lan is created, it's added to the list of nodes (there's a
711        # node in the visualization for the lan).
712        for l in topo_lans:
713            if links.has_key(l['vname']):
714                if len(links[l['vname']]) < 2:
715                    links[l['vname']].append(l['vnode'])
716                else:
717                    nodes.append(l['vname'])
718                    lans[l['vname']] = links[l['vname']]
719                    del links[l['vname']]
720                    lans[l['vname']].append(l['vnode'])
721            elif lans.has_key(l['vname']):
722                lans[l['vname']].append(l['vnode'])
723            else:
724                links[l['vname']] = [ l['vnode'] ]
725
726
727        # Open up a temporary file for dot to turn into a visualization
728        try:
729            df, dotname = tempfile.mkstemp()
730            dotfile = os.fdopen(df, 'w')
731        except IOError:
732            raise service_error(service_error.internal,
733                    "Failed to open file in genviz")
734
735        try:
736            dnull = open('/dev/null', 'w')
737        except IOError:
738            service_error(service_error.internal,
739                    "Failed to open /dev/null in genviz")
740
741        # Generate a dot/neato input file from the links, nodes and lans
742        try:
743            print >>dotfile, "graph G {"
744            for n in nodes:
745                print >>dotfile, '\t"%s"' % n
746            for l in links.keys():
747                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
748            for l in lans.keys():
749                for n in lans[l]:
750                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
751            print >>dotfile, "}"
752            dotfile.close()
753        except TypeError:
754            raise service_error(service_error.internal,
755                    "Single endpoint link in vtopo")
756        except IOError:
757            raise service_error(service_error.internal, "Cannot write dot file")
758
759        # Use dot to create a visualization
760        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
761                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
762                close_fds=True)
763        dnull.close()
764
765        # Translate dot to vis format
766        vis_nodes = [ ]
767        vis = { 'node': vis_nodes }
768        for line in dot.stdout:
769            m = vis_re.match(line)
770            if m:
771                vn = m.group(1)
772                vis_node = {'name': vn, \
773                        'x': float(m.group(2)),\
774                        'y' : float(m.group(3)),\
775                    }
776                if vn in links.keys() or vn in lans.keys():
777                    vis_node['type'] = 'lan'
778                else:
779                    vis_node['type'] = 'node'
780                vis_nodes.append(vis_node)
781        rv = dot.wait()
782
783        os.remove(dotname)
784        if rv == 0 : return vis
785        else: return None
786
787    def get_access(self, tb, nodes, tbparam, master, export_project,
788            access_user, services):
789        """
790        Get access to testbed through fedd and set the parameters for that tb
791        """
792        uri = self.tbmap.get(tb, None)
793        if not uri:
794            raise service_error(service_error.server_config, 
795                    "Unknown testbed: %s" % tb)
796
797        # Tweak search order so that if there are entries in access_user that
798        # have a project matching the export project, we try them first
799        if export_project: 
800            access_sequence = [ (p, u) for p, u in access_user \
801                    if p == export_project] 
802            access_sequence.extend([(p, u) for p, u in access_user \
803                    if p != export_project]) 
804        else: 
805            access_sequence = access_user
806
807        for p, u in access_sequence: 
808            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
809                    "to %s") %  ((p or "None"), u, uri))
810
811            if p:
812                # Request with user and project specified
813                req = {\
814                        'destinationTestbed' : { 'uri' : uri },
815                        'credential': [ "project: %s" % p, "user: %s"  % u],
816                        'allocID' : { 'localname': 'test' },
817                    }
818            else:
819                # Request with only user specified
820                req = {\
821                        'destinationTestbed' : { 'uri' : uri },
822                        'credential': [ 'user: %s' % u ],
823                        'user':  [ {'userID': { 'localname': u } } ],
824                        'allocID' : { 'localname': 'test' },
825                    }
826
827            # If there is a master, and this is it, ask it to export services
828            # XXX move this to export pseudo-service
829            if tb == master:
830                req['service'] = [
831                        { 'name': 'userconfig', 'visibility': 'export'},
832                        { 'name': 'SMB', 'visibility': 'export'},
833                        { 'name': 'seer', 'visibility': 'export'},
834                        { 'name': 'tmcd', 'visibility': 'export'},
835                    ]
836
837            # node resources if any
838            if nodes != None and len(nodes) > 0:
839                rnodes = [ ]
840                for n in nodes:
841                    rn = { }
842                    image, hw, count = n.split(":")
843                    if image: rn['image'] = [ image ]
844                    if hw: rn['hardware'] = [ hw ]
845                    if count and int(count) >0 : rn['count'] = int(count)
846                    rnodes.append(rn)
847                req['resources']= { }
848                req['resources']['node'] = rnodes
849
850            try:
851                if self.local_access.has_key(uri):
852                    # Local access call
853                    req = { 'RequestAccessRequestBody' : req }
854                    r = self.local_access[uri].RequestAccess(req, 
855                            fedid(file=self.cert_file))
856                    r = { 'RequestAccessResponseBody' : r }
857                else:
858                    r = self.call_RequestAccess(uri, req, 
859                            self.cert_file, self.cert_pwd, self.trusted_certs)
860            except service_error, e:
861                if e.code == service_error.access:
862                    self.log.debug("[get_access] Access denied")
863                    r = None
864                    continue
865                else:
866                    raise e
867
868            if r.has_key('RequestAccessResponseBody'):
869                # Through to here we have a valid response, not a fault.
870                # Access denied is a fault, so something better or worse than
871                # access denied has happened.
872                r = r['RequestAccessResponseBody']
873                self.log.debug("[get_access] Access granted")
874                break
875            else:
876                raise service_error(service_error.protocol,
877                        "Bad proxy response")
878       
879        if not r:
880            raise service_error(service_error.access, 
881                    "Access denied by %s (%s)" % (tb, uri))
882
883        tbparam[tb] = { 
884                "allocID" : r['allocID'],
885                "uri": uri,
886                }
887        if 'service' in r: 
888            services.extend(r['service'])
889
890        # Add attributes to parameter space.  We don't allow attributes to
891        # overlay any parameters already installed.
892        for a in r.get('fedAttr', []):
893            try:
894                if a['attribute'] and \
895                        isinstance(a['attribute'], basestring)\
896                        and not tbparam[tb].has_key(a['attribute'].lower()):
897                    tbparam[tb][a['attribute'].lower()] = a['value']
898            except KeyError:
899                self.log.error("Bad attribute in response: %s" % a)
900
901    def release_access(self, tb, aid, uri=None):
902        """
903        Release access to testbed through fedd
904        """
905
906        if not uri:
907            uri = self.tbmap.get(tb, None)
908        if not uri:
909            raise service_error(service_error.server_config, 
910                    "Unknown testbed: %s" % tb)
911
912        if self.local_access.has_key(uri):
913            resp = self.local_access[uri].ReleaseAccess(\
914                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
915                    fedid(file=self.cert_file))
916            resp = { 'ReleaseAccessResponseBody': resp } 
917        else:
918            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
919                    self.cert_file, self.cert_pwd, self.trusted_certs)
920
921        # better error coding
922
923    def remote_ns2topdl(self, uri, desc):
924
925        req = {
926                'description' : { 'ns2description': desc },
927            }
928
929        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
930                self.trusted_certs)
931
932        if r.has_key('Ns2TopdlResponseBody'):
933            r = r['Ns2TopdlResponseBody']
934            ed = r.get('experimentdescription', None)
935            if ed.has_key('topdldescription'):
936                return topdl.Topology(**ed['topdldescription'])
937            else:
938                raise service_error(service_error.protocol, 
939                        "Bad splitter response (no output)")
940        else:
941            raise service_error(service_error.protocol, "Bad splitter response")
942
943    class start_segment:
944        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
945                cert_pwd=None, trusted_certs=None, caller=None,
946                log_collector=None):
947            self.log = log
948            self.debug = debug
949            self.cert_file = cert_file
950            self.cert_pwd = cert_pwd
951            self.trusted_certs = None
952            self.caller = caller
953            self.testbed = testbed
954            self.log_collector = log_collector
955            self.response = None
956
957        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
958                services=None):
959            req = {
960                    'allocID': { 'fedid' : aid }, 
961                    'segmentdescription': { 
962                        'topdldescription': topo.to_dict(),
963                    },
964                    'master': master,
965                }
966
967            if connInfo:
968                req['connection'] = connInfo
969            # Add services to request.  The master exports, everyone else
970            # imports.
971            if services:
972                svcs = [ x.copy() for x in services]
973                for s in svcs:
974                    if master: s['visibility'] = 'export'
975                    else: s['visibility'] = 'import'
976                req['service'] = svcs
977            if attrs:
978                req['fedAttr'] = attrs
979
980            try:
981                self.log.debug("Calling StartSegment at %s " % uri)
982                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
983                        self.trusted_certs)
984                if r.has_key('StartSegmentResponseBody'):
985                    lval = r['StartSegmentResponseBody'].get('allocationLog',
986                            None)
987                    if lval and self.log_collector:
988                        for line in  lval.splitlines(True):
989                            self.log_collector.write(line)
990                    self.response = r
991                else:
992                    raise service_error(service_error.internal, 
993                            "Bad response!?: %s" %r)
994                return True
995            except service_error, e:
996                self.log.error("Start segment failed on %s: %s" % \
997                        (self.testbed, e))
998                return False
999
1000
1001
1002    class terminate_segment:
1003        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1004                cert_pwd=None, trusted_certs=None, caller=None):
1005            self.log = log
1006            self.debug = debug
1007            self.cert_file = cert_file
1008            self.cert_pwd = cert_pwd
1009            self.trusted_certs = None
1010            self.caller = caller
1011            self.testbed = testbed
1012
1013        def __call__(self, uri, aid ):
1014            req = {
1015                    'allocID': aid , 
1016                }
1017            try:
1018                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1019                        self.trusted_certs)
1020                return True
1021            except service_error, e:
1022                self.log.error("Terminate segment failed on %s: %s" % \
1023                        (self.testbed, e))
1024                return False
1025   
1026
1027    def allocate_resources(self, allocated, master, eid, expid, 
1028            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
1029            attrs=None, connInfo={}, services=[]):
1030
1031        started = { }           # Testbeds where a sub-experiment started
1032                                # successfully
1033
1034        # XXX
1035        fail_soft = False
1036
1037        log = alloc_log or self.log
1038
1039        thread_pool = self.thread_pool(self.nthreads)
1040        threads = [ ]
1041
1042        for tb in allocated.keys():
1043            # Create and start a thread to start the segment, and save it
1044            # to get the return value later
1045            thread_pool.wait_for_slot()
1046            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1047            if not uri:
1048                raise service_error(service_error.internal, 
1049                        "Unknown testbed %s !?" % tb)
1050
1051            if tbparams[tb].has_key('allocID') and \
1052                    tbparams[tb]['allocID'].has_key('fedid'):
1053                aid = tbparams[tb]['allocID']['fedid']
1054            else:
1055                raise service_error(service_error.internal, 
1056                        "No alloc id for testbed %s !?" % tb)
1057
1058            t  = self.pooled_thread(\
1059                    target=self.start_segment(log=log, debug=self.debug,
1060                        testbed=tb, cert_file=self.cert_file,
1061                        cert_pwd=self.cert_pwd,
1062                        trusted_certs=self.trusted_certs,
1063                        caller=self.call_StartSegment,
1064                        log_collector=log_collector), 
1065                    args=(uri, aid, topo[tb], tb == master, 
1066                        attrs, connInfo[tb], services),
1067                    name=tb,
1068                    pdata=thread_pool, trace_file=self.trace_file)
1069            threads.append(t)
1070            t.start()
1071
1072        # Wait until all finish (keep pinging the log, though)
1073        mins = 0
1074        revoked = False
1075        while not thread_pool.wait_for_all_done(60.0):
1076            mins += 1
1077            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1078                    % mins)
1079            if not revoked and \
1080                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1081                # a testbed has failed.  Revoke this experiment's
1082                # synchronizarion values so that sub experiments will not
1083                # deadlock waiting for synchronization that will never happen
1084                self.log.info("A subexperiment has failed to swap in, " + \
1085                        "revoking synch keys")
1086                var_key = "fedid:%s" % expid
1087                for k in self.synch_store.all_keys():
1088                    if len(k) > 45 and k[0:46] == var_key:
1089                        self.synch_store.revoke_key(k)
1090                revoked = True
1091
1092        failed = [ t.getName() for t in threads if not t.rv ]
1093        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1094
1095        # If one failed clean up, unless fail_soft is set
1096        if failed:
1097            if not fail_soft:
1098                thread_pool.clear()
1099                for tb in succeeded:
1100                    # Create and start a thread to stop the segment
1101                    thread_pool.wait_for_slot()
1102                    uri = tbparams[tb]['uri']
1103                    t  = self.pooled_thread(\
1104                            target=self.terminate_segment(log=log,
1105                                testbed=tb,
1106                                cert_file=self.cert_file, 
1107                                cert_pwd=self.cert_pwd,
1108                                trusted_certs=self.trusted_certs,
1109                                caller=self.call_TerminateSegment),
1110                            args=(uri, tbparams[tb]['federant']['allocID']),
1111                            name=tb,
1112                            pdata=thread_pool, trace_file=self.trace_file)
1113                    t.start()
1114                # Wait until all finish (if any are being stopped)
1115                if succeeded:
1116                    thread_pool.wait_for_all_done()
1117
1118                # release the allocations
1119                for tb in tbparams.keys():
1120                    self.release_access(tb, tbparams[tb]['allocID'],
1121                            tbparams[tb].get('uri', None))
1122                # Remove the placeholder
1123                self.state_lock.acquire()
1124                self.state[eid]['experimentStatus'] = 'failed'
1125                if self.state_filename: self.write_state()
1126                self.state_lock.release()
1127
1128                log.error("Swap in failed on %s" % ",".join(failed))
1129                return
1130        else:
1131            log.info("[start_segment]: Experiment %s active" % eid)
1132
1133
1134        # Walk up tmpdir, deleting as we go
1135        if self.cleanup:
1136            log.debug("[start_experiment]: removing %s" % tmpdir)
1137            for path, dirs, files in os.walk(tmpdir, topdown=False):
1138                for f in files:
1139                    os.remove(os.path.join(path, f))
1140                for d in dirs:
1141                    os.rmdir(os.path.join(path, d))
1142            os.rmdir(tmpdir)
1143        else:
1144            log.debug("[start_experiment]: not removing %s" % tmpdir)
1145
1146        # Insert the experiment into our state and update the disk copy
1147        self.state_lock.acquire()
1148        self.state[expid]['experimentStatus'] = 'active'
1149        self.state[eid] = self.state[expid]
1150        if self.state_filename: self.write_state()
1151        self.state_lock.release()
1152        return
1153
1154
1155    def add_kit(self, e, kit):
1156        """
1157        Add a Software object created from the list of (install, location)
1158        tuples passed as kit  to the software attribute of an object e.  We
1159        do this enough to break out the code, but it's kind of a hack to
1160        avoid changing the old tuple rep.
1161        """
1162
1163        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1164
1165        if isinstance(e.software, list): e.software.extend(s)
1166        else: e.software = s
1167
1168
1169    def create_experiment_state(self, fid, req, expid, expcert, 
1170            state='starting'):
1171        """
1172        Create the initial entry in the experiment's state.  The expid and
1173        expcert are the experiment's fedid and certifacte that represents that
1174        ID, which are installed in the experiment state.  If the request
1175        includes a suggested local name that is used if possible.  If the local
1176        name is already taken by an experiment owned by this user that has
1177        failed, it is overwritten.  Otherwise new letters are added until a
1178        valid localname is found.  The generated local name is returned.
1179        """
1180
1181        if req.has_key('experimentID') and \
1182                req['experimentID'].has_key('localname'):
1183            overwrite = False
1184            eid = req['experimentID']['localname']
1185            # If there's an old failed experiment here with the same local name
1186            # and accessible by this user, we'll overwrite it, otherwise we'll
1187            # fall through and do the collision avoidance.
1188            old_expid = self.get_experiment_fedid(eid)
1189            if old_expid and self.check_experiment_access(fid, old_expid):
1190                self.state_lock.acquire()
1191                status = self.state[eid].get('experimentStatus', None)
1192                if status and status == 'failed':
1193                    # remove the old access attribute
1194                    self.auth.unset_attribute(fid, old_expid)
1195                    overwrite = True
1196                    del self.state[eid]
1197                    del self.state[old_expid]
1198                self.state_lock.release()
1199            self.state_lock.acquire()
1200            while (self.state.has_key(eid) and not overwrite):
1201                eid += random.choice(string.ascii_letters)
1202            # Initial state
1203            self.state[eid] = {
1204                    'experimentID' : \
1205                            [ { 'localname' : eid }, {'fedid': expid } ],
1206                    'experimentStatus': state,
1207                    'experimentAccess': { 'X509' : expcert },
1208                    'owner': fid,
1209                    'log' : [],
1210                }
1211            self.state[expid] = self.state[eid]
1212            if self.state_filename: self.write_state()
1213            self.state_lock.release()
1214        else:
1215            eid = self.exp_stem
1216            for i in range(0,5):
1217                eid += random.choice(string.ascii_letters)
1218            self.state_lock.acquire()
1219            while (self.state.has_key(eid)):
1220                eid = self.exp_stem
1221                for i in range(0,5):
1222                    eid += random.choice(string.ascii_letters)
1223            # Initial state
1224            self.state[eid] = {
1225                    'experimentID' : \
1226                            [ { 'localname' : eid }, {'fedid': expid } ],
1227                    'experimentStatus': state,
1228                    'experimentAccess': { 'X509' : expcert },
1229                    'owner': fid,
1230                    'log' : [],
1231                }
1232            self.state[expid] = self.state[eid]
1233            if self.state_filename: self.write_state()
1234            self.state_lock.release()
1235
1236        return eid
1237
1238
1239    def allocate_ips_to_topo(self, top):
1240        """
1241        Add an ip4_address attribute to all the hosts in the topology, based on
1242        the shared substrates on which they sit.  An /etc/hosts file is also
1243        created and returned as a list of hostfiles entries.  We also return
1244        the allocator, because we may need to allocate IPs to portals
1245        (specifically DRAGON portals).
1246        """
1247        subs = sorted(top.substrates, 
1248                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1249                reverse=True)
1250        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1251        ifs = { }
1252        hosts = [ ]
1253
1254        for idx, s in enumerate(subs):
1255            net_size = len(s.interfaces)+2
1256
1257            a = ips.allocate(net_size)
1258            if a :
1259                base, num = a
1260                if num < net_size: 
1261                    raise service_error(service_error.internal,
1262                            "Allocator returned wrong number of IPs??")
1263            else:
1264                raise service_error(service_error.req, 
1265                        "Cannot allocate IP addresses")
1266            mask = ips.min_alloc
1267            while mask < net_size:
1268                mask *= 2
1269
1270            netmask = ((2**32-1) ^ (mask-1))
1271
1272            base += 1
1273            for i in s.interfaces:
1274                i.attribute.append(
1275                        topdl.Attribute('ip4_address', 
1276                            "%s" % ip_addr(base)))
1277                i.attribute.append(
1278                        topdl.Attribute('ip4_netmask', 
1279                            "%s" % ip_addr(int(netmask))))
1280
1281                hname = i.element.name[0]
1282                if ifs.has_key(hname):
1283                    hosts.append("%s\t%s-%s %s-%d" % \
1284                            (ip_addr(base), hname, s.name, hname,
1285                                ifs[hname]))
1286                else:
1287                    ifs[hname] = 0
1288                    hosts.append("%s\t%s-%s %s-%d %s" % \
1289                            (ip_addr(base), hname, s.name, hname,
1290                                ifs[hname], hname))
1291
1292                ifs[hname] += 1
1293                base += 1
1294        return hosts, ips
1295
1296    def get_access_to_testbeds(self, testbeds, access_user, 
1297            export_project, master, allocated, tbparams, services):
1298        """
1299        Request access to the various testbeds required for this instantiation
1300        (passed in as testbeds).  User, access_user, expoert_project and master
1301        are used to construct the correct requests.  Per-testbed parameters are
1302        returned in tbparams.
1303        """
1304        for tb in testbeds:
1305            self.get_access(tb, None, tbparams, master,
1306                    export_project, access_user, services)
1307            allocated[tb] = 1
1308
1309    def split_topology(self, top, topo, testbeds):
1310        """
1311        Create the sub-topologies that are needed for experiment instantiation.
1312        """
1313        for tb in testbeds:
1314            topo[tb] = top.clone()
1315            # copy in for loop allows deletions from the original
1316            for e in [ e for e in topo[tb].elements]:
1317                etb = e.get_attribute('testbed')
1318                # NB: elements without a testbed attribute won't appear in any
1319                # sub topologies. 
1320                if not etb or etb != tb:
1321                    for i in e.interface:
1322                        for s in i.subs:
1323                            try:
1324                                s.interfaces.remove(i)
1325                            except ValueError:
1326                                raise service_error(service_error.internal,
1327                                        "Can't remove interface??")
1328                    topo[tb].elements.remove(e)
1329            topo[tb].make_indices()
1330
1331    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1332            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
1333            expid=None):
1334        """
1335        Return a new internet portal node and a dict with the connectionInfo to
1336        be attached.
1337        """
1338        dproject = tbparams[dt].get('project', 'project')
1339        ddomain = tbparams[dt].get('domain', ".example.com")
1340        mdomain = tbparams[master].get('domain', '.example.com')
1341        mproject = tbparams[master].get('project', 'project')
1342        muser = tbparams[master].get('user', 'root')
1343        smbshare = tbparams[master].get('smbshare', 'USERS')
1344
1345        if st == master or dt == master:
1346            active = ("%s" % (st == master))
1347        else:
1348            active = ("%s" % (st > dt))
1349
1350        ifaces = [ ]
1351        for sub, attrs in iface_desc:
1352            inf = topdl.Interface(
1353                    name="inf%03d" % len(ifaces),
1354                    substrate=sub,
1355                    attribute=[
1356                        topdl.Attribute(
1357                            attribute=n,
1358                            value = v)
1359                        for n, v in attrs
1360                        ]
1361                    )
1362            ifaces.append(inf)
1363        if conn_type == "ssh":
1364            try:
1365                aid = tbparams[st]['allocID']['fedid']
1366            except:
1367                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
1368                        % st)
1369                aid = None
1370            info = {
1371                    "type" : conn_type, 
1372                    "portal": myname,
1373                    'fedAttr': [ 
1374                            { 'attribute': 'masterdomain', 'value': mdomain},
1375                            { 'attribute': 'masterexperiment', 'value': 
1376                                "%s/%s" % (mproject, eid)},
1377                            { 'attribute': 'active', 'value': active},
1378                            # Move to SMB service description
1379                            { 'attribute': 'masteruser', 'value': muser},
1380                            { 'attribute': 'smbshare', 'value': smbshare},
1381                        ],
1382                    'parameter': [
1383                        {
1384                            'name': 'peer',
1385                            'key': 'fedid:%s/%s' % (expid, myname),
1386                            'store': self.store_url,
1387                            'type': 'output',
1388                        },
1389                        {
1390                            'name': 'ssh_port',
1391                            'key': 'fedid:%s/%s-port' % (expid, myname),
1392                            'store': self.store_url,
1393                            'type': 'output',
1394                        },
1395                        {
1396                            'name': 'peer',
1397                            'key': 'fedid:%s/%s' % (expid, desthost),
1398                            'store': self.store_url,
1399                            'type': 'input',
1400                        },
1401                        {
1402                            'name': 'ssh_port',
1403                            'key': 'fedid:%s/%s-port' % (expid, desthost),
1404                            'store': self.store_url,
1405                            'type': 'input',
1406                        },
1407                        ]
1408                    }
1409            # Give this allocation the rights to access the key of the
1410            # peers
1411            if aid:
1412                for h in (myname, desthost):
1413                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
1414                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
1415                            (expid, h))
1416            else:
1417                self.log.error("No aid for %s in new_portal_node" % st)
1418        else:
1419            info = None
1420
1421        return (topdl.Computer(
1422                name=myname,
1423                attribute=[ 
1424                    topdl.Attribute(attribute=n,value=v)
1425                        for n, v in (\
1426                            ('portal', 'true'),
1427                            ('portal_type', portal_type), 
1428                        )
1429                    ],
1430                interface=ifaces,
1431                ), info)
1432
1433    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
1434        ddomain = tbparams[dt].get('domain', ".example.com")
1435        dproject = tbparams[dt].get('project', 'project')
1436        tsubstrate = \
1437                topdl.Substrate(name='%s-%s' % (st, dt),
1438                        attribute= [
1439                            topdl.Attribute(
1440                                attribute='portal',
1441                                value='true')
1442                            ]
1443                        )
1444        segment_element = topdl.Segment(
1445                id= tbparams[dt]['allocID'],
1446                type='emulab',
1447                uri = self.tbmap.get(dt, None),
1448                interface=[ 
1449                    topdl.Interface(
1450                        substrate=tsubstrate.name),
1451                    ],
1452                attribute = [
1453                    topdl.Attribute(attribute=n, value=v)
1454                        for n, v in (\
1455                            ('domain', ddomain),
1456                            ('experiment', "%s/%s" % \
1457                                    (dproject, eid)),)
1458                    ],
1459                )
1460
1461        return (tsubstrate, segment_element)
1462
1463    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
1464        if sub.capacity is None:
1465            raise service_error(service_error.internal,
1466                    "Cannot DRAGON split substrate w/o capacity")
1467        segs = [ ]
1468        substr = topdl.Substrate(name="dragon%d" % idx, 
1469                capacity=sub.capacity.clone(),
1470                attribute=[ topdl.Attribute(attribute=n, value=v)
1471                    for n, v, in (\
1472                            ('vlan', 'unassigned%d' % idx),)])
1473        name = "dragon%d" % idx
1474        store_key = 'fedid:%s/vlan%d' % (expid, idx)
1475        for tb in tbs.keys():
1476            seg = topdl.Segment(
1477                    id = tbparams[tb]['allocID'],
1478                    type='emulab',
1479                    uri = self.tbmap.get(tb, None),
1480                    interface=[ 
1481                        topdl.Interface(
1482                            substrate=substr.name),
1483                        ],
1484                    attribute=[ topdl.Attribute(
1485                        attribute='dragon_endpoint', 
1486                        value=tbparams[tb]['dragon']),
1487                        ]
1488                    )
1489            if tbparams[tb].has_key('vlans'):
1490                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1491            segs.append(seg)
1492
1493            # Give this allocation the rights to access the key of the
1494            # vlan_id
1495            try:
1496                aid = tbparams[tb]['allocID']['fedid']
1497                self.auth.set_attribute(aid, store_key)
1498            except:
1499                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
1500                        % tb)
1501
1502        connInfo[name] = [ { 
1503            'type': 'transit',
1504            'parameter': [ { 
1505                'name': 'vlan_id',
1506                'key': store_key,
1507                'store': self.store_url,
1508                'type': 'output'
1509                } ]
1510            } ]
1511
1512        topo[name] = \
1513                topdl.Topology(substrates=[substr], elements=segs,
1514                        attribute=[
1515                            topdl.Attribute(attribute="transit", value='true'),
1516                            topdl.Attribute(attribute="dynamic", value='true'),
1517                            topdl.Attribute(attribute="testbed", 
1518                                value='dragon'),
1519                            topdl.Attribute(attribute="store_keys", 
1520                                value=store_key),
1521                            ]
1522                        )
1523
1524    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
1525            connInfo, expid=None):
1526        """
1527        Add attribiutes to the various elements indicating that they are to be
1528        dragon connected and create a dragon segment in topo to be
1529        instantiated.
1530        """
1531
1532        def get_substrate_from_topo(name, t):
1533            for s in t.substrates:
1534                if s.name == name: return s
1535            else: return None
1536
1537
1538        mdomain = tbparams[master].get('domain', '.example.com')
1539        mproject = tbparams[master].get('project', 'project')
1540        # dn is the number of previously created dragon nets.  This routine
1541        # creates a net numbered by dn
1542        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1543        # Count the number of interfaces on this substrate in each testbed from
1544        # the global topology
1545        count = { }
1546        node = { }
1547        for e in [ i.element for i in sub.interfaces ]:
1548            tb = e.get_attribute('testbed')
1549            count[tb] = count.get(tb, 0) + 1
1550            node[tb] = i.get_attribute('ip4_address')
1551
1552
1553        # Set the attributes in the copies that will allow setup of dragon
1554        # connections.
1555        for tb in tbs.keys():
1556            s = get_substrate_from_topo(sub.name, topo[tb])
1557            if s:
1558                if not connInfo.has_key(tb):
1559                    connInfo[tb] = [ ]
1560
1561                try:
1562                    aid = tbparams[tb]['allocID']['fedid']
1563                except:
1564                    self.log.debug("[creat_dragon_substrate] " + 
1565                            "Can't get alloc id for %s?" %tb)
1566                    aid = None
1567
1568                # This may need another look, but only a service gateway will
1569                # look at the active parameter, and these are only inserted to
1570                # connect to the master.
1571                active = "%s" % ( tb == master)
1572                info = {
1573                        'type': 'transit',
1574                        'member': [ {
1575                            'element': i.element.name[0], 
1576                            'interface': i.name
1577                            } for i in s.interfaces \
1578                                    if isinstance(i.element, topdl.Computer) ],
1579                        'fedAttr': [ 
1580                            { 'attribute': 'masterdomain', 'value': mdomain},
1581                            { 'attribute': 'masterexperiment', 'value': 
1582                                "%s/%s" % (mproject, eid)},
1583                            { 'attribute': 'active', 'value': active},
1584                            ],
1585                        'parameter': [ {
1586                            'name': 'vlan_id',
1587                            'key': 'fedid:%s/vlan%d' % (expid, dn),
1588                            'store': self.store_url,
1589                            'type': 'input',
1590                            } ]
1591                        }
1592                if tbs.has_key(tb):
1593                    info['peer'] = tbs[tb]
1594                connInfo[tb].append(info)
1595
1596                # Give this allocation the rights to access the key of the
1597                # vlan_id
1598                if aid:
1599                    self.auth.set_attribute(aid, 
1600                            'fedid:%s/vlan%d' % (expid, dn))
1601            else:
1602                raise service_error(service_error.internal,
1603                        "No substrate %s in testbed %s" % (sub.name, tb))
1604
1605        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
1606
1607    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1608            segment_substrate, portals, connInfo, expid):
1609        # More than one testbed is on this substrate.  Insert
1610        # some portals into the subtopologies.  st == source testbed,
1611        # dt == destination testbed.
1612        for st in tbs.keys():
1613            if not segment_substrate.has_key(st):
1614                segment_substrate[st] = { }
1615            if not portals.has_key(st): 
1616                portals[st] = { }
1617            if not connInfo.has_key(st):
1618                connInfo[st] = [ ]
1619            for dt in [ t for t in tbs.keys() if t != st]:
1620                sproject = tbparams[st].get('project', 'project')
1621                dproject = tbparams[dt].get('project', 'project')
1622                mproject = tbparams[master].get('project', 'project')
1623                sdomain = tbparams[st].get('domain', ".example.com")
1624                ddomain = tbparams[dt].get('domain', ".example.com")
1625                mdomain = tbparams[master].get('domain', '.example.com')
1626                muser = tbparams[master].get('user', 'root')
1627                smbshare = tbparams[master].get('smbshare', 'USERS')
1628                aid = tbparams[dt]['allocID']['fedid']
1629                if st == master or dt == master:
1630                    active = ("%s" % (st == master))
1631                else:
1632                    active = ("%s" %(st > dt))
1633                if not segment_substrate[st].has_key(dt):
1634                    # Put a substrate and a segment for the connected
1635                    # testbed in there.
1636                    tsubstrate, segment_element = \
1637                            self.new_portal_substrate(st, dt, eid, tbparams,
1638                                    expid)
1639                    segment_substrate[st][dt] = tsubstrate
1640                    topo[st].substrates.append(tsubstrate)
1641                    topo[st].elements.append(segment_element)
1642
1643                new_portal = False
1644                if portals[st].has_key(dt):
1645                    # There's a portal set up to go to this destination.
1646                    # See if there's room to multiplex this connection on
1647                    # it.  If so, add an interface to the portal; if not,
1648                    # set up to add a portal below.
1649                    # [This little festival of braces is just a pop of the
1650                    # last element in the list of portals between st and
1651                    # dt.]
1652                    portal = portals[st][dt][-1]
1653                    mux = len([ i for i in portal.interface \
1654                            if not i.get_attribute('portal')])
1655                    if mux == self.muxmax:
1656                        new_portal = True
1657                        portal_type = "experiment"
1658                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1659                        desthost = "%stunnel%d" % (st.lower(), 
1660                                len(portals[st][dt]))
1661                    else:
1662                        new_i = topdl.Interface(
1663                                substrate=sub.name,
1664                                attribute=[ 
1665                                    topdl.Attribute(
1666                                        attribute='ip4_address', 
1667                                        value=tbs[dt]
1668                                    )
1669                                ])
1670                        portal.interface.append(new_i)
1671                else:
1672                    # First connection to this testbed, make an empty list
1673                    # and set up to add the new portal below
1674                    new_portal = True
1675                    portals[st][dt] = [ ]
1676                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1677                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
1678
1679                    if dt == master or st == master: portal_type = "both"
1680                    else: portal_type = "experiment"
1681
1682                if new_portal:
1683                    infs = (
1684                            (segment_substrate[st][dt].name, 
1685                                (('portal', 'true'),)),
1686                            (sub.name, 
1687                                (('ip4_address', tbs[dt]),))
1688                        )
1689                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
1690                            master, eid, myname, desthost, portal_type,
1691                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
1692
1693                    topo[st].elements.append(portal)
1694                    portals[st][dt].append(portal)
1695                    connInfo[st].append(info)
1696
1697    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo, expid):
1698        # Add to the master testbed
1699        tsubstrate, segment_element = \
1700                self.new_portal_substrate(st, dt, eid, tbparams, expid)
1701        myname = "%stunnel" % dt
1702        desthost = "%stunnel" % st
1703
1704        portal, info = self.new_portal_node(st, dt, tbparams, master,
1705                eid, myname, desthost, "control", 
1706                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
1707                conn_attrs=[], expid=expid)
1708
1709        topo[st].substrates.append(tsubstrate)
1710        topo[st].elements.append(segment_element)
1711        topo[st].elements.append(portal)
1712        if not connInfo.has_key(st):
1713            connInfo[st] = [ ]
1714        connInfo[st].append(info)
1715
1716    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1717            substrate, tbparams, expid):
1718        # Add to the master testbed
1719        myname = "%stunnel" % dt
1720        desthost = "%s" % ip_addr(dip)
1721
1722        portal, info = self.new_portal_node(st, dt, tbparams, master,
1723                eid, myname, desthost, "control", 
1724                ((substrate.name,(
1725                    ('portal','true'),
1726                    ('ip4_address', "%s" % ip_addr(myip)),)),),
1727                conn_type="transit", conn_attrs=[], expid=expid)
1728
1729        return portal
1730
1731    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
1732            connInfo, expid):
1733        """
1734        For each substrate in the main topology, find those that
1735        have nodes on more than one testbed.  Insert portal nodes
1736        into the copies of those substrates on the sub topologies.
1737        """
1738        segment_substrate = { }
1739        portals = { }
1740        for s in top.substrates:
1741            # tbs will contain an ip address on this subsrate that is in
1742            # each testbed.
1743            tbs = { }
1744            for i in s.interfaces:
1745                e = i.element
1746                tb = e.get_attribute('testbed')
1747                if tb and not tbs.has_key(tb):
1748                    for i in e.interface:
1749                        if s in i.subs:
1750                            tbs[tb]= i.get_attribute('ip4_address')
1751            if len(tbs) < 2:
1752                continue
1753
1754            # DRAGON will not create multi-site vlans yet
1755            if len(tbs) == 2 and \
1756                    all([tbparams[x].has_key('dragon') for x in tbs]):
1757                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1758                        master, eid, connInfo, expid)
1759            else:
1760                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1761                        eid, segment_substrate, portals, connInfo, expid)
1762
1763        # Make sure that all the slaves have a control portal back to the
1764        # master.
1765        for tb in [ t for t in tbparams.keys() if t != master ]:
1766            if len([e for e in topo[tb].elements \
1767                    if isinstance(e, topdl.Computer) and \
1768                    e.get_attribute('portal') and \
1769                    e.get_attribute('portal_type') == 'both']) == 0:
1770
1771                if tbparams[master].has_key('dragon') \
1772                        and tbparams[tb].has_key('dragon'):
1773
1774                    idx = len([x for x in topo.keys() \
1775                            if x.startswith('dragon')])
1776                    dip, leng = ip_allocator.allocate(4)
1777                    dip += 1
1778                    mip = dip+1
1779                    csub = topdl.Substrate(
1780                            name="dragon-control-%s" % tb,
1781                            capacity=topdl.Capacity(100000.0, 'max'),
1782                            attribute=[
1783                                topdl.Attribute(
1784                                    attribute='portal',
1785                                    value='true'
1786                                    )
1787                                ]
1788                            )
1789                    seg = topdl.Segment(
1790                            id= tbparams[master]['allocID'],
1791                            type='emulab',
1792                            uri = self.tbmap.get(master, None),
1793                            interface=[ 
1794                                topdl.Interface(
1795                                    substrate=csub.name),
1796                                ],
1797                            attribute = [
1798                                topdl.Attribute(attribute=n, value=v)
1799                                    for n, v in (\
1800                                        ('domain', 
1801                                            tbparams[master].get('domain',
1802                                                ".example.com")),
1803                                        ('experiment', "%s/%s" % \
1804                                                (tbparams[master].get(
1805                                                    'project', 
1806                                                    'project'), 
1807                                                    eid)),)
1808                                ],
1809                            )
1810                    portal = self.new_dragon_portal(tb, master,
1811                            master, eid, dip, mip, idx, csub, tbparams, expid)
1812                    topo[tb].substrates.append(csub)
1813                    topo[tb].elements.append(portal)
1814                    topo[tb].elements.append(seg)
1815
1816                    mcsub = csub.clone()
1817                    seg = topdl.Segment(
1818                            id= tbparams[tb]['allocID'],
1819                            type='emulab',
1820                            uri = self.tbmap.get(tb, None),
1821                            interface=[ 
1822                                topdl.Interface(
1823                                    substrate=csub.name),
1824                                ],
1825                            attribute = [
1826                                topdl.Attribute(attribute=n, value=v)
1827                                    for n, v in (\
1828                                        ('domain', 
1829                                            tbparams[tb].get('domain',
1830                                                ".example.com")),
1831                                        ('experiment', "%s/%s" % \
1832                                                (tbparams[tb].get('project', 
1833                                                    'project'), 
1834                                                    eid)),)
1835                                ],
1836                            )
1837                    portal = self.new_dragon_portal(master, tb, master,
1838                            eid, mip, dip, idx, mcsub, tbparams, expid)
1839                    topo[master].substrates.append(mcsub)
1840                    topo[master].elements.append(portal)
1841                    topo[master].elements.append(seg)
1842                    for t in (master, tb):
1843                        topo[t].incorporate_elements()
1844
1845                    self.create_dragon_substrate(csub, topo, 
1846                            {tb: ip_addr(mip), master: ip_addr(dip)}, 
1847                            tbparams, master, eid, connInfo,
1848                            expid)
1849                else:
1850                    self.add_control_portal(master, tb, master, eid, topo, 
1851                            tbparams, connInfo, expid)
1852                    self.add_control_portal(tb, master, master, eid, topo, 
1853                            tbparams, connInfo, expid)
1854
1855        # Connect the portal nodes into the topologies and clear out
1856        # substrates that are not in the topologies
1857        for tb in tbparams.keys():
1858            topo[tb].incorporate_elements()
1859            topo[tb].substrates = \
1860                    [s for s in topo[tb].substrates \
1861                        if len(s.interfaces) >0]
1862
1863    def wrangle_software(self, expid, top, topo, tbparams):
1864        """
1865        Copy software out to the repository directory, allocate permissions and
1866        rewrite the segment topologies to look for the software in local
1867        places.
1868        """
1869
1870        # Copy the rpms and tarfiles to a distribution directory from
1871        # which the federants can retrieve them
1872        linkpath = "%s/software" %  expid
1873        softdir ="%s/%s" % ( self.repodir, linkpath)
1874        softmap = { }
1875        # These are in a list of tuples format (each kit).  This comprehension
1876        # unwraps them into a single list of tuples that initilaizes the set of
1877        # tuples.
1878        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1879                for p, t in l ])
1880        pkgs.update([x.location for e in top.elements \
1881                for x in e.software])
1882        try:
1883            os.makedirs(softdir)
1884        except IOError, e:
1885            raise service_error(
1886                    "Cannot create software directory: %s" % e)
1887        # The actual copying.  Everything's converted into a url for copying.
1888        for pkg in pkgs:
1889            loc = pkg
1890
1891            scheme, host, path = urlparse(loc)[0:3]
1892            dest = os.path.basename(path)
1893            if not scheme:
1894                if not loc.startswith('/'):
1895                    loc = "/%s" % loc
1896                loc = "file://%s" %loc
1897            try:
1898                u = urlopen(loc)
1899            except Exception, e:
1900                raise service_error(service_error.req, 
1901                        "Cannot open %s: %s" % (loc, e))
1902            try:
1903                f = open("%s/%s" % (softdir, dest) , "w")
1904                self.log.debug("Writing %s/%s" % (softdir,dest) )
1905                data = u.read(4096)
1906                while data:
1907                    f.write(data)
1908                    data = u.read(4096)
1909                f.close()
1910                u.close()
1911            except Exception, e:
1912                raise service_error(service_error.internal,
1913                        "Could not copy %s: %s" % (loc, e))
1914            path = re.sub("/tmp", "", linkpath)
1915            # XXX
1916            softmap[pkg] = \
1917                    "%s/%s/%s" %\
1918                    ( self.repo_url, path, dest)
1919
1920            # Allow the individual segments to access the software.
1921            for tb in tbparams.keys():
1922                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1923                        "/%s/%s" % ( path, dest))
1924
1925        # Convert the software locations in the segments into the local
1926        # copies on this host
1927        for soft in [ s for tb in topo.values() \
1928                for e in tb.elements \
1929                    if getattr(e, 'software', False) \
1930                        for s in e.software ]:
1931            if softmap.has_key(soft.location):
1932                soft.location = softmap[soft.location]
1933
1934
1935    def new_experiment(self, req, fid):
1936        """
1937        The external interface to empty initial experiment creation called from
1938        the dispatcher.
1939
1940        Creates a working directory, splits the incoming description using the
1941        splitter script and parses out the avrious subsections using the
1942        lcasses above.  Once each sub-experiment is created, use pooled threads
1943        to instantiate them and start it all up.
1944        """
1945        if not self.auth.check_attribute(fid, 'new'):
1946            raise service_error(service_error.access, "New access denied")
1947
1948        try:
1949            tmpdir = tempfile.mkdtemp(prefix="split-")
1950        except IOError:
1951            raise service_error(service_error.internal, "Cannot create tmp dir")
1952
1953        try:
1954            access_user = self.accessdb[fid]
1955        except KeyError:
1956            raise service_error(service_error.internal,
1957                    "Access map and authorizer out of sync in " + \
1958                            "new_experiment for fedid %s"  % fid)
1959
1960        pid = "dummy"
1961        gid = "dummy"
1962
1963        req = req.get('NewRequestBody', None)
1964        if not req:
1965            raise service_error(service_error.req,
1966                    "Bad request format (no NewRequestBody)")
1967
1968        # Generate an ID for the experiment (slice) and a certificate that the
1969        # allocator can use to prove they own it.  We'll ship it back through
1970        # the encrypted connection.
1971        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1972
1973        #now we're done with the tmpdir, and it should be empty
1974        if self.cleanup:
1975            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1976            os.rmdir(tmpdir)
1977        else:
1978            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1979
1980        eid = self.create_experiment_state(fid, req, expid, expcert, 
1981                state='empty')
1982
1983        # Let users touch the state
1984        self.auth.set_attribute(fid, expid)
1985        self.auth.set_attribute(expid, expid)
1986        # Override fedids can manipulate state as well
1987        for o in self.overrides:
1988            self.auth.set_attribute(o, expid)
1989
1990        rv = {
1991                'experimentID': [
1992                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1993                ],
1994                'experimentStatus': 'empty',
1995                'experimentAccess': { 'X509' : expcert }
1996            }
1997
1998        return rv
1999
2000    def get_master_project(self, req):
2001        master= None
2002        export_project = None
2003        for e in [ s for s in req.get('service', []) \
2004                if s.get('name') == 'project_export']:
2005            for a in e.get('fedAttr', []):
2006                attr = a.get('attribute', None)
2007                if attr == 'testbed':
2008                    master = a.get('value', None)
2009                elif attr == 'project':
2010                    export_project = a.get('value', None)
2011       
2012        return (master, export_project)
2013
2014
2015
2016    def create_experiment(self, req, fid):
2017        """
2018        The external interface to experiment creation called from the
2019        dispatcher.
2020
2021        Creates a working directory, splits the incoming description using the
2022        splitter script and parses out the avrious subsections using the
2023        lcasses above.  Once each sub-experiment is created, use pooled threads
2024        to instantiate them and start it all up.
2025        """
2026
2027        req = req.get('CreateRequestBody', None)
2028        if not req:
2029            raise service_error(service_error.req,
2030                    "Bad request format (no CreateRequestBody)")
2031
2032        # Get the experiment access
2033        exp = req.get('experimentID', None)
2034        if exp:
2035            if exp.has_key('fedid'):
2036                key = exp['fedid']
2037                expid = key
2038                eid = None
2039            elif exp.has_key('localname'):
2040                key = exp['localname']
2041                eid = key
2042                expid = None
2043            else:
2044                raise service_error(service_error.req, "Unknown lookup type")
2045        else:
2046            raise service_error(service_error.req, "No request?")
2047
2048        self.check_experiment_access(fid, key)
2049
2050        try:
2051            tmpdir = tempfile.mkdtemp(prefix="split-")
2052            os.mkdir(tmpdir+"/keys")
2053        except IOError:
2054            raise service_error(service_error.internal, "Cannot create tmp dir")
2055
2056        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2057        gw_secretkey_base = "fed.%s" % self.ssh_type
2058        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2059        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2060        tclfile = tmpdir + "/experiment.tcl"
2061        tbparams = { }
2062        try:
2063            access_user = self.accessdb[fid]
2064        except KeyError:
2065            raise service_error(service_error.internal,
2066                    "Access map and authorizer out of sync in " + \
2067                            "create_experiment for fedid %s"  % fid)
2068
2069        pid = "dummy"
2070        gid = "dummy"
2071
2072        # The tcl parser needs to read a file so put the content into that file
2073        descr=req.get('experimentdescription', None)
2074        if descr:
2075            file_content=descr.get('ns2description', None)
2076            if file_content:
2077                try:
2078                    f = open(tclfile, 'w')
2079                    f.write(file_content)
2080                    f.close()
2081                except IOError:
2082                    raise service_error(service_error.internal,
2083                            "Cannot write temp experiment description")
2084            else:
2085                raise service_error(service_error.req, 
2086                        "Only ns2descriptions supported")
2087        else:
2088            raise service_error(service_error.req, "No experiment description")
2089
2090        self.state_lock.acquire()
2091        if self.state.has_key(key):
2092            self.state[key]['experimentStatus'] = "starting"
2093            for e in self.state[key].get('experimentID',[]):
2094                if not expid and e.has_key('fedid'):
2095                    expid = e['fedid']
2096                elif not eid and e.has_key('localname'):
2097                    eid = e['localname']
2098        self.state_lock.release()
2099
2100        if not (eid and expid):
2101            raise service_error(service_error.internal, 
2102                    "Cannot find local experiment info!?")
2103
2104        try: 
2105            # This catches exceptions to clear the placeholder if necessary
2106            try:
2107                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2108            except ValueError:
2109                raise service_error(service_error.server_config, 
2110                        "Bad key type (%s)" % self.ssh_type)
2111            master, export_project = self.get_master_project(req)
2112            # XXX get these out when master and project are optional
2113            if not master:
2114                raise service_error(service_error.req,
2115                        "No master testbed label")
2116            if not export_project:
2117                raise service_error(service_error.req, "No export project")
2118            # XXX
2119
2120            # Translate to topdl
2121            if self.splitter_url:
2122                # XXX: need remote topdl translator
2123                self.log.debug("Calling remote splitter at %s" % \
2124                        self.splitter_url)
2125                top = self.remote_ns2topdl(self.splitter_url, file_content)
2126            else:
2127                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2128                    str(self.muxmax), '-m', master]
2129
2130                tclcmd.extend([pid, gid, eid, tclfile])
2131
2132                self.log.debug("running local splitter %s", " ".join(tclcmd))
2133                # This is just fantastic.  As a side effect the parser copies
2134                # tb_compat.tcl into the current directory, so that directory
2135                # must be writable by the fedd user.  Doing this in the
2136                # temporary subdir ensures this is the case.
2137                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2138                        cwd=tmpdir)
2139                split_data = tclparser.stdout
2140
2141                top = topdl.topology_from_xml(file=split_data, top="experiment")
2142
2143            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2144             # Find the testbeds to look up
2145            testbeds = set([ a.value for e in top.elements \
2146                    for a in e.attribute \
2147                    if a.attribute == 'testbed'] )
2148
2149            allocated = { }         # Testbeds we can access
2150            topo ={ }               # Sub topologies
2151            connInfo = { }          # Connection information
2152            services = [ ]
2153            self.get_access_to_testbeds(testbeds, access_user, 
2154                    export_project, master, allocated, tbparams, services)
2155            self.split_topology(top, topo, testbeds)
2156
2157            # Copy configuration files into the remote file store
2158            # The config urlpath
2159            configpath = "/%s/config" % expid
2160            # The config file system location
2161            configdir ="%s%s" % ( self.repodir, configpath)
2162            try:
2163                os.makedirs(configdir)
2164            except IOError, e:
2165                raise service_error(
2166                        "Cannot create config directory: %s" % e)
2167            try:
2168                f = open("%s/hosts" % configdir, "w")
2169                f.write('\n'.join(hosts))
2170                f.close()
2171            except IOError, e:
2172                raise service_error(service_error.internal, 
2173                        "Cannot write hosts file: %s" % e)
2174            try:
2175                copy_file("%s" % gw_pubkey, "%s/%s" % \
2176                        (configdir, gw_pubkey_base))
2177                copy_file("%s" % gw_secretkey, "%s/%s" % \
2178                        (configdir, gw_secretkey_base))
2179            except IOError, e:
2180                raise service_error(service_error.internal, 
2181                        "Cannot copy keyfiles: %s" % e)
2182
2183            # Allow the individual testbeds to access the configuration files.
2184            for tb in tbparams.keys():
2185                asignee = tbparams[tb]['allocID']['fedid']
2186                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2187                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2188
2189            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
2190                    connInfo, expid)
2191            # Now get access to the dynamic testbeds
2192            for k, t in topo.items():
2193                if not t.get_attribute('dynamic'):
2194                    continue
2195                tb = t.get_attribute('testbed')
2196                if tb: 
2197                    self.get_access(tb, None, tbparams, master, 
2198                            export_project, access_user, services)
2199                    tbparams[k] = tbparams[tb]
2200                    del tbparams[tb]
2201                    allocated[k] = 1
2202                    store_keys = t.get_attribute('store_keys')
2203                    # Give the testbed access to keys it exports or imports
2204                    if store_keys:
2205                        for sk in store_keys.split(" "):
2206                            self.auth.set_attribute(\
2207                                    tbparams[k]['allocID']['fedid'], sk)
2208                else:
2209                    raise service_error(service_error.internal, 
2210                            "Dynamic allocation from no testbed!?")
2211
2212            self.wrangle_software(expid, top, topo, tbparams)
2213
2214            vtopo = topdl.topology_to_vtopo(top)
2215            vis = self.genviz(vtopo)
2216
2217            # save federant information
2218            for k in allocated.keys():
2219                tbparams[k]['federant'] = {
2220                        'name': [ { 'localname' : eid} ],
2221                        'allocID' : tbparams[k]['allocID'],
2222                        'master' : k == master,
2223                        'uri': tbparams[k]['uri'],
2224                    }
2225                if tbparams[k].has_key('emulab'):
2226                        tbparams[k]['federant']['emulab'] = \
2227                                tbparams[k]['emulab']
2228
2229            self.state_lock.acquire()
2230            self.state[eid]['vtopo'] = vtopo
2231            self.state[eid]['vis'] = vis
2232            self.state[expid]['federant'] = \
2233                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2234                        if tbparams[tb].has_key('federant') ]
2235            if self.state_filename: 
2236                self.write_state()
2237            self.state_lock.release()
2238        except service_error, e:
2239            # If something goes wrong in the parse (usually an access error)
2240            # clear the placeholder state.  From here on out the code delays
2241            # exceptions.  Failing at this point returns a fault to the remote
2242            # caller.
2243
2244            self.state_lock.acquire()
2245            del self.state[eid]
2246            del self.state[expid]
2247            if self.state_filename: self.write_state()
2248            self.state_lock.release()
2249            raise e
2250
2251
2252        # Start the background swapper and return the starting state.  From
2253        # here on out, the state will stick around a while.
2254
2255        # Let users touch the state
2256        self.auth.set_attribute(fid, expid)
2257        self.auth.set_attribute(expid, expid)
2258        # Override fedids can manipulate state as well
2259        for o in self.overrides:
2260            self.auth.set_attribute(o, expid)
2261
2262        # Create a logger that logs to the experiment's state object as well as
2263        # to the main log file.
2264        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2265        alloc_collector = self.list_log(self.state[eid]['log'])
2266        h = logging.StreamHandler(alloc_collector)
2267        # XXX: there should be a global one of these rather than repeating the
2268        # code.
2269        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2270                    '%d %b %y %H:%M:%S'))
2271        alloc_log.addHandler(h)
2272       
2273        attrs = [ 
2274                {
2275                    'attribute': 'ssh_pubkey', 
2276                    'value': '%s/%s/config/%s' % \
2277                            (self.repo_url, expid, gw_pubkey_base)
2278                },
2279                {
2280                    'attribute': 'ssh_secretkey', 
2281                    'value': '%s/%s/config/%s' % \
2282                            (self.repo_url, expid, gw_secretkey_base)
2283                },
2284                {
2285                    'attribute': 'hosts', 
2286                    'value': '%s/%s/config/hosts' % \
2287                            (self.repo_url, expid)
2288                },
2289                {
2290                    'attribute': 'experiment_name',
2291                    'value': eid,
2292                },
2293            ]
2294
2295        # transit and disconnected testbeds may not have a connInfo entry.
2296        # Fill in the blanks.
2297        for t in allocated.keys():
2298            if not connInfo.has_key(t):
2299                connInfo[t] = { }
2300
2301        # Start a thread to do the resource allocation
2302        t  = Thread(target=self.allocate_resources,
2303                args=(allocated, master, eid, expid, tbparams, 
2304                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
2305                    services),
2306                name=eid)
2307        t.start()
2308
2309        rv = {
2310                'experimentID': [
2311                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2312                ],
2313                'experimentStatus': 'starting',
2314            }
2315
2316        return rv
2317   
2318    def get_experiment_fedid(self, key):
2319        """
2320        find the fedid associated with the localname key in the state database.
2321        """
2322
2323        rv = None
2324        self.state_lock.acquire()
2325        if self.state.has_key(key):
2326            if isinstance(self.state[key], dict):
2327                try:
2328                    kl = [ f['fedid'] for f in \
2329                            self.state[key]['experimentID']\
2330                                if f.has_key('fedid') ]
2331                except KeyError:
2332                    self.state_lock.release()
2333                    raise service_error(service_error.internal, 
2334                            "No fedid for experiment %s when getting "+\
2335                                    "fedid(!?)" % key)
2336                if len(kl) == 1:
2337                    rv = kl[0]
2338                else:
2339                    self.state_lock.release()
2340                    raise service_error(service_error.internal, 
2341                            "multiple fedids for experiment %s when " +\
2342                                    "getting fedid(!?)" % key)
2343            else:
2344                self.state_lock.release()
2345                raise service_error(service_error.internal, 
2346                        "Unexpected state for %s" % key)
2347        self.state_lock.release()
2348        return rv
2349
2350    def check_experiment_access(self, fid, key):
2351        """
2352        Confirm that the fid has access to the experiment.  Though a request
2353        may be made in terms of a local name, the access attribute is always
2354        the experiment's fedid.
2355        """
2356        if not isinstance(key, fedid):
2357            key = self.get_experiment_fedid(key)
2358
2359        if self.auth.check_attribute(fid, key):
2360            return True
2361        else:
2362            raise service_error(service_error.access, "Access Denied")
2363
2364
2365    def get_handler(self, path, fid):
2366        self.log.info("Get handler %s %s" % (path, fid))
2367        if self.auth.check_attribute(fid, path):
2368            return ("%s/%s" % (self.repodir, path), "application/binary")
2369        else:
2370            return (None, None)
2371
2372    def get_vtopo(self, req, fid):
2373        """
2374        Return the stored virtual topology for this experiment
2375        """
2376        rv = None
2377        state = None
2378
2379        req = req.get('VtopoRequestBody', None)
2380        if not req:
2381            raise service_error(service_error.req,
2382                    "Bad request format (no VtopoRequestBody)")
2383        exp = req.get('experiment', None)
2384        if exp:
2385            if exp.has_key('fedid'):
2386                key = exp['fedid']
2387                keytype = "fedid"
2388            elif exp.has_key('localname'):
2389                key = exp['localname']
2390                keytype = "localname"
2391            else:
2392                raise service_error(service_error.req, "Unknown lookup type")
2393        else:
2394            raise service_error(service_error.req, "No request?")
2395
2396        self.check_experiment_access(fid, key)
2397
2398        self.state_lock.acquire()
2399        if self.state.has_key(key):
2400            if self.state[key].has_key('vtopo'):
2401                rv = { 'experiment' : {keytype: key },\
2402                        'vtopo': self.state[key]['vtopo'],\
2403                    }
2404            else:
2405                state = self.state[key]['experimentStatus']
2406        self.state_lock.release()
2407
2408        if rv: return rv
2409        else: 
2410            if state:
2411                raise service_error(service_error.partial, 
2412                        "Not ready: %s" % state)
2413            else:
2414                raise service_error(service_error.req, "No such experiment")
2415
2416    def get_vis(self, req, fid):
2417        """
2418        Return the stored visualization for this experiment
2419        """
2420        rv = None
2421        state = None
2422
2423        req = req.get('VisRequestBody', None)
2424        if not req:
2425            raise service_error(service_error.req,
2426                    "Bad request format (no VisRequestBody)")
2427        exp = req.get('experiment', None)
2428        if exp:
2429            if exp.has_key('fedid'):
2430                key = exp['fedid']
2431                keytype = "fedid"
2432            elif exp.has_key('localname'):
2433                key = exp['localname']
2434                keytype = "localname"
2435            else:
2436                raise service_error(service_error.req, "Unknown lookup type")
2437        else:
2438            raise service_error(service_error.req, "No request?")
2439
2440        self.check_experiment_access(fid, key)
2441
2442        self.state_lock.acquire()
2443        if self.state.has_key(key):
2444            if self.state[key].has_key('vis'):
2445                rv =  { 'experiment' : {keytype: key },\
2446                        'vis': self.state[key]['vis'],\
2447                        }
2448            else:
2449                state = self.state[key]['experimentStatus']
2450        self.state_lock.release()
2451
2452        if rv: return rv
2453        else:
2454            if state:
2455                raise service_error(service_error.partial, 
2456                        "Not ready: %s" % state)
2457            else:
2458                raise service_error(service_error.req, "No such experiment")
2459
2460    def clean_info_response(self, rv):
2461        """
2462        Remove the information in the experiment's state object that is not in
2463        the info response.
2464        """
2465        # Remove the owner info (should always be there, but...)
2466        if rv.has_key('owner'): del rv['owner']
2467
2468        # Convert the log into the allocationLog parameter and remove the
2469        # log entry (with defensive programming)
2470        if rv.has_key('log'):
2471            rv['allocationLog'] = "".join(rv['log'])
2472            del rv['log']
2473        else:
2474            rv['allocationLog'] = ""
2475
2476        if rv['experimentStatus'] != 'active':
2477            if rv.has_key('federant'): del rv['federant']
2478        else:
2479            # remove the allocationID and uri info from each federant
2480            for f in rv.get('federant', []):
2481                if f.has_key('allocID'): del f['allocID']
2482                if f.has_key('uri'): del f['uri']
2483        return rv
2484
2485    def get_info(self, req, fid):
2486        """
2487        Return all the stored info about this experiment
2488        """
2489        rv = None
2490
2491        req = req.get('InfoRequestBody', None)
2492        if not req:
2493            raise service_error(service_error.req,
2494                    "Bad request format (no InfoRequestBody)")
2495        exp = req.get('experiment', None)
2496        if exp:
2497            if exp.has_key('fedid'):
2498                key = exp['fedid']
2499                keytype = "fedid"
2500            elif exp.has_key('localname'):
2501                key = exp['localname']
2502                keytype = "localname"
2503            else:
2504                raise service_error(service_error.req, "Unknown lookup type")
2505        else:
2506            raise service_error(service_error.req, "No request?")
2507
2508        self.check_experiment_access(fid, key)
2509
2510        # The state may be massaged by the service function that called
2511        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2512        # state.
2513        self.state_lock.acquire()
2514        if self.state.has_key(key):
2515            rv = copy.deepcopy(self.state[key])
2516        self.state_lock.release()
2517
2518        if rv:
2519            return self.clean_info_response(rv)
2520        else:
2521            raise service_error(service_error.req, "No such experiment")
2522
2523    def get_multi_info(self, req, fid):
2524        """
2525        Return all the stored info that this fedid can access
2526        """
2527        rv = { 'info': [ ] }
2528
2529        self.state_lock.acquire()
2530        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2531            try:
2532                self.check_experiment_access(fid, key)
2533            except service_error, e:
2534                if e.code == service_error.access:
2535                    continue
2536                else:
2537                    self.state_lock.release()
2538                    raise e
2539
2540            if self.state.has_key(key):
2541                e = copy.deepcopy(self.state[key])
2542                e = self.clean_info_response(e)
2543                rv['info'].append(e)
2544        self.state_lock.release()
2545        return rv
2546
2547    def terminate_experiment(self, req, fid):
2548        """
2549        Swap this experiment out on the federants and delete the shared
2550        information
2551        """
2552        tbparams = { }
2553        req = req.get('TerminateRequestBody', None)
2554        if not req:
2555            raise service_error(service_error.req,
2556                    "Bad request format (no TerminateRequestBody)")
2557        force = req.get('force', False)
2558        exp = req.get('experiment', None)
2559        if exp:
2560            if exp.has_key('fedid'):
2561                key = exp['fedid']
2562                keytype = "fedid"
2563            elif exp.has_key('localname'):
2564                key = exp['localname']
2565                keytype = "localname"
2566            else:
2567                raise service_error(service_error.req, "Unknown lookup type")
2568        else:
2569            raise service_error(service_error.req, "No request?")
2570
2571        self.check_experiment_access(fid, key)
2572
2573        dealloc_list = [ ]
2574
2575
2576        # Create a logger that logs to the dealloc_list as well as to the main
2577        # log file.
2578        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2579        h = logging.StreamHandler(self.list_log(dealloc_list))
2580        # XXX: there should be a global one of these rather than repeating the
2581        # code.
2582        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2583                    '%d %b %y %H:%M:%S'))
2584        dealloc_log.addHandler(h)
2585
2586        self.state_lock.acquire()
2587        fed_exp = self.state.get(key, None)
2588
2589        if fed_exp:
2590            # This branch of the conditional holds the lock to generate a
2591            # consistent temporary tbparams variable to deallocate experiments.
2592            # It releases the lock to do the deallocations and reacquires it to
2593            # remove the experiment state when the termination is complete.
2594
2595            # First make sure that the experiment creation is complete.
2596            status = fed_exp.get('experimentStatus', None)
2597
2598            if status:
2599                if status in ('starting', 'terminating'):
2600                    if not force:
2601                        self.state_lock.release()
2602                        raise service_error(service_error.partial, 
2603                                'Experiment still being created or destroyed')
2604                    else:
2605                        self.log.warning('Experiment in %s state ' % status + \
2606                                'being terminated by force.')
2607            else:
2608                # No status??? trouble
2609                self.state_lock.release()
2610                raise service_error(service_error.internal,
2611                        "Experiment has no status!?")
2612
2613            ids = []
2614            #  experimentID is a list of dicts that are self-describing
2615            #  identifiers.  This finds all the fedids and localnames - the
2616            #  keys of self.state - and puts them into ids.
2617            for id in fed_exp.get('experimentID', []):
2618                if id.has_key('fedid'): ids.append(id['fedid'])
2619                if id.has_key('localname'): ids.append(id['localname'])
2620
2621            # Collect the allocation/segment ids into a dict keyed by the fedid
2622            # of the allocation (or a monotonically increasing integer) that
2623            # contains a tuple of uri, aid (which is a dict...)
2624            for i, fed in enumerate(fed_exp.get('federant', [])):
2625                try:
2626                    uri = fed['uri']
2627                    aid = fed['allocID']
2628                    k = fed['allocID'].get('fedid', i)
2629                except KeyError, e:
2630                    continue
2631                tbparams[k] = (uri, aid)
2632            fed_exp['experimentStatus'] = 'terminating'
2633            if self.state_filename: self.write_state()
2634            self.state_lock.release()
2635
2636            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2637            # then completes, so we can't wait if nothing starts.  So, no
2638            # tbparams, no start.
2639            if len(tbparams) > 0:
2640                thread_pool = self.thread_pool(self.nthreads)
2641                for k in tbparams.keys():
2642                    # Create and start a thread to stop the segment
2643                    thread_pool.wait_for_slot()
2644                    uri, aid = tbparams[k]
2645                    t  = self.pooled_thread(\
2646                            target=self.terminate_segment(log=dealloc_log,
2647                                testbed=uri,
2648                                cert_file=self.cert_file, 
2649                                cert_pwd=self.cert_pwd,
2650                                trusted_certs=self.trusted_certs,
2651                                caller=self.call_TerminateSegment),
2652                            args=(uri, aid), name=k,
2653                            pdata=thread_pool, trace_file=self.trace_file)
2654                    t.start()
2655                # Wait for completions
2656                thread_pool.wait_for_all_done()
2657
2658            # release the allocations (failed experiments have done this
2659            # already, and starting experiments may be in odd states, so we
2660            # ignore errors releasing those allocations
2661            try: 
2662                for k in tbparams.keys():
2663                    # This releases access by uri
2664                    uri, aid = tbparams[k]
2665                    self.release_access(None, aid, uri=uri)
2666            except service_error, e:
2667                if status != 'failed' and not force:
2668                    raise e
2669
2670            # Remove the terminated experiment
2671            self.state_lock.acquire()
2672            for id in ids:
2673                if self.state.has_key(id): del self.state[id]
2674
2675            if self.state_filename: self.write_state()
2676            self.state_lock.release()
2677
2678            # Delete any synch points associated with this experiment.  All
2679            # synch points begin with the fedid of the experiment.
2680            fedid_keys = set(["fedid:%s" % f for f in ids \
2681                    if isinstance(f, fedid)])
2682            for k in self.synch_store.all_keys():
2683                try:
2684                    if len(k) > 45 and k[0:46] in fedid_keys:
2685                        self.synch_store.del_value(k)
2686                except synch_store.BadDeletionError:
2687                    pass
2688            self.write_store()
2689               
2690            return { 
2691                    'experiment': exp , 
2692                    'deallocationLog': "".join(dealloc_list),
2693                    }
2694        else:
2695            # Don't forget to release the lock
2696            self.state_lock.release()
2697            raise service_error(service_error.req, "No saved state")
2698
2699
2700    def GetValue(self, req, fid):
2701        """
2702        Get a value from the synchronized store
2703        """
2704        req = req.get('GetValueRequestBody', None)
2705        if not req:
2706            raise service_error(service_error.req,
2707                    "Bad request format (no GetValueRequestBody)")
2708       
2709        name = req['name']
2710        wait = req['wait']
2711        rv = { 'name': name }
2712
2713        if self.auth.check_attribute(fid, name):
2714            try:
2715                v = self.synch_store.get_value(name, wait)
2716            except synch_store.RevokedKeyError:
2717                # No more synch on this key
2718                raise service_error(service_error.federant, 
2719                        "Synch key %s revoked" % name)
2720            if v is not None:
2721                rv['value'] = v
2722            self.log.debug("[GetValue] got %s from %s" % (v, name))
2723            return rv
2724        else:
2725            raise service_error(service_error.access, "Access Denied")
2726       
2727
2728    def SetValue(self, req, fid):
2729        """
2730        Set a value in the synchronized store
2731        """
2732        req = req.get('SetValueRequestBody', None)
2733        if not req:
2734            raise service_error(service_error.req,
2735                    "Bad request format (no SetValueRequestBody)")
2736       
2737        name = req['name']
2738        v = req['value']
2739
2740        if self.auth.check_attribute(fid, name):
2741            try:
2742                self.synch_store.set_value(name, v)
2743                self.write_store()
2744                self.log.debug("[SetValue] set %s to %s" % (name, v))
2745            except synch_store.CollisionError:
2746                # Translate into a service_error
2747                raise service_error(service_error.req,
2748                        "Value already set: %s" %name)
2749            except synch_store.RevokedKeyError:
2750                # No more synch on this key
2751                raise service_error(service_error.federant, 
2752                        "Synch key %s revoked" % name)
2753            return { 'name': name, 'value': v }
2754        else:
2755            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.