source: fedd/federation/experiment_control.py @ 6280b1f

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 6280b1f was 0ac1934, checked in by Ted Faber <faber@…>, 15 years ago

Small SEER things. Compute a correct visualization and done tell an exporting tstbed to configure services

  • Property mode set to 100644
File size: 93.3 KB
RevLine 
[6679c122]1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
[eee2b2e]11import pickle
[c971895]12import logging
[79b6596]13import signal
14import time
[6679c122]15
[3441fe3]16import traceback
[c971895]17# For parsing visualization output and splitter output
18import xml.parsers.expat
[3441fe3]19
[6c57fe9]20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
[6679c122]22
[db6b092]23from urlparse import urlparse
24from urllib2 import urlopen
25
[ec4fb42]26from util import *
[51cc9df]27from fedid import fedid, generate_fedid
[9460b1e]28from remote_service import xmlrpc_handler, soap_handler, service_caller
[c971895]29from service_error import service_error
[2761484]30from synch_store import synch_store
[6679c122]31
[db6b092]32import topdl
[f07fa49]33import list_log
[db6b092]34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
[11a08b0]37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
[ec4fb42]44class experiment_control_local:
[0ea11af]45    """
46    Control of experiments that this system can directly access.
47
48    Includes experiment creation, termination and information dissemination.
49    Thred safe.
50    """
[79b6596]51
52    class ssh_cmd_timeout(RuntimeError): pass
[6679c122]53   
[1af38d6]54    class thread_pool:
[866c983]55        """
56        A class to keep track of a set of threads all invoked for the same
57        task.  Manages the mutual exclusion of the states.
58        """
59        def __init__(self, nthreads):
60            """
61            Start a pool.
62            """
63            self.changed = Condition()
64            self.started = 0
65            self.terminated = 0
66            self.nthreads = nthreads
67
68        def acquire(self):
69            """
70            Get the pool's lock.
71            """
72            self.changed.acquire()
73
74        def release(self):
75            """
76            Release the pool's lock.
77            """
78            self.changed.release()
79
80        def wait(self, timeout = None):
81            """
82            Wait for a pool thread to start or stop.
83            """
84            self.changed.wait(timeout)
85
86        def start(self):
87            """
88            Called by a pool thread to report starting.
89            """
90            self.changed.acquire()
91            self.started += 1
92            self.changed.notifyAll()
93            self.changed.release()
94
95        def terminate(self):
96            """
97            Called by a pool thread to report finishing.
98            """
99            self.changed.acquire()
100            self.terminated += 1
101            self.changed.notifyAll()
102            self.changed.release()
103
104        def clear(self):
105            """
106            Clear all pool data.
107            """
108            self.changed.acquire()
109            self.started = 0
110            self.terminated =0
111            self.changed.notifyAll()
112            self.changed.release()
113
114        def wait_for_slot(self):
115            """
116            Wait until we have a free slot to start another pooled thread
117            """
118            self.acquire()
119            while self.started - self.terminated >= self.nthreads:
120                self.wait()
121            self.release()
122
[32e7d93]123        def wait_for_all_done(self, timeout=None):
[866c983]124            """
[32e7d93]125            Wait until all active threads finish (and at least one has
126            started).  If a timeout is given, return after waiting that long
127            for termination.  If all threads are done (and one has started in
128            the since the last clear()) return True, otherwise False.
[866c983]129            """
[32e7d93]130            if timeout:
131                deadline = time.time() + timeout
[866c983]132            self.acquire()
133            while self.started == 0 or self.started > self.terminated:
[32e7d93]134                self.wait(timeout)
135                if timeout:
136                    if time.time() > deadline:
137                        break
138                    timeout = deadline - time.time()
[866c983]139            self.release()
[32e7d93]140            return not (self.started == 0 or self.started > self.terminated)
[8bc5754]141
[1af38d6]142    class pooled_thread(Thread):
[866c983]143        """
144        One of a set of threads dedicated to a specific task.  Uses the
145        thread_pool class above for coordination.
146        """
147        def __init__(self, group=None, target=None, name=None, args=(), 
148                kwargs={}, pdata=None, trace_file=None):
149            Thread.__init__(self, group, target, name, args, kwargs)
150            self.rv = None          # Return value of the ops in this thread
151            self.exception = None   # Exception that terminated this thread
152            self.target=target      # Target function to run on start()
153            self.args = args        # Args to pass to target
154            self.kwargs = kwargs    # Additional kw args
155            self.pdata = pdata      # thread_pool for this class
156            # Logger for this thread
157            self.log = logging.getLogger("fedd.experiment_control")
158       
159        def run(self):
160            """
161            Emulate Thread.run, except add pool data manipulation and error
162            logging.
163            """
164            if self.pdata:
165                self.pdata.start()
166
167            if self.target:
168                try:
169                    self.rv = self.target(*self.args, **self.kwargs)
170                except service_error, s:
171                    self.exception = s
172                    self.log.error("Thread exception: %s %s" % \
173                            (s.code_string(), s.desc))
174                except:
175                    self.exception = sys.exc_info()[1]
176                    self.log.error(("Unexpected thread exception: %s" +\
177                            "Trace %s") % (self.exception,\
178                                traceback.format_exc()))
179            if self.pdata:
180                self.pdata.terminate()
[6679c122]181
[f069052]182    call_RequestAccess = service_caller('RequestAccess')
183    call_ReleaseAccess = service_caller('ReleaseAccess')
[cc8d8e9]184    call_StartSegment = service_caller('StartSegment')
[5ae3857]185    call_TerminateSegment = service_caller('TerminateSegment')
[f069052]186    call_Ns2Split = service_caller('Ns2Split')
[058f58e]187
[3f6bc5f]188    def __init__(self, config=None, auth=None):
[866c983]189        """
190        Intialize the various attributes, most from the config object
191        """
192
193        def parse_tarfile_list(tf):
194            """
195            Parse a tarfile list from the configuration.  This is a set of
196            paths and tarfiles separated by spaces.
197            """
198            rv = [ ]
199            if tf is not None:
200                tl = tf.split()
201                while len(tl) > 1:
202                    p, t = tl[0:2]
203                    del tl[0:2]
204                    rv.append((p, t))
205            return rv
206
207        self.thread_with_rv = experiment_control_local.pooled_thread
208        self.thread_pool = experiment_control_local.thread_pool
[f07fa49]209        self.list_log = list_log.list_log
[866c983]210
211        self.cert_file = config.get("experiment_control", "cert_file")
212        if self.cert_file:
213            self.cert_pwd = config.get("experiment_control", "cert_pwd")
214        else:
215            self.cert_file = config.get("globals", "cert_file")
216            self.cert_pwd = config.get("globals", "cert_pwd")
217
218        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
219                or config.get("globals", "trusted_certs")
220
[6c57fe9]221        self.repodir = config.get("experiment_control", "repodir")
[7183b48]222        self.repo_url = config.get("experiment_control", "repo_url", 
223                "https://users.isi.deterlab.net:23235");
[cc8d8e9]224
[866c983]225        self.exp_stem = "fed-stem"
226        self.log = logging.getLogger("fedd.experiment_control")
227        set_log_level(config, "experiment_control", self.log)
228        self.muxmax = 2
229        self.nthreads = 2
230        self.randomize_experiments = False
231
232        self.splitter = None
233        self.ssh_keygen = "/usr/bin/ssh-keygen"
234        self.ssh_identity_file = None
235
236
237        self.debug = config.getboolean("experiment_control", "create_debug")
[69692a9]238        self.cleanup = not config.getboolean("experiment_control", 
239                "leave_tmpfiles")
[866c983]240        self.state_filename = config.get("experiment_control", 
241                "experiment_state")
[2761484]242        self.store_filename = config.get("experiment_control", 
243                "synch_store")
244        self.store_url = config.get("experiment_control", "store_url")
[866c983]245        self.splitter_url = config.get("experiment_control", "splitter_uri")
246        self.fedkit = parse_tarfile_list(\
247                config.get("experiment_control", "fedkit"))
248        self.gatewaykit = parse_tarfile_list(\
249                config.get("experiment_control", "gatewaykit"))
250        accessdb_file = config.get("experiment_control", "accessdb")
251
252        self.ssh_pubkey_file = config.get("experiment_control", 
253                "ssh_pubkey_file")
254        self.ssh_privkey_file = config.get("experiment_control",
255                "ssh_privkey_file")
256        # NB for internal master/slave ops, not experiment setup
257        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
[ca489e8]258
[db6b092]259        self.overrides = set([])
260        ovr = config.get('experiment_control', 'overrides')
261        if ovr:
262            for o in ovr.split(","):
263                o = o.strip()
264                if o.startswith('fedid:'): o = o[len('fedid:'):]
265                self.overrides.add(fedid(hexstr=o))
[ca489e8]266
[866c983]267        self.state = { }
268        self.state_lock = Lock()
269        self.tclsh = "/usr/local/bin/otclsh"
270        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
271                config.get("experiment_control", "tcl_splitter",
272                        "/usr/testbed/lib/ns2ir/parse.tcl")
273        mapdb_file = config.get("experiment_control", "mapdb")
274        self.trace_file = sys.stderr
275
276        self.def_expstart = \
277                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
278                "/tmp/federate";
279        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
280                "FEDDIR/hosts";
281        self.def_gwstart = \
282                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
283                "/tmp/bridge.log";
284        self.def_mgwstart = \
285                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
286                "/tmp/bridge.log";
287        self.def_gwimage = "FBSD61-TUNNEL2";
288        self.def_gwtype = "pc";
289        self.local_access = { }
290
291        if auth:
292            self.auth = auth
293        else:
294            self.log.error(\
295                    "[access]: No authorizer initialized, creating local one.")
296            auth = authorizer()
297
298
299        if self.ssh_pubkey_file:
300            try:
301                f = open(self.ssh_pubkey_file, 'r')
302                self.ssh_pubkey = f.read()
303                f.close()
304            except IOError:
305                raise service_error(service_error.internal,
306                        "Cannot read sshpubkey")
307        else:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311        if not self.ssh_privkey_file:
312            raise service_error(service_error.internal, 
313                    "No SSH public key file?")
314
315
316        if mapdb_file:
317            self.read_mapdb(mapdb_file)
318        else:
319            self.log.warn("[experiment_control] No testbed map, using defaults")
320            self.tbmap = { 
321                    'deter':'https://users.isi.deterlab.net:23235',
322                    'emulab':'https://users.isi.deterlab.net:23236',
323                    'ucb':'https://users.isi.deterlab.net:23237',
324                    }
325
326        if accessdb_file:
327                self.read_accessdb(accessdb_file)
328        else:
329            raise service_error(service_error.internal,
330                    "No accessdb specified in config")
331
332        # Grab saved state.  OK to do this w/o locking because it's read only
333        # and only one thread should be in existence that can see self.state at
334        # this point.
335        if self.state_filename:
336            self.read_state()
337
[2761484]338        if self.store_filename:
339            self.read_store()
340        else:
341            self.log.warning("No saved synch store")
342            self.synch_store = synch_store
343
[866c983]344        # Dispatch tables
345        self.soap_services = {\
[a3ad8bd]346                'New': soap_handler('New', self.new_experiment),
[e19b75c]347                'Create': soap_handler('Create', self.create_experiment),
[866c983]348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
[65f3f29]351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
[866c983]352                'Terminate': soap_handler('Terminate', 
[e19b75c]353                    self.terminate_experiment),
[2761484]354                'GetValue': soap_handler('GetValue', self.GetValue),
355                'SetValue': soap_handler('SetValue', self.SetValue),
[866c983]356        }
357
358        self.xmlrpc_services = {\
[a3ad8bd]359                'New': xmlrpc_handler('New', self.new_experiment),
[e19b75c]360                'Create': xmlrpc_handler('Create', self.create_experiment),
[866c983]361                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
362                'Vis': xmlrpc_handler('Vis', self.get_vis),
363                'Info': xmlrpc_handler('Info', self.get_info),
[65f3f29]364                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
[866c983]365                'Terminate': xmlrpc_handler('Terminate',
[e19b75c]366                    self.terminate_experiment),
[2761484]367                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
368                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
[866c983]369        }
[19cc408]370
[a97394b]371    # Call while holding self.state_lock
[eee2b2e]372    def write_state(self):
[866c983]373        """
374        Write a new copy of experiment state after copying the existing state
375        to a backup.
376
377        State format is a simple pickling of the state dictionary.
378        """
379        if os.access(self.state_filename, os.W_OK):
[40dd8c1]380            copy_file(self.state_filename, \
381                    "%s.bak" % self.state_filename)
[866c983]382        try:
383            f = open(self.state_filename, 'w')
384            pickle.dump(self.state, f)
385        except IOError, e:
386            self.log.error("Can't write file %s: %s" % \
387                    (self.state_filename, e))
388        except pickle.PicklingError, e:
389            self.log.error("Pickling problem: %s" % e)
390        except TypeError, e:
391            self.log.error("Pickling problem (TypeError): %s" % e)
[eee2b2e]392
[2761484]393    @staticmethod
394    def get_alloc_ids(state):
395        """
396        Pull the fedids of the identifiers of each allocation from the
397        state.  Again, a dict dive that's best isolated.
398
399        Used by read_store and read state
400        """
401
402        return [ f['allocID']['fedid'] 
403                for f in state.get('federant',[]) \
404                    if f.has_key('allocID') and \
405                        f['allocID'].has_key('fedid')]
406
[a97394b]407    # Call while holding self.state_lock
[eee2b2e]408    def read_state(self):
[866c983]409        """
410        Read a new copy of experiment state.  Old state is overwritten.
411
412        State format is a simple pickling of the state dictionary.
413        """
[cc8d8e9]414       
415        def get_experiment_id(state):
416            """
417            Pull the fedid experimentID out of the saved state.  This is kind
418            of a gross walk through the dict.
419            """
420
421            if state.has_key('experimentID'):
422                for e in state['experimentID']:
423                    if e.has_key('fedid'):
424                        return e['fedid']
425                else:
426                    return None
427            else:
428                return None
429
[866c983]430        try:
431            f = open(self.state_filename, "r")
432            self.state = pickle.load(f)
433            self.log.debug("[read_state]: Read state from %s" % \
434                    self.state_filename)
435        except IOError, e:
436            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
437                    % (self.state_filename, e))
438        except pickle.UnpicklingError, e:
439            self.log.warning(("[read_state]: No saved state: " + \
440                    "Unpickling failed: %s") % e)
441       
[cc8d8e9]442        for s in self.state.values():
[866c983]443            try:
[cc8d8e9]444
445                eid = get_experiment_id(s)
446                if eid : 
447                    # Give the owner rights to the experiment
448                    self.auth.set_attribute(s['owner'], eid)
449                    # And holders of the eid as well
450                    self.auth.set_attribute(eid, eid)
[db6b092]451                    # allow overrides to control experiments as well
452                    for o in self.overrides:
453                        self.auth.set_attribute(o, eid)
[cc8d8e9]454                    # Set permissions to allow reading of the software repo, if
455                    # any, as well.
[2761484]456                    for a in self.get_alloc_ids(s):
[cc8d8e9]457                        self.auth.set_attribute(a, 'repo/%s' % eid)
458                else:
459                    raise KeyError("No experiment id")
[866c983]460            except KeyError, e:
461                self.log.warning("[read_state]: State ownership or identity " +\
462                        "misformatted in %s: %s" % (self.state_filename, e))
[4064742]463
464
465    def read_accessdb(self, accessdb_file):
[866c983]466        """
467        Read the mapping from fedids that can create experiments to their name
468        in the 3-level access namespace.  All will be asserted from this
469        testbed and can include the local username and porject that will be
470        asserted on their behalf by this fedd.  Each fedid is also added to the
471        authorization system with the "create" attribute.
472        """
473        self.accessdb = {}
474        # These are the regexps for parsing the db
475        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
476        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
477                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
478        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
479                "\s*->\s*(" + name_expr + ")\s*$")
480        lineno = 0
481
482        # Parse the mappings and store in self.authdb, a dict of
483        # fedid -> (proj, user)
484        try:
485            f = open(accessdb_file, "r")
486            for line in f:
487                lineno += 1
488                line = line.strip()
489                if len(line) == 0 or line.startswith('#'):
490                    continue
491                m = project_line.match(line)
492                if m:
493                    fid = fedid(hexstr=m.group(1))
494                    project, user = m.group(2,3)
495                    if not self.accessdb.has_key(fid):
496                        self.accessdb[fid] = []
497                    self.accessdb[fid].append((project, user))
498                    continue
499
500                m = user_line.match(line)
501                if m:
502                    fid = fedid(hexstr=m.group(1))
503                    project = None
504                    user = m.group(2)
505                    if not self.accessdb.has_key(fid):
506                        self.accessdb[fid] = []
507                    self.accessdb[fid].append((project, user))
508                    continue
509                self.log.warn("[experiment_control] Error parsing access " +\
510                        "db %s at line %d" %  (accessdb_file, lineno))
511        except IOError:
512            raise service_error(service_error.internal,
513                    "Error opening/reading %s as experiment " +\
514                            "control accessdb" %  accessdb_file)
515        f.close()
516
517        # Initialize the authorization attributes
518        for fid in self.accessdb.keys():
519            self.auth.set_attribute(fid, 'create')
[a3ad8bd]520            self.auth.set_attribute(fid, 'new')
[34bc05c]521
522    def read_mapdb(self, file):
[866c983]523        """
524        Read a simple colon separated list of mappings for the
525        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
526        """
527
528        self.tbmap = { }
529        lineno =0
530        try:
531            f = open(file, "r")
532            for line in f:
533                lineno += 1
534                line = line.strip()
535                if line.startswith('#') or len(line) == 0:
536                    continue
537                try:
538                    label, url = line.split(':', 1)
539                    self.tbmap[label] = url
540                except ValueError, e:
541                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
542                            "map db: %s %s" % (lineno, line, e))
543        except IOError, e:
544            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
545                    "open %s: %s" % (file, e))
546        f.close()
[2761484]547
548    def read_store(self):
549        try:
550            self.synch_store = synch_store()
551            self.synch_store.load(self.store_filename)
552            self.log.debug("[read_store]: Read store from %s" % \
553                    self.store_filename)
554        except IOError, e:
555            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
556                    % (self.state_filename, e))
557            self.synch_store = synch_store()
558
559        # Set the initial permissions on data in the store.  XXX: This ad hoc
560        # authorization attribute initialization is getting out of hand.
561        for k in self.synch_store.all_keys():
562            try:
563                if k.startswith('fedid:'):
564                    fid = fedid(hexstr=k[6:46])
565                    if self.state.has_key(fid):
566                        for a in self.get_alloc_ids(self.state[fid]):
567                            self.auth.set_attribute(a, k)
568            except ValueError, e:
569                self.log.warn("Cannot deduce permissions for %s" % k)
570
571
572    def write_store(self):
573        """
574        Write a new copy of synch_store after writing current state
575        to a backup.  We use the internal synch_store pickle method to avoid
576        incinsistent data.
577
578        State format is a simple pickling of the store.
579        """
580        if os.access(self.store_filename, os.W_OK):
581            copy_file(self.store_filename, \
582                    "%s.bak" % self.store_filename)
583        try:
584            self.synch_store.save(self.store_filename)
585        except IOError, e:
586            self.log.error("Can't write file %s: %s" % \
587                    (self.store_filename, e))
588        except TypeError, e:
589            self.log.error("Pickling problem (TypeError): %s" % e)
590
[866c983]591       
[6679c122]592    def generate_ssh_keys(self, dest, type="rsa" ):
[866c983]593        """
594        Generate a set of keys for the gateways to use to talk.
595
596        Keys are of type type and are stored in the required dest file.
597        """
598        valid_types = ("rsa", "dsa")
599        t = type.lower();
600        if t not in valid_types: raise ValueError
601        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
602
603        try:
604            trace = open("/dev/null", "w")
605        except IOError:
606            raise service_error(service_error.internal,
607                    "Cannot open /dev/null??");
608
609        # May raise CalledProcessError
610        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
[4ea1e22]611        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
[866c983]612        if rv != 0:
613            raise service_error(service_error.internal, 
614                    "Cannot generate nonce ssh keys.  %s return code %d" \
615                            % (self.ssh_keygen, rv))
[6679c122]616
[0d830de]617    def gentopo(self, str):
[866c983]618        """
619        Generate the topology dtat structure from the splitter's XML
620        representation of it.
621
622        The topology XML looks like:
623            <experiment>
624                <nodes>
625                    <node><vname></vname><ips>ip1:ip2</ips></node>
626                </nodes>
627                <lans>
628                    <lan>
629                        <vname></vname><vnode></vnode><ip></ip>
630                        <bandwidth></bandwidth><member>node:port</member>
631                    </lan>
632                </lans>
633        """
634        class topo_parse:
635            """
636            Parse the topology XML and create the dats structure.
637            """
638            def __init__(self):
639                # Typing of the subelements for data conversion
640                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
641                self.int_subelements = ( 'bandwidth',)
642                self.float_subelements = ( 'delay',)
643                # The final data structure
644                self.nodes = [ ]
645                self.lans =  [ ]
646                self.topo = { \
647                        'node': self.nodes,\
648                        'lan' : self.lans,\
649                    }
650                self.element = { }  # Current element being created
651                self.chars = ""     # Last text seen
652
653            def end_element(self, name):
654                # After each sub element the contents is added to the current
655                # element or to the appropriate list.
656                if name == 'node':
657                    self.nodes.append(self.element)
658                    self.element = { }
659                elif name == 'lan':
660                    self.lans.append(self.element)
661                    self.element = { }
662                elif name in self.str_subelements:
663                    self.element[name] = self.chars
664                    self.chars = ""
665                elif name in self.int_subelements:
666                    self.element[name] = int(self.chars)
667                    self.chars = ""
668                elif name in self.float_subelements:
669                    self.element[name] = float(self.chars)
670                    self.chars = ""
671
672            def found_chars(self, data):
673                self.chars += data.rstrip()
674
675
676        tp = topo_parse();
677        parser = xml.parsers.expat.ParserCreate()
678        parser.EndElementHandler = tp.end_element
679        parser.CharacterDataHandler = tp.found_chars
680
681        parser.Parse(str)
682
683        return tp.topo
684       
[0d830de]685
686    def genviz(self, topo):
[866c983]687        """
688        Generate the visualization the virtual topology
689        """
690
691        neato = "/usr/local/bin/neato"
692        # These are used to parse neato output and to create the visualization
693        # file.
[0ac1934]694        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
[866c983]695        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
696                "%s</type></node>"
697
698        try:
699            # Node names
700            nodes = [ n['vname'] for n in topo['node'] ]
701            topo_lans = topo['lan']
[cc8d8e9]702        except KeyError, e:
703            raise service_error(service_error.internal, "Bad topology: %s" %e)
[866c983]704
705        lans = { }
706        links = { }
707
708        # Walk through the virtual topology, organizing the connections into
709        # 2-node connections (links) and more-than-2-node connections (lans).
710        # When a lan is created, it's added to the list of nodes (there's a
711        # node in the visualization for the lan).
712        for l in topo_lans:
713            if links.has_key(l['vname']):
714                if len(links[l['vname']]) < 2:
715                    links[l['vname']].append(l['vnode'])
716                else:
717                    nodes.append(l['vname'])
718                    lans[l['vname']] = links[l['vname']]
719                    del links[l['vname']]
720                    lans[l['vname']].append(l['vnode'])
721            elif lans.has_key(l['vname']):
722                lans[l['vname']].append(l['vnode'])
723            else:
724                links[l['vname']] = [ l['vnode'] ]
725
726
727        # Open up a temporary file for dot to turn into a visualization
728        try:
729            df, dotname = tempfile.mkstemp()
730            dotfile = os.fdopen(df, 'w')
731        except IOError:
732            raise service_error(service_error.internal,
733                    "Failed to open file in genviz")
734
[db6b092]735        try:
736            dnull = open('/dev/null', 'w')
737        except IOError:
738            service_error(service_error.internal,
[886307f]739                    "Failed to open /dev/null in genviz")
740
[866c983]741        # Generate a dot/neato input file from the links, nodes and lans
742        try:
743            print >>dotfile, "graph G {"
744            for n in nodes:
745                print >>dotfile, '\t"%s"' % n
746            for l in links.keys():
747                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
748            for l in lans.keys():
749                for n in lans[l]:
750                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
751            print >>dotfile, "}"
752            dotfile.close()
753        except TypeError:
754            raise service_error(service_error.internal,
755                    "Single endpoint link in vtopo")
756        except IOError:
757            raise service_error(service_error.internal, "Cannot write dot file")
758
759        # Use dot to create a visualization
760        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
[886307f]761                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
[db6b092]762                close_fds=True)
763        dnull.close()
[866c983]764
765        # Translate dot to vis format
766        vis_nodes = [ ]
767        vis = { 'node': vis_nodes }
768        for line in dot.stdout:
769            m = vis_re.match(line)
770            if m:
771                vn = m.group(1)
772                vis_node = {'name': vn, \
773                        'x': float(m.group(2)),\
774                        'y' : float(m.group(3)),\
775                    }
776                if vn in links.keys() or vn in lans.keys():
777                    vis_node['type'] = 'lan'
778                else:
779                    vis_node['type'] = 'node'
780                vis_nodes.append(vis_node)
781        rv = dot.wait()
782
783        os.remove(dotname)
784        if rv == 0 : return vis
785        else: return None
[d0ae12d]786
[99eb8cf]787    def get_access(self, tb, nodes, tbparam, master, export_project,
[e02cd14]788            access_user, services):
[866c983]789        """
790        Get access to testbed through fedd and set the parameters for that tb
791        """
792        uri = self.tbmap.get(tb, None)
793        if not uri:
[b78c9ea]794            raise service_error(service_error.server_config, 
[866c983]795                    "Unknown testbed: %s" % tb)
796
[8218a3b]797        # Tweak search order so that if there are entries in access_user that
798        # have a project matching the export project, we try them first
799        if export_project and export_project.has_key('localname'): 
800            pn = export_project['localname'] 
801               
802            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
803            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
804        else: 
805            access_sequence = access_user
806
807        for p, u in access_sequence: 
[866c983]808            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
809                    "to %s") %  ((p or "None"), u, uri))
810
811            if p:
812                # Request with user and project specified
813                req = {\
814                        'destinationTestbed' : { 'uri' : uri },
[3bddd24]815                        'credential': [ "project: %s" % p, "user: %s"  % u],
[866c983]816                        'allocID' : { 'localname': 'test' },
817                    }
818            else:
819                # Request with only user specified
820                req = {\
821                        'destinationTestbed' : { 'uri' : uri },
[3bddd24]822                        'credential': [ 'user: %s' % u ],
[866c983]823                        'user':  [ {'userID': { 'localname': u } } ],
824                        'allocID' : { 'localname': 'test' },
825                    }
826
827            if tb == master:
828                # NB, the export_project parameter is a dict that includes
829                # the type
830                req['exportProject'] = export_project
[e02cd14]831                req['service'] = [
832                        { 'name': 'userconfig', 'visibility': 'export'},
833                        { 'name': 'SMB', 'visibility': 'export'},
834                        { 'name': 'seer', 'visibility': 'export'},
835                        { 'name': 'tmcd', 'visibility': 'export'},
836                    ]
[866c983]837
838            # node resources if any
839            if nodes != None and len(nodes) > 0:
840                rnodes = [ ]
841                for n in nodes:
842                    rn = { }
843                    image, hw, count = n.split(":")
844                    if image: rn['image'] = [ image ]
845                    if hw: rn['hardware'] = [ hw ]
846                    if count and int(count) >0 : rn['count'] = int(count)
847                    rnodes.append(rn)
848                req['resources']= { }
849                req['resources']['node'] = rnodes
850
851            try:
852                if self.local_access.has_key(uri):
853                    # Local access call
854                    req = { 'RequestAccessRequestBody' : req }
855                    r = self.local_access[uri].RequestAccess(req, 
856                            fedid(file=self.cert_file))
857                    r = { 'RequestAccessResponseBody' : r }
858                else:
859                    r = self.call_RequestAccess(uri, req, 
860                            self.cert_file, self.cert_pwd, self.trusted_certs)
861            except service_error, e:
862                if e.code == service_error.access:
863                    self.log.debug("[get_access] Access denied")
864                    r = None
865                    continue
866                else:
867                    raise e
868
[e19b75c]869            if r.has_key('RequestAccessResponseBody'):
870                # Through to here we have a valid response, not a fault.
871                # Access denied is a fault, so something better or worse than
872                # access denied has happened.
873                r = r['RequestAccessResponseBody']
874                self.log.debug("[get_access] Access granted")
875                break
876            else:
877                raise service_error(service_error.protocol,
878                        "Bad proxy response")
879       
880        if not r:
881            raise service_error(service_error.access, 
882                    "Access denied by %s (%s)" % (tb, uri))
[db6b092]883
[4afcfc4]884        tbparam[tb] = { 
[69692a9]885                "allocID" : r['allocID'],
886                "uri": uri,
[4afcfc4]887                }
[e02cd14]888        if 'service' in r: 
889            services.extend(r['service'])
[4afcfc4]890
891        # Add attributes to parameter space.  We don't allow attributes to
892        # overlay any parameters already installed.
[617592b]893        for a in r.get('fedAttr', []):
[4afcfc4]894            try:
895                if a['attribute'] and \
896                        isinstance(a['attribute'], basestring)\
897                        and not tbparam[tb].has_key(a['attribute'].lower()):
898                    tbparam[tb][a['attribute'].lower()] = a['value']
899            except KeyError:
900                self.log.error("Bad attribute in response: %s" % a)
[db6b092]901
[69692a9]902    def release_access(self, tb, aid, uri=None):
[e19b75c]903        """
904        Release access to testbed through fedd
905        """
[db6b092]906
[69692a9]907        if not uri:
908            uri = self.tbmap.get(tb, None)
[e19b75c]909        if not uri:
[69692a9]910            raise service_error(service_error.server_config, 
[e19b75c]911                    "Unknown testbed: %s" % tb)
[db6b092]912
[e19b75c]913        if self.local_access.has_key(uri):
914            resp = self.local_access[uri].ReleaseAccess(\
915                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
916                    fedid(file=self.cert_file))
917            resp = { 'ReleaseAccessResponseBody': resp } 
918        else:
919            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
920                    self.cert_file, self.cert_pwd, self.trusted_certs)
[db6b092]921
[e19b75c]922        # better error coding
[db6b092]923
[e19b75c]924    def remote_splitter(self, uri, desc, master):
[db6b092]925
[e19b75c]926        req = {
927                'description' : { 'ns2description': desc },
928                'master': master,
929                'include_fedkit': bool(self.fedkit),
930                'include_gatewaykit': bool(self.gatewaykit)
[db6b092]931            }
932
[e19b75c]933        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
934                self.trusted_certs)
935
936        if r.has_key('Ns2SplitResponseBody'):
937            r = r['Ns2SplitResponseBody']
938            if r.has_key('output'):
939                return r['output'].splitlines()
940            else:
941                raise service_error(service_error.protocol, 
942                        "Bad splitter response (no output)")
943        else:
944            raise service_error(service_error.protocol, "Bad splitter response")
[cc8d8e9]945
[e19b75c]946    class start_segment:
[fd556d1]947        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[f07fa49]948                cert_pwd=None, trusted_certs=None, caller=None,
949                log_collector=None):
[cc8d8e9]950            self.log = log
951            self.debug = debug
952            self.cert_file = cert_file
953            self.cert_pwd = cert_pwd
954            self.trusted_certs = None
955            self.caller = caller
[fd556d1]956            self.testbed = testbed
[f07fa49]957            self.log_collector = log_collector
[69692a9]958            self.response = None
[cc8d8e9]959
[e02cd14]960        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
961                services=None):
[cc8d8e9]962            req = {
963                    'allocID': { 'fedid' : aid }, 
964                    'segmentdescription': { 
965                        'topdldescription': topo.to_dict(),
966                    },
[ecca6eb]967                    'master': master,
[cc8d8e9]968                }
[e02cd14]969
970            if connInfo:
971                req['connection'] = connInfo
972            # Add services to request.  The master exports, everyone else
973            # imports.
974            if services:
975                svcs = [ x.copy() for x in services]
976                for s in svcs:
977                    if master: s['visibility'] = 'export'
978                    else: s['visibility'] = 'import'
979                req['service'] = svcs
[6c57fe9]980            if attrs:
981                req['fedAttr'] = attrs
[cc8d8e9]982
[fd556d1]983            try:
[13e3dd2]984                self.log.debug("Calling StartSegment at %s " % uri)
[fd556d1]985                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
986                        self.trusted_certs)
[f07fa49]987                if r.has_key('StartSegmentResponseBody'):
988                    lval = r['StartSegmentResponseBody'].get('allocationLog',
989                            None)
990                    if lval and self.log_collector:
991                        for line in  lval.splitlines(True):
992                            self.log_collector.write(line)
[69692a9]993                    self.response = r
[f07fa49]994                else:
995                    raise service_error(service_error.internal, 
996                            "Bad response!?: %s" %r)
[fd556d1]997                return True
998            except service_error, e:
999                self.log.error("Start segment failed on %s: %s" % \
1000                        (self.testbed, e))
1001                return False
[cc8d8e9]1002
1003
[5ae3857]1004
[e19b75c]1005    class terminate_segment:
[fd556d1]1006        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[5ae3857]1007                cert_pwd=None, trusted_certs=None, caller=None):
1008            self.log = log
1009            self.debug = debug
1010            self.cert_file = cert_file
1011            self.cert_pwd = cert_pwd
1012            self.trusted_certs = None
1013            self.caller = caller
[fd556d1]1014            self.testbed = testbed
[5ae3857]1015
1016        def __call__(self, uri, aid ):
1017            req = {
1018                    'allocID': aid , 
1019                }
[fd556d1]1020            try:
1021                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1022                        self.trusted_certs)
1023                return True
1024            except service_error, e:
1025                self.log.error("Terminate segment failed on %s: %s" % \
1026                        (self.testbed, e))
1027                return False
[db6b092]1028   
1029
[7183b48]1030    def allocate_resources(self, allocated, master, eid, expid, 
[f07fa49]1031            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
[e02cd14]1032            attrs=None, connInfo={}, services=[]):
[69692a9]1033
[cc8d8e9]1034        started = { }           # Testbeds where a sub-experiment started
1035                                # successfully
1036
1037        # XXX
1038        fail_soft = False
1039
1040        log = alloc_log or self.log
1041
1042        thread_pool = self.thread_pool(self.nthreads)
1043        threads = [ ]
1044
[109a32a]1045        for tb in allocated.keys():
1046            # Create and start a thread to start the segment, and save it
1047            # to get the return value later
1048            thread_pool.wait_for_slot()
1049            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1050            if not uri:
1051                raise service_error(service_error.internal, 
1052                        "Unknown testbed %s !?" % tb)
1053
[cc8d8e9]1054            if tbparams[tb].has_key('allocID') and \
1055                    tbparams[tb]['allocID'].has_key('fedid'):
1056                aid = tbparams[tb]['allocID']['fedid']
1057            else:
1058                raise service_error(service_error.internal, 
1059                        "No alloc id for testbed %s !?" % tb)
1060
[109a32a]1061            t  = self.pooled_thread(\
1062                    target=self.start_segment(log=log, debug=self.debug,
1063                        testbed=tb, cert_file=self.cert_file,
1064                        cert_pwd=self.cert_pwd,
1065                        trusted_certs=self.trusted_certs,
1066                        caller=self.call_StartSegment,
1067                        log_collector=log_collector), 
1068                    args=(uri, aid, topo[tb], tb == master, 
1069                        attrs, connInfo[tb], services),
1070                    name=tb,
1071                    pdata=thread_pool, trace_file=self.trace_file)
[69692a9]1072            threads.append(t)
1073            t.start()
[cc8d8e9]1074
[109a32a]1075        # Wait until all finish (keep pinging the log, though)
1076        mins = 0
[dadc4da]1077        revoked = False
[109a32a]1078        while not thread_pool.wait_for_all_done(60.0):
1079            mins += 1
1080            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1081                    % mins)
[dadc4da]1082            if not revoked and \
[f52f5df]1083                    len([ t.getName() for t in threads if t.rv == False]) > 0:
[dadc4da]1084                # a testbed has failed.  Revoke this experiment's
1085                # synchronizarion values so that sub experiments will not
1086                # deadlock waiting for synchronization that will never happen
1087                self.log.info("A subexperiment has failed to swap in, " + \
1088                        "revoking synch keys")
1089                var_key = "fedid:%s" % expid
1090                for k in self.synch_store.all_keys():
1091                    if len(k) > 45 and k[0:46] == var_key:
1092                        self.synch_store.revoke_key(k)
1093                revoked = True
[69692a9]1094
[cc8d8e9]1095        failed = [ t.getName() for t in threads if not t.rv ]
1096        succeeded = [tb for tb in allocated.keys() if tb not in failed]
[3132419]1097
[cc8d8e9]1098        # If one failed clean up, unless fail_soft is set
[32e7d93]1099        if failed:
[cc8d8e9]1100            if not fail_soft:
1101                thread_pool.clear()
1102                for tb in succeeded:
1103                    # Create and start a thread to stop the segment
1104                    thread_pool.wait_for_slot()
[0fa1729]1105                    uri = tbparams[tb]['uri']
[cc8d8e9]1106                    t  = self.pooled_thread(\
[32e7d93]1107                            target=self.terminate_segment(log=log,
[fd556d1]1108                                testbed=tb,
[32e7d93]1109                                cert_file=self.cert_file, 
1110                                cert_pwd=self.cert_pwd,
1111                                trusted_certs=self.trusted_certs,
1112                                caller=self.call_TerminateSegment),
1113                            args=(uri, tbparams[tb]['federant']['allocID']),
1114                            name=tb,
[cc8d8e9]1115                            pdata=thread_pool, trace_file=self.trace_file)
1116                    t.start()
[f52f5df]1117                # Wait until all finish (if any are being stopped)
1118                if succeeded:
1119                    thread_pool.wait_for_all_done()
[cc8d8e9]1120
1121                # release the allocations
1122                for tb in tbparams.keys():
[69692a9]1123                    self.release_access(tb, tbparams[tb]['allocID'],
1124                            tbparams[tb].get('uri', None))
[cc8d8e9]1125                # Remove the placeholder
1126                self.state_lock.acquire()
1127                self.state[eid]['experimentStatus'] = 'failed'
1128                if self.state_filename: self.write_state()
1129                self.state_lock.release()
1130
1131                log.error("Swap in failed on %s" % ",".join(failed))
1132                return
1133        else:
1134            log.info("[start_segment]: Experiment %s active" % eid)
1135
1136
1137        # Walk up tmpdir, deleting as we go
[69692a9]1138        if self.cleanup:
1139            log.debug("[start_experiment]: removing %s" % tmpdir)
1140            for path, dirs, files in os.walk(tmpdir, topdown=False):
1141                for f in files:
1142                    os.remove(os.path.join(path, f))
1143                for d in dirs:
1144                    os.rmdir(os.path.join(path, d))
1145            os.rmdir(tmpdir)
1146        else:
1147            log.debug("[start_experiment]: not removing %s" % tmpdir)
[cc8d8e9]1148
1149        # Insert the experiment into our state and update the disk copy
1150        self.state_lock.acquire()
1151        self.state[expid]['experimentStatus'] = 'active'
1152        self.state[eid] = self.state[expid]
1153        if self.state_filename: self.write_state()
1154        self.state_lock.release()
1155        return
1156
1157
[895a133]1158    def add_kit(self, e, kit):
1159        """
1160        Add a Software object created from the list of (install, location)
1161        tuples passed as kit  to the software attribute of an object e.  We
1162        do this enough to break out the code, but it's kind of a hack to
1163        avoid changing the old tuple rep.
1164        """
1165
1166        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1167
1168        if isinstance(e.software, list): e.software.extend(s)
1169        else: e.software = s
1170
1171
[a3ad8bd]1172    def create_experiment_state(self, fid, req, expid, expcert, 
1173            state='starting'):
[895a133]1174        """
1175        Create the initial entry in the experiment's state.  The expid and
1176        expcert are the experiment's fedid and certifacte that represents that
1177        ID, which are installed in the experiment state.  If the request
1178        includes a suggested local name that is used if possible.  If the local
1179        name is already taken by an experiment owned by this user that has
[a3ad8bd]1180        failed, it is overwritten.  Otherwise new letters are added until a
[895a133]1181        valid localname is found.  The generated local name is returned.
1182        """
1183
1184        if req.has_key('experimentID') and \
1185                req['experimentID'].has_key('localname'):
1186            overwrite = False
1187            eid = req['experimentID']['localname']
1188            # If there's an old failed experiment here with the same local name
1189            # and accessible by this user, we'll overwrite it, otherwise we'll
1190            # fall through and do the collision avoidance.
1191            old_expid = self.get_experiment_fedid(eid)
1192            if old_expid and self.check_experiment_access(fid, old_expid):
1193                self.state_lock.acquire()
1194                status = self.state[eid].get('experimentStatus', None)
1195                if status and status == 'failed':
1196                    # remove the old access attribute
1197                    self.auth.unset_attribute(fid, old_expid)
1198                    overwrite = True
1199                    del self.state[eid]
1200                    del self.state[old_expid]
1201                self.state_lock.release()
1202            self.state_lock.acquire()
1203            while (self.state.has_key(eid) and not overwrite):
1204                eid += random.choice(string.ascii_letters)
1205            # Initial state
1206            self.state[eid] = {
1207                    'experimentID' : \
1208                            [ { 'localname' : eid }, {'fedid': expid } ],
[a3ad8bd]1209                    'experimentStatus': state,
[895a133]1210                    'experimentAccess': { 'X509' : expcert },
1211                    'owner': fid,
1212                    'log' : [],
1213                }
1214            self.state[expid] = self.state[eid]
1215            if self.state_filename: self.write_state()
1216            self.state_lock.release()
1217        else:
1218            eid = self.exp_stem
1219            for i in range(0,5):
1220                eid += random.choice(string.ascii_letters)
1221            self.state_lock.acquire()
1222            while (self.state.has_key(eid)):
1223                eid = self.exp_stem
1224                for i in range(0,5):
1225                    eid += random.choice(string.ascii_letters)
1226            # Initial state
1227            self.state[eid] = {
1228                    'experimentID' : \
1229                            [ { 'localname' : eid }, {'fedid': expid } ],
[a3ad8bd]1230                    'experimentStatus': state,
[895a133]1231                    'experimentAccess': { 'X509' : expcert },
1232                    'owner': fid,
1233                    'log' : [],
1234                }
1235            self.state[expid] = self.state[eid]
1236            if self.state_filename: self.write_state()
1237            self.state_lock.release()
1238
1239        return eid
1240
1241
1242    def allocate_ips_to_topo(self, top):
1243        """
[69692a9]1244        Add an ip4_address attribute to all the hosts in the topology, based on
[895a133]1245        the shared substrates on which they sit.  An /etc/hosts file is also
[69692a9]1246        created and returned as a list of hostfiles entries.  We also return
1247        the allocator, because we may need to allocate IPs to portals
1248        (specifically DRAGON portals).
[895a133]1249        """
1250        subs = sorted(top.substrates, 
1251                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1252                reverse=True)
1253        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1254        ifs = { }
1255        hosts = [ ]
1256
1257        for idx, s in enumerate(subs):
1258            a = ips.allocate(len(s.interfaces)+2)
1259            if a :
1260                base, num = a
1261                if num < len(s.interfaces) +2 : 
1262                    raise service_error(service_error.internal,
1263                            "Allocator returned wrong number of IPs??")
1264            else:
1265                raise service_error(service_error.req, 
1266                        "Cannot allocate IP addresses")
1267
1268            base += 1
1269            for i in s.interfaces:
1270                i.attribute.append(
1271                        topdl.Attribute('ip4_address', 
1272                            "%s" % ip_addr(base)))
1273                hname = i.element.name[0]
1274                if ifs.has_key(hname):
1275                    hosts.append("%s\t%s-%s %s-%d" % \
1276                            (ip_addr(base), hname, s.name, hname,
1277                                ifs[hname]))
1278                else:
1279                    ifs[hname] = 0
1280                    hosts.append("%s\t%s-%s %s-%d %s" % \
1281                            (ip_addr(base), hname, s.name, hname,
1282                                ifs[hname], hname))
1283
1284                ifs[hname] += 1
1285                base += 1
[69692a9]1286        return hosts, ips
[895a133]1287
[99eb8cf]1288    def get_access_to_testbeds(self, testbeds, access_user, 
[e02cd14]1289            export_project, master, allocated, tbparams, services):
[895a133]1290        """
1291        Request access to the various testbeds required for this instantiation
1292        (passed in as testbeds).  User, access_user, expoert_project and master
1293        are used to construct the correct requests.  Per-testbed parameters are
1294        returned in tbparams.
1295        """
1296        for tb in testbeds:
[99eb8cf]1297            self.get_access(tb, None, tbparams, master,
[e02cd14]1298                    export_project, access_user, services)
[895a133]1299            allocated[tb] = 1
1300
1301    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1302        """
[e02cd14]1303        Create the sub-topologies that are needed for experiment instantiation.
[895a133]1304        """
1305        for tb in testbeds:
1306            topo[tb] = top.clone()
1307            to_delete = [ ]
[e02cd14]1308            # XXX: copy in for loop to simplify
[895a133]1309            for e in topo[tb].elements:
1310                etb = e.get_attribute('testbed')
1311                if etb and etb != tb:
1312                    for i in e.interface:
1313                        for s in i.subs:
1314                            try:
1315                                s.interfaces.remove(i)
1316                            except ValueError:
1317                                raise service_error(service_error.internal,
1318                                        "Can't remove interface??")
1319                    to_delete.append(e)
1320            for e in to_delete:
1321                topo[tb].elements.remove(e)
1322            topo[tb].make_indices()
1323
1324            for e in [ e for e in topo[tb].elements \
1325                    if isinstance(e,topdl.Computer)]:
1326                if self.fedkit: self.add_kit(e, self.fedkit)
1327
[13e3dd2]1328    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
[2761484]1329            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
1330            expid=None):
[e02cd14]1331        """
1332        Return a new internet portal node and a dict with the connectionInfo to
1333        be attached.
1334        """
[d87778f]1335        dproject = tbparams[dt].get('project', 'project')
1336        ddomain = tbparams[dt].get('domain', ".example.com")
[13e3dd2]1337        mdomain = tbparams[master].get('domain', '.example.com')
[e02cd14]1338        mproject = tbparams[master].get('project', 'project')
[13e3dd2]1339        muser = tbparams[master].get('user', 'root')
1340        smbshare = tbparams[master].get('smbshare', 'USERS')
[e02cd14]1341
[13e3dd2]1342        if st == master or dt == master:
1343            active = ("%s" % (st == master))
1344        else:
[e02cd14]1345            active = ("%s" % (st > dt))
[13e3dd2]1346
[69692a9]1347        ifaces = [ ]
1348        for sub, attrs in iface_desc:
1349            inf = topdl.Interface(
[5b74b63]1350                    name="inf%03d" % len(ifaces),
[69692a9]1351                    substrate=sub,
1352                    attribute=[
1353                        topdl.Attribute(
1354                            attribute=n,
1355                            value = v)
1356                        for n, v in attrs
[13e3dd2]1357                        ]
[69692a9]1358                    )
1359            ifaces.append(inf)
[617592b]1360        if conn_type == "ssh":
[2761484]1361            try:
1362                aid = tbparams[st]['allocID']['fedid']
1363            except:
[109a32a]1364                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
1365                        % st)
[2761484]1366                aid = None
[617592b]1367            info = {
1368                    "type" : conn_type, 
1369                    "portal": myname,
1370                    'fedAttr': [ 
1371                            { 'attribute': 'masterdomain', 'value': mdomain},
1372                            { 'attribute': 'masterexperiment', 'value': 
1373                                "%s/%s" % (mproject, eid)},
1374                            { 'attribute': 'active', 'value': active},
1375                            # Move to SMB service description
1376                            { 'attribute': 'masteruser', 'value': muser},
1377                            { 'attribute': 'smbshare', 'value': smbshare},
1378                        ],
[2761484]1379                    'parameter': [
1380                        {
1381                            'name': 'peer',
1382                            'key': 'fedid:%s/%s' % (expid, myname),
1383                            'store': self.store_url,
1384                            'type': 'output',
1385                        },
1386                        {
1387                            'name': 'peer',
1388                            'key': 'fedid:%s/%s' % (expid, desthost),
1389                            'store': self.store_url,
1390                            'type': 'input',
1391                        },
1392                        ]
[617592b]1393                    }
[2761484]1394            # Give this allocation the rights to access the key of the
1395            # peers
1396            if aid:
1397                for h in (myname, desthost):
1398                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
1399            else:
1400                self.log.error("No aid for %s in new_portal_node" % st)
[617592b]1401        else:
1402            info = None
[5b74b63]1403
[e02cd14]1404        return (topdl.Computer(
[13e3dd2]1405                name=myname,
1406                attribute=[ 
1407                    topdl.Attribute(attribute=n,value=v)
1408                        for n, v in (\
1409                            ('portal', 'true'),
1410                            ('portal_type', portal_type), 
[641bb66]1411                        )
[13e3dd2]1412                    ],
1413                interface=ifaces,
[e02cd14]1414                ), info)
[13e3dd2]1415
[2761484]1416    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
[13e3dd2]1417        ddomain = tbparams[dt].get('domain', ".example.com")
1418        dproject = tbparams[dt].get('project', 'project')
1419        tsubstrate = \
1420                topdl.Substrate(name='%s-%s' % (st, dt),
1421                        attribute= [
1422                            topdl.Attribute(
1423                                attribute='portal',
1424                                value='true')
1425                            ]
1426                        )
1427        segment_element = topdl.Segment(
1428                id= tbparams[dt]['allocID'],
1429                type='emulab',
1430                uri = self.tbmap.get(dt, None),
1431                interface=[ 
1432                    topdl.Interface(
1433                        substrate=tsubstrate.name),
1434                    ],
1435                attribute = [
1436                    topdl.Attribute(attribute=n, value=v)
1437                        for n, v in (\
1438                            ('domain', ddomain),
1439                            ('experiment', "%s/%s" % \
1440                                    (dproject, eid)),)
1441                    ],
1442                )
1443
1444        return (tsubstrate, segment_element)
1445
[2761484]1446    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
[69692a9]1447        if sub.capacity is None:
1448            raise service_error(service_error.internal,
1449                    "Cannot DRAGON split substrate w/o capacity")
1450        segs = [ ]
1451        substr = topdl.Substrate(name="dragon%d" % idx, 
1452                capacity=sub.capacity.clone(),
1453                attribute=[ topdl.Attribute(attribute=n, value=v)
1454                    for n, v, in (\
1455                            ('vlan', 'unassigned%d' % idx),)])
[2761484]1456        name = "dragon%d" % idx
[109a32a]1457        store_key = 'fedid:%s/vlan%d' % (expid, idx)
[69692a9]1458        for tb in tbs.keys():
1459            seg = topdl.Segment(
1460                    id = tbparams[tb]['allocID'],
1461                    type='emulab',
1462                    uri = self.tbmap.get(tb, None),
1463                    interface=[ 
1464                        topdl.Interface(
1465                            substrate=substr.name),
1466                        ],
1467                    attribute=[ topdl.Attribute(
[ecf679e]1468                        attribute='dragon_endpoint', 
1469                        value=tbparams[tb]['dragon']),
[69692a9]1470                        ]
1471                    )
1472            if tbparams[tb].has_key('vlans'):
1473                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1474            segs.append(seg)
1475
[109a32a]1476            # Give this allocation the rights to access the key of the
1477            # vlan_id
1478            try:
1479                aid = tbparams[tb]['allocID']['fedid']
1480                self.auth.set_attribute(aid, store_key)
1481            except:
1482                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
1483                        % tb)
[2761484]1484
1485        connInfo[name] = [ { 
[109a32a]1486            'type': 'transit',
[2761484]1487            'parameter': [ { 
1488                'name': 'vlan_id',
[109a32a]1489                'key': store_key,
[2761484]1490                'store': self.store_url,
1491                'type': 'output'
1492                } ]
1493            } ]
1494
1495        topo[name] = \
[69692a9]1496                topdl.Topology(substrates=[substr], elements=segs,
1497                        attribute=[
1498                            topdl.Attribute(attribute="transit", value='true'),
1499                            topdl.Attribute(attribute="dynamic", value='true'),
[109a32a]1500                            topdl.Attribute(attribute="testbed", 
1501                                value='dragon'),
1502                            topdl.Attribute(attribute="store_keys", 
1503                                value=store_key),
[69692a9]1504                            ]
1505                        )
1506
[5b74b63]1507    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
[109a32a]1508            connInfo, expid=None):
[69692a9]1509        """
1510        Add attribiutes to the various elements indicating that they are to be
[5b74b63]1511        dragon connected and create a dragon segment in topo to be
[69692a9]1512        instantiated.
1513        """
1514
1515        def get_substrate_from_topo(name, t):
1516            for s in t.substrates:
1517                if s.name == name: return s
1518            else: return None
1519
[109a32a]1520
[617592b]1521        mdomain = tbparams[master].get('domain', '.example.com')
1522        mproject = tbparams[master].get('project', 'project')
[5b74b63]1523        # dn is the number of previously created dragon nets.  This routine
1524        # creates a net numbered by dn
[69692a9]1525        dn = len([x for x in topo.keys() if x.startswith('dragon')])
[5b74b63]1526        # Count the number of interfaces on this substrate in each testbed from
1527        # the global topology
[69692a9]1528        count = { }
[617592b]1529        node = { }
[5b74b63]1530        for e in [ i.element for i in sub.interfaces ]:
[69692a9]1531            tb = e.get_attribute('testbed')
1532            count[tb] = count.get(tb, 0) + 1
[617592b]1533            node[tb] = i.get_attribute('ip4_address')
1534
[69692a9]1535
[5b74b63]1536        # Set the attributes in the copies that will allow setup of dragon
1537        # connections.
[69692a9]1538        for tb in tbs.keys():
1539            s = get_substrate_from_topo(sub.name, topo[tb])
1540            if s:
[5b74b63]1541                if not connInfo.has_key(tb):
1542                    connInfo[tb] = [ ]
[617592b]1543
[2761484]1544                try:
1545                    aid = tbparams[tb]['allocID']['fedid']
1546                except:
[109a32a]1547                    self.log.debug("[creat_dragon_substrate] " + 
1548                            "Can't get alloc id for %s?" %tb)
[2761484]1549                    aid = None
1550
[617592b]1551                # This may need another look, but only a service gateway will
1552                # look at the active parameter, and these are only inserted to
1553                # connect to the master.
[238db1e]1554                active = "%s" % ( tb == master)
[5b74b63]1555                info = {
1556                        'type': 'transit',
1557                        'member': [ {
1558                            'element': i.element.name[0], 
1559                            'interface': i.name
[617592b]1560                            } for i in s.interfaces \
1561                                    if isinstance(i.element, topdl.Computer) ],
1562                        'fedAttr': [ 
1563                            { 'attribute': 'masterdomain', 'value': mdomain},
1564                            { 'attribute': 'masterexperiment', 'value': 
1565                                "%s/%s" % (mproject, eid)},
1566                            { 'attribute': 'active', 'value': active},
1567                            ],
[2761484]1568                        'parameter': [ {
1569                            'name': 'vlan_id',
1570                            'key': 'fedid:%s/vlan%d' % (expid, dn),
1571                            'store': self.store_url,
1572                            'type': 'input',
1573                            } ]
[5b74b63]1574                        }
[109a32a]1575                if tbs.has_key(tb):
1576                    info['peer'] = tbs[tb]
[5b74b63]1577                connInfo[tb].append(info)
[2761484]1578
1579                # Give this allocation the rights to access the key of the
1580                # vlan_id
1581                if aid:
1582                    self.auth.set_attribute(aid, 
1583                            'fedid:%s/vlan%d' % (expid, dn))
[69692a9]1584            else:
1585                raise service_error(service_error.internal,
1586                        "No substrate %s in testbed %s" % (sub.name, tb))
1587
[109a32a]1588        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
[69692a9]1589
1590    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
[2761484]1591            segment_substrate, portals, connInfo, expid):
[69692a9]1592        # More than one testbed is on this substrate.  Insert
1593        # some portals into the subtopologies.  st == source testbed,
1594        # dt == destination testbed.
1595        for st in tbs.keys():
1596            if not segment_substrate.has_key(st):
1597                segment_substrate[st] = { }
1598            if not portals.has_key(st): 
1599                portals[st] = { }
[e02cd14]1600            if not connInfo.has_key(st):
1601                connInfo[st] = [ ]
[69692a9]1602            for dt in [ t for t in tbs.keys() if t != st]:
1603                sproject = tbparams[st].get('project', 'project')
1604                dproject = tbparams[dt].get('project', 'project')
1605                mproject = tbparams[master].get('project', 'project')
1606                sdomain = tbparams[st].get('domain', ".example.com")
1607                ddomain = tbparams[dt].get('domain', ".example.com")
1608                mdomain = tbparams[master].get('domain', '.example.com')
1609                muser = tbparams[master].get('user', 'root')
1610                smbshare = tbparams[master].get('smbshare', 'USERS')
1611                aid = tbparams[dt]['allocID']['fedid']
1612                if st == master or dt == master:
1613                    active = ("%s" % (st == master))
1614                else:
1615                    active = ("%s" %(st > dt))
1616                if not segment_substrate[st].has_key(dt):
1617                    # Put a substrate and a segment for the connected
1618                    # testbed in there.
1619                    tsubstrate, segment_element = \
[2761484]1620                            self.new_portal_substrate(st, dt, eid, tbparams,
1621                                    expid)
[69692a9]1622                    segment_substrate[st][dt] = tsubstrate
1623                    topo[st].substrates.append(tsubstrate)
1624                    topo[st].elements.append(segment_element)
1625
1626                new_portal = False
1627                if portals[st].has_key(dt):
1628                    # There's a portal set up to go to this destination.
[e02cd14]1629                    # See if there's room to multiplex this connection on
[69692a9]1630                    # it.  If so, add an interface to the portal; if not,
1631                    # set up to add a portal below.
1632                    # [This little festival of braces is just a pop of the
1633                    # last element in the list of portals between st and
1634                    # dt.]
1635                    portal = portals[st][dt][-1]
1636                    mux = len([ i for i in portal.interface \
1637                            if not i.get_attribute('portal')])
1638                    if mux == self.muxmax:
1639                        new_portal = True
1640                        portal_type = "experiment"
1641                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
[2761484]1642                        desthost = "%stunnel%d" % (st.lower(), 
1643                                len(portals[st][dt]))
[69692a9]1644                    else:
1645                        new_i = topdl.Interface(
[6e44258]1646                                substrate=sub.name,
[69692a9]1647                                attribute=[ 
1648                                    topdl.Attribute(
1649                                        attribute='ip4_address', 
1650                                        value=tbs[dt]
1651                                    )
1652                                ])
1653                        portal.interface.append(new_i)
1654                else:
1655                    # First connection to this testbed, make an empty list
1656                    # and set up to add the new portal below
1657                    new_portal = True
1658                    portals[st][dt] = [ ]
1659                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
[2761484]1660                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
[69692a9]1661
1662                    if dt == master or st == master: portal_type = "both"
1663                    else: portal_type = "experiment"
1664
1665                if new_portal:
1666                    infs = (
1667                            (segment_substrate[st][dt].name, 
1668                                (('portal', 'true'),)),
1669                            (sub.name, 
1670                                (('ip4_address', tbs[dt]),))
1671                        )
[e02cd14]1672                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
[69692a9]1673                            master, eid, myname, desthost, portal_type,
[2761484]1674                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
[69692a9]1675                    if self.fedkit:
1676                        self.add_kit(portal, self.fedkit)
1677                    if self.gatewaykit: 
1678                        self.add_kit(portal, self.gatewaykit)
1679
1680                    topo[st].elements.append(portal)
1681                    portals[st][dt].append(portal)
[e02cd14]1682                    connInfo[st].append(info)
[69692a9]1683
[2761484]1684    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo, expid):
[69692a9]1685        # Add to the master testbed
1686        tsubstrate, segment_element = \
[2761484]1687                self.new_portal_substrate(st, dt, eid, tbparams, expid)
[69692a9]1688        myname = "%stunnel" % dt
1689        desthost = "%stunnel" % st
1690
[e02cd14]1691        portal, info = self.new_portal_node(st, dt, tbparams, master,
[69692a9]1692                eid, myname, desthost, "control", 
[2761484]1693                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
1694                conn_attrs=[], expid=expid)
[69692a9]1695        if self.fedkit:
1696            self.add_kit(portal, self.fedkit)
1697        if self.gatewaykit: 
1698            self.add_kit(portal, self.gatewaykit)
1699
1700        topo[st].substrates.append(tsubstrate)
1701        topo[st].elements.append(segment_element)
1702        topo[st].elements.append(portal)
[e02cd14]1703        if not connInfo.has_key(st):
1704            connInfo[st] = [ ]
1705        connInfo[st].append(info)
[69692a9]1706
[6e44258]1707    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
[2761484]1708            substrate, tbparams, expid):
[69692a9]1709        # Add to the master testbed
1710        myname = "%stunnel" % dt
1711        desthost = "%s" % ip_addr(dip)
1712
[5b74b63]1713        portal, info = self.new_portal_node(st, dt, tbparams, master,
[69692a9]1714                eid, myname, desthost, "control", 
1715                ((substrate.name,(
1716                    ('portal','true'),
[5b74b63]1717                    ('ip4_address', "%s" % ip_addr(myip)),)),),
[2761484]1718                conn_type="transit", conn_attrs=[], expid=expid)
[69692a9]1719        if self.fedkit:
1720            self.add_kit(portal, self.fedkit)
1721        if self.gatewaykit: 
1722            self.add_kit(portal, self.gatewaykit)
1723
[617592b]1724        return portal
[69692a9]1725
[e02cd14]1726    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
[2761484]1727            connInfo, expid):
[895a133]1728        """
1729        For each substrate in the main topology, find those that
1730        have nodes on more than one testbed.  Insert portal nodes
1731        into the copies of those substrates on the sub topologies.
1732        """
[13e3dd2]1733        segment_substrate = { }
1734        portals = { }
[895a133]1735        for s in top.substrates:
1736            # tbs will contain an ip address on this subsrate that is in
1737            # each testbed.
1738            tbs = { }
1739            for i in s.interfaces:
1740                e = i.element
1741                tb = e.get_attribute('testbed')
1742                if tb and not tbs.has_key(tb):
1743                    for i in e.interface:
1744                        if s in i.subs:
1745                            tbs[tb]= i.get_attribute('ip4_address')
1746            if len(tbs) < 2:
1747                continue
1748
[69692a9]1749            # DRAGON will not create multi-site vlans yet
1750            if len(tbs) == 2 and \
1751                    all([tbparams[x].has_key('dragon') for x in tbs]):
1752                self.create_dragon_substrate(s, topo, tbs, tbparams, 
[2761484]1753                        master, eid, connInfo, expid)
[69692a9]1754            else:
1755                self.insert_internet_portals(s, topo, tbs, tbparams, master,
[2761484]1756                        eid, segment_substrate, portals, connInfo, expid)
[13e3dd2]1757
1758        # Make sure that all the slaves have a control portal back to the
1759        # master.
1760        for tb in [ t for t in tbparams.keys() if t != master ]:
1761            if len([e for e in topo[tb].elements \
1762                    if isinstance(e, topdl.Computer) and \
1763                    e.get_attribute('portal') and \
1764                    e.get_attribute('portal_type') == 'both']) == 0:
1765
[69692a9]1766                if tbparams[master].has_key('dragon') \
1767                        and tbparams[tb].has_key('dragon'):
1768
1769                    idx = len([x for x in topo.keys() \
1770                            if x.startswith('dragon')])
1771                    dip, leng = ip_allocator.allocate(4)
1772                    dip += 1
[6e44258]1773                    mip = dip+1
[69692a9]1774                    csub = topdl.Substrate(
1775                            name="dragon-control-%s" % tb,
1776                            capacity=topdl.Capacity(100000.0, 'max'),
1777                            attribute=[
1778                                topdl.Attribute(
1779                                    attribute='portal',
1780                                    value='true'
1781                                    )
1782                                ]
1783                            )
1784                    seg = topdl.Segment(
1785                            id= tbparams[master]['allocID'],
1786                            type='emulab',
1787                            uri = self.tbmap.get(master, None),
1788                            interface=[ 
1789                                topdl.Interface(
1790                                    substrate=csub.name),
1791                                ],
1792                            attribute = [
1793                                topdl.Attribute(attribute=n, value=v)
1794                                    for n, v in (\
1795                                        ('domain', 
1796                                            tbparams[master].get('domain',
1797                                                ".example.com")),
1798                                        ('experiment', "%s/%s" % \
1799                                                (tbparams[master].get(
1800                                                    'project', 
1801                                                    'project'), 
1802                                                    eid)),)
1803                                ],
1804                            )
[617592b]1805                    portal = self.new_dragon_portal(tb, master,
[2761484]1806                            master, eid, dip, mip, idx, csub, tbparams, expid)
[69692a9]1807                    topo[tb].substrates.append(csub)
[5b74b63]1808                    topo[tb].elements.append(portal)
[69692a9]1809                    topo[tb].elements.append(seg)
1810
1811                    mcsub = csub.clone()
1812                    seg = topdl.Segment(
1813                            id= tbparams[tb]['allocID'],
1814                            type='emulab',
1815                            uri = self.tbmap.get(tb, None),
1816                            interface=[ 
1817                                topdl.Interface(
1818                                    substrate=csub.name),
1819                                ],
1820                            attribute = [
1821                                topdl.Attribute(attribute=n, value=v)
1822                                    for n, v in (\
1823                                        ('domain', 
1824                                            tbparams[tb].get('domain',
1825                                                ".example.com")),
1826                                        ('experiment', "%s/%s" % \
1827                                                (tbparams[tb].get('project', 
1828                                                    'project'), 
1829                                                    eid)),)
1830                                ],
1831                            )
[617592b]1832                    portal = self.new_dragon_portal(master, tb, master,
[2761484]1833                            eid, mip, dip, idx, mcsub, tbparams, expid)
[69692a9]1834                    topo[master].substrates.append(mcsub)
[5b74b63]1835                    topo[master].elements.append(portal)
[69692a9]1836                    topo[master].elements.append(seg)
[617592b]1837                    for t in (master, tb):
1838                        topo[t].incorporate_elements()
[69692a9]1839
1840                    self.create_dragon_substrate(csub, topo, 
[109a32a]1841                            {tb: ip_addr(mip), master: ip_addr(dip)}, 
1842                            tbparams, master, eid, connInfo,
1843                            expid)
[69692a9]1844                else:
1845                    self.add_control_portal(master, tb, master, eid, topo, 
[2761484]1846                            tbparams, connInfo, expid)
[69692a9]1847                    self.add_control_portal(tb, master, master, eid, topo, 
[2761484]1848                            tbparams, connInfo, expid)
[13e3dd2]1849
1850        # Connect the portal nodes into the topologies and clear out
[895a133]1851        # substrates that are not in the topologies
1852        for tb in tbparams.keys():
1853            topo[tb].incorporate_elements()
1854            topo[tb].substrates = \
1855                    [s for s in topo[tb].substrates \
1856                        if len(s.interfaces) >0]
1857
1858    def wrangle_software(self, expid, top, topo, tbparams):
1859        """
1860        Copy software out to the repository directory, allocate permissions and
1861        rewrite the segment topologies to look for the software in local
1862        places.
1863        """
1864
1865        # Copy the rpms and tarfiles to a distribution directory from
1866        # which the federants can retrieve them
1867        linkpath = "%s/software" %  expid
1868        softdir ="%s/%s" % ( self.repodir, linkpath)
1869        softmap = { }
1870        # These are in a list of tuples format (each kit).  This comprehension
1871        # unwraps them into a single list of tuples that initilaizes the set of
1872        # tuples.
1873        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1874                for p, t in l ])
1875        pkgs.update([x.location for e in top.elements \
1876                for x in e.software])
1877        try:
1878            os.makedirs(softdir)
1879        except IOError, e:
1880            raise service_error(
1881                    "Cannot create software directory: %s" % e)
1882        # The actual copying.  Everything's converted into a url for copying.
1883        for pkg in pkgs:
1884            loc = pkg
1885
1886            scheme, host, path = urlparse(loc)[0:3]
1887            dest = os.path.basename(path)
1888            if not scheme:
1889                if not loc.startswith('/'):
1890                    loc = "/%s" % loc
1891                loc = "file://%s" %loc
1892            try:
1893                u = urlopen(loc)
1894            except Exception, e:
1895                raise service_error(service_error.req, 
1896                        "Cannot open %s: %s" % (loc, e))
1897            try:
1898                f = open("%s/%s" % (softdir, dest) , "w")
1899                self.log.debug("Writing %s/%s" % (softdir,dest) )
1900                data = u.read(4096)
1901                while data:
1902                    f.write(data)
1903                    data = u.read(4096)
1904                f.close()
1905                u.close()
1906            except Exception, e:
1907                raise service_error(service_error.internal,
1908                        "Could not copy %s: %s" % (loc, e))
1909            path = re.sub("/tmp", "", linkpath)
1910            # XXX
1911            softmap[pkg] = \
[7183b48]1912                    "%s/%s/%s" %\
1913                    ( self.repo_url, path, dest)
[895a133]1914
1915            # Allow the individual segments to access the software.
1916            for tb in tbparams.keys():
1917                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1918                        "/%s/%s" % ( path, dest))
1919
1920        # Convert the software locations in the segments into the local
1921        # copies on this host
1922        for soft in [ s for tb in topo.values() \
1923                for e in tb.elements \
1924                    if getattr(e, 'software', False) \
1925                        for s in e.software ]:
1926            if softmap.has_key(soft.location):
1927                soft.location = softmap[soft.location]
1928
1929
[a3ad8bd]1930    def new_experiment(self, req, fid):
1931        """
1932        The external interface to empty initial experiment creation called from
1933        the dispatcher.
1934
1935        Creates a working directory, splits the incoming description using the
1936        splitter script and parses out the avrious subsections using the
1937        lcasses above.  Once each sub-experiment is created, use pooled threads
1938        to instantiate them and start it all up.
1939        """
1940        if not self.auth.check_attribute(fid, 'new'):
1941            raise service_error(service_error.access, "New access denied")
1942
1943        try:
1944            tmpdir = tempfile.mkdtemp(prefix="split-")
1945        except IOError:
1946            raise service_error(service_error.internal, "Cannot create tmp dir")
1947
1948        try:
1949            access_user = self.accessdb[fid]
1950        except KeyError:
1951            raise service_error(service_error.internal,
1952                    "Access map and authorizer out of sync in " + \
[7183b48]1953                            "new_experiment for fedid %s"  % fid)
[a3ad8bd]1954
1955        pid = "dummy"
1956        gid = "dummy"
1957
1958        req = req.get('NewRequestBody', None)
1959        if not req:
1960            raise service_error(service_error.req,
1961                    "Bad request format (no NewRequestBody)")
1962
1963        # Generate an ID for the experiment (slice) and a certificate that the
1964        # allocator can use to prove they own it.  We'll ship it back through
1965        # the encrypted connection.
1966        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1967
1968        #now we're done with the tmpdir, and it should be empty
1969        if self.cleanup:
1970            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1971            os.rmdir(tmpdir)
1972        else:
1973            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1974
1975        eid = self.create_experiment_state(fid, req, expid, expcert, 
1976                state='empty')
1977
1978        # Let users touch the state
1979        self.auth.set_attribute(fid, expid)
1980        self.auth.set_attribute(expid, expid)
1981        # Override fedids can manipulate state as well
1982        for o in self.overrides:
1983            self.auth.set_attribute(o, expid)
1984
1985        rv = {
1986                'experimentID': [
1987                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1988                ],
1989                'experimentStatus': 'empty',
1990                'experimentAccess': { 'X509' : expcert }
1991            }
1992
1993        return rv
1994
1995
[e19b75c]1996    def create_experiment(self, req, fid):
[db6b092]1997        """
1998        The external interface to experiment creation called from the
1999        dispatcher.
2000
2001        Creates a working directory, splits the incoming description using the
2002        splitter script and parses out the avrious subsections using the
2003        lcasses above.  Once each sub-experiment is created, use pooled threads
2004        to instantiate them and start it all up.
2005        """
[7183b48]2006
2007        req = req.get('CreateRequestBody', None)
2008        if not req:
2009            raise service_error(service_error.req,
2010                    "Bad request format (no CreateRequestBody)")
2011
2012        # Get the experiment access
2013        exp = req.get('experimentID', None)
2014        if exp:
2015            if exp.has_key('fedid'):
2016                key = exp['fedid']
2017                expid = key
2018                eid = None
2019            elif exp.has_key('localname'):
2020                key = exp['localname']
2021                eid = key
2022                expid = None
2023            else:
2024                raise service_error(service_error.req, "Unknown lookup type")
2025        else:
2026            raise service_error(service_error.req, "No request?")
2027
2028        self.check_experiment_access(fid, key)
[db6b092]2029
2030        try:
2031            tmpdir = tempfile.mkdtemp(prefix="split-")
[895a133]2032            os.mkdir(tmpdir+"/keys")
[db6b092]2033        except IOError:
2034            raise service_error(service_error.internal, "Cannot create tmp dir")
2035
2036        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2037        gw_secretkey_base = "fed.%s" % self.ssh_type
2038        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2039        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2040        tclfile = tmpdir + "/experiment.tcl"
2041        tbparams = { }
2042        try:
2043            access_user = self.accessdb[fid]
2044        except KeyError:
2045            raise service_error(service_error.internal,
2046                    "Access map and authorizer out of sync in " + \
2047                            "create_experiment for fedid %s"  % fid)
2048
2049        pid = "dummy"
2050        gid = "dummy"
2051
2052        # The tcl parser needs to read a file so put the content into that file
2053        descr=req.get('experimentdescription', None)
2054        if descr:
2055            file_content=descr.get('ns2description', None)
2056            if file_content:
2057                try:
2058                    f = open(tclfile, 'w')
2059                    f.write(file_content)
2060                    f.close()
2061                except IOError:
2062                    raise service_error(service_error.internal,
2063                            "Cannot write temp experiment description")
2064            else:
2065                raise service_error(service_error.req, 
2066                        "Only ns2descriptions supported")
2067        else:
2068            raise service_error(service_error.req, "No experiment description")
2069
[7183b48]2070        self.state_lock.acquire()
2071        if self.state.has_key(key):
[4afcfc4]2072            self.state[key]['experimentStatus'] = "starting"
[7183b48]2073            for e in self.state[key].get('experimentID',[]):
2074                if not expid and e.has_key('fedid'):
2075                    expid = e['fedid']
2076                elif not eid and e.has_key('localname'):
2077                    eid = e['localname']
2078        self.state_lock.release()
2079
2080        if not (eid and expid):
2081            raise service_error(service_error.internal, 
2082                    "Cannot find local experiment info!?")
[db6b092]2083
2084        try: 
2085            # This catches exceptions to clear the placeholder if necessary
2086            try:
2087                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2088            except ValueError:
2089                raise service_error(service_error.server_config, 
2090                        "Bad key type (%s)" % self.ssh_type)
2091
2092            master = req.get('master', None)
2093            if not master:
2094                raise service_error(service_error.req,
2095                        "No master testbed label")
2096            export_project = req.get('exportProject', None)
2097            if not export_project:
2098                raise service_error(service_error.req, "No export project")
[895a133]2099           
2100            # Translate to topdl
[db6b092]2101            if self.splitter_url:
[895a133]2102                # XXX: need remote topdl translator
[db6b092]2103                self.log.debug("Calling remote splitter at %s" % \
2104                        self.splitter_url)
2105                split_data = self.remote_splitter(self.splitter_url,
2106                        file_content, master)
2107            else:
2108                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2109                    str(self.muxmax), '-m', master]
2110
2111                if self.fedkit:
2112                    tclcmd.append('-k')
2113
2114                if self.gatewaykit:
2115                    tclcmd.append('-K')
2116
2117                tclcmd.extend([pid, gid, eid, tclfile])
2118
2119                self.log.debug("running local splitter %s", " ".join(tclcmd))
2120                # This is just fantastic.  As a side effect the parser copies
2121                # tb_compat.tcl into the current directory, so that directory
2122                # must be writable by the fedd user.  Doing this in the
2123                # temporary subdir ensures this is the case.
[70caa72]2124                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
[db6b092]2125                        cwd=tmpdir)
[866c983]2126                split_data = tclparser.stdout
2127
[cc8d8e9]2128            top = topdl.topology_from_xml(file=split_data, top="experiment")
[895a133]2129
[69692a9]2130            hosts, ip_allocator = self.allocate_ips_to_topo(top)
[895a133]2131             # Find the testbeds to look up
2132            testbeds = set([ a.value for e in top.elements \
2133                    for a in e.attribute \
2134                    if a.attribute == 'testbed'] )
2135
2136            allocated = { }         # Testbeds we can access
2137            topo ={ }               # Sub topologies
[e02cd14]2138            connInfo = { }          # Connection information
2139            services = [ ]
[99eb8cf]2140            self.get_access_to_testbeds(testbeds, access_user, 
[e02cd14]2141                    export_project, master, allocated, tbparams, services)
[895a133]2142            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2143
2144            # Copy configuration files into the remote file store
[6c57fe9]2145            # The config urlpath
2146            configpath = "/%s/config" % expid
2147            # The config file system location
2148            configdir ="%s%s" % ( self.repodir, configpath)
2149            try:
2150                os.makedirs(configdir)
2151            except IOError, e:
2152                raise service_error(
2153                        "Cannot create config directory: %s" % e)
2154            try:
2155                f = open("%s/hosts" % configdir, "w")
2156                f.write('\n'.join(hosts))
2157                f.close()
2158            except IOError, e:
2159                raise service_error(service_error.internal, 
2160                        "Cannot write hosts file: %s" % e)
2161            try:
[40dd8c1]2162                copy_file("%s" % gw_pubkey, "%s/%s" % \
[6c57fe9]2163                        (configdir, gw_pubkey_base))
[40dd8c1]2164                copy_file("%s" % gw_secretkey, "%s/%s" % \
[6c57fe9]2165                        (configdir, gw_secretkey_base))
2166            except IOError, e:
2167                raise service_error(service_error.internal, 
2168                        "Cannot copy keyfiles: %s" % e)
[cc8d8e9]2169
[6c57fe9]2170            # Allow the individual testbeds to access the configuration files.
2171            for tb in tbparams.keys():
2172                asignee = tbparams[tb]['allocID']['fedid']
2173                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2174                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
[cc8d8e9]2175
[e02cd14]2176            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
[2761484]2177                    connInfo, expid)
[69692a9]2178            # Now get access to the dynamic testbeds
2179            for k, t in topo.items():
2180                if not t.get_attribute('dynamic'):
2181                    continue
2182                tb = t.get_attribute('testbed')
2183                if tb: 
[617592b]2184                    self.get_access(tb, None, tbparams, master, 
[e02cd14]2185                            export_project, access_user, services)
[69692a9]2186                    tbparams[k] = tbparams[tb]
2187                    del tbparams[tb]
2188                    allocated[k] = 1
[109a32a]2189                    store_keys = t.get_attribute('store_keys')
2190                    # Give the testbed access to keys it exports or imports
2191                    if store_keys:
2192                        for sk in store_keys.split(" "):
2193                            self.auth.set_attribute(\
2194                                    tbparams[k]['allocID']['fedid'], sk)
[69692a9]2195                else:
2196                    raise service_error(service_error.internal, 
2197                            "Dynamic allocation from no testbed!?")
2198
[895a133]2199            self.wrangle_software(expid, top, topo, tbparams)
[cc8d8e9]2200
2201            vtopo = topdl.topology_to_vtopo(top)
2202            vis = self.genviz(vtopo)
[db6b092]2203
[866c983]2204            # save federant information
2205            for k in allocated.keys():
[ecf679e]2206                tbparams[k]['federant'] = {
2207                        'name': [ { 'localname' : eid} ],
2208                        'allocID' : tbparams[k]['allocID'],
2209                        'master' : k == master,
2210                        'uri': tbparams[k]['uri'],
[866c983]2211                    }
[69692a9]2212                if tbparams[k].has_key('emulab'):
2213                        tbparams[k]['federant']['emulab'] = \
2214                                tbparams[k]['emulab']
[866c983]2215
[db6b092]2216            self.state_lock.acquire()
2217            self.state[eid]['vtopo'] = vtopo
2218            self.state[eid]['vis'] = vis
2219            self.state[expid]['federant'] = \
2220                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2221                        if tbparams[tb].has_key('federant') ]
[cc8d8e9]2222            if self.state_filename: 
2223                self.write_state()
[db6b092]2224            self.state_lock.release()
[866c983]2225        except service_error, e:
2226            # If something goes wrong in the parse (usually an access error)
2227            # clear the placeholder state.  From here on out the code delays
[db6b092]2228            # exceptions.  Failing at this point returns a fault to the remote
2229            # caller.
[cc8d8e9]2230
[866c983]2231            self.state_lock.acquire()
2232            del self.state[eid]
[bd3e314]2233            del self.state[expid]
2234            if self.state_filename: self.write_state()
[866c983]2235            self.state_lock.release()
2236            raise e
2237
2238
[db6b092]2239        # Start the background swapper and return the starting state.  From
2240        # here on out, the state will stick around a while.
[866c983]2241
[db6b092]2242        # Let users touch the state
[bd3e314]2243        self.auth.set_attribute(fid, expid)
2244        self.auth.set_attribute(expid, expid)
[db6b092]2245        # Override fedids can manipulate state as well
2246        for o in self.overrides:
2247            self.auth.set_attribute(o, expid)
2248
2249        # Create a logger that logs to the experiment's state object as well as
2250        # to the main log file.
2251        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
[f07fa49]2252        alloc_collector = self.list_log(self.state[eid]['log'])
2253        h = logging.StreamHandler(alloc_collector)
[db6b092]2254        # XXX: there should be a global one of these rather than repeating the
2255        # code.
2256        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2257                    '%d %b %y %H:%M:%S'))
2258        alloc_log.addHandler(h)
2259       
[6c57fe9]2260        attrs = [ 
2261                {
2262                    'attribute': 'ssh_pubkey', 
2263                    'value': '%s/%s/config/%s' % \
[7183b48]2264                            (self.repo_url, expid, gw_pubkey_base)
[6c57fe9]2265                },
2266                {
2267                    'attribute': 'ssh_secretkey', 
2268                    'value': '%s/%s/config/%s' % \
[7183b48]2269                            (self.repo_url, expid, gw_secretkey_base)
[6c57fe9]2270                },
2271                {
2272                    'attribute': 'hosts', 
2273                    'value': '%s/%s/config/hosts' % \
[7183b48]2274                            (self.repo_url, expid)
[6c57fe9]2275                },
[ecca6eb]2276                {
2277                    'attribute': 'experiment_name',
2278                    'value': eid,
2279                },
[6c57fe9]2280            ]
2281
[617592b]2282        # transit and disconnected testbeds may not have a connInfo entry.
2283        # Fill in the blanks.
2284        for t in allocated.keys():
2285            if not connInfo.has_key(t):
2286                connInfo[t] = { }
2287
[db6b092]2288        # Start a thread to do the resource allocation
[e19b75c]2289        t  = Thread(target=self.allocate_resources,
[7183b48]2290                args=(allocated, master, eid, expid, tbparams, 
[e02cd14]2291                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
2292                    services),
[db6b092]2293                name=eid)
2294        t.start()
2295
2296        rv = {
2297                'experimentID': [
2298                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2299                ],
2300                'experimentStatus': 'starting',
2301            }
2302
2303        return rv
[9479343]2304   
2305    def get_experiment_fedid(self, key):
2306        """
[db6b092]2307        find the fedid associated with the localname key in the state database.
[9479343]2308        """
2309
[db6b092]2310        rv = None
2311        self.state_lock.acquire()
2312        if self.state.has_key(key):
2313            if isinstance(self.state[key], dict):
2314                try:
2315                    kl = [ f['fedid'] for f in \
2316                            self.state[key]['experimentID']\
2317                                if f.has_key('fedid') ]
2318                except KeyError:
2319                    self.state_lock.release()
2320                    raise service_error(service_error.internal, 
2321                            "No fedid for experiment %s when getting "+\
2322                                    "fedid(!?)" % key)
2323                if len(kl) == 1:
2324                    rv = kl[0]
2325                else:
2326                    self.state_lock.release()
2327                    raise service_error(service_error.internal, 
2328                            "multiple fedids for experiment %s when " +\
2329                                    "getting fedid(!?)" % key)
2330            else:
2331                self.state_lock.release()
2332                raise service_error(service_error.internal, 
2333                        "Unexpected state for %s" % key)
2334        self.state_lock.release()
2335        return rv
[a97394b]2336
[4064742]2337    def check_experiment_access(self, fid, key):
[866c983]2338        """
2339        Confirm that the fid has access to the experiment.  Though a request
2340        may be made in terms of a local name, the access attribute is always
2341        the experiment's fedid.
2342        """
2343        if not isinstance(key, fedid):
[db6b092]2344            key = self.get_experiment_fedid(key)
[866c983]2345
2346        if self.auth.check_attribute(fid, key):
2347            return True
2348        else:
2349            raise service_error(service_error.access, "Access Denied")
[4064742]2350
2351
[db6b092]2352    def get_handler(self, path, fid):
[7183b48]2353        self.log.info("Get handler %s %s" % (path, fid))
[6c57fe9]2354        if self.auth.check_attribute(fid, path):
2355            return ("%s/%s" % (self.repodir, path), "application/binary")
2356        else:
2357            return (None, None)
[987aaa1]2358
2359    def get_vtopo(self, req, fid):
[866c983]2360        """
2361        Return the stored virtual topology for this experiment
2362        """
2363        rv = None
[db6b092]2364        state = None
[866c983]2365
2366        req = req.get('VtopoRequestBody', None)
2367        if not req:
2368            raise service_error(service_error.req,
2369                    "Bad request format (no VtopoRequestBody)")
2370        exp = req.get('experiment', None)
2371        if exp:
2372            if exp.has_key('fedid'):
2373                key = exp['fedid']
2374                keytype = "fedid"
2375            elif exp.has_key('localname'):
2376                key = exp['localname']
2377                keytype = "localname"
2378            else:
2379                raise service_error(service_error.req, "Unknown lookup type")
2380        else:
2381            raise service_error(service_error.req, "No request?")
2382
2383        self.check_experiment_access(fid, key)
2384
2385        self.state_lock.acquire()
2386        if self.state.has_key(key):
[db6b092]2387            if self.state[key].has_key('vtopo'):
2388                rv = { 'experiment' : {keytype: key },\
2389                        'vtopo': self.state[key]['vtopo'],\
2390                    }
2391            else:
2392                state = self.state[key]['experimentStatus']
[866c983]2393        self.state_lock.release()
2394
2395        if rv: return rv
[bd3e314]2396        else: 
[db6b092]2397            if state:
2398                raise service_error(service_error.partial, 
2399                        "Not ready: %s" % state)
2400            else:
2401                raise service_error(service_error.req, "No such experiment")
[987aaa1]2402
2403    def get_vis(self, req, fid):
[866c983]2404        """
2405        Return the stored visualization for this experiment
2406        """
2407        rv = None
[db6b092]2408        state = None
[866c983]2409
2410        req = req.get('VisRequestBody', None)
2411        if not req:
2412            raise service_error(service_error.req,
2413                    "Bad request format (no VisRequestBody)")
2414        exp = req.get('experiment', None)
2415        if exp:
2416            if exp.has_key('fedid'):
2417                key = exp['fedid']
2418                keytype = "fedid"
2419            elif exp.has_key('localname'):
2420                key = exp['localname']
2421                keytype = "localname"
2422            else:
2423                raise service_error(service_error.req, "Unknown lookup type")
2424        else:
2425            raise service_error(service_error.req, "No request?")
2426
2427        self.check_experiment_access(fid, key)
2428
2429        self.state_lock.acquire()
2430        if self.state.has_key(key):
[db6b092]2431            if self.state[key].has_key('vis'):
2432                rv =  { 'experiment' : {keytype: key },\
2433                        'vis': self.state[key]['vis'],\
2434                        }
2435            else:
2436                state = self.state[key]['experimentStatus']
[866c983]2437        self.state_lock.release()
2438
2439        if rv: return rv
[bd3e314]2440        else:
[db6b092]2441            if state:
2442                raise service_error(service_error.partial, 
2443                        "Not ready: %s" % state)
2444            else:
2445                raise service_error(service_error.req, "No such experiment")
[987aaa1]2446
[65f3f29]2447    def clean_info_response(self, rv):
[db6b092]2448        """
2449        Remove the information in the experiment's state object that is not in
2450        the info response.
2451        """
2452        # Remove the owner info (should always be there, but...)
2453        if rv.has_key('owner'): del rv['owner']
2454
2455        # Convert the log into the allocationLog parameter and remove the
2456        # log entry (with defensive programming)
2457        if rv.has_key('log'):
2458            rv['allocationLog'] = "".join(rv['log'])
2459            del rv['log']
2460        else:
2461            rv['allocationLog'] = ""
2462
2463        if rv['experimentStatus'] != 'active':
2464            if rv.has_key('federant'): del rv['federant']
2465        else:
[69692a9]2466            # remove the allocationID and uri info from each federant
[db6b092]2467            for f in rv.get('federant', []):
2468                if f.has_key('allocID'): del f['allocID']
[69692a9]2469                if f.has_key('uri'): del f['uri']
[db6b092]2470        return rv
[65f3f29]2471
[c52c48d]2472    def get_info(self, req, fid):
[866c983]2473        """
2474        Return all the stored info about this experiment
2475        """
2476        rv = None
2477
2478        req = req.get('InfoRequestBody', None)
2479        if not req:
2480            raise service_error(service_error.req,
[65f3f29]2481                    "Bad request format (no InfoRequestBody)")
[866c983]2482        exp = req.get('experiment', None)
2483        if exp:
2484            if exp.has_key('fedid'):
2485                key = exp['fedid']
2486                keytype = "fedid"
2487            elif exp.has_key('localname'):
2488                key = exp['localname']
2489                keytype = "localname"
2490            else:
2491                raise service_error(service_error.req, "Unknown lookup type")
2492        else:
2493            raise service_error(service_error.req, "No request?")
2494
2495        self.check_experiment_access(fid, key)
2496
2497        # The state may be massaged by the service function that called
2498        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2499        # state.
2500        self.state_lock.acquire()
2501        if self.state.has_key(key):
2502            rv = copy.deepcopy(self.state[key])
2503        self.state_lock.release()
2504
[db6b092]2505        if rv:
2506            return self.clean_info_response(rv)
[bd3e314]2507        else:
[db6b092]2508            raise service_error(service_error.req, "No such experiment")
[7a8d667]2509
[65f3f29]2510    def get_multi_info(self, req, fid):
2511        """
2512        Return all the stored info that this fedid can access
2513        """
[db6b092]2514        rv = { 'info': [ ] }
[65f3f29]2515
[db6b092]2516        self.state_lock.acquire()
2517        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
[829246e]2518            try:
2519                self.check_experiment_access(fid, key)
2520            except service_error, e:
2521                if e.code == service_error.access:
2522                    continue
2523                else:
2524                    self.state_lock.release()
2525                    raise e
[65f3f29]2526
[db6b092]2527            if self.state.has_key(key):
2528                e = copy.deepcopy(self.state[key])
2529                e = self.clean_info_response(e)
2530                rv['info'].append(e)
[65f3f29]2531        self.state_lock.release()
[db6b092]2532        return rv
[65f3f29]2533
[7a8d667]2534    def terminate_experiment(self, req, fid):
[866c983]2535        """
2536        Swap this experiment out on the federants and delete the shared
2537        information
2538        """
2539        tbparams = { }
2540        req = req.get('TerminateRequestBody', None)
2541        if not req:
2542            raise service_error(service_error.req,
2543                    "Bad request format (no TerminateRequestBody)")
[db6b092]2544        force = req.get('force', False)
[866c983]2545        exp = req.get('experiment', None)
2546        if exp:
2547            if exp.has_key('fedid'):
2548                key = exp['fedid']
2549                keytype = "fedid"
2550            elif exp.has_key('localname'):
2551                key = exp['localname']
2552                keytype = "localname"
2553            else:
2554                raise service_error(service_error.req, "Unknown lookup type")
2555        else:
2556            raise service_error(service_error.req, "No request?")
2557
2558        self.check_experiment_access(fid, key)
2559
[db6b092]2560        dealloc_list = [ ]
[46e4682]2561
2562
[5ae3857]2563        # Create a logger that logs to the dealloc_list as well as to the main
2564        # log file.
2565        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2566        h = logging.StreamHandler(self.list_log(dealloc_list))
2567        # XXX: there should be a global one of these rather than repeating the
2568        # code.
2569        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2570                    '%d %b %y %H:%M:%S'))
2571        dealloc_log.addHandler(h)
2572
2573        self.state_lock.acquire()
2574        fed_exp = self.state.get(key, None)
2575
2576        if fed_exp:
2577            # This branch of the conditional holds the lock to generate a
2578            # consistent temporary tbparams variable to deallocate experiments.
2579            # It releases the lock to do the deallocations and reacquires it to
2580            # remove the experiment state when the termination is complete.
2581
2582            # First make sure that the experiment creation is complete.
2583            status = fed_exp.get('experimentStatus', None)
2584
2585            if status:
2586                if status in ('starting', 'terminating'):
2587                    if not force:
2588                        self.state_lock.release()
2589                        raise service_error(service_error.partial, 
2590                                'Experiment still being created or destroyed')
2591                    else:
2592                        self.log.warning('Experiment in %s state ' % status + \
2593                                'being terminated by force.')
2594            else:
2595                # No status??? trouble
2596                self.state_lock.release()
2597                raise service_error(service_error.internal,
2598                        "Experiment has no status!?")
2599
2600            ids = []
2601            #  experimentID is a list of dicts that are self-describing
2602            #  identifiers.  This finds all the fedids and localnames - the
2603            #  keys of self.state - and puts them into ids.
2604            for id in fed_exp.get('experimentID', []):
2605                if id.has_key('fedid'): ids.append(id['fedid'])
2606                if id.has_key('localname'): ids.append(id['localname'])
2607
[63a35b7]2608            # Collect the allocation/segment ids into a dict keyed by the fedid
2609            # of the allocation (or a monotonically increasing integer) that
2610            # contains a tuple of uri, aid (which is a dict...)
2611            for i, fed in enumerate(fed_exp.get('federant', [])):
[5ae3857]2612                try:
[63a35b7]2613                    uri = fed['uri']
2614                    aid = fed['allocID']
2615                    k = fed['allocID'].get('fedid', i)
[5ae3857]2616                except KeyError, e:
2617                    continue
[63a35b7]2618                tbparams[k] = (uri, aid)
[5ae3857]2619            fed_exp['experimentStatus'] = 'terminating'
2620            if self.state_filename: self.write_state()
2621            self.state_lock.release()
2622
2623            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2624            # then completes, so we can't wait if nothing starts.  So, no
2625            # tbparams, no start.
2626            if len(tbparams) > 0:
2627                thread_pool = self.thread_pool(self.nthreads)
[63a35b7]2628                for k in tbparams.keys():
[5ae3857]2629                    # Create and start a thread to stop the segment
2630                    thread_pool.wait_for_slot()
[63a35b7]2631                    uri, aid = tbparams[k]
[5ae3857]2632                    t  = self.pooled_thread(\
[e19b75c]2633                            target=self.terminate_segment(log=dealloc_log,
[63a35b7]2634                                testbed=uri,
[5ae3857]2635                                cert_file=self.cert_file, 
2636                                cert_pwd=self.cert_pwd,
2637                                trusted_certs=self.trusted_certs,
2638                                caller=self.call_TerminateSegment),
[63a35b7]2639                            args=(uri, aid), name=k,
[5ae3857]2640                            pdata=thread_pool, trace_file=self.trace_file)
2641                    t.start()
2642                # Wait for completions
2643                thread_pool.wait_for_all_done()
2644
2645            # release the allocations (failed experiments have done this
2646            # already, and starting experiments may be in odd states, so we
2647            # ignore errors releasing those allocations
2648            try: 
[63a35b7]2649                for k in tbparams.keys():
[ecf679e]2650                    # This releases access by uri
[63a35b7]2651                    uri, aid = tbparams[k]
2652                    self.release_access(None, aid, uri=uri)
[5ae3857]2653            except service_error, e:
2654                if status != 'failed' and not force:
2655                    raise e
2656
2657            # Remove the terminated experiment
2658            self.state_lock.acquire()
2659            for id in ids:
2660                if self.state.has_key(id): del self.state[id]
2661
2662            if self.state_filename: self.write_state()
2663            self.state_lock.release()
2664
[2761484]2665            # Delete any synch points associated with this experiment.  All
2666            # synch points begin with the fedid of the experiment.
2667            fedid_keys = set(["fedid:%s" % f for f in ids \
2668                    if isinstance(f, fedid)])
2669            for k in self.synch_store.all_keys():
2670                try:
2671                    if len(k) > 45 and k[0:46] in fedid_keys:
2672                        self.synch_store.del_value(k)
[dadc4da]2673                except synch_store.BadDeletionError:
[2761484]2674                    pass
2675            self.write_store()
2676               
[5ae3857]2677            return { 
2678                    'experiment': exp , 
2679                    'deallocationLog': "".join(dealloc_list),
2680                    }
2681        else:
2682            # Don't forget to release the lock
2683            self.state_lock.release()
2684            raise service_error(service_error.req, "No saved state")
[2761484]2685
2686
2687    def GetValue(self, req, fid):
2688        """
2689        Get a value from the synchronized store
2690        """
2691        req = req.get('GetValueRequestBody', None)
2692        if not req:
2693            raise service_error(service_error.req,
2694                    "Bad request format (no GetValueRequestBody)")
2695       
2696        name = req['name']
2697        wait = req['wait']
2698        rv = { 'name': name }
2699
2700        if self.auth.check_attribute(fid, name):
[dadc4da]2701            try:
2702                v = self.synch_store.get_value(name, wait)
2703            except synch_store.RevokedKeyError:
2704                # No more synch on this key
2705                raise service_error(service_error.federant, 
2706                        "Synch key %s revoked" % name)
[2761484]2707            if v is not None:
2708                rv['value'] = v
[109a32a]2709            self.log.debug("[GetValue] got %s from %s" % (v, name))
[2761484]2710            return rv
2711        else:
2712            raise service_error(service_error.access, "Access Denied")
2713       
2714
2715    def SetValue(self, req, fid):
2716        """
2717        Set a value in the synchronized store
2718        """
2719        req = req.get('SetValueRequestBody', None)
2720        if not req:
2721            raise service_error(service_error.req,
2722                    "Bad request format (no SetValueRequestBody)")
2723       
2724        name = req['name']
2725        v = req['value']
2726
2727        if self.auth.check_attribute(fid, name):
2728            try:
2729                self.synch_store.set_value(name, v)
2730                self.write_store()
[109a32a]2731                self.log.debug("[SetValue] set %s to %s" % (name, v))
[2761484]2732            except synch_store.CollisionError:
2733                # Translate into a service_error
2734                raise service_error(service_error.req,
2735                        "Value already set: %s" %name)
[dadc4da]2736            except synch_store.RevokedKeyError:
2737                # No more synch on this key
2738                raise service_error(service_error.federant, 
2739                        "Synch key %s revoked" % name)
[2761484]2740            return { 'name': name, 'value': v }
2741        else:
2742            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.