source: fedd/federation/experiment_control.py @ 97edf0d

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 97edf0d was 35a4c01, checked in by Ted Faber <faber@…>, 15 years ago

Access controllers manage the fedkit now. Also upped the parallelism for
swapins.

  • Property mode set to 100644
File size: 93.4 KB
RevLine 
[6679c122]1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
[eee2b2e]11import pickle
[c971895]12import logging
[79b6596]13import signal
14import time
[6679c122]15
[3441fe3]16import traceback
[c971895]17# For parsing visualization output and splitter output
18import xml.parsers.expat
[3441fe3]19
[6c57fe9]20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
[6679c122]22
[db6b092]23from urlparse import urlparse
24from urllib2 import urlopen
25
[ec4fb42]26from util import *
[51cc9df]27from fedid import fedid, generate_fedid
[9460b1e]28from remote_service import xmlrpc_handler, soap_handler, service_caller
[c971895]29from service_error import service_error
[2761484]30from synch_store import synch_store
[6679c122]31
[db6b092]32import topdl
[f07fa49]33import list_log
[db6b092]34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
[11a08b0]37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
[ec4fb42]44class experiment_control_local:
[0ea11af]45    """
46    Control of experiments that this system can directly access.
47
48    Includes experiment creation, termination and information dissemination.
49    Thred safe.
50    """
[79b6596]51
52    class ssh_cmd_timeout(RuntimeError): pass
[6679c122]53   
[1af38d6]54    class thread_pool:
[866c983]55        """
56        A class to keep track of a set of threads all invoked for the same
57        task.  Manages the mutual exclusion of the states.
58        """
59        def __init__(self, nthreads):
60            """
61            Start a pool.
62            """
63            self.changed = Condition()
64            self.started = 0
65            self.terminated = 0
66            self.nthreads = nthreads
67
68        def acquire(self):
69            """
70            Get the pool's lock.
71            """
72            self.changed.acquire()
73
74        def release(self):
75            """
76            Release the pool's lock.
77            """
78            self.changed.release()
79
80        def wait(self, timeout = None):
81            """
82            Wait for a pool thread to start or stop.
83            """
84            self.changed.wait(timeout)
85
86        def start(self):
87            """
88            Called by a pool thread to report starting.
89            """
90            self.changed.acquire()
91            self.started += 1
92            self.changed.notifyAll()
93            self.changed.release()
94
95        def terminate(self):
96            """
97            Called by a pool thread to report finishing.
98            """
99            self.changed.acquire()
100            self.terminated += 1
101            self.changed.notifyAll()
102            self.changed.release()
103
104        def clear(self):
105            """
106            Clear all pool data.
107            """
108            self.changed.acquire()
109            self.started = 0
110            self.terminated =0
111            self.changed.notifyAll()
112            self.changed.release()
113
114        def wait_for_slot(self):
115            """
116            Wait until we have a free slot to start another pooled thread
117            """
118            self.acquire()
119            while self.started - self.terminated >= self.nthreads:
120                self.wait()
121            self.release()
122
[32e7d93]123        def wait_for_all_done(self, timeout=None):
[866c983]124            """
[32e7d93]125            Wait until all active threads finish (and at least one has
126            started).  If a timeout is given, return after waiting that long
127            for termination.  If all threads are done (and one has started in
128            the since the last clear()) return True, otherwise False.
[866c983]129            """
[32e7d93]130            if timeout:
131                deadline = time.time() + timeout
[866c983]132            self.acquire()
133            while self.started == 0 or self.started > self.terminated:
[32e7d93]134                self.wait(timeout)
135                if timeout:
136                    if time.time() > deadline:
137                        break
138                    timeout = deadline - time.time()
[866c983]139            self.release()
[32e7d93]140            return not (self.started == 0 or self.started > self.terminated)
[8bc5754]141
[1af38d6]142    class pooled_thread(Thread):
[866c983]143        """
144        One of a set of threads dedicated to a specific task.  Uses the
145        thread_pool class above for coordination.
146        """
147        def __init__(self, group=None, target=None, name=None, args=(), 
148                kwargs={}, pdata=None, trace_file=None):
149            Thread.__init__(self, group, target, name, args, kwargs)
150            self.rv = None          # Return value of the ops in this thread
151            self.exception = None   # Exception that terminated this thread
152            self.target=target      # Target function to run on start()
153            self.args = args        # Args to pass to target
154            self.kwargs = kwargs    # Additional kw args
155            self.pdata = pdata      # thread_pool for this class
156            # Logger for this thread
157            self.log = logging.getLogger("fedd.experiment_control")
158       
159        def run(self):
160            """
161            Emulate Thread.run, except add pool data manipulation and error
162            logging.
163            """
164            if self.pdata:
165                self.pdata.start()
166
167            if self.target:
168                try:
169                    self.rv = self.target(*self.args, **self.kwargs)
170                except service_error, s:
171                    self.exception = s
172                    self.log.error("Thread exception: %s %s" % \
173                            (s.code_string(), s.desc))
174                except:
175                    self.exception = sys.exc_info()[1]
176                    self.log.error(("Unexpected thread exception: %s" +\
177                            "Trace %s") % (self.exception,\
178                                traceback.format_exc()))
179            if self.pdata:
180                self.pdata.terminate()
[6679c122]181
[f069052]182    call_RequestAccess = service_caller('RequestAccess')
183    call_ReleaseAccess = service_caller('ReleaseAccess')
[cc8d8e9]184    call_StartSegment = service_caller('StartSegment')
[5ae3857]185    call_TerminateSegment = service_caller('TerminateSegment')
[f069052]186    call_Ns2Split = service_caller('Ns2Split')
[058f58e]187
[3f6bc5f]188    def __init__(self, config=None, auth=None):
[866c983]189        """
190        Intialize the various attributes, most from the config object
191        """
192
193        def parse_tarfile_list(tf):
194            """
195            Parse a tarfile list from the configuration.  This is a set of
196            paths and tarfiles separated by spaces.
197            """
198            rv = [ ]
199            if tf is not None:
200                tl = tf.split()
201                while len(tl) > 1:
202                    p, t = tl[0:2]
203                    del tl[0:2]
204                    rv.append((p, t))
205            return rv
206
207        self.thread_with_rv = experiment_control_local.pooled_thread
208        self.thread_pool = experiment_control_local.thread_pool
[f07fa49]209        self.list_log = list_log.list_log
[866c983]210
211        self.cert_file = config.get("experiment_control", "cert_file")
212        if self.cert_file:
213            self.cert_pwd = config.get("experiment_control", "cert_pwd")
214        else:
215            self.cert_file = config.get("globals", "cert_file")
216            self.cert_pwd = config.get("globals", "cert_pwd")
217
218        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
219                or config.get("globals", "trusted_certs")
220
[6c57fe9]221        self.repodir = config.get("experiment_control", "repodir")
[7183b48]222        self.repo_url = config.get("experiment_control", "repo_url", 
223                "https://users.isi.deterlab.net:23235");
[cc8d8e9]224
[866c983]225        self.exp_stem = "fed-stem"
226        self.log = logging.getLogger("fedd.experiment_control")
227        set_log_level(config, "experiment_control", self.log)
228        self.muxmax = 2
[35a4c01]229        self.nthreads = 10
[866c983]230        self.randomize_experiments = False
231
232        self.splitter = None
233        self.ssh_keygen = "/usr/bin/ssh-keygen"
234        self.ssh_identity_file = None
235
236
237        self.debug = config.getboolean("experiment_control", "create_debug")
[69692a9]238        self.cleanup = not config.getboolean("experiment_control", 
239                "leave_tmpfiles")
[866c983]240        self.state_filename = config.get("experiment_control", 
241                "experiment_state")
[2761484]242        self.store_filename = config.get("experiment_control", 
243                "synch_store")
244        self.store_url = config.get("experiment_control", "store_url")
[866c983]245        self.splitter_url = config.get("experiment_control", "splitter_uri")
246        self.fedkit = parse_tarfile_list(\
247                config.get("experiment_control", "fedkit"))
248        self.gatewaykit = parse_tarfile_list(\
249                config.get("experiment_control", "gatewaykit"))
250        accessdb_file = config.get("experiment_control", "accessdb")
251
252        self.ssh_pubkey_file = config.get("experiment_control", 
253                "ssh_pubkey_file")
254        self.ssh_privkey_file = config.get("experiment_control",
255                "ssh_privkey_file")
256        # NB for internal master/slave ops, not experiment setup
257        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
[ca489e8]258
[db6b092]259        self.overrides = set([])
260        ovr = config.get('experiment_control', 'overrides')
261        if ovr:
262            for o in ovr.split(","):
263                o = o.strip()
264                if o.startswith('fedid:'): o = o[len('fedid:'):]
265                self.overrides.add(fedid(hexstr=o))
[ca489e8]266
[866c983]267        self.state = { }
268        self.state_lock = Lock()
269        self.tclsh = "/usr/local/bin/otclsh"
270        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
271                config.get("experiment_control", "tcl_splitter",
272                        "/usr/testbed/lib/ns2ir/parse.tcl")
273        mapdb_file = config.get("experiment_control", "mapdb")
274        self.trace_file = sys.stderr
275
276        self.def_expstart = \
277                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
278                "/tmp/federate";
279        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
280                "FEDDIR/hosts";
281        self.def_gwstart = \
282                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
283                "/tmp/bridge.log";
284        self.def_mgwstart = \
285                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
286                "/tmp/bridge.log";
287        self.def_gwimage = "FBSD61-TUNNEL2";
288        self.def_gwtype = "pc";
289        self.local_access = { }
290
291        if auth:
292            self.auth = auth
293        else:
294            self.log.error(\
295                    "[access]: No authorizer initialized, creating local one.")
296            auth = authorizer()
297
298
299        if self.ssh_pubkey_file:
300            try:
301                f = open(self.ssh_pubkey_file, 'r')
302                self.ssh_pubkey = f.read()
303                f.close()
304            except IOError:
305                raise service_error(service_error.internal,
306                        "Cannot read sshpubkey")
307        else:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311        if not self.ssh_privkey_file:
312            raise service_error(service_error.internal, 
313                    "No SSH public key file?")
314
315
316        if mapdb_file:
317            self.read_mapdb(mapdb_file)
318        else:
319            self.log.warn("[experiment_control] No testbed map, using defaults")
320            self.tbmap = { 
321                    'deter':'https://users.isi.deterlab.net:23235',
322                    'emulab':'https://users.isi.deterlab.net:23236',
323                    'ucb':'https://users.isi.deterlab.net:23237',
324                    }
325
326        if accessdb_file:
327                self.read_accessdb(accessdb_file)
328        else:
329            raise service_error(service_error.internal,
330                    "No accessdb specified in config")
331
332        # Grab saved state.  OK to do this w/o locking because it's read only
333        # and only one thread should be in existence that can see self.state at
334        # this point.
335        if self.state_filename:
336            self.read_state()
337
[2761484]338        if self.store_filename:
339            self.read_store()
340        else:
341            self.log.warning("No saved synch store")
342            self.synch_store = synch_store
343
[866c983]344        # Dispatch tables
345        self.soap_services = {\
[a3ad8bd]346                'New': soap_handler('New', self.new_experiment),
[e19b75c]347                'Create': soap_handler('Create', self.create_experiment),
[866c983]348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
[65f3f29]351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
[866c983]352                'Terminate': soap_handler('Terminate', 
[e19b75c]353                    self.terminate_experiment),
[2761484]354                'GetValue': soap_handler('GetValue', self.GetValue),
355                'SetValue': soap_handler('SetValue', self.SetValue),
[866c983]356        }
357
358        self.xmlrpc_services = {\
[a3ad8bd]359                'New': xmlrpc_handler('New', self.new_experiment),
[e19b75c]360                'Create': xmlrpc_handler('Create', self.create_experiment),
[866c983]361                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
362                'Vis': xmlrpc_handler('Vis', self.get_vis),
363                'Info': xmlrpc_handler('Info', self.get_info),
[65f3f29]364                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
[866c983]365                'Terminate': xmlrpc_handler('Terminate',
[e19b75c]366                    self.terminate_experiment),
[2761484]367                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
368                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
[866c983]369        }
[19cc408]370
[a97394b]371    # Call while holding self.state_lock
[eee2b2e]372    def write_state(self):
[866c983]373        """
374        Write a new copy of experiment state after copying the existing state
375        to a backup.
376
377        State format is a simple pickling of the state dictionary.
378        """
379        if os.access(self.state_filename, os.W_OK):
[40dd8c1]380            copy_file(self.state_filename, \
381                    "%s.bak" % self.state_filename)
[866c983]382        try:
383            f = open(self.state_filename, 'w')
384            pickle.dump(self.state, f)
385        except IOError, e:
386            self.log.error("Can't write file %s: %s" % \
387                    (self.state_filename, e))
388        except pickle.PicklingError, e:
389            self.log.error("Pickling problem: %s" % e)
390        except TypeError, e:
391            self.log.error("Pickling problem (TypeError): %s" % e)
[eee2b2e]392
[2761484]393    @staticmethod
394    def get_alloc_ids(state):
395        """
396        Pull the fedids of the identifiers of each allocation from the
397        state.  Again, a dict dive that's best isolated.
398
399        Used by read_store and read state
400        """
401
402        return [ f['allocID']['fedid'] 
403                for f in state.get('federant',[]) \
404                    if f.has_key('allocID') and \
405                        f['allocID'].has_key('fedid')]
406
[a97394b]407    # Call while holding self.state_lock
[eee2b2e]408    def read_state(self):
[866c983]409        """
410        Read a new copy of experiment state.  Old state is overwritten.
411
412        State format is a simple pickling of the state dictionary.
413        """
[cc8d8e9]414       
415        def get_experiment_id(state):
416            """
417            Pull the fedid experimentID out of the saved state.  This is kind
418            of a gross walk through the dict.
419            """
420
421            if state.has_key('experimentID'):
422                for e in state['experimentID']:
423                    if e.has_key('fedid'):
424                        return e['fedid']
425                else:
426                    return None
427            else:
428                return None
429
[866c983]430        try:
431            f = open(self.state_filename, "r")
432            self.state = pickle.load(f)
433            self.log.debug("[read_state]: Read state from %s" % \
434                    self.state_filename)
435        except IOError, e:
436            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
437                    % (self.state_filename, e))
438        except pickle.UnpicklingError, e:
439            self.log.warning(("[read_state]: No saved state: " + \
440                    "Unpickling failed: %s") % e)
441       
[cc8d8e9]442        for s in self.state.values():
[866c983]443            try:
[cc8d8e9]444
445                eid = get_experiment_id(s)
446                if eid : 
447                    # Give the owner rights to the experiment
448                    self.auth.set_attribute(s['owner'], eid)
449                    # And holders of the eid as well
450                    self.auth.set_attribute(eid, eid)
[db6b092]451                    # allow overrides to control experiments as well
452                    for o in self.overrides:
453                        self.auth.set_attribute(o, eid)
[cc8d8e9]454                    # Set permissions to allow reading of the software repo, if
455                    # any, as well.
[2761484]456                    for a in self.get_alloc_ids(s):
[cc8d8e9]457                        self.auth.set_attribute(a, 'repo/%s' % eid)
458                else:
459                    raise KeyError("No experiment id")
[866c983]460            except KeyError, e:
461                self.log.warning("[read_state]: State ownership or identity " +\
462                        "misformatted in %s: %s" % (self.state_filename, e))
[4064742]463
464
465    def read_accessdb(self, accessdb_file):
[866c983]466        """
467        Read the mapping from fedids that can create experiments to their name
468        in the 3-level access namespace.  All will be asserted from this
469        testbed and can include the local username and porject that will be
470        asserted on their behalf by this fedd.  Each fedid is also added to the
471        authorization system with the "create" attribute.
472        """
473        self.accessdb = {}
474        # These are the regexps for parsing the db
475        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
476        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
477                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
478        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
479                "\s*->\s*(" + name_expr + ")\s*$")
480        lineno = 0
481
482        # Parse the mappings and store in self.authdb, a dict of
483        # fedid -> (proj, user)
484        try:
485            f = open(accessdb_file, "r")
486            for line in f:
487                lineno += 1
488                line = line.strip()
489                if len(line) == 0 or line.startswith('#'):
490                    continue
491                m = project_line.match(line)
492                if m:
493                    fid = fedid(hexstr=m.group(1))
494                    project, user = m.group(2,3)
495                    if not self.accessdb.has_key(fid):
496                        self.accessdb[fid] = []
497                    self.accessdb[fid].append((project, user))
498                    continue
499
500                m = user_line.match(line)
501                if m:
502                    fid = fedid(hexstr=m.group(1))
503                    project = None
504                    user = m.group(2)
505                    if not self.accessdb.has_key(fid):
506                        self.accessdb[fid] = []
507                    self.accessdb[fid].append((project, user))
508                    continue
509                self.log.warn("[experiment_control] Error parsing access " +\
510                        "db %s at line %d" %  (accessdb_file, lineno))
511        except IOError:
512            raise service_error(service_error.internal,
513                    "Error opening/reading %s as experiment " +\
514                            "control accessdb" %  accessdb_file)
515        f.close()
516
517        # Initialize the authorization attributes
518        for fid in self.accessdb.keys():
519            self.auth.set_attribute(fid, 'create')
[a3ad8bd]520            self.auth.set_attribute(fid, 'new')
[34bc05c]521
522    def read_mapdb(self, file):
[866c983]523        """
524        Read a simple colon separated list of mappings for the
525        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
526        """
527
528        self.tbmap = { }
529        lineno =0
530        try:
531            f = open(file, "r")
532            for line in f:
533                lineno += 1
534                line = line.strip()
535                if line.startswith('#') or len(line) == 0:
536                    continue
537                try:
538                    label, url = line.split(':', 1)
539                    self.tbmap[label] = url
540                except ValueError, e:
541                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
542                            "map db: %s %s" % (lineno, line, e))
543        except IOError, e:
544            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
545                    "open %s: %s" % (file, e))
546        f.close()
[2761484]547
548    def read_store(self):
549        try:
550            self.synch_store = synch_store()
551            self.synch_store.load(self.store_filename)
552            self.log.debug("[read_store]: Read store from %s" % \
553                    self.store_filename)
554        except IOError, e:
555            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
556                    % (self.state_filename, e))
557            self.synch_store = synch_store()
558
559        # Set the initial permissions on data in the store.  XXX: This ad hoc
560        # authorization attribute initialization is getting out of hand.
561        for k in self.synch_store.all_keys():
562            try:
563                if k.startswith('fedid:'):
564                    fid = fedid(hexstr=k[6:46])
565                    if self.state.has_key(fid):
566                        for a in self.get_alloc_ids(self.state[fid]):
567                            self.auth.set_attribute(a, k)
568            except ValueError, e:
569                self.log.warn("Cannot deduce permissions for %s" % k)
570
571
572    def write_store(self):
573        """
574        Write a new copy of synch_store after writing current state
575        to a backup.  We use the internal synch_store pickle method to avoid
576        incinsistent data.
577
578        State format is a simple pickling of the store.
579        """
580        if os.access(self.store_filename, os.W_OK):
581            copy_file(self.store_filename, \
582                    "%s.bak" % self.store_filename)
583        try:
584            self.synch_store.save(self.store_filename)
585        except IOError, e:
586            self.log.error("Can't write file %s: %s" % \
587                    (self.store_filename, e))
588        except TypeError, e:
589            self.log.error("Pickling problem (TypeError): %s" % e)
590
[866c983]591       
[6679c122]592    def generate_ssh_keys(self, dest, type="rsa" ):
[866c983]593        """
594        Generate a set of keys for the gateways to use to talk.
595
596        Keys are of type type and are stored in the required dest file.
597        """
598        valid_types = ("rsa", "dsa")
599        t = type.lower();
600        if t not in valid_types: raise ValueError
601        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
602
603        try:
604            trace = open("/dev/null", "w")
605        except IOError:
606            raise service_error(service_error.internal,
607                    "Cannot open /dev/null??");
608
609        # May raise CalledProcessError
610        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
[4ea1e22]611        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
[866c983]612        if rv != 0:
613            raise service_error(service_error.internal, 
614                    "Cannot generate nonce ssh keys.  %s return code %d" \
615                            % (self.ssh_keygen, rv))
[6679c122]616
[0d830de]617    def gentopo(self, str):
[866c983]618        """
619        Generate the topology dtat structure from the splitter's XML
620        representation of it.
621
622        The topology XML looks like:
623            <experiment>
624                <nodes>
625                    <node><vname></vname><ips>ip1:ip2</ips></node>
626                </nodes>
627                <lans>
628                    <lan>
629                        <vname></vname><vnode></vnode><ip></ip>
630                        <bandwidth></bandwidth><member>node:port</member>
631                    </lan>
632                </lans>
633        """
634        class topo_parse:
635            """
636            Parse the topology XML and create the dats structure.
637            """
638            def __init__(self):
639                # Typing of the subelements for data conversion
640                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
641                self.int_subelements = ( 'bandwidth',)
642                self.float_subelements = ( 'delay',)
643                # The final data structure
644                self.nodes = [ ]
645                self.lans =  [ ]
646                self.topo = { \
647                        'node': self.nodes,\
648                        'lan' : self.lans,\
649                    }
650                self.element = { }  # Current element being created
651                self.chars = ""     # Last text seen
652
653            def end_element(self, name):
654                # After each sub element the contents is added to the current
655                # element or to the appropriate list.
656                if name == 'node':
657                    self.nodes.append(self.element)
658                    self.element = { }
659                elif name == 'lan':
660                    self.lans.append(self.element)
661                    self.element = { }
662                elif name in self.str_subelements:
663                    self.element[name] = self.chars
664                    self.chars = ""
665                elif name in self.int_subelements:
666                    self.element[name] = int(self.chars)
667                    self.chars = ""
668                elif name in self.float_subelements:
669                    self.element[name] = float(self.chars)
670                    self.chars = ""
671
672            def found_chars(self, data):
673                self.chars += data.rstrip()
674
675
676        tp = topo_parse();
677        parser = xml.parsers.expat.ParserCreate()
678        parser.EndElementHandler = tp.end_element
679        parser.CharacterDataHandler = tp.found_chars
680
681        parser.Parse(str)
682
683        return tp.topo
684       
[0d830de]685
686    def genviz(self, topo):
[866c983]687        """
688        Generate the visualization the virtual topology
689        """
690
691        neato = "/usr/local/bin/neato"
692        # These are used to parse neato output and to create the visualization
693        # file.
[0ac1934]694        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
[866c983]695        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
696                "%s</type></node>"
697
698        try:
699            # Node names
700            nodes = [ n['vname'] for n in topo['node'] ]
701            topo_lans = topo['lan']
[cc8d8e9]702        except KeyError, e:
703            raise service_error(service_error.internal, "Bad topology: %s" %e)
[866c983]704
705        lans = { }
706        links = { }
707
708        # Walk through the virtual topology, organizing the connections into
709        # 2-node connections (links) and more-than-2-node connections (lans).
710        # When a lan is created, it's added to the list of nodes (there's a
711        # node in the visualization for the lan).
712        for l in topo_lans:
713            if links.has_key(l['vname']):
714                if len(links[l['vname']]) < 2:
715                    links[l['vname']].append(l['vnode'])
716                else:
717                    nodes.append(l['vname'])
718                    lans[l['vname']] = links[l['vname']]
719                    del links[l['vname']]
720                    lans[l['vname']].append(l['vnode'])
721            elif lans.has_key(l['vname']):
722                lans[l['vname']].append(l['vnode'])
723            else:
724                links[l['vname']] = [ l['vnode'] ]
725
726
727        # Open up a temporary file for dot to turn into a visualization
728        try:
729            df, dotname = tempfile.mkstemp()
730            dotfile = os.fdopen(df, 'w')
731        except IOError:
732            raise service_error(service_error.internal,
733                    "Failed to open file in genviz")
734
[db6b092]735        try:
736            dnull = open('/dev/null', 'w')
737        except IOError:
738            service_error(service_error.internal,
[886307f]739                    "Failed to open /dev/null in genviz")
740
[866c983]741        # Generate a dot/neato input file from the links, nodes and lans
742        try:
743            print >>dotfile, "graph G {"
744            for n in nodes:
745                print >>dotfile, '\t"%s"' % n
746            for l in links.keys():
747                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
748            for l in lans.keys():
749                for n in lans[l]:
750                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
751            print >>dotfile, "}"
752            dotfile.close()
753        except TypeError:
754            raise service_error(service_error.internal,
755                    "Single endpoint link in vtopo")
756        except IOError:
757            raise service_error(service_error.internal, "Cannot write dot file")
758
759        # Use dot to create a visualization
760        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
[886307f]761                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
[db6b092]762                close_fds=True)
763        dnull.close()
[866c983]764
765        # Translate dot to vis format
766        vis_nodes = [ ]
767        vis = { 'node': vis_nodes }
768        for line in dot.stdout:
769            m = vis_re.match(line)
770            if m:
771                vn = m.group(1)
772                vis_node = {'name': vn, \
773                        'x': float(m.group(2)),\
774                        'y' : float(m.group(3)),\
775                    }
776                if vn in links.keys() or vn in lans.keys():
777                    vis_node['type'] = 'lan'
778                else:
779                    vis_node['type'] = 'node'
780                vis_nodes.append(vis_node)
781        rv = dot.wait()
782
783        os.remove(dotname)
784        if rv == 0 : return vis
785        else: return None
[d0ae12d]786
[99eb8cf]787    def get_access(self, tb, nodes, tbparam, master, export_project,
[e02cd14]788            access_user, services):
[866c983]789        """
790        Get access to testbed through fedd and set the parameters for that tb
791        """
792        uri = self.tbmap.get(tb, None)
793        if not uri:
[b78c9ea]794            raise service_error(service_error.server_config, 
[866c983]795                    "Unknown testbed: %s" % tb)
796
[8218a3b]797        # Tweak search order so that if there are entries in access_user that
798        # have a project matching the export project, we try them first
799        if export_project and export_project.has_key('localname'): 
800            pn = export_project['localname'] 
801               
802            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
803            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
804        else: 
805            access_sequence = access_user
806
807        for p, u in access_sequence: 
[866c983]808            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
809                    "to %s") %  ((p or "None"), u, uri))
810
811            if p:
812                # Request with user and project specified
813                req = {\
814                        'destinationTestbed' : { 'uri' : uri },
[3bddd24]815                        'credential': [ "project: %s" % p, "user: %s"  % u],
[866c983]816                        'allocID' : { 'localname': 'test' },
817                    }
818            else:
819                # Request with only user specified
820                req = {\
821                        'destinationTestbed' : { 'uri' : uri },
[3bddd24]822                        'credential': [ 'user: %s' % u ],
[866c983]823                        'user':  [ {'userID': { 'localname': u } } ],
824                        'allocID' : { 'localname': 'test' },
825                    }
826
827            if tb == master:
828                # NB, the export_project parameter is a dict that includes
829                # the type
830                req['exportProject'] = export_project
[e02cd14]831                req['service'] = [
832                        { 'name': 'userconfig', 'visibility': 'export'},
833                        { 'name': 'SMB', 'visibility': 'export'},
834                        { 'name': 'seer', 'visibility': 'export'},
835                        { 'name': 'tmcd', 'visibility': 'export'},
836                    ]
[866c983]837
838            # node resources if any
839            if nodes != None and len(nodes) > 0:
840                rnodes = [ ]
841                for n in nodes:
842                    rn = { }
843                    image, hw, count = n.split(":")
844                    if image: rn['image'] = [ image ]
845                    if hw: rn['hardware'] = [ hw ]
846                    if count and int(count) >0 : rn['count'] = int(count)
847                    rnodes.append(rn)
848                req['resources']= { }
849                req['resources']['node'] = rnodes
850
851            try:
852                if self.local_access.has_key(uri):
853                    # Local access call
854                    req = { 'RequestAccessRequestBody' : req }
855                    r = self.local_access[uri].RequestAccess(req, 
856                            fedid(file=self.cert_file))
857                    r = { 'RequestAccessResponseBody' : r }
858                else:
859                    r = self.call_RequestAccess(uri, req, 
860                            self.cert_file, self.cert_pwd, self.trusted_certs)
861            except service_error, e:
862                if e.code == service_error.access:
863                    self.log.debug("[get_access] Access denied")
864                    r = None
865                    continue
866                else:
867                    raise e
868
[e19b75c]869            if r.has_key('RequestAccessResponseBody'):
870                # Through to here we have a valid response, not a fault.
871                # Access denied is a fault, so something better or worse than
872                # access denied has happened.
873                r = r['RequestAccessResponseBody']
874                self.log.debug("[get_access] Access granted")
875                break
876            else:
877                raise service_error(service_error.protocol,
878                        "Bad proxy response")
879       
880        if not r:
881            raise service_error(service_error.access, 
882                    "Access denied by %s (%s)" % (tb, uri))
[db6b092]883
[4afcfc4]884        tbparam[tb] = { 
[69692a9]885                "allocID" : r['allocID'],
886                "uri": uri,
[4afcfc4]887                }
[e02cd14]888        if 'service' in r: 
889            services.extend(r['service'])
[4afcfc4]890
891        # Add attributes to parameter space.  We don't allow attributes to
892        # overlay any parameters already installed.
[617592b]893        for a in r.get('fedAttr', []):
[4afcfc4]894            try:
895                if a['attribute'] and \
896                        isinstance(a['attribute'], basestring)\
897                        and not tbparam[tb].has_key(a['attribute'].lower()):
898                    tbparam[tb][a['attribute'].lower()] = a['value']
899            except KeyError:
900                self.log.error("Bad attribute in response: %s" % a)
[db6b092]901
[69692a9]902    def release_access(self, tb, aid, uri=None):
[e19b75c]903        """
904        Release access to testbed through fedd
905        """
[db6b092]906
[69692a9]907        if not uri:
908            uri = self.tbmap.get(tb, None)
[e19b75c]909        if not uri:
[69692a9]910            raise service_error(service_error.server_config, 
[e19b75c]911                    "Unknown testbed: %s" % tb)
[db6b092]912
[e19b75c]913        if self.local_access.has_key(uri):
914            resp = self.local_access[uri].ReleaseAccess(\
915                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
916                    fedid(file=self.cert_file))
917            resp = { 'ReleaseAccessResponseBody': resp } 
918        else:
919            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
920                    self.cert_file, self.cert_pwd, self.trusted_certs)
[db6b092]921
[e19b75c]922        # better error coding
[db6b092]923
[e19b75c]924    def remote_splitter(self, uri, desc, master):
[db6b092]925
[e19b75c]926        req = {
927                'description' : { 'ns2description': desc },
928                'master': master,
929                'include_fedkit': bool(self.fedkit),
930                'include_gatewaykit': bool(self.gatewaykit)
[db6b092]931            }
932
[e19b75c]933        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
934                self.trusted_certs)
935
936        if r.has_key('Ns2SplitResponseBody'):
937            r = r['Ns2SplitResponseBody']
938            if r.has_key('output'):
939                return r['output'].splitlines()
940            else:
941                raise service_error(service_error.protocol, 
942                        "Bad splitter response (no output)")
943        else:
944            raise service_error(service_error.protocol, "Bad splitter response")
[cc8d8e9]945
[e19b75c]946    class start_segment:
[fd556d1]947        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[f07fa49]948                cert_pwd=None, trusted_certs=None, caller=None,
949                log_collector=None):
[cc8d8e9]950            self.log = log
951            self.debug = debug
952            self.cert_file = cert_file
953            self.cert_pwd = cert_pwd
954            self.trusted_certs = None
955            self.caller = caller
[fd556d1]956            self.testbed = testbed
[f07fa49]957            self.log_collector = log_collector
[69692a9]958            self.response = None
[cc8d8e9]959
[e02cd14]960        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
961                services=None):
[cc8d8e9]962            req = {
963                    'allocID': { 'fedid' : aid }, 
964                    'segmentdescription': { 
965                        'topdldescription': topo.to_dict(),
966                    },
[ecca6eb]967                    'master': master,
[cc8d8e9]968                }
[e02cd14]969
970            if connInfo:
971                req['connection'] = connInfo
972            # Add services to request.  The master exports, everyone else
973            # imports.
974            if services:
975                svcs = [ x.copy() for x in services]
976                for s in svcs:
977                    if master: s['visibility'] = 'export'
978                    else: s['visibility'] = 'import'
979                req['service'] = svcs
[6c57fe9]980            if attrs:
981                req['fedAttr'] = attrs
[cc8d8e9]982
[fd556d1]983            try:
[13e3dd2]984                self.log.debug("Calling StartSegment at %s " % uri)
[fd556d1]985                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
986                        self.trusted_certs)
[f07fa49]987                if r.has_key('StartSegmentResponseBody'):
988                    lval = r['StartSegmentResponseBody'].get('allocationLog',
989                            None)
990                    if lval and self.log_collector:
991                        for line in  lval.splitlines(True):
992                            self.log_collector.write(line)
[69692a9]993                    self.response = r
[f07fa49]994                else:
995                    raise service_error(service_error.internal, 
996                            "Bad response!?: %s" %r)
[fd556d1]997                return True
998            except service_error, e:
999                self.log.error("Start segment failed on %s: %s" % \
1000                        (self.testbed, e))
1001                return False
[cc8d8e9]1002
1003
[5ae3857]1004
[e19b75c]1005    class terminate_segment:
[fd556d1]1006        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[5ae3857]1007                cert_pwd=None, trusted_certs=None, caller=None):
1008            self.log = log
1009            self.debug = debug
1010            self.cert_file = cert_file
1011            self.cert_pwd = cert_pwd
1012            self.trusted_certs = None
1013            self.caller = caller
[fd556d1]1014            self.testbed = testbed
[5ae3857]1015
1016        def __call__(self, uri, aid ):
1017            req = {
1018                    'allocID': aid , 
1019                }
[fd556d1]1020            try:
1021                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1022                        self.trusted_certs)
1023                return True
1024            except service_error, e:
1025                self.log.error("Terminate segment failed on %s: %s" % \
1026                        (self.testbed, e))
1027                return False
[db6b092]1028   
1029
[7183b48]1030    def allocate_resources(self, allocated, master, eid, expid, 
[f07fa49]1031            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
[e02cd14]1032            attrs=None, connInfo={}, services=[]):
[69692a9]1033
[cc8d8e9]1034        started = { }           # Testbeds where a sub-experiment started
1035                                # successfully
1036
1037        # XXX
1038        fail_soft = False
1039
1040        log = alloc_log or self.log
1041
1042        thread_pool = self.thread_pool(self.nthreads)
1043        threads = [ ]
1044
[109a32a]1045        for tb in allocated.keys():
1046            # Create and start a thread to start the segment, and save it
1047            # to get the return value later
1048            thread_pool.wait_for_slot()
1049            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1050            if not uri:
1051                raise service_error(service_error.internal, 
1052                        "Unknown testbed %s !?" % tb)
1053
[cc8d8e9]1054            if tbparams[tb].has_key('allocID') and \
1055                    tbparams[tb]['allocID'].has_key('fedid'):
1056                aid = tbparams[tb]['allocID']['fedid']
1057            else:
1058                raise service_error(service_error.internal, 
1059                        "No alloc id for testbed %s !?" % tb)
1060
[109a32a]1061            t  = self.pooled_thread(\
1062                    target=self.start_segment(log=log, debug=self.debug,
1063                        testbed=tb, cert_file=self.cert_file,
1064                        cert_pwd=self.cert_pwd,
1065                        trusted_certs=self.trusted_certs,
1066                        caller=self.call_StartSegment,
1067                        log_collector=log_collector), 
1068                    args=(uri, aid, topo[tb], tb == master, 
1069                        attrs, connInfo[tb], services),
1070                    name=tb,
1071                    pdata=thread_pool, trace_file=self.trace_file)
[69692a9]1072            threads.append(t)
1073            t.start()
[cc8d8e9]1074
[109a32a]1075        # Wait until all finish (keep pinging the log, though)
1076        mins = 0
[dadc4da]1077        revoked = False
[109a32a]1078        while not thread_pool.wait_for_all_done(60.0):
1079            mins += 1
1080            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1081                    % mins)
[dadc4da]1082            if not revoked and \
[f52f5df]1083                    len([ t.getName() for t in threads if t.rv == False]) > 0:
[dadc4da]1084                # a testbed has failed.  Revoke this experiment's
1085                # synchronizarion values so that sub experiments will not
1086                # deadlock waiting for synchronization that will never happen
1087                self.log.info("A subexperiment has failed to swap in, " + \
1088                        "revoking synch keys")
1089                var_key = "fedid:%s" % expid
1090                for k in self.synch_store.all_keys():
1091                    if len(k) > 45 and k[0:46] == var_key:
1092                        self.synch_store.revoke_key(k)
1093                revoked = True
[69692a9]1094
[cc8d8e9]1095        failed = [ t.getName() for t in threads if not t.rv ]
1096        succeeded = [tb for tb in allocated.keys() if tb not in failed]
[3132419]1097
[cc8d8e9]1098        # If one failed clean up, unless fail_soft is set
[32e7d93]1099        if failed:
[cc8d8e9]1100            if not fail_soft:
1101                thread_pool.clear()
1102                for tb in succeeded:
1103                    # Create and start a thread to stop the segment
1104                    thread_pool.wait_for_slot()
[0fa1729]1105                    uri = tbparams[tb]['uri']
[cc8d8e9]1106                    t  = self.pooled_thread(\
[32e7d93]1107                            target=self.terminate_segment(log=log,
[fd556d1]1108                                testbed=tb,
[32e7d93]1109                                cert_file=self.cert_file, 
1110                                cert_pwd=self.cert_pwd,
1111                                trusted_certs=self.trusted_certs,
1112                                caller=self.call_TerminateSegment),
1113                            args=(uri, tbparams[tb]['federant']['allocID']),
1114                            name=tb,
[cc8d8e9]1115                            pdata=thread_pool, trace_file=self.trace_file)
1116                    t.start()
[f52f5df]1117                # Wait until all finish (if any are being stopped)
1118                if succeeded:
1119                    thread_pool.wait_for_all_done()
[cc8d8e9]1120
1121                # release the allocations
1122                for tb in tbparams.keys():
[69692a9]1123                    self.release_access(tb, tbparams[tb]['allocID'],
1124                            tbparams[tb].get('uri', None))
[cc8d8e9]1125                # Remove the placeholder
1126                self.state_lock.acquire()
1127                self.state[eid]['experimentStatus'] = 'failed'
1128                if self.state_filename: self.write_state()
1129                self.state_lock.release()
1130
1131                log.error("Swap in failed on %s" % ",".join(failed))
1132                return
1133        else:
1134            log.info("[start_segment]: Experiment %s active" % eid)
1135
1136
1137        # Walk up tmpdir, deleting as we go
[69692a9]1138        if self.cleanup:
1139            log.debug("[start_experiment]: removing %s" % tmpdir)
1140            for path, dirs, files in os.walk(tmpdir, topdown=False):
1141                for f in files:
1142                    os.remove(os.path.join(path, f))
1143                for d in dirs:
1144                    os.rmdir(os.path.join(path, d))
1145            os.rmdir(tmpdir)
1146        else:
1147            log.debug("[start_experiment]: not removing %s" % tmpdir)
[cc8d8e9]1148
1149        # Insert the experiment into our state and update the disk copy
1150        self.state_lock.acquire()
1151        self.state[expid]['experimentStatus'] = 'active'
1152        self.state[eid] = self.state[expid]
1153        if self.state_filename: self.write_state()
1154        self.state_lock.release()
1155        return
1156
1157
[895a133]1158    def add_kit(self, e, kit):
1159        """
1160        Add a Software object created from the list of (install, location)
1161        tuples passed as kit  to the software attribute of an object e.  We
1162        do this enough to break out the code, but it's kind of a hack to
1163        avoid changing the old tuple rep.
1164        """
1165
1166        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1167
1168        if isinstance(e.software, list): e.software.extend(s)
1169        else: e.software = s
1170
1171
[a3ad8bd]1172    def create_experiment_state(self, fid, req, expid, expcert, 
1173            state='starting'):
[895a133]1174        """
1175        Create the initial entry in the experiment's state.  The expid and
1176        expcert are the experiment's fedid and certifacte that represents that
1177        ID, which are installed in the experiment state.  If the request
1178        includes a suggested local name that is used if possible.  If the local
1179        name is already taken by an experiment owned by this user that has
[a3ad8bd]1180        failed, it is overwritten.  Otherwise new letters are added until a
[895a133]1181        valid localname is found.  The generated local name is returned.
1182        """
1183
1184        if req.has_key('experimentID') and \
1185                req['experimentID'].has_key('localname'):
1186            overwrite = False
1187            eid = req['experimentID']['localname']
1188            # If there's an old failed experiment here with the same local name
1189            # and accessible by this user, we'll overwrite it, otherwise we'll
1190            # fall through and do the collision avoidance.
1191            old_expid = self.get_experiment_fedid(eid)
1192            if old_expid and self.check_experiment_access(fid, old_expid):
1193                self.state_lock.acquire()
1194                status = self.state[eid].get('experimentStatus', None)
1195                if status and status == 'failed':
1196                    # remove the old access attribute
1197                    self.auth.unset_attribute(fid, old_expid)
1198                    overwrite = True
1199                    del self.state[eid]
1200                    del self.state[old_expid]
1201                self.state_lock.release()
1202            self.state_lock.acquire()
1203            while (self.state.has_key(eid) and not overwrite):
1204                eid += random.choice(string.ascii_letters)
1205            # Initial state
1206            self.state[eid] = {
1207                    'experimentID' : \
1208                            [ { 'localname' : eid }, {'fedid': expid } ],
[a3ad8bd]1209                    'experimentStatus': state,
[895a133]1210                    'experimentAccess': { 'X509' : expcert },
1211                    'owner': fid,
1212                    'log' : [],
1213                }
1214            self.state[expid] = self.state[eid]
1215            if self.state_filename: self.write_state()
1216            self.state_lock.release()
1217        else:
1218            eid = self.exp_stem
1219            for i in range(0,5):
1220                eid += random.choice(string.ascii_letters)
1221            self.state_lock.acquire()
1222            while (self.state.has_key(eid)):
1223                eid = self.exp_stem
1224                for i in range(0,5):
1225                    eid += random.choice(string.ascii_letters)
1226            # Initial state
1227            self.state[eid] = {
1228                    'experimentID' : \
1229                            [ { 'localname' : eid }, {'fedid': expid } ],
[a3ad8bd]1230                    'experimentStatus': state,
[895a133]1231                    'experimentAccess': { 'X509' : expcert },
1232                    'owner': fid,
1233                    'log' : [],
1234                }
1235            self.state[expid] = self.state[eid]
1236            if self.state_filename: self.write_state()
1237            self.state_lock.release()
1238
1239        return eid
1240
1241
1242    def allocate_ips_to_topo(self, top):
1243        """
[69692a9]1244        Add an ip4_address attribute to all the hosts in the topology, based on
[895a133]1245        the shared substrates on which they sit.  An /etc/hosts file is also
[69692a9]1246        created and returned as a list of hostfiles entries.  We also return
1247        the allocator, because we may need to allocate IPs to portals
1248        (specifically DRAGON portals).
[895a133]1249        """
1250        subs = sorted(top.substrates, 
1251                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1252                reverse=True)
1253        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1254        ifs = { }
1255        hosts = [ ]
1256
1257        for idx, s in enumerate(subs):
[289ff7e]1258            net_size = len(s.interfaces)+2
1259
1260            a = ips.allocate(net_size)
[895a133]1261            if a :
1262                base, num = a
[289ff7e]1263                if num < net_size: 
[895a133]1264                    raise service_error(service_error.internal,
1265                            "Allocator returned wrong number of IPs??")
1266            else:
1267                raise service_error(service_error.req, 
1268                        "Cannot allocate IP addresses")
[062b991]1269            mask = ips.min_alloc
1270            while mask < net_size:
1271                mask *= 2
[289ff7e]1272
[062b991]1273            netmask = ((2**32-1) ^ (mask-1))
[895a133]1274
1275            base += 1
1276            for i in s.interfaces:
1277                i.attribute.append(
1278                        topdl.Attribute('ip4_address', 
1279                            "%s" % ip_addr(base)))
[289ff7e]1280                i.attribute.append(
1281                        topdl.Attribute('ip4_netmask', 
1282                            "%s" % ip_addr(int(netmask))))
1283
[895a133]1284                hname = i.element.name[0]
1285                if ifs.has_key(hname):
1286                    hosts.append("%s\t%s-%s %s-%d" % \
1287                            (ip_addr(base), hname, s.name, hname,
1288                                ifs[hname]))
1289                else:
1290                    ifs[hname] = 0
1291                    hosts.append("%s\t%s-%s %s-%d %s" % \
1292                            (ip_addr(base), hname, s.name, hname,
1293                                ifs[hname], hname))
1294
1295                ifs[hname] += 1
1296                base += 1
[69692a9]1297        return hosts, ips
[895a133]1298
[99eb8cf]1299    def get_access_to_testbeds(self, testbeds, access_user, 
[e02cd14]1300            export_project, master, allocated, tbparams, services):
[895a133]1301        """
1302        Request access to the various testbeds required for this instantiation
1303        (passed in as testbeds).  User, access_user, expoert_project and master
1304        are used to construct the correct requests.  Per-testbed parameters are
1305        returned in tbparams.
1306        """
1307        for tb in testbeds:
[99eb8cf]1308            self.get_access(tb, None, tbparams, master,
[e02cd14]1309                    export_project, access_user, services)
[895a133]1310            allocated[tb] = 1
1311
1312    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1313        """
[e02cd14]1314        Create the sub-topologies that are needed for experiment instantiation.
[895a133]1315        """
1316        for tb in testbeds:
1317            topo[tb] = top.clone()
1318            to_delete = [ ]
[e02cd14]1319            # XXX: copy in for loop to simplify
[895a133]1320            for e in topo[tb].elements:
1321                etb = e.get_attribute('testbed')
1322                if etb and etb != tb:
1323                    for i in e.interface:
1324                        for s in i.subs:
1325                            try:
1326                                s.interfaces.remove(i)
1327                            except ValueError:
1328                                raise service_error(service_error.internal,
1329                                        "Can't remove interface??")
1330                    to_delete.append(e)
1331            for e in to_delete:
1332                topo[tb].elements.remove(e)
1333            topo[tb].make_indices()
1334
[13e3dd2]1335    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
[2761484]1336            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
1337            expid=None):
[e02cd14]1338        """
1339        Return a new internet portal node and a dict with the connectionInfo to
1340        be attached.
1341        """
[d87778f]1342        dproject = tbparams[dt].get('project', 'project')
1343        ddomain = tbparams[dt].get('domain', ".example.com")
[13e3dd2]1344        mdomain = tbparams[master].get('domain', '.example.com')
[e02cd14]1345        mproject = tbparams[master].get('project', 'project')
[13e3dd2]1346        muser = tbparams[master].get('user', 'root')
1347        smbshare = tbparams[master].get('smbshare', 'USERS')
[e02cd14]1348
[13e3dd2]1349        if st == master or dt == master:
1350            active = ("%s" % (st == master))
1351        else:
[e02cd14]1352            active = ("%s" % (st > dt))
[13e3dd2]1353
[69692a9]1354        ifaces = [ ]
1355        for sub, attrs in iface_desc:
1356            inf = topdl.Interface(
[5b74b63]1357                    name="inf%03d" % len(ifaces),
[69692a9]1358                    substrate=sub,
1359                    attribute=[
1360                        topdl.Attribute(
1361                            attribute=n,
1362                            value = v)
1363                        for n, v in attrs
[13e3dd2]1364                        ]
[69692a9]1365                    )
1366            ifaces.append(inf)
[617592b]1367        if conn_type == "ssh":
[2761484]1368            try:
1369                aid = tbparams[st]['allocID']['fedid']
1370            except:
[109a32a]1371                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
1372                        % st)
[2761484]1373                aid = None
[617592b]1374            info = {
1375                    "type" : conn_type, 
1376                    "portal": myname,
1377                    'fedAttr': [ 
1378                            { 'attribute': 'masterdomain', 'value': mdomain},
1379                            { 'attribute': 'masterexperiment', 'value': 
1380                                "%s/%s" % (mproject, eid)},
1381                            { 'attribute': 'active', 'value': active},
1382                            # Move to SMB service description
1383                            { 'attribute': 'masteruser', 'value': muser},
1384                            { 'attribute': 'smbshare', 'value': smbshare},
1385                        ],
[2761484]1386                    'parameter': [
1387                        {
1388                            'name': 'peer',
1389                            'key': 'fedid:%s/%s' % (expid, myname),
1390                            'store': self.store_url,
1391                            'type': 'output',
1392                        },
[593e901]1393                        {
1394                            'name': 'ssh_port',
1395                            'key': 'fedid:%s/%s-port' % (expid, myname),
1396                            'store': self.store_url,
1397                            'type': 'output',
1398                        },
[2761484]1399                        {
1400                            'name': 'peer',
1401                            'key': 'fedid:%s/%s' % (expid, desthost),
1402                            'store': self.store_url,
1403                            'type': 'input',
1404                        },
[593e901]1405                        {
1406                            'name': 'ssh_port',
1407                            'key': 'fedid:%s/%s-port' % (expid, desthost),
1408                            'store': self.store_url,
1409                            'type': 'input',
1410                        },
[2761484]1411                        ]
[617592b]1412                    }
[2761484]1413            # Give this allocation the rights to access the key of the
1414            # peers
1415            if aid:
1416                for h in (myname, desthost):
1417                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
[593e901]1418                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
1419                            (expid, h))
[2761484]1420            else:
1421                self.log.error("No aid for %s in new_portal_node" % st)
[617592b]1422        else:
1423            info = None
[5b74b63]1424
[e02cd14]1425        return (topdl.Computer(
[13e3dd2]1426                name=myname,
1427                attribute=[ 
1428                    topdl.Attribute(attribute=n,value=v)
1429                        for n, v in (\
1430                            ('portal', 'true'),
1431                            ('portal_type', portal_type), 
[641bb66]1432                        )
[13e3dd2]1433                    ],
1434                interface=ifaces,
[e02cd14]1435                ), info)
[13e3dd2]1436
[2761484]1437    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
[13e3dd2]1438        ddomain = tbparams[dt].get('domain', ".example.com")
1439        dproject = tbparams[dt].get('project', 'project')
1440        tsubstrate = \
1441                topdl.Substrate(name='%s-%s' % (st, dt),
1442                        attribute= [
1443                            topdl.Attribute(
1444                                attribute='portal',
1445                                value='true')
1446                            ]
1447                        )
1448        segment_element = topdl.Segment(
1449                id= tbparams[dt]['allocID'],
1450                type='emulab',
1451                uri = self.tbmap.get(dt, None),
1452                interface=[ 
1453                    topdl.Interface(
1454                        substrate=tsubstrate.name),
1455                    ],
1456                attribute = [
1457                    topdl.Attribute(attribute=n, value=v)
1458                        for n, v in (\
1459                            ('domain', ddomain),
1460                            ('experiment', "%s/%s" % \
1461                                    (dproject, eid)),)
1462                    ],
1463                )
1464
1465        return (tsubstrate, segment_element)
1466
[2761484]1467    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
[69692a9]1468        if sub.capacity is None:
1469            raise service_error(service_error.internal,
1470                    "Cannot DRAGON split substrate w/o capacity")
1471        segs = [ ]
1472        substr = topdl.Substrate(name="dragon%d" % idx, 
1473                capacity=sub.capacity.clone(),
1474                attribute=[ topdl.Attribute(attribute=n, value=v)
1475                    for n, v, in (\
1476                            ('vlan', 'unassigned%d' % idx),)])
[2761484]1477        name = "dragon%d" % idx
[109a32a]1478        store_key = 'fedid:%s/vlan%d' % (expid, idx)
[69692a9]1479        for tb in tbs.keys():
1480            seg = topdl.Segment(
1481                    id = tbparams[tb]['allocID'],
1482                    type='emulab',
1483                    uri = self.tbmap.get(tb, None),
1484                    interface=[ 
1485                        topdl.Interface(
1486                            substrate=substr.name),
1487                        ],
1488                    attribute=[ topdl.Attribute(
[ecf679e]1489                        attribute='dragon_endpoint', 
1490                        value=tbparams[tb]['dragon']),
[69692a9]1491                        ]
1492                    )
1493            if tbparams[tb].has_key('vlans'):
1494                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1495            segs.append(seg)
1496
[109a32a]1497            # Give this allocation the rights to access the key of the
1498            # vlan_id
1499            try:
1500                aid = tbparams[tb]['allocID']['fedid']
1501                self.auth.set_attribute(aid, store_key)
1502            except:
1503                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
1504                        % tb)
[2761484]1505
1506        connInfo[name] = [ { 
[109a32a]1507            'type': 'transit',
[2761484]1508            'parameter': [ { 
1509                'name': 'vlan_id',
[109a32a]1510                'key': store_key,
[2761484]1511                'store': self.store_url,
1512                'type': 'output'
1513                } ]
1514            } ]
1515
1516        topo[name] = \
[69692a9]1517                topdl.Topology(substrates=[substr], elements=segs,
1518                        attribute=[
1519                            topdl.Attribute(attribute="transit", value='true'),
1520                            topdl.Attribute(attribute="dynamic", value='true'),
[109a32a]1521                            topdl.Attribute(attribute="testbed", 
1522                                value='dragon'),
1523                            topdl.Attribute(attribute="store_keys", 
1524                                value=store_key),
[69692a9]1525                            ]
1526                        )
1527
[5b74b63]1528    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
[109a32a]1529            connInfo, expid=None):
[69692a9]1530        """
1531        Add attribiutes to the various elements indicating that they are to be
[5b74b63]1532        dragon connected and create a dragon segment in topo to be
[69692a9]1533        instantiated.
1534        """
1535
1536        def get_substrate_from_topo(name, t):
1537            for s in t.substrates:
1538                if s.name == name: return s
1539            else: return None
1540
[109a32a]1541
[617592b]1542        mdomain = tbparams[master].get('domain', '.example.com')
1543        mproject = tbparams[master].get('project', 'project')
[5b74b63]1544        # dn is the number of previously created dragon nets.  This routine
1545        # creates a net numbered by dn
[69692a9]1546        dn = len([x for x in topo.keys() if x.startswith('dragon')])
[5b74b63]1547        # Count the number of interfaces on this substrate in each testbed from
1548        # the global topology
[69692a9]1549        count = { }
[617592b]1550        node = { }
[5b74b63]1551        for e in [ i.element for i in sub.interfaces ]:
[69692a9]1552            tb = e.get_attribute('testbed')
1553            count[tb] = count.get(tb, 0) + 1
[617592b]1554            node[tb] = i.get_attribute('ip4_address')
1555
[69692a9]1556
[5b74b63]1557        # Set the attributes in the copies that will allow setup of dragon
1558        # connections.
[69692a9]1559        for tb in tbs.keys():
1560            s = get_substrate_from_topo(sub.name, topo[tb])
1561            if s:
[5b74b63]1562                if not connInfo.has_key(tb):
1563                    connInfo[tb] = [ ]
[617592b]1564
[2761484]1565                try:
1566                    aid = tbparams[tb]['allocID']['fedid']
1567                except:
[109a32a]1568                    self.log.debug("[creat_dragon_substrate] " + 
1569                            "Can't get alloc id for %s?" %tb)
[2761484]1570                    aid = None
1571
[617592b]1572                # This may need another look, but only a service gateway will
1573                # look at the active parameter, and these are only inserted to
1574                # connect to the master.
[238db1e]1575                active = "%s" % ( tb == master)
[5b74b63]1576                info = {
1577                        'type': 'transit',
1578                        'member': [ {
1579                            'element': i.element.name[0], 
1580                            'interface': i.name
[617592b]1581                            } for i in s.interfaces \
1582                                    if isinstance(i.element, topdl.Computer) ],
1583                        'fedAttr': [ 
1584                            { 'attribute': 'masterdomain', 'value': mdomain},
1585                            { 'attribute': 'masterexperiment', 'value': 
1586                                "%s/%s" % (mproject, eid)},
1587                            { 'attribute': 'active', 'value': active},
1588                            ],
[2761484]1589                        'parameter': [ {
1590                            'name': 'vlan_id',
1591                            'key': 'fedid:%s/vlan%d' % (expid, dn),
1592                            'store': self.store_url,
1593                            'type': 'input',
1594                            } ]
[5b74b63]1595                        }
[109a32a]1596                if tbs.has_key(tb):
1597                    info['peer'] = tbs[tb]
[5b74b63]1598                connInfo[tb].append(info)
[2761484]1599
1600                # Give this allocation the rights to access the key of the
1601                # vlan_id
1602                if aid:
1603                    self.auth.set_attribute(aid, 
1604                            'fedid:%s/vlan%d' % (expid, dn))
[69692a9]1605            else:
1606                raise service_error(service_error.internal,
1607                        "No substrate %s in testbed %s" % (sub.name, tb))
1608
[109a32a]1609        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
[69692a9]1610
1611    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
[2761484]1612            segment_substrate, portals, connInfo, expid):
[69692a9]1613        # More than one testbed is on this substrate.  Insert
1614        # some portals into the subtopologies.  st == source testbed,
1615        # dt == destination testbed.
1616        for st in tbs.keys():
1617            if not segment_substrate.has_key(st):
1618                segment_substrate[st] = { }
1619            if not portals.has_key(st): 
1620                portals[st] = { }
[e02cd14]1621            if not connInfo.has_key(st):
1622                connInfo[st] = [ ]
[69692a9]1623            for dt in [ t for t in tbs.keys() if t != st]:
1624                sproject = tbparams[st].get('project', 'project')
1625                dproject = tbparams[dt].get('project', 'project')
1626                mproject = tbparams[master].get('project', 'project')
1627                sdomain = tbparams[st].get('domain', ".example.com")
1628                ddomain = tbparams[dt].get('domain', ".example.com")
1629                mdomain = tbparams[master].get('domain', '.example.com')
1630                muser = tbparams[master].get('user', 'root')
1631                smbshare = tbparams[master].get('smbshare', 'USERS')
1632                aid = tbparams[dt]['allocID']['fedid']
1633                if st == master or dt == master:
1634                    active = ("%s" % (st == master))
1635                else:
1636                    active = ("%s" %(st > dt))
1637                if not segment_substrate[st].has_key(dt):
1638                    # Put a substrate and a segment for the connected
1639                    # testbed in there.
1640                    tsubstrate, segment_element = \
[2761484]1641                            self.new_portal_substrate(st, dt, eid, tbparams,
1642                                    expid)
[69692a9]1643                    segment_substrate[st][dt] = tsubstrate
1644                    topo[st].substrates.append(tsubstrate)
1645                    topo[st].elements.append(segment_element)
1646
1647                new_portal = False
1648                if portals[st].has_key(dt):
1649                    # There's a portal set up to go to this destination.
[e02cd14]1650                    # See if there's room to multiplex this connection on
[69692a9]1651                    # it.  If so, add an interface to the portal; if not,
1652                    # set up to add a portal below.
1653                    # [This little festival of braces is just a pop of the
1654                    # last element in the list of portals between st and
1655                    # dt.]
1656                    portal = portals[st][dt][-1]
1657                    mux = len([ i for i in portal.interface \
1658                            if not i.get_attribute('portal')])
1659                    if mux == self.muxmax:
1660                        new_portal = True
1661                        portal_type = "experiment"
1662                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
[2761484]1663                        desthost = "%stunnel%d" % (st.lower(), 
1664                                len(portals[st][dt]))
[69692a9]1665                    else:
1666                        new_i = topdl.Interface(
[6e44258]1667                                substrate=sub.name,
[69692a9]1668                                attribute=[ 
1669                                    topdl.Attribute(
1670                                        attribute='ip4_address', 
1671                                        value=tbs[dt]
1672                                    )
1673                                ])
1674                        portal.interface.append(new_i)
1675                else:
1676                    # First connection to this testbed, make an empty list
1677                    # and set up to add the new portal below
1678                    new_portal = True
1679                    portals[st][dt] = [ ]
1680                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
[2761484]1681                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
[69692a9]1682
1683                    if dt == master or st == master: portal_type = "both"
1684                    else: portal_type = "experiment"
1685
1686                if new_portal:
1687                    infs = (
1688                            (segment_substrate[st][dt].name, 
1689                                (('portal', 'true'),)),
1690                            (sub.name, 
1691                                (('ip4_address', tbs[dt]),))
1692                        )
[e02cd14]1693                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
[69692a9]1694                            master, eid, myname, desthost, portal_type,
[2761484]1695                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
[9b3627e]1696                    #if self.fedkit:
1697                        #self.add_kit(portal, self.fedkit)
1698                    #if self.gatewaykit:
1699                        #self.add_kit(portal, self.gatewaykit)
[69692a9]1700
1701                    topo[st].elements.append(portal)
1702                    portals[st][dt].append(portal)
[e02cd14]1703                    connInfo[st].append(info)
[69692a9]1704
[2761484]1705    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo, expid):
[69692a9]1706        # Add to the master testbed
1707        tsubstrate, segment_element = \
[2761484]1708                self.new_portal_substrate(st, dt, eid, tbparams, expid)
[69692a9]1709        myname = "%stunnel" % dt
1710        desthost = "%stunnel" % st
1711
[e02cd14]1712        portal, info = self.new_portal_node(st, dt, tbparams, master,
[69692a9]1713                eid, myname, desthost, "control", 
[2761484]1714                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
1715                conn_attrs=[], expid=expid)
[69692a9]1716
1717        topo[st].substrates.append(tsubstrate)
1718        topo[st].elements.append(segment_element)
1719        topo[st].elements.append(portal)
[e02cd14]1720        if not connInfo.has_key(st):
1721            connInfo[st] = [ ]
1722        connInfo[st].append(info)
[69692a9]1723
[6e44258]1724    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
[2761484]1725            substrate, tbparams, expid):
[69692a9]1726        # Add to the master testbed
1727        myname = "%stunnel" % dt
1728        desthost = "%s" % ip_addr(dip)
1729
[5b74b63]1730        portal, info = self.new_portal_node(st, dt, tbparams, master,
[69692a9]1731                eid, myname, desthost, "control", 
1732                ((substrate.name,(
1733                    ('portal','true'),
[5b74b63]1734                    ('ip4_address', "%s" % ip_addr(myip)),)),),
[2761484]1735                conn_type="transit", conn_attrs=[], expid=expid)
[69692a9]1736
[617592b]1737        return portal
[69692a9]1738
[e02cd14]1739    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
[2761484]1740            connInfo, expid):
[895a133]1741        """
1742        For each substrate in the main topology, find those that
1743        have nodes on more than one testbed.  Insert portal nodes
1744        into the copies of those substrates on the sub topologies.
1745        """
[13e3dd2]1746        segment_substrate = { }
1747        portals = { }
[895a133]1748        for s in top.substrates:
1749            # tbs will contain an ip address on this subsrate that is in
1750            # each testbed.
1751            tbs = { }
1752            for i in s.interfaces:
1753                e = i.element
1754                tb = e.get_attribute('testbed')
1755                if tb and not tbs.has_key(tb):
1756                    for i in e.interface:
1757                        if s in i.subs:
1758                            tbs[tb]= i.get_attribute('ip4_address')
1759            if len(tbs) < 2:
1760                continue
1761
[69692a9]1762            # DRAGON will not create multi-site vlans yet
1763            if len(tbs) == 2 and \
1764                    all([tbparams[x].has_key('dragon') for x in tbs]):
1765                self.create_dragon_substrate(s, topo, tbs, tbparams, 
[2761484]1766                        master, eid, connInfo, expid)
[69692a9]1767            else:
1768                self.insert_internet_portals(s, topo, tbs, tbparams, master,
[2761484]1769                        eid, segment_substrate, portals, connInfo, expid)
[13e3dd2]1770
1771        # Make sure that all the slaves have a control portal back to the
1772        # master.
1773        for tb in [ t for t in tbparams.keys() if t != master ]:
1774            if len([e for e in topo[tb].elements \
1775                    if isinstance(e, topdl.Computer) and \
1776                    e.get_attribute('portal') and \
1777                    e.get_attribute('portal_type') == 'both']) == 0:
1778
[69692a9]1779                if tbparams[master].has_key('dragon') \
1780                        and tbparams[tb].has_key('dragon'):
1781
1782                    idx = len([x for x in topo.keys() \
1783                            if x.startswith('dragon')])
1784                    dip, leng = ip_allocator.allocate(4)
1785                    dip += 1
[6e44258]1786                    mip = dip+1
[69692a9]1787                    csub = topdl.Substrate(
1788                            name="dragon-control-%s" % tb,
1789                            capacity=topdl.Capacity(100000.0, 'max'),
1790                            attribute=[
1791                                topdl.Attribute(
1792                                    attribute='portal',
1793                                    value='true'
1794                                    )
1795                                ]
1796                            )
1797                    seg = topdl.Segment(
1798                            id= tbparams[master]['allocID'],
1799                            type='emulab',
1800                            uri = self.tbmap.get(master, None),
1801                            interface=[ 
1802                                topdl.Interface(
1803                                    substrate=csub.name),
1804                                ],
1805                            attribute = [
1806                                topdl.Attribute(attribute=n, value=v)
1807                                    for n, v in (\
1808                                        ('domain', 
1809                                            tbparams[master].get('domain',
1810                                                ".example.com")),
1811                                        ('experiment', "%s/%s" % \
1812                                                (tbparams[master].get(
1813                                                    'project', 
1814                                                    'project'), 
1815                                                    eid)),)
1816                                ],
1817                            )
[617592b]1818                    portal = self.new_dragon_portal(tb, master,
[2761484]1819                            master, eid, dip, mip, idx, csub, tbparams, expid)
[69692a9]1820                    topo[tb].substrates.append(csub)
[5b74b63]1821                    topo[tb].elements.append(portal)
[69692a9]1822                    topo[tb].elements.append(seg)
1823
1824                    mcsub = csub.clone()
1825                    seg = topdl.Segment(
1826                            id= tbparams[tb]['allocID'],
1827                            type='emulab',
1828                            uri = self.tbmap.get(tb, None),
1829                            interface=[ 
1830                                topdl.Interface(
1831                                    substrate=csub.name),
1832                                ],
1833                            attribute = [
1834                                topdl.Attribute(attribute=n, value=v)
1835                                    for n, v in (\
1836                                        ('domain', 
1837                                            tbparams[tb].get('domain',
1838                                                ".example.com")),
1839                                        ('experiment', "%s/%s" % \
1840                                                (tbparams[tb].get('project', 
1841                                                    'project'), 
1842                                                    eid)),)
1843                                ],
1844                            )
[617592b]1845                    portal = self.new_dragon_portal(master, tb, master,
[2761484]1846                            eid, mip, dip, idx, mcsub, tbparams, expid)
[69692a9]1847                    topo[master].substrates.append(mcsub)
[5b74b63]1848                    topo[master].elements.append(portal)
[69692a9]1849                    topo[master].elements.append(seg)
[617592b]1850                    for t in (master, tb):
1851                        topo[t].incorporate_elements()
[69692a9]1852
1853                    self.create_dragon_substrate(csub, topo, 
[109a32a]1854                            {tb: ip_addr(mip), master: ip_addr(dip)}, 
1855                            tbparams, master, eid, connInfo,
1856                            expid)
[69692a9]1857                else:
1858                    self.add_control_portal(master, tb, master, eid, topo, 
[2761484]1859                            tbparams, connInfo, expid)
[69692a9]1860                    self.add_control_portal(tb, master, master, eid, topo, 
[2761484]1861                            tbparams, connInfo, expid)
[13e3dd2]1862
1863        # Connect the portal nodes into the topologies and clear out
[895a133]1864        # substrates that are not in the topologies
1865        for tb in tbparams.keys():
1866            topo[tb].incorporate_elements()
1867            topo[tb].substrates = \
1868                    [s for s in topo[tb].substrates \
1869                        if len(s.interfaces) >0]
1870
1871    def wrangle_software(self, expid, top, topo, tbparams):
1872        """
1873        Copy software out to the repository directory, allocate permissions and
1874        rewrite the segment topologies to look for the software in local
1875        places.
1876        """
1877
1878        # Copy the rpms and tarfiles to a distribution directory from
1879        # which the federants can retrieve them
1880        linkpath = "%s/software" %  expid
1881        softdir ="%s/%s" % ( self.repodir, linkpath)
1882        softmap = { }
1883        # These are in a list of tuples format (each kit).  This comprehension
1884        # unwraps them into a single list of tuples that initilaizes the set of
1885        # tuples.
1886        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1887                for p, t in l ])
1888        pkgs.update([x.location for e in top.elements \
1889                for x in e.software])
1890        try:
1891            os.makedirs(softdir)
1892        except IOError, e:
1893            raise service_error(
1894                    "Cannot create software directory: %s" % e)
1895        # The actual copying.  Everything's converted into a url for copying.
1896        for pkg in pkgs:
1897            loc = pkg
1898
1899            scheme, host, path = urlparse(loc)[0:3]
1900            dest = os.path.basename(path)
1901            if not scheme:
1902                if not loc.startswith('/'):
1903                    loc = "/%s" % loc
1904                loc = "file://%s" %loc
1905            try:
1906                u = urlopen(loc)
1907            except Exception, e:
1908                raise service_error(service_error.req, 
1909                        "Cannot open %s: %s" % (loc, e))
1910            try:
1911                f = open("%s/%s" % (softdir, dest) , "w")
1912                self.log.debug("Writing %s/%s" % (softdir,dest) )
1913                data = u.read(4096)
1914                while data:
1915                    f.write(data)
1916                    data = u.read(4096)
1917                f.close()
1918                u.close()
1919            except Exception, e:
1920                raise service_error(service_error.internal,
1921                        "Could not copy %s: %s" % (loc, e))
1922            path = re.sub("/tmp", "", linkpath)
1923            # XXX
1924            softmap[pkg] = \
[7183b48]1925                    "%s/%s/%s" %\
1926                    ( self.repo_url, path, dest)
[895a133]1927
1928            # Allow the individual segments to access the software.
1929            for tb in tbparams.keys():
1930                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1931                        "/%s/%s" % ( path, dest))
1932
1933        # Convert the software locations in the segments into the local
1934        # copies on this host
1935        for soft in [ s for tb in topo.values() \
1936                for e in tb.elements \
1937                    if getattr(e, 'software', False) \
1938                        for s in e.software ]:
1939            if softmap.has_key(soft.location):
1940                soft.location = softmap[soft.location]
1941
1942
[a3ad8bd]1943    def new_experiment(self, req, fid):
1944        """
1945        The external interface to empty initial experiment creation called from
1946        the dispatcher.
1947
1948        Creates a working directory, splits the incoming description using the
1949        splitter script and parses out the avrious subsections using the
1950        lcasses above.  Once each sub-experiment is created, use pooled threads
1951        to instantiate them and start it all up.
1952        """
1953        if not self.auth.check_attribute(fid, 'new'):
1954            raise service_error(service_error.access, "New access denied")
1955
1956        try:
1957            tmpdir = tempfile.mkdtemp(prefix="split-")
1958        except IOError:
1959            raise service_error(service_error.internal, "Cannot create tmp dir")
1960
1961        try:
1962            access_user = self.accessdb[fid]
1963        except KeyError:
1964            raise service_error(service_error.internal,
1965                    "Access map and authorizer out of sync in " + \
[7183b48]1966                            "new_experiment for fedid %s"  % fid)
[a3ad8bd]1967
1968        pid = "dummy"
1969        gid = "dummy"
1970
1971        req = req.get('NewRequestBody', None)
1972        if not req:
1973            raise service_error(service_error.req,
1974                    "Bad request format (no NewRequestBody)")
1975
1976        # Generate an ID for the experiment (slice) and a certificate that the
1977        # allocator can use to prove they own it.  We'll ship it back through
1978        # the encrypted connection.
1979        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1980
1981        #now we're done with the tmpdir, and it should be empty
1982        if self.cleanup:
1983            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1984            os.rmdir(tmpdir)
1985        else:
1986            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1987
1988        eid = self.create_experiment_state(fid, req, expid, expcert, 
1989                state='empty')
1990
1991        # Let users touch the state
1992        self.auth.set_attribute(fid, expid)
1993        self.auth.set_attribute(expid, expid)
1994        # Override fedids can manipulate state as well
1995        for o in self.overrides:
1996            self.auth.set_attribute(o, expid)
1997
1998        rv = {
1999                'experimentID': [
2000                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2001                ],
2002                'experimentStatus': 'empty',
2003                'experimentAccess': { 'X509' : expcert }
2004            }
2005
2006        return rv
2007
2008
[e19b75c]2009    def create_experiment(self, req, fid):
[db6b092]2010        """
2011        The external interface to experiment creation called from the
2012        dispatcher.
2013
2014        Creates a working directory, splits the incoming description using the
2015        splitter script and parses out the avrious subsections using the
2016        lcasses above.  Once each sub-experiment is created, use pooled threads
2017        to instantiate them and start it all up.
2018        """
[7183b48]2019
2020        req = req.get('CreateRequestBody', None)
2021        if not req:
2022            raise service_error(service_error.req,
2023                    "Bad request format (no CreateRequestBody)")
2024
2025        # Get the experiment access
2026        exp = req.get('experimentID', None)
2027        if exp:
2028            if exp.has_key('fedid'):
2029                key = exp['fedid']
2030                expid = key
2031                eid = None
2032            elif exp.has_key('localname'):
2033                key = exp['localname']
2034                eid = key
2035                expid = None
2036            else:
2037                raise service_error(service_error.req, "Unknown lookup type")
2038        else:
2039            raise service_error(service_error.req, "No request?")
2040
2041        self.check_experiment_access(fid, key)
[db6b092]2042
2043        try:
2044            tmpdir = tempfile.mkdtemp(prefix="split-")
[895a133]2045            os.mkdir(tmpdir+"/keys")
[db6b092]2046        except IOError:
2047            raise service_error(service_error.internal, "Cannot create tmp dir")
2048
2049        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2050        gw_secretkey_base = "fed.%s" % self.ssh_type
2051        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2052        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2053        tclfile = tmpdir + "/experiment.tcl"
2054        tbparams = { }
2055        try:
2056            access_user = self.accessdb[fid]
2057        except KeyError:
2058            raise service_error(service_error.internal,
2059                    "Access map and authorizer out of sync in " + \
2060                            "create_experiment for fedid %s"  % fid)
2061
2062        pid = "dummy"
2063        gid = "dummy"
2064
2065        # The tcl parser needs to read a file so put the content into that file
2066        descr=req.get('experimentdescription', None)
2067        if descr:
2068            file_content=descr.get('ns2description', None)
2069            if file_content:
2070                try:
2071                    f = open(tclfile, 'w')
2072                    f.write(file_content)
2073                    f.close()
2074                except IOError:
2075                    raise service_error(service_error.internal,
2076                            "Cannot write temp experiment description")
2077            else:
2078                raise service_error(service_error.req, 
2079                        "Only ns2descriptions supported")
2080        else:
2081            raise service_error(service_error.req, "No experiment description")
2082
[7183b48]2083        self.state_lock.acquire()
2084        if self.state.has_key(key):
[4afcfc4]2085            self.state[key]['experimentStatus'] = "starting"
[7183b48]2086            for e in self.state[key].get('experimentID',[]):
2087                if not expid and e.has_key('fedid'):
2088                    expid = e['fedid']
2089                elif not eid and e.has_key('localname'):
2090                    eid = e['localname']
2091        self.state_lock.release()
2092
2093        if not (eid and expid):
2094            raise service_error(service_error.internal, 
2095                    "Cannot find local experiment info!?")
[db6b092]2096
2097        try: 
2098            # This catches exceptions to clear the placeholder if necessary
2099            try:
2100                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2101            except ValueError:
2102                raise service_error(service_error.server_config, 
2103                        "Bad key type (%s)" % self.ssh_type)
2104
2105            master = req.get('master', None)
2106            if not master:
2107                raise service_error(service_error.req,
2108                        "No master testbed label")
2109            export_project = req.get('exportProject', None)
2110            if not export_project:
2111                raise service_error(service_error.req, "No export project")
[895a133]2112           
2113            # Translate to topdl
[db6b092]2114            if self.splitter_url:
[895a133]2115                # XXX: need remote topdl translator
[db6b092]2116                self.log.debug("Calling remote splitter at %s" % \
2117                        self.splitter_url)
2118                split_data = self.remote_splitter(self.splitter_url,
2119                        file_content, master)
2120            else:
2121                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2122                    str(self.muxmax), '-m', master]
2123
2124                tclcmd.extend([pid, gid, eid, tclfile])
2125
2126                self.log.debug("running local splitter %s", " ".join(tclcmd))
2127                # This is just fantastic.  As a side effect the parser copies
2128                # tb_compat.tcl into the current directory, so that directory
2129                # must be writable by the fedd user.  Doing this in the
2130                # temporary subdir ensures this is the case.
[70caa72]2131                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
[db6b092]2132                        cwd=tmpdir)
[866c983]2133                split_data = tclparser.stdout
2134
[cc8d8e9]2135            top = topdl.topology_from_xml(file=split_data, top="experiment")
[895a133]2136
[69692a9]2137            hosts, ip_allocator = self.allocate_ips_to_topo(top)
[895a133]2138             # Find the testbeds to look up
2139            testbeds = set([ a.value for e in top.elements \
2140                    for a in e.attribute \
2141                    if a.attribute == 'testbed'] )
2142
2143            allocated = { }         # Testbeds we can access
2144            topo ={ }               # Sub topologies
[e02cd14]2145            connInfo = { }          # Connection information
2146            services = [ ]
[99eb8cf]2147            self.get_access_to_testbeds(testbeds, access_user, 
[e02cd14]2148                    export_project, master, allocated, tbparams, services)
[895a133]2149            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2150
2151            # Copy configuration files into the remote file store
[6c57fe9]2152            # The config urlpath
2153            configpath = "/%s/config" % expid
2154            # The config file system location
2155            configdir ="%s%s" % ( self.repodir, configpath)
2156            try:
2157                os.makedirs(configdir)
2158            except IOError, e:
2159                raise service_error(
2160                        "Cannot create config directory: %s" % e)
2161            try:
2162                f = open("%s/hosts" % configdir, "w")
2163                f.write('\n'.join(hosts))
2164                f.close()
2165            except IOError, e:
2166                raise service_error(service_error.internal, 
2167                        "Cannot write hosts file: %s" % e)
2168            try:
[40dd8c1]2169                copy_file("%s" % gw_pubkey, "%s/%s" % \
[6c57fe9]2170                        (configdir, gw_pubkey_base))
[40dd8c1]2171                copy_file("%s" % gw_secretkey, "%s/%s" % \
[6c57fe9]2172                        (configdir, gw_secretkey_base))
2173            except IOError, e:
2174                raise service_error(service_error.internal, 
2175                        "Cannot copy keyfiles: %s" % e)
[cc8d8e9]2176
[6c57fe9]2177            # Allow the individual testbeds to access the configuration files.
2178            for tb in tbparams.keys():
2179                asignee = tbparams[tb]['allocID']['fedid']
2180                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2181                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
[cc8d8e9]2182
[e02cd14]2183            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
[2761484]2184                    connInfo, expid)
[69692a9]2185            # Now get access to the dynamic testbeds
2186            for k, t in topo.items():
2187                if not t.get_attribute('dynamic'):
2188                    continue
2189                tb = t.get_attribute('testbed')
2190                if tb: 
[617592b]2191                    self.get_access(tb, None, tbparams, master, 
[e02cd14]2192                            export_project, access_user, services)
[69692a9]2193                    tbparams[k] = tbparams[tb]
2194                    del tbparams[tb]
2195                    allocated[k] = 1
[109a32a]2196                    store_keys = t.get_attribute('store_keys')
2197                    # Give the testbed access to keys it exports or imports
2198                    if store_keys:
2199                        for sk in store_keys.split(" "):
2200                            self.auth.set_attribute(\
2201                                    tbparams[k]['allocID']['fedid'], sk)
[69692a9]2202                else:
2203                    raise service_error(service_error.internal, 
2204                            "Dynamic allocation from no testbed!?")
2205
[895a133]2206            self.wrangle_software(expid, top, topo, tbparams)
[cc8d8e9]2207
2208            vtopo = topdl.topology_to_vtopo(top)
2209            vis = self.genviz(vtopo)
[db6b092]2210
[866c983]2211            # save federant information
2212            for k in allocated.keys():
[ecf679e]2213                tbparams[k]['federant'] = {
2214                        'name': [ { 'localname' : eid} ],
2215                        'allocID' : tbparams[k]['allocID'],
2216                        'master' : k == master,
2217                        'uri': tbparams[k]['uri'],
[866c983]2218                    }
[69692a9]2219                if tbparams[k].has_key('emulab'):
2220                        tbparams[k]['federant']['emulab'] = \
2221                                tbparams[k]['emulab']
[866c983]2222
[db6b092]2223            self.state_lock.acquire()
2224            self.state[eid]['vtopo'] = vtopo
2225            self.state[eid]['vis'] = vis
2226            self.state[expid]['federant'] = \
2227                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2228                        if tbparams[tb].has_key('federant') ]
[cc8d8e9]2229            if self.state_filename: 
2230                self.write_state()
[db6b092]2231            self.state_lock.release()
[866c983]2232        except service_error, e:
2233            # If something goes wrong in the parse (usually an access error)
2234            # clear the placeholder state.  From here on out the code delays
[db6b092]2235            # exceptions.  Failing at this point returns a fault to the remote
2236            # caller.
[cc8d8e9]2237
[866c983]2238            self.state_lock.acquire()
2239            del self.state[eid]
[bd3e314]2240            del self.state[expid]
2241            if self.state_filename: self.write_state()
[866c983]2242            self.state_lock.release()
2243            raise e
2244
2245
[db6b092]2246        # Start the background swapper and return the starting state.  From
2247        # here on out, the state will stick around a while.
[866c983]2248
[db6b092]2249        # Let users touch the state
[bd3e314]2250        self.auth.set_attribute(fid, expid)
2251        self.auth.set_attribute(expid, expid)
[db6b092]2252        # Override fedids can manipulate state as well
2253        for o in self.overrides:
2254            self.auth.set_attribute(o, expid)
2255
2256        # Create a logger that logs to the experiment's state object as well as
2257        # to the main log file.
2258        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
[f07fa49]2259        alloc_collector = self.list_log(self.state[eid]['log'])
2260        h = logging.StreamHandler(alloc_collector)
[db6b092]2261        # XXX: there should be a global one of these rather than repeating the
2262        # code.
2263        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2264                    '%d %b %y %H:%M:%S'))
2265        alloc_log.addHandler(h)
2266       
[6c57fe9]2267        attrs = [ 
2268                {
2269                    'attribute': 'ssh_pubkey', 
2270                    'value': '%s/%s/config/%s' % \
[7183b48]2271                            (self.repo_url, expid, gw_pubkey_base)
[6c57fe9]2272                },
2273                {
2274                    'attribute': 'ssh_secretkey', 
2275                    'value': '%s/%s/config/%s' % \
[7183b48]2276                            (self.repo_url, expid, gw_secretkey_base)
[6c57fe9]2277                },
2278                {
2279                    'attribute': 'hosts', 
2280                    'value': '%s/%s/config/hosts' % \
[7183b48]2281                            (self.repo_url, expid)
[6c57fe9]2282                },
[ecca6eb]2283                {
2284                    'attribute': 'experiment_name',
2285                    'value': eid,
2286                },
[6c57fe9]2287            ]
2288
[617592b]2289        # transit and disconnected testbeds may not have a connInfo entry.
2290        # Fill in the blanks.
2291        for t in allocated.keys():
2292            if not connInfo.has_key(t):
2293                connInfo[t] = { }
2294
[db6b092]2295        # Start a thread to do the resource allocation
[e19b75c]2296        t  = Thread(target=self.allocate_resources,
[7183b48]2297                args=(allocated, master, eid, expid, tbparams, 
[e02cd14]2298                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
2299                    services),
[db6b092]2300                name=eid)
2301        t.start()
2302
2303        rv = {
2304                'experimentID': [
2305                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2306                ],
2307                'experimentStatus': 'starting',
2308            }
2309
2310        return rv
[9479343]2311   
2312    def get_experiment_fedid(self, key):
2313        """
[db6b092]2314        find the fedid associated with the localname key in the state database.
[9479343]2315        """
2316
[db6b092]2317        rv = None
2318        self.state_lock.acquire()
2319        if self.state.has_key(key):
2320            if isinstance(self.state[key], dict):
2321                try:
2322                    kl = [ f['fedid'] for f in \
2323                            self.state[key]['experimentID']\
2324                                if f.has_key('fedid') ]
2325                except KeyError:
2326                    self.state_lock.release()
2327                    raise service_error(service_error.internal, 
2328                            "No fedid for experiment %s when getting "+\
2329                                    "fedid(!?)" % key)
2330                if len(kl) == 1:
2331                    rv = kl[0]
2332                else:
2333                    self.state_lock.release()
2334                    raise service_error(service_error.internal, 
2335                            "multiple fedids for experiment %s when " +\
2336                                    "getting fedid(!?)" % key)
2337            else:
2338                self.state_lock.release()
2339                raise service_error(service_error.internal, 
2340                        "Unexpected state for %s" % key)
2341        self.state_lock.release()
2342        return rv
[a97394b]2343
[4064742]2344    def check_experiment_access(self, fid, key):
[866c983]2345        """
2346        Confirm that the fid has access to the experiment.  Though a request
2347        may be made in terms of a local name, the access attribute is always
2348        the experiment's fedid.
2349        """
2350        if not isinstance(key, fedid):
[db6b092]2351            key = self.get_experiment_fedid(key)
[866c983]2352
2353        if self.auth.check_attribute(fid, key):
2354            return True
2355        else:
2356            raise service_error(service_error.access, "Access Denied")
[4064742]2357
2358
[db6b092]2359    def get_handler(self, path, fid):
[7183b48]2360        self.log.info("Get handler %s %s" % (path, fid))
[6c57fe9]2361        if self.auth.check_attribute(fid, path):
2362            return ("%s/%s" % (self.repodir, path), "application/binary")
2363        else:
2364            return (None, None)
[987aaa1]2365
2366    def get_vtopo(self, req, fid):
[866c983]2367        """
2368        Return the stored virtual topology for this experiment
2369        """
2370        rv = None
[db6b092]2371        state = None
[866c983]2372
2373        req = req.get('VtopoRequestBody', None)
2374        if not req:
2375            raise service_error(service_error.req,
2376                    "Bad request format (no VtopoRequestBody)")
2377        exp = req.get('experiment', None)
2378        if exp:
2379            if exp.has_key('fedid'):
2380                key = exp['fedid']
2381                keytype = "fedid"
2382            elif exp.has_key('localname'):
2383                key = exp['localname']
2384                keytype = "localname"
2385            else:
2386                raise service_error(service_error.req, "Unknown lookup type")
2387        else:
2388            raise service_error(service_error.req, "No request?")
2389
2390        self.check_experiment_access(fid, key)
2391
2392        self.state_lock.acquire()
2393        if self.state.has_key(key):
[db6b092]2394            if self.state[key].has_key('vtopo'):
2395                rv = { 'experiment' : {keytype: key },\
2396                        'vtopo': self.state[key]['vtopo'],\
2397                    }
2398            else:
2399                state = self.state[key]['experimentStatus']
[866c983]2400        self.state_lock.release()
2401
2402        if rv: return rv
[bd3e314]2403        else: 
[db6b092]2404            if state:
2405                raise service_error(service_error.partial, 
2406                        "Not ready: %s" % state)
2407            else:
2408                raise service_error(service_error.req, "No such experiment")
[987aaa1]2409
2410    def get_vis(self, req, fid):
[866c983]2411        """
2412        Return the stored visualization for this experiment
2413        """
2414        rv = None
[db6b092]2415        state = None
[866c983]2416
2417        req = req.get('VisRequestBody', None)
2418        if not req:
2419            raise service_error(service_error.req,
2420                    "Bad request format (no VisRequestBody)")
2421        exp = req.get('experiment', None)
2422        if exp:
2423            if exp.has_key('fedid'):
2424                key = exp['fedid']
2425                keytype = "fedid"
2426            elif exp.has_key('localname'):
2427                key = exp['localname']
2428                keytype = "localname"
2429            else:
2430                raise service_error(service_error.req, "Unknown lookup type")
2431        else:
2432            raise service_error(service_error.req, "No request?")
2433
2434        self.check_experiment_access(fid, key)
2435
2436        self.state_lock.acquire()
2437        if self.state.has_key(key):
[db6b092]2438            if self.state[key].has_key('vis'):
2439                rv =  { 'experiment' : {keytype: key },\
2440                        'vis': self.state[key]['vis'],\
2441                        }
2442            else:
2443                state = self.state[key]['experimentStatus']
[866c983]2444        self.state_lock.release()
2445
2446        if rv: return rv
[bd3e314]2447        else:
[db6b092]2448            if state:
2449                raise service_error(service_error.partial, 
2450                        "Not ready: %s" % state)
2451            else:
2452                raise service_error(service_error.req, "No such experiment")
[987aaa1]2453
[65f3f29]2454    def clean_info_response(self, rv):
[db6b092]2455        """
2456        Remove the information in the experiment's state object that is not in
2457        the info response.
2458        """
2459        # Remove the owner info (should always be there, but...)
2460        if rv.has_key('owner'): del rv['owner']
2461
2462        # Convert the log into the allocationLog parameter and remove the
2463        # log entry (with defensive programming)
2464        if rv.has_key('log'):
2465            rv['allocationLog'] = "".join(rv['log'])
2466            del rv['log']
2467        else:
2468            rv['allocationLog'] = ""
2469
2470        if rv['experimentStatus'] != 'active':
2471            if rv.has_key('federant'): del rv['federant']
2472        else:
[69692a9]2473            # remove the allocationID and uri info from each federant
[db6b092]2474            for f in rv.get('federant', []):
2475                if f.has_key('allocID'): del f['allocID']
[69692a9]2476                if f.has_key('uri'): del f['uri']
[db6b092]2477        return rv
[65f3f29]2478
[c52c48d]2479    def get_info(self, req, fid):
[866c983]2480        """
2481        Return all the stored info about this experiment
2482        """
2483        rv = None
2484
2485        req = req.get('InfoRequestBody', None)
2486        if not req:
2487            raise service_error(service_error.req,
[65f3f29]2488                    "Bad request format (no InfoRequestBody)")
[866c983]2489        exp = req.get('experiment', None)
2490        if exp:
2491            if exp.has_key('fedid'):
2492                key = exp['fedid']
2493                keytype = "fedid"
2494            elif exp.has_key('localname'):
2495                key = exp['localname']
2496                keytype = "localname"
2497            else:
2498                raise service_error(service_error.req, "Unknown lookup type")
2499        else:
2500            raise service_error(service_error.req, "No request?")
2501
2502        self.check_experiment_access(fid, key)
2503
2504        # The state may be massaged by the service function that called
2505        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2506        # state.
2507        self.state_lock.acquire()
2508        if self.state.has_key(key):
2509            rv = copy.deepcopy(self.state[key])
2510        self.state_lock.release()
2511
[db6b092]2512        if rv:
2513            return self.clean_info_response(rv)
[bd3e314]2514        else:
[db6b092]2515            raise service_error(service_error.req, "No such experiment")
[7a8d667]2516
[65f3f29]2517    def get_multi_info(self, req, fid):
2518        """
2519        Return all the stored info that this fedid can access
2520        """
[db6b092]2521        rv = { 'info': [ ] }
[65f3f29]2522
[db6b092]2523        self.state_lock.acquire()
2524        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
[829246e]2525            try:
2526                self.check_experiment_access(fid, key)
2527            except service_error, e:
2528                if e.code == service_error.access:
2529                    continue
2530                else:
2531                    self.state_lock.release()
2532                    raise e
[65f3f29]2533
[db6b092]2534            if self.state.has_key(key):
2535                e = copy.deepcopy(self.state[key])
2536                e = self.clean_info_response(e)
2537                rv['info'].append(e)
[65f3f29]2538        self.state_lock.release()
[db6b092]2539        return rv
[65f3f29]2540
[7a8d667]2541    def terminate_experiment(self, req, fid):
[866c983]2542        """
2543        Swap this experiment out on the federants and delete the shared
2544        information
2545        """
2546        tbparams = { }
2547        req = req.get('TerminateRequestBody', None)
2548        if not req:
2549            raise service_error(service_error.req,
2550                    "Bad request format (no TerminateRequestBody)")
[db6b092]2551        force = req.get('force', False)
[866c983]2552        exp = req.get('experiment', None)
2553        if exp:
2554            if exp.has_key('fedid'):
2555                key = exp['fedid']
2556                keytype = "fedid"
2557            elif exp.has_key('localname'):
2558                key = exp['localname']
2559                keytype = "localname"
2560            else:
2561                raise service_error(service_error.req, "Unknown lookup type")
2562        else:
2563            raise service_error(service_error.req, "No request?")
2564
2565        self.check_experiment_access(fid, key)
2566
[db6b092]2567        dealloc_list = [ ]
[46e4682]2568
2569
[5ae3857]2570        # Create a logger that logs to the dealloc_list as well as to the main
2571        # log file.
2572        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2573        h = logging.StreamHandler(self.list_log(dealloc_list))
2574        # XXX: there should be a global one of these rather than repeating the
2575        # code.
2576        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2577                    '%d %b %y %H:%M:%S'))
2578        dealloc_log.addHandler(h)
2579
2580        self.state_lock.acquire()
2581        fed_exp = self.state.get(key, None)
2582
2583        if fed_exp:
2584            # This branch of the conditional holds the lock to generate a
2585            # consistent temporary tbparams variable to deallocate experiments.
2586            # It releases the lock to do the deallocations and reacquires it to
2587            # remove the experiment state when the termination is complete.
2588
2589            # First make sure that the experiment creation is complete.
2590            status = fed_exp.get('experimentStatus', None)
2591
2592            if status:
2593                if status in ('starting', 'terminating'):
2594                    if not force:
2595                        self.state_lock.release()
2596                        raise service_error(service_error.partial, 
2597                                'Experiment still being created or destroyed')
2598                    else:
2599                        self.log.warning('Experiment in %s state ' % status + \
2600                                'being terminated by force.')
2601            else:
2602                # No status??? trouble
2603                self.state_lock.release()
2604                raise service_error(service_error.internal,
2605                        "Experiment has no status!?")
2606
2607            ids = []
2608            #  experimentID is a list of dicts that are self-describing
2609            #  identifiers.  This finds all the fedids and localnames - the
2610            #  keys of self.state - and puts them into ids.
2611            for id in fed_exp.get('experimentID', []):
2612                if id.has_key('fedid'): ids.append(id['fedid'])
2613                if id.has_key('localname'): ids.append(id['localname'])
2614
[63a35b7]2615            # Collect the allocation/segment ids into a dict keyed by the fedid
2616            # of the allocation (or a monotonically increasing integer) that
2617            # contains a tuple of uri, aid (which is a dict...)
2618            for i, fed in enumerate(fed_exp.get('federant', [])):
[5ae3857]2619                try:
[63a35b7]2620                    uri = fed['uri']
2621                    aid = fed['allocID']
2622                    k = fed['allocID'].get('fedid', i)
[5ae3857]2623                except KeyError, e:
2624                    continue
[63a35b7]2625                tbparams[k] = (uri, aid)
[5ae3857]2626            fed_exp['experimentStatus'] = 'terminating'
2627            if self.state_filename: self.write_state()
2628            self.state_lock.release()
2629
2630            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2631            # then completes, so we can't wait if nothing starts.  So, no
2632            # tbparams, no start.
2633            if len(tbparams) > 0:
2634                thread_pool = self.thread_pool(self.nthreads)
[63a35b7]2635                for k in tbparams.keys():
[5ae3857]2636                    # Create and start a thread to stop the segment
2637                    thread_pool.wait_for_slot()
[63a35b7]2638                    uri, aid = tbparams[k]
[5ae3857]2639                    t  = self.pooled_thread(\
[e19b75c]2640                            target=self.terminate_segment(log=dealloc_log,
[63a35b7]2641                                testbed=uri,
[5ae3857]2642                                cert_file=self.cert_file, 
2643                                cert_pwd=self.cert_pwd,
2644                                trusted_certs=self.trusted_certs,
2645                                caller=self.call_TerminateSegment),
[63a35b7]2646                            args=(uri, aid), name=k,
[5ae3857]2647                            pdata=thread_pool, trace_file=self.trace_file)
2648                    t.start()
2649                # Wait for completions
2650                thread_pool.wait_for_all_done()
2651
2652            # release the allocations (failed experiments have done this
2653            # already, and starting experiments may be in odd states, so we
2654            # ignore errors releasing those allocations
2655            try: 
[63a35b7]2656                for k in tbparams.keys():
[ecf679e]2657                    # This releases access by uri
[63a35b7]2658                    uri, aid = tbparams[k]
2659                    self.release_access(None, aid, uri=uri)
[5ae3857]2660            except service_error, e:
2661                if status != 'failed' and not force:
2662                    raise e
2663
2664            # Remove the terminated experiment
2665            self.state_lock.acquire()
2666            for id in ids:
2667                if self.state.has_key(id): del self.state[id]
2668
2669            if self.state_filename: self.write_state()
2670            self.state_lock.release()
2671
[2761484]2672            # Delete any synch points associated with this experiment.  All
2673            # synch points begin with the fedid of the experiment.
2674            fedid_keys = set(["fedid:%s" % f for f in ids \
2675                    if isinstance(f, fedid)])
2676            for k in self.synch_store.all_keys():
2677                try:
2678                    if len(k) > 45 and k[0:46] in fedid_keys:
2679                        self.synch_store.del_value(k)
[dadc4da]2680                except synch_store.BadDeletionError:
[2761484]2681                    pass
2682            self.write_store()
2683               
[5ae3857]2684            return { 
2685                    'experiment': exp , 
2686                    'deallocationLog': "".join(dealloc_list),
2687                    }
2688        else:
2689            # Don't forget to release the lock
2690            self.state_lock.release()
2691            raise service_error(service_error.req, "No saved state")
[2761484]2692
2693
2694    def GetValue(self, req, fid):
2695        """
2696        Get a value from the synchronized store
2697        """
2698        req = req.get('GetValueRequestBody', None)
2699        if not req:
2700            raise service_error(service_error.req,
2701                    "Bad request format (no GetValueRequestBody)")
2702       
2703        name = req['name']
2704        wait = req['wait']
2705        rv = { 'name': name }
2706
2707        if self.auth.check_attribute(fid, name):
[dadc4da]2708            try:
2709                v = self.synch_store.get_value(name, wait)
2710            except synch_store.RevokedKeyError:
2711                # No more synch on this key
2712                raise service_error(service_error.federant, 
2713                        "Synch key %s revoked" % name)
[2761484]2714            if v is not None:
2715                rv['value'] = v
[109a32a]2716            self.log.debug("[GetValue] got %s from %s" % (v, name))
[2761484]2717            return rv
2718        else:
2719            raise service_error(service_error.access, "Access Denied")
2720       
2721
2722    def SetValue(self, req, fid):
2723        """
2724        Set a value in the synchronized store
2725        """
2726        req = req.get('SetValueRequestBody', None)
2727        if not req:
2728            raise service_error(service_error.req,
2729                    "Bad request format (no SetValueRequestBody)")
2730       
2731        name = req['name']
2732        v = req['value']
2733
2734        if self.auth.check_attribute(fid, name):
2735            try:
2736                self.synch_store.set_value(name, v)
2737                self.write_store()
[109a32a]2738                self.log.debug("[SetValue] set %s to %s" % (name, v))
[2761484]2739            except synch_store.CollisionError:
2740                # Translate into a service_error
2741                raise service_error(service_error.req,
2742                        "Value already set: %s" %name)
[dadc4da]2743            except synch_store.RevokedKeyError:
2744                # No more synch on this key
2745                raise service_error(service_error.federant, 
2746                        "Synch key %s revoked" % name)
[2761484]2747            return { 'name': name, 'value': v }
2748        else:
2749            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.