source: fedd/federation/experiment_control.py @ 444790d

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 444790d was 289ff7e, checked in by Ted Faber <faber@…>, 15 years ago

Netmasks

  • Property mode set to 100644
File size: 93.6 KB
RevLine 
[6679c122]1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
[eee2b2e]11import pickle
[c971895]12import logging
[79b6596]13import signal
14import time
[6679c122]15
[3441fe3]16import traceback
[c971895]17# For parsing visualization output and splitter output
18import xml.parsers.expat
[3441fe3]19
[6c57fe9]20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
[6679c122]22
[db6b092]23from urlparse import urlparse
24from urllib2 import urlopen
25
[ec4fb42]26from util import *
[51cc9df]27from fedid import fedid, generate_fedid
[9460b1e]28from remote_service import xmlrpc_handler, soap_handler, service_caller
[c971895]29from service_error import service_error
[2761484]30from synch_store import synch_store
[6679c122]31
[db6b092]32import topdl
[f07fa49]33import list_log
[db6b092]34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
[11a08b0]37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
[ec4fb42]44class experiment_control_local:
[0ea11af]45    """
46    Control of experiments that this system can directly access.
47
48    Includes experiment creation, termination and information dissemination.
49    Thred safe.
50    """
[79b6596]51
52    class ssh_cmd_timeout(RuntimeError): pass
[6679c122]53   
[1af38d6]54    class thread_pool:
[866c983]55        """
56        A class to keep track of a set of threads all invoked for the same
57        task.  Manages the mutual exclusion of the states.
58        """
59        def __init__(self, nthreads):
60            """
61            Start a pool.
62            """
63            self.changed = Condition()
64            self.started = 0
65            self.terminated = 0
66            self.nthreads = nthreads
67
68        def acquire(self):
69            """
70            Get the pool's lock.
71            """
72            self.changed.acquire()
73
74        def release(self):
75            """
76            Release the pool's lock.
77            """
78            self.changed.release()
79
80        def wait(self, timeout = None):
81            """
82            Wait for a pool thread to start or stop.
83            """
84            self.changed.wait(timeout)
85
86        def start(self):
87            """
88            Called by a pool thread to report starting.
89            """
90            self.changed.acquire()
91            self.started += 1
92            self.changed.notifyAll()
93            self.changed.release()
94
95        def terminate(self):
96            """
97            Called by a pool thread to report finishing.
98            """
99            self.changed.acquire()
100            self.terminated += 1
101            self.changed.notifyAll()
102            self.changed.release()
103
104        def clear(self):
105            """
106            Clear all pool data.
107            """
108            self.changed.acquire()
109            self.started = 0
110            self.terminated =0
111            self.changed.notifyAll()
112            self.changed.release()
113
114        def wait_for_slot(self):
115            """
116            Wait until we have a free slot to start another pooled thread
117            """
118            self.acquire()
119            while self.started - self.terminated >= self.nthreads:
120                self.wait()
121            self.release()
122
[32e7d93]123        def wait_for_all_done(self, timeout=None):
[866c983]124            """
[32e7d93]125            Wait until all active threads finish (and at least one has
126            started).  If a timeout is given, return after waiting that long
127            for termination.  If all threads are done (and one has started in
128            the since the last clear()) return True, otherwise False.
[866c983]129            """
[32e7d93]130            if timeout:
131                deadline = time.time() + timeout
[866c983]132            self.acquire()
133            while self.started == 0 or self.started > self.terminated:
[32e7d93]134                self.wait(timeout)
135                if timeout:
136                    if time.time() > deadline:
137                        break
138                    timeout = deadline - time.time()
[866c983]139            self.release()
[32e7d93]140            return not (self.started == 0 or self.started > self.terminated)
[8bc5754]141
[1af38d6]142    class pooled_thread(Thread):
[866c983]143        """
144        One of a set of threads dedicated to a specific task.  Uses the
145        thread_pool class above for coordination.
146        """
147        def __init__(self, group=None, target=None, name=None, args=(), 
148                kwargs={}, pdata=None, trace_file=None):
149            Thread.__init__(self, group, target, name, args, kwargs)
150            self.rv = None          # Return value of the ops in this thread
151            self.exception = None   # Exception that terminated this thread
152            self.target=target      # Target function to run on start()
153            self.args = args        # Args to pass to target
154            self.kwargs = kwargs    # Additional kw args
155            self.pdata = pdata      # thread_pool for this class
156            # Logger for this thread
157            self.log = logging.getLogger("fedd.experiment_control")
158       
159        def run(self):
160            """
161            Emulate Thread.run, except add pool data manipulation and error
162            logging.
163            """
164            if self.pdata:
165                self.pdata.start()
166
167            if self.target:
168                try:
169                    self.rv = self.target(*self.args, **self.kwargs)
170                except service_error, s:
171                    self.exception = s
172                    self.log.error("Thread exception: %s %s" % \
173                            (s.code_string(), s.desc))
174                except:
175                    self.exception = sys.exc_info()[1]
176                    self.log.error(("Unexpected thread exception: %s" +\
177                            "Trace %s") % (self.exception,\
178                                traceback.format_exc()))
179            if self.pdata:
180                self.pdata.terminate()
[6679c122]181
[f069052]182    call_RequestAccess = service_caller('RequestAccess')
183    call_ReleaseAccess = service_caller('ReleaseAccess')
[cc8d8e9]184    call_StartSegment = service_caller('StartSegment')
[5ae3857]185    call_TerminateSegment = service_caller('TerminateSegment')
[f069052]186    call_Ns2Split = service_caller('Ns2Split')
[058f58e]187
[3f6bc5f]188    def __init__(self, config=None, auth=None):
[866c983]189        """
190        Intialize the various attributes, most from the config object
191        """
192
193        def parse_tarfile_list(tf):
194            """
195            Parse a tarfile list from the configuration.  This is a set of
196            paths and tarfiles separated by spaces.
197            """
198            rv = [ ]
199            if tf is not None:
200                tl = tf.split()
201                while len(tl) > 1:
202                    p, t = tl[0:2]
203                    del tl[0:2]
204                    rv.append((p, t))
205            return rv
206
207        self.thread_with_rv = experiment_control_local.pooled_thread
208        self.thread_pool = experiment_control_local.thread_pool
[f07fa49]209        self.list_log = list_log.list_log
[866c983]210
211        self.cert_file = config.get("experiment_control", "cert_file")
212        if self.cert_file:
213            self.cert_pwd = config.get("experiment_control", "cert_pwd")
214        else:
215            self.cert_file = config.get("globals", "cert_file")
216            self.cert_pwd = config.get("globals", "cert_pwd")
217
218        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
219                or config.get("globals", "trusted_certs")
220
[6c57fe9]221        self.repodir = config.get("experiment_control", "repodir")
[7183b48]222        self.repo_url = config.get("experiment_control", "repo_url", 
223                "https://users.isi.deterlab.net:23235");
[cc8d8e9]224
[866c983]225        self.exp_stem = "fed-stem"
226        self.log = logging.getLogger("fedd.experiment_control")
227        set_log_level(config, "experiment_control", self.log)
228        self.muxmax = 2
229        self.nthreads = 2
230        self.randomize_experiments = False
231
232        self.splitter = None
233        self.ssh_keygen = "/usr/bin/ssh-keygen"
234        self.ssh_identity_file = None
235
236
237        self.debug = config.getboolean("experiment_control", "create_debug")
[69692a9]238        self.cleanup = not config.getboolean("experiment_control", 
239                "leave_tmpfiles")
[866c983]240        self.state_filename = config.get("experiment_control", 
241                "experiment_state")
[2761484]242        self.store_filename = config.get("experiment_control", 
243                "synch_store")
244        self.store_url = config.get("experiment_control", "store_url")
[866c983]245        self.splitter_url = config.get("experiment_control", "splitter_uri")
246        self.fedkit = parse_tarfile_list(\
247                config.get("experiment_control", "fedkit"))
248        self.gatewaykit = parse_tarfile_list(\
249                config.get("experiment_control", "gatewaykit"))
250        accessdb_file = config.get("experiment_control", "accessdb")
251
252        self.ssh_pubkey_file = config.get("experiment_control", 
253                "ssh_pubkey_file")
254        self.ssh_privkey_file = config.get("experiment_control",
255                "ssh_privkey_file")
256        # NB for internal master/slave ops, not experiment setup
257        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
[ca489e8]258
[db6b092]259        self.overrides = set([])
260        ovr = config.get('experiment_control', 'overrides')
261        if ovr:
262            for o in ovr.split(","):
263                o = o.strip()
264                if o.startswith('fedid:'): o = o[len('fedid:'):]
265                self.overrides.add(fedid(hexstr=o))
[ca489e8]266
[866c983]267        self.state = { }
268        self.state_lock = Lock()
269        self.tclsh = "/usr/local/bin/otclsh"
270        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
271                config.get("experiment_control", "tcl_splitter",
272                        "/usr/testbed/lib/ns2ir/parse.tcl")
273        mapdb_file = config.get("experiment_control", "mapdb")
274        self.trace_file = sys.stderr
275
276        self.def_expstart = \
277                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
278                "/tmp/federate";
279        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
280                "FEDDIR/hosts";
281        self.def_gwstart = \
282                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
283                "/tmp/bridge.log";
284        self.def_mgwstart = \
285                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
286                "/tmp/bridge.log";
287        self.def_gwimage = "FBSD61-TUNNEL2";
288        self.def_gwtype = "pc";
289        self.local_access = { }
290
291        if auth:
292            self.auth = auth
293        else:
294            self.log.error(\
295                    "[access]: No authorizer initialized, creating local one.")
296            auth = authorizer()
297
298
299        if self.ssh_pubkey_file:
300            try:
301                f = open(self.ssh_pubkey_file, 'r')
302                self.ssh_pubkey = f.read()
303                f.close()
304            except IOError:
305                raise service_error(service_error.internal,
306                        "Cannot read sshpubkey")
307        else:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311        if not self.ssh_privkey_file:
312            raise service_error(service_error.internal, 
313                    "No SSH public key file?")
314
315
316        if mapdb_file:
317            self.read_mapdb(mapdb_file)
318        else:
319            self.log.warn("[experiment_control] No testbed map, using defaults")
320            self.tbmap = { 
321                    'deter':'https://users.isi.deterlab.net:23235',
322                    'emulab':'https://users.isi.deterlab.net:23236',
323                    'ucb':'https://users.isi.deterlab.net:23237',
324                    }
325
326        if accessdb_file:
327                self.read_accessdb(accessdb_file)
328        else:
329            raise service_error(service_error.internal,
330                    "No accessdb specified in config")
331
332        # Grab saved state.  OK to do this w/o locking because it's read only
333        # and only one thread should be in existence that can see self.state at
334        # this point.
335        if self.state_filename:
336            self.read_state()
337
[2761484]338        if self.store_filename:
339            self.read_store()
340        else:
341            self.log.warning("No saved synch store")
342            self.synch_store = synch_store
343
[866c983]344        # Dispatch tables
345        self.soap_services = {\
[a3ad8bd]346                'New': soap_handler('New', self.new_experiment),
[e19b75c]347                'Create': soap_handler('Create', self.create_experiment),
[866c983]348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
[65f3f29]351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
[866c983]352                'Terminate': soap_handler('Terminate', 
[e19b75c]353                    self.terminate_experiment),
[2761484]354                'GetValue': soap_handler('GetValue', self.GetValue),
355                'SetValue': soap_handler('SetValue', self.SetValue),
[866c983]356        }
357
358        self.xmlrpc_services = {\
[a3ad8bd]359                'New': xmlrpc_handler('New', self.new_experiment),
[e19b75c]360                'Create': xmlrpc_handler('Create', self.create_experiment),
[866c983]361                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
362                'Vis': xmlrpc_handler('Vis', self.get_vis),
363                'Info': xmlrpc_handler('Info', self.get_info),
[65f3f29]364                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
[866c983]365                'Terminate': xmlrpc_handler('Terminate',
[e19b75c]366                    self.terminate_experiment),
[2761484]367                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
368                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
[866c983]369        }
[19cc408]370
[a97394b]371    # Call while holding self.state_lock
[eee2b2e]372    def write_state(self):
[866c983]373        """
374        Write a new copy of experiment state after copying the existing state
375        to a backup.
376
377        State format is a simple pickling of the state dictionary.
378        """
379        if os.access(self.state_filename, os.W_OK):
[40dd8c1]380            copy_file(self.state_filename, \
381                    "%s.bak" % self.state_filename)
[866c983]382        try:
383            f = open(self.state_filename, 'w')
384            pickle.dump(self.state, f)
385        except IOError, e:
386            self.log.error("Can't write file %s: %s" % \
387                    (self.state_filename, e))
388        except pickle.PicklingError, e:
389            self.log.error("Pickling problem: %s" % e)
390        except TypeError, e:
391            self.log.error("Pickling problem (TypeError): %s" % e)
[eee2b2e]392
[2761484]393    @staticmethod
394    def get_alloc_ids(state):
395        """
396        Pull the fedids of the identifiers of each allocation from the
397        state.  Again, a dict dive that's best isolated.
398
399        Used by read_store and read state
400        """
401
402        return [ f['allocID']['fedid'] 
403                for f in state.get('federant',[]) \
404                    if f.has_key('allocID') and \
405                        f['allocID'].has_key('fedid')]
406
[a97394b]407    # Call while holding self.state_lock
[eee2b2e]408    def read_state(self):
[866c983]409        """
410        Read a new copy of experiment state.  Old state is overwritten.
411
412        State format is a simple pickling of the state dictionary.
413        """
[cc8d8e9]414       
415        def get_experiment_id(state):
416            """
417            Pull the fedid experimentID out of the saved state.  This is kind
418            of a gross walk through the dict.
419            """
420
421            if state.has_key('experimentID'):
422                for e in state['experimentID']:
423                    if e.has_key('fedid'):
424                        return e['fedid']
425                else:
426                    return None
427            else:
428                return None
429
[866c983]430        try:
431            f = open(self.state_filename, "r")
432            self.state = pickle.load(f)
433            self.log.debug("[read_state]: Read state from %s" % \
434                    self.state_filename)
435        except IOError, e:
436            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
437                    % (self.state_filename, e))
438        except pickle.UnpicklingError, e:
439            self.log.warning(("[read_state]: No saved state: " + \
440                    "Unpickling failed: %s") % e)
441       
[cc8d8e9]442        for s in self.state.values():
[866c983]443            try:
[cc8d8e9]444
445                eid = get_experiment_id(s)
446                if eid : 
447                    # Give the owner rights to the experiment
448                    self.auth.set_attribute(s['owner'], eid)
449                    # And holders of the eid as well
450                    self.auth.set_attribute(eid, eid)
[db6b092]451                    # allow overrides to control experiments as well
452                    for o in self.overrides:
453                        self.auth.set_attribute(o, eid)
[cc8d8e9]454                    # Set permissions to allow reading of the software repo, if
455                    # any, as well.
[2761484]456                    for a in self.get_alloc_ids(s):
[cc8d8e9]457                        self.auth.set_attribute(a, 'repo/%s' % eid)
458                else:
459                    raise KeyError("No experiment id")
[866c983]460            except KeyError, e:
461                self.log.warning("[read_state]: State ownership or identity " +\
462                        "misformatted in %s: %s" % (self.state_filename, e))
[4064742]463
464
465    def read_accessdb(self, accessdb_file):
[866c983]466        """
467        Read the mapping from fedids that can create experiments to their name
468        in the 3-level access namespace.  All will be asserted from this
469        testbed and can include the local username and porject that will be
470        asserted on their behalf by this fedd.  Each fedid is also added to the
471        authorization system with the "create" attribute.
472        """
473        self.accessdb = {}
474        # These are the regexps for parsing the db
475        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
476        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
477                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
478        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
479                "\s*->\s*(" + name_expr + ")\s*$")
480        lineno = 0
481
482        # Parse the mappings and store in self.authdb, a dict of
483        # fedid -> (proj, user)
484        try:
485            f = open(accessdb_file, "r")
486            for line in f:
487                lineno += 1
488                line = line.strip()
489                if len(line) == 0 or line.startswith('#'):
490                    continue
491                m = project_line.match(line)
492                if m:
493                    fid = fedid(hexstr=m.group(1))
494                    project, user = m.group(2,3)
495                    if not self.accessdb.has_key(fid):
496                        self.accessdb[fid] = []
497                    self.accessdb[fid].append((project, user))
498                    continue
499
500                m = user_line.match(line)
501                if m:
502                    fid = fedid(hexstr=m.group(1))
503                    project = None
504                    user = m.group(2)
505                    if not self.accessdb.has_key(fid):
506                        self.accessdb[fid] = []
507                    self.accessdb[fid].append((project, user))
508                    continue
509                self.log.warn("[experiment_control] Error parsing access " +\
510                        "db %s at line %d" %  (accessdb_file, lineno))
511        except IOError:
512            raise service_error(service_error.internal,
513                    "Error opening/reading %s as experiment " +\
514                            "control accessdb" %  accessdb_file)
515        f.close()
516
517        # Initialize the authorization attributes
518        for fid in self.accessdb.keys():
519            self.auth.set_attribute(fid, 'create')
[a3ad8bd]520            self.auth.set_attribute(fid, 'new')
[34bc05c]521
522    def read_mapdb(self, file):
[866c983]523        """
524        Read a simple colon separated list of mappings for the
525        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
526        """
527
528        self.tbmap = { }
529        lineno =0
530        try:
531            f = open(file, "r")
532            for line in f:
533                lineno += 1
534                line = line.strip()
535                if line.startswith('#') or len(line) == 0:
536                    continue
537                try:
538                    label, url = line.split(':', 1)
539                    self.tbmap[label] = url
540                except ValueError, e:
541                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
542                            "map db: %s %s" % (lineno, line, e))
543        except IOError, e:
544            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
545                    "open %s: %s" % (file, e))
546        f.close()
[2761484]547
548    def read_store(self):
549        try:
550            self.synch_store = synch_store()
551            self.synch_store.load(self.store_filename)
552            self.log.debug("[read_store]: Read store from %s" % \
553                    self.store_filename)
554        except IOError, e:
555            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
556                    % (self.state_filename, e))
557            self.synch_store = synch_store()
558
559        # Set the initial permissions on data in the store.  XXX: This ad hoc
560        # authorization attribute initialization is getting out of hand.
561        for k in self.synch_store.all_keys():
562            try:
563                if k.startswith('fedid:'):
564                    fid = fedid(hexstr=k[6:46])
565                    if self.state.has_key(fid):
566                        for a in self.get_alloc_ids(self.state[fid]):
567                            self.auth.set_attribute(a, k)
568            except ValueError, e:
569                self.log.warn("Cannot deduce permissions for %s" % k)
570
571
572    def write_store(self):
573        """
574        Write a new copy of synch_store after writing current state
575        to a backup.  We use the internal synch_store pickle method to avoid
576        incinsistent data.
577
578        State format is a simple pickling of the store.
579        """
580        if os.access(self.store_filename, os.W_OK):
581            copy_file(self.store_filename, \
582                    "%s.bak" % self.store_filename)
583        try:
584            self.synch_store.save(self.store_filename)
585        except IOError, e:
586            self.log.error("Can't write file %s: %s" % \
587                    (self.store_filename, e))
588        except TypeError, e:
589            self.log.error("Pickling problem (TypeError): %s" % e)
590
[866c983]591       
[6679c122]592    def generate_ssh_keys(self, dest, type="rsa" ):
[866c983]593        """
594        Generate a set of keys for the gateways to use to talk.
595
596        Keys are of type type and are stored in the required dest file.
597        """
598        valid_types = ("rsa", "dsa")
599        t = type.lower();
600        if t not in valid_types: raise ValueError
601        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
602
603        try:
604            trace = open("/dev/null", "w")
605        except IOError:
606            raise service_error(service_error.internal,
607                    "Cannot open /dev/null??");
608
609        # May raise CalledProcessError
610        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
[4ea1e22]611        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
[866c983]612        if rv != 0:
613            raise service_error(service_error.internal, 
614                    "Cannot generate nonce ssh keys.  %s return code %d" \
615                            % (self.ssh_keygen, rv))
[6679c122]616
[0d830de]617    def gentopo(self, str):
[866c983]618        """
619        Generate the topology dtat structure from the splitter's XML
620        representation of it.
621
622        The topology XML looks like:
623            <experiment>
624                <nodes>
625                    <node><vname></vname><ips>ip1:ip2</ips></node>
626                </nodes>
627                <lans>
628                    <lan>
629                        <vname></vname><vnode></vnode><ip></ip>
630                        <bandwidth></bandwidth><member>node:port</member>
631                    </lan>
632                </lans>
633        """
634        class topo_parse:
635            """
636            Parse the topology XML and create the dats structure.
637            """
638            def __init__(self):
639                # Typing of the subelements for data conversion
640                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
641                self.int_subelements = ( 'bandwidth',)
642                self.float_subelements = ( 'delay',)
643                # The final data structure
644                self.nodes = [ ]
645                self.lans =  [ ]
646                self.topo = { \
647                        'node': self.nodes,\
648                        'lan' : self.lans,\
649                    }
650                self.element = { }  # Current element being created
651                self.chars = ""     # Last text seen
652
653            def end_element(self, name):
654                # After each sub element the contents is added to the current
655                # element or to the appropriate list.
656                if name == 'node':
657                    self.nodes.append(self.element)
658                    self.element = { }
659                elif name == 'lan':
660                    self.lans.append(self.element)
661                    self.element = { }
662                elif name in self.str_subelements:
663                    self.element[name] = self.chars
664                    self.chars = ""
665                elif name in self.int_subelements:
666                    self.element[name] = int(self.chars)
667                    self.chars = ""
668                elif name in self.float_subelements:
669                    self.element[name] = float(self.chars)
670                    self.chars = ""
671
672            def found_chars(self, data):
673                self.chars += data.rstrip()
674
675
676        tp = topo_parse();
677        parser = xml.parsers.expat.ParserCreate()
678        parser.EndElementHandler = tp.end_element
679        parser.CharacterDataHandler = tp.found_chars
680
681        parser.Parse(str)
682
683        return tp.topo
684       
[0d830de]685
686    def genviz(self, topo):
[866c983]687        """
688        Generate the visualization the virtual topology
689        """
690
691        neato = "/usr/local/bin/neato"
692        # These are used to parse neato output and to create the visualization
693        # file.
[0ac1934]694        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
[866c983]695        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
696                "%s</type></node>"
697
698        try:
699            # Node names
700            nodes = [ n['vname'] for n in topo['node'] ]
701            topo_lans = topo['lan']
[cc8d8e9]702        except KeyError, e:
703            raise service_error(service_error.internal, "Bad topology: %s" %e)
[866c983]704
705        lans = { }
706        links = { }
707
708        # Walk through the virtual topology, organizing the connections into
709        # 2-node connections (links) and more-than-2-node connections (lans).
710        # When a lan is created, it's added to the list of nodes (there's a
711        # node in the visualization for the lan).
712        for l in topo_lans:
713            if links.has_key(l['vname']):
714                if len(links[l['vname']]) < 2:
715                    links[l['vname']].append(l['vnode'])
716                else:
717                    nodes.append(l['vname'])
718                    lans[l['vname']] = links[l['vname']]
719                    del links[l['vname']]
720                    lans[l['vname']].append(l['vnode'])
721            elif lans.has_key(l['vname']):
722                lans[l['vname']].append(l['vnode'])
723            else:
724                links[l['vname']] = [ l['vnode'] ]
725
726
727        # Open up a temporary file for dot to turn into a visualization
728        try:
729            df, dotname = tempfile.mkstemp()
730            dotfile = os.fdopen(df, 'w')
731        except IOError:
732            raise service_error(service_error.internal,
733                    "Failed to open file in genviz")
734
[db6b092]735        try:
736            dnull = open('/dev/null', 'w')
737        except IOError:
738            service_error(service_error.internal,
[886307f]739                    "Failed to open /dev/null in genviz")
740
[866c983]741        # Generate a dot/neato input file from the links, nodes and lans
742        try:
743            print >>dotfile, "graph G {"
744            for n in nodes:
745                print >>dotfile, '\t"%s"' % n
746            for l in links.keys():
747                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
748            for l in lans.keys():
749                for n in lans[l]:
750                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
751            print >>dotfile, "}"
752            dotfile.close()
753        except TypeError:
754            raise service_error(service_error.internal,
755                    "Single endpoint link in vtopo")
756        except IOError:
757            raise service_error(service_error.internal, "Cannot write dot file")
758
759        # Use dot to create a visualization
760        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
[886307f]761                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
[db6b092]762                close_fds=True)
763        dnull.close()
[866c983]764
765        # Translate dot to vis format
766        vis_nodes = [ ]
767        vis = { 'node': vis_nodes }
768        for line in dot.stdout:
769            m = vis_re.match(line)
770            if m:
771                vn = m.group(1)
772                vis_node = {'name': vn, \
773                        'x': float(m.group(2)),\
774                        'y' : float(m.group(3)),\
775                    }
776                if vn in links.keys() or vn in lans.keys():
777                    vis_node['type'] = 'lan'
778                else:
779                    vis_node['type'] = 'node'
780                vis_nodes.append(vis_node)
781        rv = dot.wait()
782
783        os.remove(dotname)
784        if rv == 0 : return vis
785        else: return None
[d0ae12d]786
[99eb8cf]787    def get_access(self, tb, nodes, tbparam, master, export_project,
[e02cd14]788            access_user, services):
[866c983]789        """
790        Get access to testbed through fedd and set the parameters for that tb
791        """
792        uri = self.tbmap.get(tb, None)
793        if not uri:
[b78c9ea]794            raise service_error(service_error.server_config, 
[866c983]795                    "Unknown testbed: %s" % tb)
796
[8218a3b]797        # Tweak search order so that if there are entries in access_user that
798        # have a project matching the export project, we try them first
799        if export_project and export_project.has_key('localname'): 
800            pn = export_project['localname'] 
801               
802            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
803            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
804        else: 
805            access_sequence = access_user
806
807        for p, u in access_sequence: 
[866c983]808            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
809                    "to %s") %  ((p or "None"), u, uri))
810
811            if p:
812                # Request with user and project specified
813                req = {\
814                        'destinationTestbed' : { 'uri' : uri },
[3bddd24]815                        'credential': [ "project: %s" % p, "user: %s"  % u],
[866c983]816                        'allocID' : { 'localname': 'test' },
817                    }
818            else:
819                # Request with only user specified
820                req = {\
821                        'destinationTestbed' : { 'uri' : uri },
[3bddd24]822                        'credential': [ 'user: %s' % u ],
[866c983]823                        'user':  [ {'userID': { 'localname': u } } ],
824                        'allocID' : { 'localname': 'test' },
825                    }
826
827            if tb == master:
828                # NB, the export_project parameter is a dict that includes
829                # the type
830                req['exportProject'] = export_project
[e02cd14]831                req['service'] = [
832                        { 'name': 'userconfig', 'visibility': 'export'},
833                        { 'name': 'SMB', 'visibility': 'export'},
834                        { 'name': 'seer', 'visibility': 'export'},
835                        { 'name': 'tmcd', 'visibility': 'export'},
836                    ]
[866c983]837
838            # node resources if any
839            if nodes != None and len(nodes) > 0:
840                rnodes = [ ]
841                for n in nodes:
842                    rn = { }
843                    image, hw, count = n.split(":")
844                    if image: rn['image'] = [ image ]
845                    if hw: rn['hardware'] = [ hw ]
846                    if count and int(count) >0 : rn['count'] = int(count)
847                    rnodes.append(rn)
848                req['resources']= { }
849                req['resources']['node'] = rnodes
850
851            try:
852                if self.local_access.has_key(uri):
853                    # Local access call
854                    req = { 'RequestAccessRequestBody' : req }
855                    r = self.local_access[uri].RequestAccess(req, 
856                            fedid(file=self.cert_file))
857                    r = { 'RequestAccessResponseBody' : r }
858                else:
859                    r = self.call_RequestAccess(uri, req, 
860                            self.cert_file, self.cert_pwd, self.trusted_certs)
861            except service_error, e:
862                if e.code == service_error.access:
863                    self.log.debug("[get_access] Access denied")
864                    r = None
865                    continue
866                else:
867                    raise e
868
[e19b75c]869            if r.has_key('RequestAccessResponseBody'):
870                # Through to here we have a valid response, not a fault.
871                # Access denied is a fault, so something better or worse than
872                # access denied has happened.
873                r = r['RequestAccessResponseBody']
874                self.log.debug("[get_access] Access granted")
875                break
876            else:
877                raise service_error(service_error.protocol,
878                        "Bad proxy response")
879       
880        if not r:
881            raise service_error(service_error.access, 
882                    "Access denied by %s (%s)" % (tb, uri))
[db6b092]883
[4afcfc4]884        tbparam[tb] = { 
[69692a9]885                "allocID" : r['allocID'],
886                "uri": uri,
[4afcfc4]887                }
[e02cd14]888        if 'service' in r: 
889            services.extend(r['service'])
[4afcfc4]890
891        # Add attributes to parameter space.  We don't allow attributes to
892        # overlay any parameters already installed.
[617592b]893        for a in r.get('fedAttr', []):
[4afcfc4]894            try:
895                if a['attribute'] and \
896                        isinstance(a['attribute'], basestring)\
897                        and not tbparam[tb].has_key(a['attribute'].lower()):
898                    tbparam[tb][a['attribute'].lower()] = a['value']
899            except KeyError:
900                self.log.error("Bad attribute in response: %s" % a)
[db6b092]901
[69692a9]902    def release_access(self, tb, aid, uri=None):
[e19b75c]903        """
904        Release access to testbed through fedd
905        """
[db6b092]906
[69692a9]907        if not uri:
908            uri = self.tbmap.get(tb, None)
[e19b75c]909        if not uri:
[69692a9]910            raise service_error(service_error.server_config, 
[e19b75c]911                    "Unknown testbed: %s" % tb)
[db6b092]912
[e19b75c]913        if self.local_access.has_key(uri):
914            resp = self.local_access[uri].ReleaseAccess(\
915                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
916                    fedid(file=self.cert_file))
917            resp = { 'ReleaseAccessResponseBody': resp } 
918        else:
919            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
920                    self.cert_file, self.cert_pwd, self.trusted_certs)
[db6b092]921
[e19b75c]922        # better error coding
[db6b092]923
[e19b75c]924    def remote_splitter(self, uri, desc, master):
[db6b092]925
[e19b75c]926        req = {
927                'description' : { 'ns2description': desc },
928                'master': master,
929                'include_fedkit': bool(self.fedkit),
930                'include_gatewaykit': bool(self.gatewaykit)
[db6b092]931            }
932
[e19b75c]933        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
934                self.trusted_certs)
935
936        if r.has_key('Ns2SplitResponseBody'):
937            r = r['Ns2SplitResponseBody']
938            if r.has_key('output'):
939                return r['output'].splitlines()
940            else:
941                raise service_error(service_error.protocol, 
942                        "Bad splitter response (no output)")
943        else:
944            raise service_error(service_error.protocol, "Bad splitter response")
[cc8d8e9]945
[e19b75c]946    class start_segment:
[fd556d1]947        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[f07fa49]948                cert_pwd=None, trusted_certs=None, caller=None,
949                log_collector=None):
[cc8d8e9]950            self.log = log
951            self.debug = debug
952            self.cert_file = cert_file
953            self.cert_pwd = cert_pwd
954            self.trusted_certs = None
955            self.caller = caller
[fd556d1]956            self.testbed = testbed
[f07fa49]957            self.log_collector = log_collector
[69692a9]958            self.response = None
[cc8d8e9]959
[e02cd14]960        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
961                services=None):
[cc8d8e9]962            req = {
963                    'allocID': { 'fedid' : aid }, 
964                    'segmentdescription': { 
965                        'topdldescription': topo.to_dict(),
966                    },
[ecca6eb]967                    'master': master,
[cc8d8e9]968                }
[e02cd14]969
970            if connInfo:
971                req['connection'] = connInfo
972            # Add services to request.  The master exports, everyone else
973            # imports.
974            if services:
975                svcs = [ x.copy() for x in services]
976                for s in svcs:
977                    if master: s['visibility'] = 'export'
978                    else: s['visibility'] = 'import'
979                req['service'] = svcs
[6c57fe9]980            if attrs:
981                req['fedAttr'] = attrs
[cc8d8e9]982
[fd556d1]983            try:
[13e3dd2]984                self.log.debug("Calling StartSegment at %s " % uri)
[fd556d1]985                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
986                        self.trusted_certs)
[f07fa49]987                if r.has_key('StartSegmentResponseBody'):
988                    lval = r['StartSegmentResponseBody'].get('allocationLog',
989                            None)
990                    if lval and self.log_collector:
991                        for line in  lval.splitlines(True):
992                            self.log_collector.write(line)
[69692a9]993                    self.response = r
[f07fa49]994                else:
995                    raise service_error(service_error.internal, 
996                            "Bad response!?: %s" %r)
[fd556d1]997                return True
998            except service_error, e:
999                self.log.error("Start segment failed on %s: %s" % \
1000                        (self.testbed, e))
1001                return False
[cc8d8e9]1002
1003
[5ae3857]1004
[e19b75c]1005    class terminate_segment:
[fd556d1]1006        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[5ae3857]1007                cert_pwd=None, trusted_certs=None, caller=None):
1008            self.log = log
1009            self.debug = debug
1010            self.cert_file = cert_file
1011            self.cert_pwd = cert_pwd
1012            self.trusted_certs = None
1013            self.caller = caller
[fd556d1]1014            self.testbed = testbed
[5ae3857]1015
1016        def __call__(self, uri, aid ):
1017            req = {
1018                    'allocID': aid , 
1019                }
[fd556d1]1020            try:
1021                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1022                        self.trusted_certs)
1023                return True
1024            except service_error, e:
1025                self.log.error("Terminate segment failed on %s: %s" % \
1026                        (self.testbed, e))
1027                return False
[db6b092]1028   
1029
[7183b48]1030    def allocate_resources(self, allocated, master, eid, expid, 
[f07fa49]1031            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
[e02cd14]1032            attrs=None, connInfo={}, services=[]):
[69692a9]1033
[cc8d8e9]1034        started = { }           # Testbeds where a sub-experiment started
1035                                # successfully
1036
1037        # XXX
1038        fail_soft = False
1039
1040        log = alloc_log or self.log
1041
1042        thread_pool = self.thread_pool(self.nthreads)
1043        threads = [ ]
1044
[109a32a]1045        for tb in allocated.keys():
1046            # Create and start a thread to start the segment, and save it
1047            # to get the return value later
1048            thread_pool.wait_for_slot()
1049            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1050            if not uri:
1051                raise service_error(service_error.internal, 
1052                        "Unknown testbed %s !?" % tb)
1053
[cc8d8e9]1054            if tbparams[tb].has_key('allocID') and \
1055                    tbparams[tb]['allocID'].has_key('fedid'):
1056                aid = tbparams[tb]['allocID']['fedid']
1057            else:
1058                raise service_error(service_error.internal, 
1059                        "No alloc id for testbed %s !?" % tb)
1060
[109a32a]1061            t  = self.pooled_thread(\
1062                    target=self.start_segment(log=log, debug=self.debug,
1063                        testbed=tb, cert_file=self.cert_file,
1064                        cert_pwd=self.cert_pwd,
1065                        trusted_certs=self.trusted_certs,
1066                        caller=self.call_StartSegment,
1067                        log_collector=log_collector), 
1068                    args=(uri, aid, topo[tb], tb == master, 
1069                        attrs, connInfo[tb], services),
1070                    name=tb,
1071                    pdata=thread_pool, trace_file=self.trace_file)
[69692a9]1072            threads.append(t)
1073            t.start()
[cc8d8e9]1074
[109a32a]1075        # Wait until all finish (keep pinging the log, though)
1076        mins = 0
[dadc4da]1077        revoked = False
[109a32a]1078        while not thread_pool.wait_for_all_done(60.0):
1079            mins += 1
1080            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1081                    % mins)
[dadc4da]1082            if not revoked and \
[f52f5df]1083                    len([ t.getName() for t in threads if t.rv == False]) > 0:
[dadc4da]1084                # a testbed has failed.  Revoke this experiment's
1085                # synchronizarion values so that sub experiments will not
1086                # deadlock waiting for synchronization that will never happen
1087                self.log.info("A subexperiment has failed to swap in, " + \
1088                        "revoking synch keys")
1089                var_key = "fedid:%s" % expid
1090                for k in self.synch_store.all_keys():
1091                    if len(k) > 45 and k[0:46] == var_key:
1092                        self.synch_store.revoke_key(k)
1093                revoked = True
[69692a9]1094
[cc8d8e9]1095        failed = [ t.getName() for t in threads if not t.rv ]
1096        succeeded = [tb for tb in allocated.keys() if tb not in failed]
[3132419]1097
[cc8d8e9]1098        # If one failed clean up, unless fail_soft is set
[32e7d93]1099        if failed:
[cc8d8e9]1100            if not fail_soft:
1101                thread_pool.clear()
1102                for tb in succeeded:
1103                    # Create and start a thread to stop the segment
1104                    thread_pool.wait_for_slot()
[0fa1729]1105                    uri = tbparams[tb]['uri']
[cc8d8e9]1106                    t  = self.pooled_thread(\
[32e7d93]1107                            target=self.terminate_segment(log=log,
[fd556d1]1108                                testbed=tb,
[32e7d93]1109                                cert_file=self.cert_file, 
1110                                cert_pwd=self.cert_pwd,
1111                                trusted_certs=self.trusted_certs,
1112                                caller=self.call_TerminateSegment),
1113                            args=(uri, tbparams[tb]['federant']['allocID']),
1114                            name=tb,
[cc8d8e9]1115                            pdata=thread_pool, trace_file=self.trace_file)
1116                    t.start()
[f52f5df]1117                # Wait until all finish (if any are being stopped)
1118                if succeeded:
1119                    thread_pool.wait_for_all_done()
[cc8d8e9]1120
1121                # release the allocations
1122                for tb in tbparams.keys():
[69692a9]1123                    self.release_access(tb, tbparams[tb]['allocID'],
1124                            tbparams[tb].get('uri', None))
[cc8d8e9]1125                # Remove the placeholder
1126                self.state_lock.acquire()
1127                self.state[eid]['experimentStatus'] = 'failed'
1128                if self.state_filename: self.write_state()
1129                self.state_lock.release()
1130
1131                log.error("Swap in failed on %s" % ",".join(failed))
1132                return
1133        else:
1134            log.info("[start_segment]: Experiment %s active" % eid)
1135
1136
1137        # Walk up tmpdir, deleting as we go
[69692a9]1138        if self.cleanup:
1139            log.debug("[start_experiment]: removing %s" % tmpdir)
1140            for path, dirs, files in os.walk(tmpdir, topdown=False):
1141                for f in files:
1142                    os.remove(os.path.join(path, f))
1143                for d in dirs:
1144                    os.rmdir(os.path.join(path, d))
1145            os.rmdir(tmpdir)
1146        else:
1147            log.debug("[start_experiment]: not removing %s" % tmpdir)
[cc8d8e9]1148
1149        # Insert the experiment into our state and update the disk copy
1150        self.state_lock.acquire()
1151        self.state[expid]['experimentStatus'] = 'active'
1152        self.state[eid] = self.state[expid]
1153        if self.state_filename: self.write_state()
1154        self.state_lock.release()
1155        return
1156
1157
[895a133]1158    def add_kit(self, e, kit):
1159        """
1160        Add a Software object created from the list of (install, location)
1161        tuples passed as kit  to the software attribute of an object e.  We
1162        do this enough to break out the code, but it's kind of a hack to
1163        avoid changing the old tuple rep.
1164        """
1165
1166        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1167
1168        if isinstance(e.software, list): e.software.extend(s)
1169        else: e.software = s
1170
1171
[a3ad8bd]1172    def create_experiment_state(self, fid, req, expid, expcert, 
1173            state='starting'):
[895a133]1174        """
1175        Create the initial entry in the experiment's state.  The expid and
1176        expcert are the experiment's fedid and certifacte that represents that
1177        ID, which are installed in the experiment state.  If the request
1178        includes a suggested local name that is used if possible.  If the local
1179        name is already taken by an experiment owned by this user that has
[a3ad8bd]1180        failed, it is overwritten.  Otherwise new letters are added until a
[895a133]1181        valid localname is found.  The generated local name is returned.
1182        """
1183
1184        if req.has_key('experimentID') and \
1185                req['experimentID'].has_key('localname'):
1186            overwrite = False
1187            eid = req['experimentID']['localname']
1188            # If there's an old failed experiment here with the same local name
1189            # and accessible by this user, we'll overwrite it, otherwise we'll
1190            # fall through and do the collision avoidance.
1191            old_expid = self.get_experiment_fedid(eid)
1192            if old_expid and self.check_experiment_access(fid, old_expid):
1193                self.state_lock.acquire()
1194                status = self.state[eid].get('experimentStatus', None)
1195                if status and status == 'failed':
1196                    # remove the old access attribute
1197                    self.auth.unset_attribute(fid, old_expid)
1198                    overwrite = True
1199                    del self.state[eid]
1200                    del self.state[old_expid]
1201                self.state_lock.release()
1202            self.state_lock.acquire()
1203            while (self.state.has_key(eid) and not overwrite):
1204                eid += random.choice(string.ascii_letters)
1205            # Initial state
1206            self.state[eid] = {
1207                    'experimentID' : \
1208                            [ { 'localname' : eid }, {'fedid': expid } ],
[a3ad8bd]1209                    'experimentStatus': state,
[895a133]1210                    'experimentAccess': { 'X509' : expcert },
1211                    'owner': fid,
1212                    'log' : [],
1213                }
1214            self.state[expid] = self.state[eid]
1215            if self.state_filename: self.write_state()
1216            self.state_lock.release()
1217        else:
1218            eid = self.exp_stem
1219            for i in range(0,5):
1220                eid += random.choice(string.ascii_letters)
1221            self.state_lock.acquire()
1222            while (self.state.has_key(eid)):
1223                eid = self.exp_stem
1224                for i in range(0,5):
1225                    eid += random.choice(string.ascii_letters)
1226            # Initial state
1227            self.state[eid] = {
1228                    'experimentID' : \
1229                            [ { 'localname' : eid }, {'fedid': expid } ],
[a3ad8bd]1230                    'experimentStatus': state,
[895a133]1231                    'experimentAccess': { 'X509' : expcert },
1232                    'owner': fid,
1233                    'log' : [],
1234                }
1235            self.state[expid] = self.state[eid]
1236            if self.state_filename: self.write_state()
1237            self.state_lock.release()
1238
1239        return eid
1240
1241
1242    def allocate_ips_to_topo(self, top):
1243        """
[69692a9]1244        Add an ip4_address attribute to all the hosts in the topology, based on
[895a133]1245        the shared substrates on which they sit.  An /etc/hosts file is also
[69692a9]1246        created and returned as a list of hostfiles entries.  We also return
1247        the allocator, because we may need to allocate IPs to portals
1248        (specifically DRAGON portals).
[895a133]1249        """
1250        subs = sorted(top.substrates, 
1251                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1252                reverse=True)
1253        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1254        ifs = { }
1255        hosts = [ ]
1256
1257        for idx, s in enumerate(subs):
[289ff7e]1258            net_size = len(s.interfaces)+2
1259
1260            a = ips.allocate(net_size)
[895a133]1261            if a :
1262                base, num = a
[289ff7e]1263                if num < net_size: 
[895a133]1264                    raise service_error(service_error.internal,
1265                            "Allocator returned wrong number of IPs??")
1266            else:
1267                raise service_error(service_error.req, 
1268                        "Cannot allocate IP addresses")
[289ff7e]1269            mask = 2
1270            while 2 **mask < net_size:
1271                mask += 1
1272
1273            netmask = ((2**32-1) ^ (mask**2 -1))
1274            print "%d %x" % (mask , netmask)
[895a133]1275
1276            base += 1
1277            for i in s.interfaces:
1278                i.attribute.append(
1279                        topdl.Attribute('ip4_address', 
1280                            "%s" % ip_addr(base)))
[289ff7e]1281                i.attribute.append(
1282                        topdl.Attribute('ip4_netmask', 
1283                            "%s" % ip_addr(int(netmask))))
1284
[895a133]1285                hname = i.element.name[0]
1286                if ifs.has_key(hname):
1287                    hosts.append("%s\t%s-%s %s-%d" % \
1288                            (ip_addr(base), hname, s.name, hname,
1289                                ifs[hname]))
1290                else:
1291                    ifs[hname] = 0
1292                    hosts.append("%s\t%s-%s %s-%d %s" % \
1293                            (ip_addr(base), hname, s.name, hname,
1294                                ifs[hname], hname))
1295
1296                ifs[hname] += 1
1297                base += 1
[69692a9]1298        return hosts, ips
[895a133]1299
[99eb8cf]1300    def get_access_to_testbeds(self, testbeds, access_user, 
[e02cd14]1301            export_project, master, allocated, tbparams, services):
[895a133]1302        """
1303        Request access to the various testbeds required for this instantiation
1304        (passed in as testbeds).  User, access_user, expoert_project and master
1305        are used to construct the correct requests.  Per-testbed parameters are
1306        returned in tbparams.
1307        """
1308        for tb in testbeds:
[99eb8cf]1309            self.get_access(tb, None, tbparams, master,
[e02cd14]1310                    export_project, access_user, services)
[895a133]1311            allocated[tb] = 1
1312
1313    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1314        """
[e02cd14]1315        Create the sub-topologies that are needed for experiment instantiation.
[895a133]1316        """
1317        for tb in testbeds:
1318            topo[tb] = top.clone()
1319            to_delete = [ ]
[e02cd14]1320            # XXX: copy in for loop to simplify
[895a133]1321            for e in topo[tb].elements:
1322                etb = e.get_attribute('testbed')
1323                if etb and etb != tb:
1324                    for i in e.interface:
1325                        for s in i.subs:
1326                            try:
1327                                s.interfaces.remove(i)
1328                            except ValueError:
1329                                raise service_error(service_error.internal,
1330                                        "Can't remove interface??")
1331                    to_delete.append(e)
1332            for e in to_delete:
1333                topo[tb].elements.remove(e)
1334            topo[tb].make_indices()
1335
1336            for e in [ e for e in topo[tb].elements \
1337                    if isinstance(e,topdl.Computer)]:
1338                if self.fedkit: self.add_kit(e, self.fedkit)
1339
[13e3dd2]1340    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
[2761484]1341            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
1342            expid=None):
[e02cd14]1343        """
1344        Return a new internet portal node and a dict with the connectionInfo to
1345        be attached.
1346        """
[d87778f]1347        dproject = tbparams[dt].get('project', 'project')
1348        ddomain = tbparams[dt].get('domain', ".example.com")
[13e3dd2]1349        mdomain = tbparams[master].get('domain', '.example.com')
[e02cd14]1350        mproject = tbparams[master].get('project', 'project')
[13e3dd2]1351        muser = tbparams[master].get('user', 'root')
1352        smbshare = tbparams[master].get('smbshare', 'USERS')
[e02cd14]1353
[13e3dd2]1354        if st == master or dt == master:
1355            active = ("%s" % (st == master))
1356        else:
[e02cd14]1357            active = ("%s" % (st > dt))
[13e3dd2]1358
[69692a9]1359        ifaces = [ ]
1360        for sub, attrs in iface_desc:
1361            inf = topdl.Interface(
[5b74b63]1362                    name="inf%03d" % len(ifaces),
[69692a9]1363                    substrate=sub,
1364                    attribute=[
1365                        topdl.Attribute(
1366                            attribute=n,
1367                            value = v)
1368                        for n, v in attrs
[13e3dd2]1369                        ]
[69692a9]1370                    )
1371            ifaces.append(inf)
[617592b]1372        if conn_type == "ssh":
[2761484]1373            try:
1374                aid = tbparams[st]['allocID']['fedid']
1375            except:
[109a32a]1376                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
1377                        % st)
[2761484]1378                aid = None
[617592b]1379            info = {
1380                    "type" : conn_type, 
1381                    "portal": myname,
1382                    'fedAttr': [ 
1383                            { 'attribute': 'masterdomain', 'value': mdomain},
1384                            { 'attribute': 'masterexperiment', 'value': 
1385                                "%s/%s" % (mproject, eid)},
1386                            { 'attribute': 'active', 'value': active},
1387                            # Move to SMB service description
1388                            { 'attribute': 'masteruser', 'value': muser},
1389                            { 'attribute': 'smbshare', 'value': smbshare},
1390                        ],
[2761484]1391                    'parameter': [
1392                        {
1393                            'name': 'peer',
1394                            'key': 'fedid:%s/%s' % (expid, myname),
1395                            'store': self.store_url,
1396                            'type': 'output',
1397                        },
1398                        {
1399                            'name': 'peer',
1400                            'key': 'fedid:%s/%s' % (expid, desthost),
1401                            'store': self.store_url,
1402                            'type': 'input',
1403                        },
1404                        ]
[617592b]1405                    }
[2761484]1406            # Give this allocation the rights to access the key of the
1407            # peers
1408            if aid:
1409                for h in (myname, desthost):
1410                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
1411            else:
1412                self.log.error("No aid for %s in new_portal_node" % st)
[617592b]1413        else:
1414            info = None
[5b74b63]1415
[e02cd14]1416        return (topdl.Computer(
[13e3dd2]1417                name=myname,
1418                attribute=[ 
1419                    topdl.Attribute(attribute=n,value=v)
1420                        for n, v in (\
1421                            ('portal', 'true'),
1422                            ('portal_type', portal_type), 
[641bb66]1423                        )
[13e3dd2]1424                    ],
1425                interface=ifaces,
[e02cd14]1426                ), info)
[13e3dd2]1427
[2761484]1428    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
[13e3dd2]1429        ddomain = tbparams[dt].get('domain', ".example.com")
1430        dproject = tbparams[dt].get('project', 'project')
1431        tsubstrate = \
1432                topdl.Substrate(name='%s-%s' % (st, dt),
1433                        attribute= [
1434                            topdl.Attribute(
1435                                attribute='portal',
1436                                value='true')
1437                            ]
1438                        )
1439        segment_element = topdl.Segment(
1440                id= tbparams[dt]['allocID'],
1441                type='emulab',
1442                uri = self.tbmap.get(dt, None),
1443                interface=[ 
1444                    topdl.Interface(
1445                        substrate=tsubstrate.name),
1446                    ],
1447                attribute = [
1448                    topdl.Attribute(attribute=n, value=v)
1449                        for n, v in (\
1450                            ('domain', ddomain),
1451                            ('experiment', "%s/%s" % \
1452                                    (dproject, eid)),)
1453                    ],
1454                )
1455
1456        return (tsubstrate, segment_element)
1457
[2761484]1458    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
[69692a9]1459        if sub.capacity is None:
1460            raise service_error(service_error.internal,
1461                    "Cannot DRAGON split substrate w/o capacity")
1462        segs = [ ]
1463        substr = topdl.Substrate(name="dragon%d" % idx, 
1464                capacity=sub.capacity.clone(),
1465                attribute=[ topdl.Attribute(attribute=n, value=v)
1466                    for n, v, in (\
1467                            ('vlan', 'unassigned%d' % idx),)])
[2761484]1468        name = "dragon%d" % idx
[109a32a]1469        store_key = 'fedid:%s/vlan%d' % (expid, idx)
[69692a9]1470        for tb in tbs.keys():
1471            seg = topdl.Segment(
1472                    id = tbparams[tb]['allocID'],
1473                    type='emulab',
1474                    uri = self.tbmap.get(tb, None),
1475                    interface=[ 
1476                        topdl.Interface(
1477                            substrate=substr.name),
1478                        ],
1479                    attribute=[ topdl.Attribute(
[ecf679e]1480                        attribute='dragon_endpoint', 
1481                        value=tbparams[tb]['dragon']),
[69692a9]1482                        ]
1483                    )
1484            if tbparams[tb].has_key('vlans'):
1485                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1486            segs.append(seg)
1487
[109a32a]1488            # Give this allocation the rights to access the key of the
1489            # vlan_id
1490            try:
1491                aid = tbparams[tb]['allocID']['fedid']
1492                self.auth.set_attribute(aid, store_key)
1493            except:
1494                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
1495                        % tb)
[2761484]1496
1497        connInfo[name] = [ { 
[109a32a]1498            'type': 'transit',
[2761484]1499            'parameter': [ { 
1500                'name': 'vlan_id',
[109a32a]1501                'key': store_key,
[2761484]1502                'store': self.store_url,
1503                'type': 'output'
1504                } ]
1505            } ]
1506
1507        topo[name] = \
[69692a9]1508                topdl.Topology(substrates=[substr], elements=segs,
1509                        attribute=[
1510                            topdl.Attribute(attribute="transit", value='true'),
1511                            topdl.Attribute(attribute="dynamic", value='true'),
[109a32a]1512                            topdl.Attribute(attribute="testbed", 
1513                                value='dragon'),
1514                            topdl.Attribute(attribute="store_keys", 
1515                                value=store_key),
[69692a9]1516                            ]
1517                        )
1518
[5b74b63]1519    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
[109a32a]1520            connInfo, expid=None):
[69692a9]1521        """
1522        Add attribiutes to the various elements indicating that they are to be
[5b74b63]1523        dragon connected and create a dragon segment in topo to be
[69692a9]1524        instantiated.
1525        """
1526
1527        def get_substrate_from_topo(name, t):
1528            for s in t.substrates:
1529                if s.name == name: return s
1530            else: return None
1531
[109a32a]1532
[617592b]1533        mdomain = tbparams[master].get('domain', '.example.com')
1534        mproject = tbparams[master].get('project', 'project')
[5b74b63]1535        # dn is the number of previously created dragon nets.  This routine
1536        # creates a net numbered by dn
[69692a9]1537        dn = len([x for x in topo.keys() if x.startswith('dragon')])
[5b74b63]1538        # Count the number of interfaces on this substrate in each testbed from
1539        # the global topology
[69692a9]1540        count = { }
[617592b]1541        node = { }
[5b74b63]1542        for e in [ i.element for i in sub.interfaces ]:
[69692a9]1543            tb = e.get_attribute('testbed')
1544            count[tb] = count.get(tb, 0) + 1
[617592b]1545            node[tb] = i.get_attribute('ip4_address')
1546
[69692a9]1547
[5b74b63]1548        # Set the attributes in the copies that will allow setup of dragon
1549        # connections.
[69692a9]1550        for tb in tbs.keys():
1551            s = get_substrate_from_topo(sub.name, topo[tb])
1552            if s:
[5b74b63]1553                if not connInfo.has_key(tb):
1554                    connInfo[tb] = [ ]
[617592b]1555
[2761484]1556                try:
1557                    aid = tbparams[tb]['allocID']['fedid']
1558                except:
[109a32a]1559                    self.log.debug("[creat_dragon_substrate] " + 
1560                            "Can't get alloc id for %s?" %tb)
[2761484]1561                    aid = None
1562
[617592b]1563                # This may need another look, but only a service gateway will
1564                # look at the active parameter, and these are only inserted to
1565                # connect to the master.
[238db1e]1566                active = "%s" % ( tb == master)
[5b74b63]1567                info = {
1568                        'type': 'transit',
1569                        'member': [ {
1570                            'element': i.element.name[0], 
1571                            'interface': i.name
[617592b]1572                            } for i in s.interfaces \
1573                                    if isinstance(i.element, topdl.Computer) ],
1574                        'fedAttr': [ 
1575                            { 'attribute': 'masterdomain', 'value': mdomain},
1576                            { 'attribute': 'masterexperiment', 'value': 
1577                                "%s/%s" % (mproject, eid)},
1578                            { 'attribute': 'active', 'value': active},
1579                            ],
[2761484]1580                        'parameter': [ {
1581                            'name': 'vlan_id',
1582                            'key': 'fedid:%s/vlan%d' % (expid, dn),
1583                            'store': self.store_url,
1584                            'type': 'input',
1585                            } ]
[5b74b63]1586                        }
[109a32a]1587                if tbs.has_key(tb):
1588                    info['peer'] = tbs[tb]
[5b74b63]1589                connInfo[tb].append(info)
[2761484]1590
1591                # Give this allocation the rights to access the key of the
1592                # vlan_id
1593                if aid:
1594                    self.auth.set_attribute(aid, 
1595                            'fedid:%s/vlan%d' % (expid, dn))
[69692a9]1596            else:
1597                raise service_error(service_error.internal,
1598                        "No substrate %s in testbed %s" % (sub.name, tb))
1599
[109a32a]1600        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
[69692a9]1601
1602    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
[2761484]1603            segment_substrate, portals, connInfo, expid):
[69692a9]1604        # More than one testbed is on this substrate.  Insert
1605        # some portals into the subtopologies.  st == source testbed,
1606        # dt == destination testbed.
1607        for st in tbs.keys():
1608            if not segment_substrate.has_key(st):
1609                segment_substrate[st] = { }
1610            if not portals.has_key(st): 
1611                portals[st] = { }
[e02cd14]1612            if not connInfo.has_key(st):
1613                connInfo[st] = [ ]
[69692a9]1614            for dt in [ t for t in tbs.keys() if t != st]:
1615                sproject = tbparams[st].get('project', 'project')
1616                dproject = tbparams[dt].get('project', 'project')
1617                mproject = tbparams[master].get('project', 'project')
1618                sdomain = tbparams[st].get('domain', ".example.com")
1619                ddomain = tbparams[dt].get('domain', ".example.com")
1620                mdomain = tbparams[master].get('domain', '.example.com')
1621                muser = tbparams[master].get('user', 'root')
1622                smbshare = tbparams[master].get('smbshare', 'USERS')
1623                aid = tbparams[dt]['allocID']['fedid']
1624                if st == master or dt == master:
1625                    active = ("%s" % (st == master))
1626                else:
1627                    active = ("%s" %(st > dt))
1628                if not segment_substrate[st].has_key(dt):
1629                    # Put a substrate and a segment for the connected
1630                    # testbed in there.
1631                    tsubstrate, segment_element = \
[2761484]1632                            self.new_portal_substrate(st, dt, eid, tbparams,
1633                                    expid)
[69692a9]1634                    segment_substrate[st][dt] = tsubstrate
1635                    topo[st].substrates.append(tsubstrate)
1636                    topo[st].elements.append(segment_element)
1637
1638                new_portal = False
1639                if portals[st].has_key(dt):
1640                    # There's a portal set up to go to this destination.
[e02cd14]1641                    # See if there's room to multiplex this connection on
[69692a9]1642                    # it.  If so, add an interface to the portal; if not,
1643                    # set up to add a portal below.
1644                    # [This little festival of braces is just a pop of the
1645                    # last element in the list of portals between st and
1646                    # dt.]
1647                    portal = portals[st][dt][-1]
1648                    mux = len([ i for i in portal.interface \
1649                            if not i.get_attribute('portal')])
1650                    if mux == self.muxmax:
1651                        new_portal = True
1652                        portal_type = "experiment"
1653                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
[2761484]1654                        desthost = "%stunnel%d" % (st.lower(), 
1655                                len(portals[st][dt]))
[69692a9]1656                    else:
1657                        new_i = topdl.Interface(
[6e44258]1658                                substrate=sub.name,
[69692a9]1659                                attribute=[ 
1660                                    topdl.Attribute(
1661                                        attribute='ip4_address', 
1662                                        value=tbs[dt]
1663                                    )
1664                                ])
1665                        portal.interface.append(new_i)
1666                else:
1667                    # First connection to this testbed, make an empty list
1668                    # and set up to add the new portal below
1669                    new_portal = True
1670                    portals[st][dt] = [ ]
1671                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
[2761484]1672                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
[69692a9]1673
1674                    if dt == master or st == master: portal_type = "both"
1675                    else: portal_type = "experiment"
1676
1677                if new_portal:
1678                    infs = (
1679                            (segment_substrate[st][dt].name, 
1680                                (('portal', 'true'),)),
1681                            (sub.name, 
1682                                (('ip4_address', tbs[dt]),))
1683                        )
[e02cd14]1684                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
[69692a9]1685                            master, eid, myname, desthost, portal_type,
[2761484]1686                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
[69692a9]1687                    if self.fedkit:
1688                        self.add_kit(portal, self.fedkit)
1689                    if self.gatewaykit: 
1690                        self.add_kit(portal, self.gatewaykit)
1691
1692                    topo[st].elements.append(portal)
1693                    portals[st][dt].append(portal)
[e02cd14]1694                    connInfo[st].append(info)
[69692a9]1695
[2761484]1696    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo, expid):
[69692a9]1697        # Add to the master testbed
1698        tsubstrate, segment_element = \
[2761484]1699                self.new_portal_substrate(st, dt, eid, tbparams, expid)
[69692a9]1700        myname = "%stunnel" % dt
1701        desthost = "%stunnel" % st
1702
[e02cd14]1703        portal, info = self.new_portal_node(st, dt, tbparams, master,
[69692a9]1704                eid, myname, desthost, "control", 
[2761484]1705                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
1706                conn_attrs=[], expid=expid)
[69692a9]1707        if self.fedkit:
1708            self.add_kit(portal, self.fedkit)
1709        if self.gatewaykit: 
1710            self.add_kit(portal, self.gatewaykit)
1711
1712        topo[st].substrates.append(tsubstrate)
1713        topo[st].elements.append(segment_element)
1714        topo[st].elements.append(portal)
[e02cd14]1715        if not connInfo.has_key(st):
1716            connInfo[st] = [ ]
1717        connInfo[st].append(info)
[69692a9]1718
[6e44258]1719    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
[2761484]1720            substrate, tbparams, expid):
[69692a9]1721        # Add to the master testbed
1722        myname = "%stunnel" % dt
1723        desthost = "%s" % ip_addr(dip)
1724
[5b74b63]1725        portal, info = self.new_portal_node(st, dt, tbparams, master,
[69692a9]1726                eid, myname, desthost, "control", 
1727                ((substrate.name,(
1728                    ('portal','true'),
[5b74b63]1729                    ('ip4_address', "%s" % ip_addr(myip)),)),),
[2761484]1730                conn_type="transit", conn_attrs=[], expid=expid)
[69692a9]1731        if self.fedkit:
1732            self.add_kit(portal, self.fedkit)
1733        if self.gatewaykit: 
1734            self.add_kit(portal, self.gatewaykit)
1735
[617592b]1736        return portal
[69692a9]1737
[e02cd14]1738    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
[2761484]1739            connInfo, expid):
[895a133]1740        """
1741        For each substrate in the main topology, find those that
1742        have nodes on more than one testbed.  Insert portal nodes
1743        into the copies of those substrates on the sub topologies.
1744        """
[13e3dd2]1745        segment_substrate = { }
1746        portals = { }
[895a133]1747        for s in top.substrates:
1748            # tbs will contain an ip address on this subsrate that is in
1749            # each testbed.
1750            tbs = { }
1751            for i in s.interfaces:
1752                e = i.element
1753                tb = e.get_attribute('testbed')
1754                if tb and not tbs.has_key(tb):
1755                    for i in e.interface:
1756                        if s in i.subs:
1757                            tbs[tb]= i.get_attribute('ip4_address')
1758            if len(tbs) < 2:
1759                continue
1760
[69692a9]1761            # DRAGON will not create multi-site vlans yet
1762            if len(tbs) == 2 and \
1763                    all([tbparams[x].has_key('dragon') for x in tbs]):
1764                self.create_dragon_substrate(s, topo, tbs, tbparams, 
[2761484]1765                        master, eid, connInfo, expid)
[69692a9]1766            else:
1767                self.insert_internet_portals(s, topo, tbs, tbparams, master,
[2761484]1768                        eid, segment_substrate, portals, connInfo, expid)
[13e3dd2]1769
1770        # Make sure that all the slaves have a control portal back to the
1771        # master.
1772        for tb in [ t for t in tbparams.keys() if t != master ]:
1773            if len([e for e in topo[tb].elements \
1774                    if isinstance(e, topdl.Computer) and \
1775                    e.get_attribute('portal') and \
1776                    e.get_attribute('portal_type') == 'both']) == 0:
1777
[69692a9]1778                if tbparams[master].has_key('dragon') \
1779                        and tbparams[tb].has_key('dragon'):
1780
1781                    idx = len([x for x in topo.keys() \
1782                            if x.startswith('dragon')])
1783                    dip, leng = ip_allocator.allocate(4)
1784                    dip += 1
[6e44258]1785                    mip = dip+1
[69692a9]1786                    csub = topdl.Substrate(
1787                            name="dragon-control-%s" % tb,
1788                            capacity=topdl.Capacity(100000.0, 'max'),
1789                            attribute=[
1790                                topdl.Attribute(
1791                                    attribute='portal',
1792                                    value='true'
1793                                    )
1794                                ]
1795                            )
1796                    seg = topdl.Segment(
1797                            id= tbparams[master]['allocID'],
1798                            type='emulab',
1799                            uri = self.tbmap.get(master, None),
1800                            interface=[ 
1801                                topdl.Interface(
1802                                    substrate=csub.name),
1803                                ],
1804                            attribute = [
1805                                topdl.Attribute(attribute=n, value=v)
1806                                    for n, v in (\
1807                                        ('domain', 
1808                                            tbparams[master].get('domain',
1809                                                ".example.com")),
1810                                        ('experiment', "%s/%s" % \
1811                                                (tbparams[master].get(
1812                                                    'project', 
1813                                                    'project'), 
1814                                                    eid)),)
1815                                ],
1816                            )
[617592b]1817                    portal = self.new_dragon_portal(tb, master,
[2761484]1818                            master, eid, dip, mip, idx, csub, tbparams, expid)
[69692a9]1819                    topo[tb].substrates.append(csub)
[5b74b63]1820                    topo[tb].elements.append(portal)
[69692a9]1821                    topo[tb].elements.append(seg)
1822
1823                    mcsub = csub.clone()
1824                    seg = topdl.Segment(
1825                            id= tbparams[tb]['allocID'],
1826                            type='emulab',
1827                            uri = self.tbmap.get(tb, None),
1828                            interface=[ 
1829                                topdl.Interface(
1830                                    substrate=csub.name),
1831                                ],
1832                            attribute = [
1833                                topdl.Attribute(attribute=n, value=v)
1834                                    for n, v in (\
1835                                        ('domain', 
1836                                            tbparams[tb].get('domain',
1837                                                ".example.com")),
1838                                        ('experiment', "%s/%s" % \
1839                                                (tbparams[tb].get('project', 
1840                                                    'project'), 
1841                                                    eid)),)
1842                                ],
1843                            )
[617592b]1844                    portal = self.new_dragon_portal(master, tb, master,
[2761484]1845                            eid, mip, dip, idx, mcsub, tbparams, expid)
[69692a9]1846                    topo[master].substrates.append(mcsub)
[5b74b63]1847                    topo[master].elements.append(portal)
[69692a9]1848                    topo[master].elements.append(seg)
[617592b]1849                    for t in (master, tb):
1850                        topo[t].incorporate_elements()
[69692a9]1851
1852                    self.create_dragon_substrate(csub, topo, 
[109a32a]1853                            {tb: ip_addr(mip), master: ip_addr(dip)}, 
1854                            tbparams, master, eid, connInfo,
1855                            expid)
[69692a9]1856                else:
1857                    self.add_control_portal(master, tb, master, eid, topo, 
[2761484]1858                            tbparams, connInfo, expid)
[69692a9]1859                    self.add_control_portal(tb, master, master, eid, topo, 
[2761484]1860                            tbparams, connInfo, expid)
[13e3dd2]1861
1862        # Connect the portal nodes into the topologies and clear out
[895a133]1863        # substrates that are not in the topologies
1864        for tb in tbparams.keys():
1865            topo[tb].incorporate_elements()
1866            topo[tb].substrates = \
1867                    [s for s in topo[tb].substrates \
1868                        if len(s.interfaces) >0]
1869
1870    def wrangle_software(self, expid, top, topo, tbparams):
1871        """
1872        Copy software out to the repository directory, allocate permissions and
1873        rewrite the segment topologies to look for the software in local
1874        places.
1875        """
1876
1877        # Copy the rpms and tarfiles to a distribution directory from
1878        # which the federants can retrieve them
1879        linkpath = "%s/software" %  expid
1880        softdir ="%s/%s" % ( self.repodir, linkpath)
1881        softmap = { }
1882        # These are in a list of tuples format (each kit).  This comprehension
1883        # unwraps them into a single list of tuples that initilaizes the set of
1884        # tuples.
1885        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1886                for p, t in l ])
1887        pkgs.update([x.location for e in top.elements \
1888                for x in e.software])
1889        try:
1890            os.makedirs(softdir)
1891        except IOError, e:
1892            raise service_error(
1893                    "Cannot create software directory: %s" % e)
1894        # The actual copying.  Everything's converted into a url for copying.
1895        for pkg in pkgs:
1896            loc = pkg
1897
1898            scheme, host, path = urlparse(loc)[0:3]
1899            dest = os.path.basename(path)
1900            if not scheme:
1901                if not loc.startswith('/'):
1902                    loc = "/%s" % loc
1903                loc = "file://%s" %loc
1904            try:
1905                u = urlopen(loc)
1906            except Exception, e:
1907                raise service_error(service_error.req, 
1908                        "Cannot open %s: %s" % (loc, e))
1909            try:
1910                f = open("%s/%s" % (softdir, dest) , "w")
1911                self.log.debug("Writing %s/%s" % (softdir,dest) )
1912                data = u.read(4096)
1913                while data:
1914                    f.write(data)
1915                    data = u.read(4096)
1916                f.close()
1917                u.close()
1918            except Exception, e:
1919                raise service_error(service_error.internal,
1920                        "Could not copy %s: %s" % (loc, e))
1921            path = re.sub("/tmp", "", linkpath)
1922            # XXX
1923            softmap[pkg] = \
[7183b48]1924                    "%s/%s/%s" %\
1925                    ( self.repo_url, path, dest)
[895a133]1926
1927            # Allow the individual segments to access the software.
1928            for tb in tbparams.keys():
1929                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1930                        "/%s/%s" % ( path, dest))
1931
1932        # Convert the software locations in the segments into the local
1933        # copies on this host
1934        for soft in [ s for tb in topo.values() \
1935                for e in tb.elements \
1936                    if getattr(e, 'software', False) \
1937                        for s in e.software ]:
1938            if softmap.has_key(soft.location):
1939                soft.location = softmap[soft.location]
1940
1941
[a3ad8bd]1942    def new_experiment(self, req, fid):
1943        """
1944        The external interface to empty initial experiment creation called from
1945        the dispatcher.
1946
1947        Creates a working directory, splits the incoming description using the
1948        splitter script and parses out the avrious subsections using the
1949        lcasses above.  Once each sub-experiment is created, use pooled threads
1950        to instantiate them and start it all up.
1951        """
1952        if not self.auth.check_attribute(fid, 'new'):
1953            raise service_error(service_error.access, "New access denied")
1954
1955        try:
1956            tmpdir = tempfile.mkdtemp(prefix="split-")
1957        except IOError:
1958            raise service_error(service_error.internal, "Cannot create tmp dir")
1959
1960        try:
1961            access_user = self.accessdb[fid]
1962        except KeyError:
1963            raise service_error(service_error.internal,
1964                    "Access map and authorizer out of sync in " + \
[7183b48]1965                            "new_experiment for fedid %s"  % fid)
[a3ad8bd]1966
1967        pid = "dummy"
1968        gid = "dummy"
1969
1970        req = req.get('NewRequestBody', None)
1971        if not req:
1972            raise service_error(service_error.req,
1973                    "Bad request format (no NewRequestBody)")
1974
1975        # Generate an ID for the experiment (slice) and a certificate that the
1976        # allocator can use to prove they own it.  We'll ship it back through
1977        # the encrypted connection.
1978        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1979
1980        #now we're done with the tmpdir, and it should be empty
1981        if self.cleanup:
1982            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1983            os.rmdir(tmpdir)
1984        else:
1985            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1986
1987        eid = self.create_experiment_state(fid, req, expid, expcert, 
1988                state='empty')
1989
1990        # Let users touch the state
1991        self.auth.set_attribute(fid, expid)
1992        self.auth.set_attribute(expid, expid)
1993        # Override fedids can manipulate state as well
1994        for o in self.overrides:
1995            self.auth.set_attribute(o, expid)
1996
1997        rv = {
1998                'experimentID': [
1999                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2000                ],
2001                'experimentStatus': 'empty',
2002                'experimentAccess': { 'X509' : expcert }
2003            }
2004
2005        return rv
2006
2007
[e19b75c]2008    def create_experiment(self, req, fid):
[db6b092]2009        """
2010        The external interface to experiment creation called from the
2011        dispatcher.
2012
2013        Creates a working directory, splits the incoming description using the
2014        splitter script and parses out the avrious subsections using the
2015        lcasses above.  Once each sub-experiment is created, use pooled threads
2016        to instantiate them and start it all up.
2017        """
[7183b48]2018
2019        req = req.get('CreateRequestBody', None)
2020        if not req:
2021            raise service_error(service_error.req,
2022                    "Bad request format (no CreateRequestBody)")
2023
2024        # Get the experiment access
2025        exp = req.get('experimentID', None)
2026        if exp:
2027            if exp.has_key('fedid'):
2028                key = exp['fedid']
2029                expid = key
2030                eid = None
2031            elif exp.has_key('localname'):
2032                key = exp['localname']
2033                eid = key
2034                expid = None
2035            else:
2036                raise service_error(service_error.req, "Unknown lookup type")
2037        else:
2038            raise service_error(service_error.req, "No request?")
2039
2040        self.check_experiment_access(fid, key)
[db6b092]2041
2042        try:
2043            tmpdir = tempfile.mkdtemp(prefix="split-")
[895a133]2044            os.mkdir(tmpdir+"/keys")
[db6b092]2045        except IOError:
2046            raise service_error(service_error.internal, "Cannot create tmp dir")
2047
2048        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2049        gw_secretkey_base = "fed.%s" % self.ssh_type
2050        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2051        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2052        tclfile = tmpdir + "/experiment.tcl"
2053        tbparams = { }
2054        try:
2055            access_user = self.accessdb[fid]
2056        except KeyError:
2057            raise service_error(service_error.internal,
2058                    "Access map and authorizer out of sync in " + \
2059                            "create_experiment for fedid %s"  % fid)
2060
2061        pid = "dummy"
2062        gid = "dummy"
2063
2064        # The tcl parser needs to read a file so put the content into that file
2065        descr=req.get('experimentdescription', None)
2066        if descr:
2067            file_content=descr.get('ns2description', None)
2068            if file_content:
2069                try:
2070                    f = open(tclfile, 'w')
2071                    f.write(file_content)
2072                    f.close()
2073                except IOError:
2074                    raise service_error(service_error.internal,
2075                            "Cannot write temp experiment description")
2076            else:
2077                raise service_error(service_error.req, 
2078                        "Only ns2descriptions supported")
2079        else:
2080            raise service_error(service_error.req, "No experiment description")
2081
[7183b48]2082        self.state_lock.acquire()
2083        if self.state.has_key(key):
[4afcfc4]2084            self.state[key]['experimentStatus'] = "starting"
[7183b48]2085            for e in self.state[key].get('experimentID',[]):
2086                if not expid and e.has_key('fedid'):
2087                    expid = e['fedid']
2088                elif not eid and e.has_key('localname'):
2089                    eid = e['localname']
2090        self.state_lock.release()
2091
2092        if not (eid and expid):
2093            raise service_error(service_error.internal, 
2094                    "Cannot find local experiment info!?")
[db6b092]2095
2096        try: 
2097            # This catches exceptions to clear the placeholder if necessary
2098            try:
2099                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2100            except ValueError:
2101                raise service_error(service_error.server_config, 
2102                        "Bad key type (%s)" % self.ssh_type)
2103
2104            master = req.get('master', None)
2105            if not master:
2106                raise service_error(service_error.req,
2107                        "No master testbed label")
2108            export_project = req.get('exportProject', None)
2109            if not export_project:
2110                raise service_error(service_error.req, "No export project")
[895a133]2111           
2112            # Translate to topdl
[db6b092]2113            if self.splitter_url:
[895a133]2114                # XXX: need remote topdl translator
[db6b092]2115                self.log.debug("Calling remote splitter at %s" % \
2116                        self.splitter_url)
2117                split_data = self.remote_splitter(self.splitter_url,
2118                        file_content, master)
2119            else:
2120                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2121                    str(self.muxmax), '-m', master]
2122
2123                if self.fedkit:
2124                    tclcmd.append('-k')
2125
2126                if self.gatewaykit:
2127                    tclcmd.append('-K')
2128
2129                tclcmd.extend([pid, gid, eid, tclfile])
2130
2131                self.log.debug("running local splitter %s", " ".join(tclcmd))
2132                # This is just fantastic.  As a side effect the parser copies
2133                # tb_compat.tcl into the current directory, so that directory
2134                # must be writable by the fedd user.  Doing this in the
2135                # temporary subdir ensures this is the case.
[70caa72]2136                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
[db6b092]2137                        cwd=tmpdir)
[866c983]2138                split_data = tclparser.stdout
2139
[cc8d8e9]2140            top = topdl.topology_from_xml(file=split_data, top="experiment")
[895a133]2141
[69692a9]2142            hosts, ip_allocator = self.allocate_ips_to_topo(top)
[895a133]2143             # Find the testbeds to look up
2144            testbeds = set([ a.value for e in top.elements \
2145                    for a in e.attribute \
2146                    if a.attribute == 'testbed'] )
2147
2148            allocated = { }         # Testbeds we can access
2149            topo ={ }               # Sub topologies
[e02cd14]2150            connInfo = { }          # Connection information
2151            services = [ ]
[99eb8cf]2152            self.get_access_to_testbeds(testbeds, access_user, 
[e02cd14]2153                    export_project, master, allocated, tbparams, services)
[895a133]2154            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2155
2156            # Copy configuration files into the remote file store
[6c57fe9]2157            # The config urlpath
2158            configpath = "/%s/config" % expid
2159            # The config file system location
2160            configdir ="%s%s" % ( self.repodir, configpath)
2161            try:
2162                os.makedirs(configdir)
2163            except IOError, e:
2164                raise service_error(
2165                        "Cannot create config directory: %s" % e)
2166            try:
2167                f = open("%s/hosts" % configdir, "w")
2168                f.write('\n'.join(hosts))
2169                f.close()
2170            except IOError, e:
2171                raise service_error(service_error.internal, 
2172                        "Cannot write hosts file: %s" % e)
2173            try:
[40dd8c1]2174                copy_file("%s" % gw_pubkey, "%s/%s" % \
[6c57fe9]2175                        (configdir, gw_pubkey_base))
[40dd8c1]2176                copy_file("%s" % gw_secretkey, "%s/%s" % \
[6c57fe9]2177                        (configdir, gw_secretkey_base))
2178            except IOError, e:
2179                raise service_error(service_error.internal, 
2180                        "Cannot copy keyfiles: %s" % e)
[cc8d8e9]2181
[6c57fe9]2182            # Allow the individual testbeds to access the configuration files.
2183            for tb in tbparams.keys():
2184                asignee = tbparams[tb]['allocID']['fedid']
2185                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2186                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
[cc8d8e9]2187
[e02cd14]2188            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
[2761484]2189                    connInfo, expid)
[69692a9]2190            # Now get access to the dynamic testbeds
2191            for k, t in topo.items():
2192                if not t.get_attribute('dynamic'):
2193                    continue
2194                tb = t.get_attribute('testbed')
2195                if tb: 
[617592b]2196                    self.get_access(tb, None, tbparams, master, 
[e02cd14]2197                            export_project, access_user, services)
[69692a9]2198                    tbparams[k] = tbparams[tb]
2199                    del tbparams[tb]
2200                    allocated[k] = 1
[109a32a]2201                    store_keys = t.get_attribute('store_keys')
2202                    # Give the testbed access to keys it exports or imports
2203                    if store_keys:
2204                        for sk in store_keys.split(" "):
2205                            self.auth.set_attribute(\
2206                                    tbparams[k]['allocID']['fedid'], sk)
[69692a9]2207                else:
2208                    raise service_error(service_error.internal, 
2209                            "Dynamic allocation from no testbed!?")
2210
[895a133]2211            self.wrangle_software(expid, top, topo, tbparams)
[cc8d8e9]2212
2213            vtopo = topdl.topology_to_vtopo(top)
2214            vis = self.genviz(vtopo)
[db6b092]2215
[866c983]2216            # save federant information
2217            for k in allocated.keys():
[ecf679e]2218                tbparams[k]['federant'] = {
2219                        'name': [ { 'localname' : eid} ],
2220                        'allocID' : tbparams[k]['allocID'],
2221                        'master' : k == master,
2222                        'uri': tbparams[k]['uri'],
[866c983]2223                    }
[69692a9]2224                if tbparams[k].has_key('emulab'):
2225                        tbparams[k]['federant']['emulab'] = \
2226                                tbparams[k]['emulab']
[866c983]2227
[db6b092]2228            self.state_lock.acquire()
2229            self.state[eid]['vtopo'] = vtopo
2230            self.state[eid]['vis'] = vis
2231            self.state[expid]['federant'] = \
2232                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2233                        if tbparams[tb].has_key('federant') ]
[cc8d8e9]2234            if self.state_filename: 
2235                self.write_state()
[db6b092]2236            self.state_lock.release()
[866c983]2237        except service_error, e:
2238            # If something goes wrong in the parse (usually an access error)
2239            # clear the placeholder state.  From here on out the code delays
[db6b092]2240            # exceptions.  Failing at this point returns a fault to the remote
2241            # caller.
[cc8d8e9]2242
[866c983]2243            self.state_lock.acquire()
2244            del self.state[eid]
[bd3e314]2245            del self.state[expid]
2246            if self.state_filename: self.write_state()
[866c983]2247            self.state_lock.release()
2248            raise e
2249
2250
[db6b092]2251        # Start the background swapper and return the starting state.  From
2252        # here on out, the state will stick around a while.
[866c983]2253
[db6b092]2254        # Let users touch the state
[bd3e314]2255        self.auth.set_attribute(fid, expid)
2256        self.auth.set_attribute(expid, expid)
[db6b092]2257        # Override fedids can manipulate state as well
2258        for o in self.overrides:
2259            self.auth.set_attribute(o, expid)
2260
2261        # Create a logger that logs to the experiment's state object as well as
2262        # to the main log file.
2263        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
[f07fa49]2264        alloc_collector = self.list_log(self.state[eid]['log'])
2265        h = logging.StreamHandler(alloc_collector)
[db6b092]2266        # XXX: there should be a global one of these rather than repeating the
2267        # code.
2268        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2269                    '%d %b %y %H:%M:%S'))
2270        alloc_log.addHandler(h)
2271       
[6c57fe9]2272        attrs = [ 
2273                {
2274                    'attribute': 'ssh_pubkey', 
2275                    'value': '%s/%s/config/%s' % \
[7183b48]2276                            (self.repo_url, expid, gw_pubkey_base)
[6c57fe9]2277                },
2278                {
2279                    'attribute': 'ssh_secretkey', 
2280                    'value': '%s/%s/config/%s' % \
[7183b48]2281                            (self.repo_url, expid, gw_secretkey_base)
[6c57fe9]2282                },
2283                {
2284                    'attribute': 'hosts', 
2285                    'value': '%s/%s/config/hosts' % \
[7183b48]2286                            (self.repo_url, expid)
[6c57fe9]2287                },
[ecca6eb]2288                {
2289                    'attribute': 'experiment_name',
2290                    'value': eid,
2291                },
[6c57fe9]2292            ]
2293
[617592b]2294        # transit and disconnected testbeds may not have a connInfo entry.
2295        # Fill in the blanks.
2296        for t in allocated.keys():
2297            if not connInfo.has_key(t):
2298                connInfo[t] = { }
2299
[db6b092]2300        # Start a thread to do the resource allocation
[e19b75c]2301        t  = Thread(target=self.allocate_resources,
[7183b48]2302                args=(allocated, master, eid, expid, tbparams, 
[e02cd14]2303                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
2304                    services),
[db6b092]2305                name=eid)
2306        t.start()
2307
2308        rv = {
2309                'experimentID': [
2310                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2311                ],
2312                'experimentStatus': 'starting',
2313            }
2314
2315        return rv
[9479343]2316   
2317    def get_experiment_fedid(self, key):
2318        """
[db6b092]2319        find the fedid associated with the localname key in the state database.
[9479343]2320        """
2321
[db6b092]2322        rv = None
2323        self.state_lock.acquire()
2324        if self.state.has_key(key):
2325            if isinstance(self.state[key], dict):
2326                try:
2327                    kl = [ f['fedid'] for f in \
2328                            self.state[key]['experimentID']\
2329                                if f.has_key('fedid') ]
2330                except KeyError:
2331                    self.state_lock.release()
2332                    raise service_error(service_error.internal, 
2333                            "No fedid for experiment %s when getting "+\
2334                                    "fedid(!?)" % key)
2335                if len(kl) == 1:
2336                    rv = kl[0]
2337                else:
2338                    self.state_lock.release()
2339                    raise service_error(service_error.internal, 
2340                            "multiple fedids for experiment %s when " +\
2341                                    "getting fedid(!?)" % key)
2342            else:
2343                self.state_lock.release()
2344                raise service_error(service_error.internal, 
2345                        "Unexpected state for %s" % key)
2346        self.state_lock.release()
2347        return rv
[a97394b]2348
[4064742]2349    def check_experiment_access(self, fid, key):
[866c983]2350        """
2351        Confirm that the fid has access to the experiment.  Though a request
2352        may be made in terms of a local name, the access attribute is always
2353        the experiment's fedid.
2354        """
2355        if not isinstance(key, fedid):
[db6b092]2356            key = self.get_experiment_fedid(key)
[866c983]2357
2358        if self.auth.check_attribute(fid, key):
2359            return True
2360        else:
2361            raise service_error(service_error.access, "Access Denied")
[4064742]2362
2363
[db6b092]2364    def get_handler(self, path, fid):
[7183b48]2365        self.log.info("Get handler %s %s" % (path, fid))
[6c57fe9]2366        if self.auth.check_attribute(fid, path):
2367            return ("%s/%s" % (self.repodir, path), "application/binary")
2368        else:
2369            return (None, None)
[987aaa1]2370
2371    def get_vtopo(self, req, fid):
[866c983]2372        """
2373        Return the stored virtual topology for this experiment
2374        """
2375        rv = None
[db6b092]2376        state = None
[866c983]2377
2378        req = req.get('VtopoRequestBody', None)
2379        if not req:
2380            raise service_error(service_error.req,
2381                    "Bad request format (no VtopoRequestBody)")
2382        exp = req.get('experiment', None)
2383        if exp:
2384            if exp.has_key('fedid'):
2385                key = exp['fedid']
2386                keytype = "fedid"
2387            elif exp.has_key('localname'):
2388                key = exp['localname']
2389                keytype = "localname"
2390            else:
2391                raise service_error(service_error.req, "Unknown lookup type")
2392        else:
2393            raise service_error(service_error.req, "No request?")
2394
2395        self.check_experiment_access(fid, key)
2396
2397        self.state_lock.acquire()
2398        if self.state.has_key(key):
[db6b092]2399            if self.state[key].has_key('vtopo'):
2400                rv = { 'experiment' : {keytype: key },\
2401                        'vtopo': self.state[key]['vtopo'],\
2402                    }
2403            else:
2404                state = self.state[key]['experimentStatus']
[866c983]2405        self.state_lock.release()
2406
2407        if rv: return rv
[bd3e314]2408        else: 
[db6b092]2409            if state:
2410                raise service_error(service_error.partial, 
2411                        "Not ready: %s" % state)
2412            else:
2413                raise service_error(service_error.req, "No such experiment")
[987aaa1]2414
2415    def get_vis(self, req, fid):
[866c983]2416        """
2417        Return the stored visualization for this experiment
2418        """
2419        rv = None
[db6b092]2420        state = None
[866c983]2421
2422        req = req.get('VisRequestBody', None)
2423        if not req:
2424            raise service_error(service_error.req,
2425                    "Bad request format (no VisRequestBody)")
2426        exp = req.get('experiment', None)
2427        if exp:
2428            if exp.has_key('fedid'):
2429                key = exp['fedid']
2430                keytype = "fedid"
2431            elif exp.has_key('localname'):
2432                key = exp['localname']
2433                keytype = "localname"
2434            else:
2435                raise service_error(service_error.req, "Unknown lookup type")
2436        else:
2437            raise service_error(service_error.req, "No request?")
2438
2439        self.check_experiment_access(fid, key)
2440
2441        self.state_lock.acquire()
2442        if self.state.has_key(key):
[db6b092]2443            if self.state[key].has_key('vis'):
2444                rv =  { 'experiment' : {keytype: key },\
2445                        'vis': self.state[key]['vis'],\
2446                        }
2447            else:
2448                state = self.state[key]['experimentStatus']
[866c983]2449        self.state_lock.release()
2450
2451        if rv: return rv
[bd3e314]2452        else:
[db6b092]2453            if state:
2454                raise service_error(service_error.partial, 
2455                        "Not ready: %s" % state)
2456            else:
2457                raise service_error(service_error.req, "No such experiment")
[987aaa1]2458
[65f3f29]2459    def clean_info_response(self, rv):
[db6b092]2460        """
2461        Remove the information in the experiment's state object that is not in
2462        the info response.
2463        """
2464        # Remove the owner info (should always be there, but...)
2465        if rv.has_key('owner'): del rv['owner']
2466
2467        # Convert the log into the allocationLog parameter and remove the
2468        # log entry (with defensive programming)
2469        if rv.has_key('log'):
2470            rv['allocationLog'] = "".join(rv['log'])
2471            del rv['log']
2472        else:
2473            rv['allocationLog'] = ""
2474
2475        if rv['experimentStatus'] != 'active':
2476            if rv.has_key('federant'): del rv['federant']
2477        else:
[69692a9]2478            # remove the allocationID and uri info from each federant
[db6b092]2479            for f in rv.get('federant', []):
2480                if f.has_key('allocID'): del f['allocID']
[69692a9]2481                if f.has_key('uri'): del f['uri']
[db6b092]2482        return rv
[65f3f29]2483
[c52c48d]2484    def get_info(self, req, fid):
[866c983]2485        """
2486        Return all the stored info about this experiment
2487        """
2488        rv = None
2489
2490        req = req.get('InfoRequestBody', None)
2491        if not req:
2492            raise service_error(service_error.req,
[65f3f29]2493                    "Bad request format (no InfoRequestBody)")
[866c983]2494        exp = req.get('experiment', None)
2495        if exp:
2496            if exp.has_key('fedid'):
2497                key = exp['fedid']
2498                keytype = "fedid"
2499            elif exp.has_key('localname'):
2500                key = exp['localname']
2501                keytype = "localname"
2502            else:
2503                raise service_error(service_error.req, "Unknown lookup type")
2504        else:
2505            raise service_error(service_error.req, "No request?")
2506
2507        self.check_experiment_access(fid, key)
2508
2509        # The state may be massaged by the service function that called
2510        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2511        # state.
2512        self.state_lock.acquire()
2513        if self.state.has_key(key):
2514            rv = copy.deepcopy(self.state[key])
2515        self.state_lock.release()
2516
[db6b092]2517        if rv:
2518            return self.clean_info_response(rv)
[bd3e314]2519        else:
[db6b092]2520            raise service_error(service_error.req, "No such experiment")
[7a8d667]2521
[65f3f29]2522    def get_multi_info(self, req, fid):
2523        """
2524        Return all the stored info that this fedid can access
2525        """
[db6b092]2526        rv = { 'info': [ ] }
[65f3f29]2527
[db6b092]2528        self.state_lock.acquire()
2529        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
[829246e]2530            try:
2531                self.check_experiment_access(fid, key)
2532            except service_error, e:
2533                if e.code == service_error.access:
2534                    continue
2535                else:
2536                    self.state_lock.release()
2537                    raise e
[65f3f29]2538
[db6b092]2539            if self.state.has_key(key):
2540                e = copy.deepcopy(self.state[key])
2541                e = self.clean_info_response(e)
2542                rv['info'].append(e)
[65f3f29]2543        self.state_lock.release()
[db6b092]2544        return rv
[65f3f29]2545
[7a8d667]2546    def terminate_experiment(self, req, fid):
[866c983]2547        """
2548        Swap this experiment out on the federants and delete the shared
2549        information
2550        """
2551        tbparams = { }
2552        req = req.get('TerminateRequestBody', None)
2553        if not req:
2554            raise service_error(service_error.req,
2555                    "Bad request format (no TerminateRequestBody)")
[db6b092]2556        force = req.get('force', False)
[866c983]2557        exp = req.get('experiment', None)
2558        if exp:
2559            if exp.has_key('fedid'):
2560                key = exp['fedid']
2561                keytype = "fedid"
2562            elif exp.has_key('localname'):
2563                key = exp['localname']
2564                keytype = "localname"
2565            else:
2566                raise service_error(service_error.req, "Unknown lookup type")
2567        else:
2568            raise service_error(service_error.req, "No request?")
2569
2570        self.check_experiment_access(fid, key)
2571
[db6b092]2572        dealloc_list = [ ]
[46e4682]2573
2574
[5ae3857]2575        # Create a logger that logs to the dealloc_list as well as to the main
2576        # log file.
2577        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2578        h = logging.StreamHandler(self.list_log(dealloc_list))
2579        # XXX: there should be a global one of these rather than repeating the
2580        # code.
2581        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2582                    '%d %b %y %H:%M:%S'))
2583        dealloc_log.addHandler(h)
2584
2585        self.state_lock.acquire()
2586        fed_exp = self.state.get(key, None)
2587
2588        if fed_exp:
2589            # This branch of the conditional holds the lock to generate a
2590            # consistent temporary tbparams variable to deallocate experiments.
2591            # It releases the lock to do the deallocations and reacquires it to
2592            # remove the experiment state when the termination is complete.
2593
2594            # First make sure that the experiment creation is complete.
2595            status = fed_exp.get('experimentStatus', None)
2596
2597            if status:
2598                if status in ('starting', 'terminating'):
2599                    if not force:
2600                        self.state_lock.release()
2601                        raise service_error(service_error.partial, 
2602                                'Experiment still being created or destroyed')
2603                    else:
2604                        self.log.warning('Experiment in %s state ' % status + \
2605                                'being terminated by force.')
2606            else:
2607                # No status??? trouble
2608                self.state_lock.release()
2609                raise service_error(service_error.internal,
2610                        "Experiment has no status!?")
2611
2612            ids = []
2613            #  experimentID is a list of dicts that are self-describing
2614            #  identifiers.  This finds all the fedids and localnames - the
2615            #  keys of self.state - and puts them into ids.
2616            for id in fed_exp.get('experimentID', []):
2617                if id.has_key('fedid'): ids.append(id['fedid'])
2618                if id.has_key('localname'): ids.append(id['localname'])
2619
[63a35b7]2620            # Collect the allocation/segment ids into a dict keyed by the fedid
2621            # of the allocation (or a monotonically increasing integer) that
2622            # contains a tuple of uri, aid (which is a dict...)
2623            for i, fed in enumerate(fed_exp.get('federant', [])):
[5ae3857]2624                try:
[63a35b7]2625                    uri = fed['uri']
2626                    aid = fed['allocID']
2627                    k = fed['allocID'].get('fedid', i)
[5ae3857]2628                except KeyError, e:
2629                    continue
[63a35b7]2630                tbparams[k] = (uri, aid)
[5ae3857]2631            fed_exp['experimentStatus'] = 'terminating'
2632            if self.state_filename: self.write_state()
2633            self.state_lock.release()
2634
2635            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2636            # then completes, so we can't wait if nothing starts.  So, no
2637            # tbparams, no start.
2638            if len(tbparams) > 0:
2639                thread_pool = self.thread_pool(self.nthreads)
[63a35b7]2640                for k in tbparams.keys():
[5ae3857]2641                    # Create and start a thread to stop the segment
2642                    thread_pool.wait_for_slot()
[63a35b7]2643                    uri, aid = tbparams[k]
[5ae3857]2644                    t  = self.pooled_thread(\
[e19b75c]2645                            target=self.terminate_segment(log=dealloc_log,
[63a35b7]2646                                testbed=uri,
[5ae3857]2647                                cert_file=self.cert_file, 
2648                                cert_pwd=self.cert_pwd,
2649                                trusted_certs=self.trusted_certs,
2650                                caller=self.call_TerminateSegment),
[63a35b7]2651                            args=(uri, aid), name=k,
[5ae3857]2652                            pdata=thread_pool, trace_file=self.trace_file)
2653                    t.start()
2654                # Wait for completions
2655                thread_pool.wait_for_all_done()
2656
2657            # release the allocations (failed experiments have done this
2658            # already, and starting experiments may be in odd states, so we
2659            # ignore errors releasing those allocations
2660            try: 
[63a35b7]2661                for k in tbparams.keys():
[ecf679e]2662                    # This releases access by uri
[63a35b7]2663                    uri, aid = tbparams[k]
2664                    self.release_access(None, aid, uri=uri)
[5ae3857]2665            except service_error, e:
2666                if status != 'failed' and not force:
2667                    raise e
2668
2669            # Remove the terminated experiment
2670            self.state_lock.acquire()
2671            for id in ids:
2672                if self.state.has_key(id): del self.state[id]
2673
2674            if self.state_filename: self.write_state()
2675            self.state_lock.release()
2676
[2761484]2677            # Delete any synch points associated with this experiment.  All
2678            # synch points begin with the fedid of the experiment.
2679            fedid_keys = set(["fedid:%s" % f for f in ids \
2680                    if isinstance(f, fedid)])
2681            for k in self.synch_store.all_keys():
2682                try:
2683                    if len(k) > 45 and k[0:46] in fedid_keys:
2684                        self.synch_store.del_value(k)
[dadc4da]2685                except synch_store.BadDeletionError:
[2761484]2686                    pass
2687            self.write_store()
2688               
[5ae3857]2689            return { 
2690                    'experiment': exp , 
2691                    'deallocationLog': "".join(dealloc_list),
2692                    }
2693        else:
2694            # Don't forget to release the lock
2695            self.state_lock.release()
2696            raise service_error(service_error.req, "No saved state")
[2761484]2697
2698
2699    def GetValue(self, req, fid):
2700        """
2701        Get a value from the synchronized store
2702        """
2703        req = req.get('GetValueRequestBody', None)
2704        if not req:
2705            raise service_error(service_error.req,
2706                    "Bad request format (no GetValueRequestBody)")
2707       
2708        name = req['name']
2709        wait = req['wait']
2710        rv = { 'name': name }
2711
2712        if self.auth.check_attribute(fid, name):
[dadc4da]2713            try:
2714                v = self.synch_store.get_value(name, wait)
2715            except synch_store.RevokedKeyError:
2716                # No more synch on this key
2717                raise service_error(service_error.federant, 
2718                        "Synch key %s revoked" % name)
[2761484]2719            if v is not None:
2720                rv['value'] = v
[109a32a]2721            self.log.debug("[GetValue] got %s from %s" % (v, name))
[2761484]2722            return rv
2723        else:
2724            raise service_error(service_error.access, "Access Denied")
2725       
2726
2727    def SetValue(self, req, fid):
2728        """
2729        Set a value in the synchronized store
2730        """
2731        req = req.get('SetValueRequestBody', None)
2732        if not req:
2733            raise service_error(service_error.req,
2734                    "Bad request format (no SetValueRequestBody)")
2735       
2736        name = req['name']
2737        v = req['value']
2738
2739        if self.auth.check_attribute(fid, name):
2740            try:
2741                self.synch_store.set_value(name, v)
2742                self.write_store()
[109a32a]2743                self.log.debug("[SetValue] set %s to %s" % (name, v))
[2761484]2744            except synch_store.CollisionError:
2745                # Translate into a service_error
2746                raise service_error(service_error.req,
2747                        "Value already set: %s" %name)
[dadc4da]2748            except synch_store.RevokedKeyError:
2749                # No more synch on this key
2750                raise service_error(service_error.federant, 
2751                        "Synch key %s revoked" % name)
[2761484]2752            return { 'name': name, 'value': v }
2753        else:
2754            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.