source: fedd/federation/experiment_control.py @ eda00e1

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since eda00e1 was eeab22e, checked in by Ted Faber <faber@…>, 15 years ago

remove references to emulab data structure

  • Property mode set to 100644
File size: 79.4 KB
RevLine 
[6679c122]1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
[eee2b2e]11import pickle
[c971895]12import logging
[79b6596]13import signal
14import time
[6679c122]15
[3441fe3]16import traceback
[c971895]17# For parsing visualization output and splitter output
18import xml.parsers.expat
[3441fe3]19
[6c57fe9]20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
[6679c122]22
[db6b092]23from urlparse import urlparse
24from urllib2 import urlopen
25
[ec4fb42]26from util import *
[51cc9df]27from fedid import fedid, generate_fedid
[9460b1e]28from remote_service import xmlrpc_handler, soap_handler, service_caller
[c971895]29from service_error import service_error
[2761484]30from synch_store import synch_store
[73e7f5c]31from experiment_partition import experiment_partition
[6679c122]32
[db6b092]33import topdl
[f07fa49]34import list_log
[db6b092]35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
[11a08b0]38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
[43197eb]45
46# Right now, no support for composition.
47class federated_service:
48    def __init__(self, name, exporter=None, importers=[], params={ }, reqs=[]):
49        self.name=name
50        self.exporter=exporter
51        self.importers=importers
52        self.params = params
53        self.reqs = reqs
54
[ec4fb42]55class experiment_control_local:
[0ea11af]56    """
57    Control of experiments that this system can directly access.
58
59    Includes experiment creation, termination and information dissemination.
60    Thred safe.
61    """
[79b6596]62
63    class ssh_cmd_timeout(RuntimeError): pass
[6679c122]64   
[1af38d6]65    class thread_pool:
[866c983]66        """
67        A class to keep track of a set of threads all invoked for the same
68        task.  Manages the mutual exclusion of the states.
69        """
70        def __init__(self, nthreads):
71            """
72            Start a pool.
73            """
74            self.changed = Condition()
75            self.started = 0
76            self.terminated = 0
77            self.nthreads = nthreads
78
79        def acquire(self):
80            """
81            Get the pool's lock.
82            """
83            self.changed.acquire()
84
85        def release(self):
86            """
87            Release the pool's lock.
88            """
89            self.changed.release()
90
91        def wait(self, timeout = None):
92            """
93            Wait for a pool thread to start or stop.
94            """
95            self.changed.wait(timeout)
96
97        def start(self):
98            """
99            Called by a pool thread to report starting.
100            """
101            self.changed.acquire()
102            self.started += 1
103            self.changed.notifyAll()
104            self.changed.release()
105
106        def terminate(self):
107            """
108            Called by a pool thread to report finishing.
109            """
110            self.changed.acquire()
111            self.terminated += 1
112            self.changed.notifyAll()
113            self.changed.release()
114
115        def clear(self):
116            """
117            Clear all pool data.
118            """
119            self.changed.acquire()
120            self.started = 0
121            self.terminated =0
122            self.changed.notifyAll()
123            self.changed.release()
124
125        def wait_for_slot(self):
126            """
127            Wait until we have a free slot to start another pooled thread
128            """
129            self.acquire()
130            while self.started - self.terminated >= self.nthreads:
131                self.wait()
132            self.release()
133
[32e7d93]134        def wait_for_all_done(self, timeout=None):
[866c983]135            """
[32e7d93]136            Wait until all active threads finish (and at least one has
137            started).  If a timeout is given, return after waiting that long
138            for termination.  If all threads are done (and one has started in
139            the since the last clear()) return True, otherwise False.
[866c983]140            """
[32e7d93]141            if timeout:
142                deadline = time.time() + timeout
[866c983]143            self.acquire()
144            while self.started == 0 or self.started > self.terminated:
[32e7d93]145                self.wait(timeout)
146                if timeout:
147                    if time.time() > deadline:
148                        break
149                    timeout = deadline - time.time()
[866c983]150            self.release()
[32e7d93]151            return not (self.started == 0 or self.started > self.terminated)
[8bc5754]152
[1af38d6]153    class pooled_thread(Thread):
[866c983]154        """
155        One of a set of threads dedicated to a specific task.  Uses the
156        thread_pool class above for coordination.
157        """
158        def __init__(self, group=None, target=None, name=None, args=(), 
159                kwargs={}, pdata=None, trace_file=None):
160            Thread.__init__(self, group, target, name, args, kwargs)
161            self.rv = None          # Return value of the ops in this thread
162            self.exception = None   # Exception that terminated this thread
163            self.target=target      # Target function to run on start()
164            self.args = args        # Args to pass to target
165            self.kwargs = kwargs    # Additional kw args
166            self.pdata = pdata      # thread_pool for this class
167            # Logger for this thread
168            self.log = logging.getLogger("fedd.experiment_control")
169       
170        def run(self):
171            """
172            Emulate Thread.run, except add pool data manipulation and error
173            logging.
174            """
175            if self.pdata:
176                self.pdata.start()
177
178            if self.target:
179                try:
180                    self.rv = self.target(*self.args, **self.kwargs)
181                except service_error, s:
182                    self.exception = s
183                    self.log.error("Thread exception: %s %s" % \
184                            (s.code_string(), s.desc))
185                except:
186                    self.exception = sys.exc_info()[1]
187                    self.log.error(("Unexpected thread exception: %s" +\
188                            "Trace %s") % (self.exception,\
189                                traceback.format_exc()))
190            if self.pdata:
191                self.pdata.terminate()
[6679c122]192
[f069052]193    call_RequestAccess = service_caller('RequestAccess')
194    call_ReleaseAccess = service_caller('ReleaseAccess')
[cc8d8e9]195    call_StartSegment = service_caller('StartSegment')
[5ae3857]196    call_TerminateSegment = service_caller('TerminateSegment')
[5f6929a]197    call_Ns2Topdl = service_caller('Ns2Topdl')
[058f58e]198
[3f6bc5f]199    def __init__(self, config=None, auth=None):
[866c983]200        """
201        Intialize the various attributes, most from the config object
202        """
203
204        def parse_tarfile_list(tf):
205            """
206            Parse a tarfile list from the configuration.  This is a set of
207            paths and tarfiles separated by spaces.
208            """
209            rv = [ ]
210            if tf is not None:
211                tl = tf.split()
212                while len(tl) > 1:
213                    p, t = tl[0:2]
214                    del tl[0:2]
215                    rv.append((p, t))
216            return rv
217
218        self.thread_with_rv = experiment_control_local.pooled_thread
219        self.thread_pool = experiment_control_local.thread_pool
[f07fa49]220        self.list_log = list_log.list_log
[866c983]221
222        self.cert_file = config.get("experiment_control", "cert_file")
223        if self.cert_file:
224            self.cert_pwd = config.get("experiment_control", "cert_pwd")
225        else:
226            self.cert_file = config.get("globals", "cert_file")
227            self.cert_pwd = config.get("globals", "cert_pwd")
228
229        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
230                or config.get("globals", "trusted_certs")
231
[6c57fe9]232        self.repodir = config.get("experiment_control", "repodir")
[7183b48]233        self.repo_url = config.get("experiment_control", "repo_url", 
234                "https://users.isi.deterlab.net:23235");
[cc8d8e9]235
[866c983]236        self.exp_stem = "fed-stem"
237        self.log = logging.getLogger("fedd.experiment_control")
238        set_log_level(config, "experiment_control", self.log)
239        self.muxmax = 2
[35a4c01]240        self.nthreads = 10
[866c983]241        self.randomize_experiments = False
242
243        self.splitter = None
244        self.ssh_keygen = "/usr/bin/ssh-keygen"
245        self.ssh_identity_file = None
246
247
248        self.debug = config.getboolean("experiment_control", "create_debug")
[69692a9]249        self.cleanup = not config.getboolean("experiment_control", 
250                "leave_tmpfiles")
[866c983]251        self.state_filename = config.get("experiment_control", 
252                "experiment_state")
[2761484]253        self.store_filename = config.get("experiment_control", 
254                "synch_store")
255        self.store_url = config.get("experiment_control", "store_url")
[5f6929a]256        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
[866c983]257        self.fedkit = parse_tarfile_list(\
258                config.get("experiment_control", "fedkit"))
259        self.gatewaykit = parse_tarfile_list(\
260                config.get("experiment_control", "gatewaykit"))
261        accessdb_file = config.get("experiment_control", "accessdb")
262
263        self.ssh_pubkey_file = config.get("experiment_control", 
264                "ssh_pubkey_file")
265        self.ssh_privkey_file = config.get("experiment_control",
266                "ssh_privkey_file")
267        # NB for internal master/slave ops, not experiment setup
268        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
[ca489e8]269
[db6b092]270        self.overrides = set([])
271        ovr = config.get('experiment_control', 'overrides')
272        if ovr:
273            for o in ovr.split(","):
274                o = o.strip()
275                if o.startswith('fedid:'): o = o[len('fedid:'):]
276                self.overrides.add(fedid(hexstr=o))
[ca489e8]277
[866c983]278        self.state = { }
279        self.state_lock = Lock()
280        self.tclsh = "/usr/local/bin/otclsh"
[5f6929a]281        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
[866c983]282                config.get("experiment_control", "tcl_splitter",
283                        "/usr/testbed/lib/ns2ir/parse.tcl")
284        mapdb_file = config.get("experiment_control", "mapdb")
285        self.trace_file = sys.stderr
286
287        self.def_expstart = \
288                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
289                "/tmp/federate";
290        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
291                "FEDDIR/hosts";
292        self.def_gwstart = \
293                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
294                "/tmp/bridge.log";
295        self.def_mgwstart = \
296                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
297                "/tmp/bridge.log";
298        self.def_gwimage = "FBSD61-TUNNEL2";
299        self.def_gwtype = "pc";
300        self.local_access = { }
301
302        if auth:
303            self.auth = auth
304        else:
305            self.log.error(\
306                    "[access]: No authorizer initialized, creating local one.")
307            auth = authorizer()
308
309
310        if self.ssh_pubkey_file:
311            try:
312                f = open(self.ssh_pubkey_file, 'r')
313                self.ssh_pubkey = f.read()
314                f.close()
315            except IOError:
316                raise service_error(service_error.internal,
317                        "Cannot read sshpubkey")
318        else:
319            raise service_error(service_error.internal, 
320                    "No SSH public key file?")
321
322        if not self.ssh_privkey_file:
323            raise service_error(service_error.internal, 
324                    "No SSH public key file?")
325
326
327        if mapdb_file:
328            self.read_mapdb(mapdb_file)
329        else:
330            self.log.warn("[experiment_control] No testbed map, using defaults")
331            self.tbmap = { 
332                    'deter':'https://users.isi.deterlab.net:23235',
333                    'emulab':'https://users.isi.deterlab.net:23236',
334                    'ucb':'https://users.isi.deterlab.net:23237',
335                    }
336
337        if accessdb_file:
338                self.read_accessdb(accessdb_file)
339        else:
340            raise service_error(service_error.internal,
341                    "No accessdb specified in config")
342
343        # Grab saved state.  OK to do this w/o locking because it's read only
344        # and only one thread should be in existence that can see self.state at
345        # this point.
346        if self.state_filename:
347            self.read_state()
348
[2761484]349        if self.store_filename:
350            self.read_store()
351        else:
352            self.log.warning("No saved synch store")
353            self.synch_store = synch_store
354
[866c983]355        # Dispatch tables
356        self.soap_services = {\
[a3ad8bd]357                'New': soap_handler('New', self.new_experiment),
[e19b75c]358                'Create': soap_handler('Create', self.create_experiment),
[866c983]359                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
360                'Vis': soap_handler('Vis', self.get_vis),
361                'Info': soap_handler('Info', self.get_info),
[65f3f29]362                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
[866c983]363                'Terminate': soap_handler('Terminate', 
[e19b75c]364                    self.terminate_experiment),
[2761484]365                'GetValue': soap_handler('GetValue', self.GetValue),
366                'SetValue': soap_handler('SetValue', self.SetValue),
[866c983]367        }
368
369        self.xmlrpc_services = {\
[a3ad8bd]370                'New': xmlrpc_handler('New', self.new_experiment),
[e19b75c]371                'Create': xmlrpc_handler('Create', self.create_experiment),
[866c983]372                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
373                'Vis': xmlrpc_handler('Vis', self.get_vis),
374                'Info': xmlrpc_handler('Info', self.get_info),
[65f3f29]375                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
[866c983]376                'Terminate': xmlrpc_handler('Terminate',
[e19b75c]377                    self.terminate_experiment),
[2761484]378                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
379                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
[866c983]380        }
[19cc408]381
[a97394b]382    # Call while holding self.state_lock
[eee2b2e]383    def write_state(self):
[866c983]384        """
385        Write a new copy of experiment state after copying the existing state
386        to a backup.
387
388        State format is a simple pickling of the state dictionary.
389        """
390        if os.access(self.state_filename, os.W_OK):
[40dd8c1]391            copy_file(self.state_filename, \
392                    "%s.bak" % self.state_filename)
[866c983]393        try:
394            f = open(self.state_filename, 'w')
395            pickle.dump(self.state, f)
396        except IOError, e:
397            self.log.error("Can't write file %s: %s" % \
398                    (self.state_filename, e))
399        except pickle.PicklingError, e:
400            self.log.error("Pickling problem: %s" % e)
401        except TypeError, e:
402            self.log.error("Pickling problem (TypeError): %s" % e)
[eee2b2e]403
[2761484]404    @staticmethod
405    def get_alloc_ids(state):
406        """
407        Pull the fedids of the identifiers of each allocation from the
408        state.  Again, a dict dive that's best isolated.
409
410        Used by read_store and read state
411        """
412
413        return [ f['allocID']['fedid'] 
414                for f in state.get('federant',[]) \
415                    if f.has_key('allocID') and \
416                        f['allocID'].has_key('fedid')]
417
[a97394b]418    # Call while holding self.state_lock
[eee2b2e]419    def read_state(self):
[866c983]420        """
421        Read a new copy of experiment state.  Old state is overwritten.
422
423        State format is a simple pickling of the state dictionary.
424        """
[cc8d8e9]425       
426        def get_experiment_id(state):
427            """
428            Pull the fedid experimentID out of the saved state.  This is kind
429            of a gross walk through the dict.
430            """
431
432            if state.has_key('experimentID'):
433                for e in state['experimentID']:
434                    if e.has_key('fedid'):
435                        return e['fedid']
436                else:
437                    return None
438            else:
439                return None
440
[866c983]441        try:
442            f = open(self.state_filename, "r")
443            self.state = pickle.load(f)
444            self.log.debug("[read_state]: Read state from %s" % \
445                    self.state_filename)
446        except IOError, e:
447            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
448                    % (self.state_filename, e))
449        except pickle.UnpicklingError, e:
450            self.log.warning(("[read_state]: No saved state: " + \
451                    "Unpickling failed: %s") % e)
452       
[cc8d8e9]453        for s in self.state.values():
[866c983]454            try:
[cc8d8e9]455
456                eid = get_experiment_id(s)
457                if eid : 
458                    # Give the owner rights to the experiment
459                    self.auth.set_attribute(s['owner'], eid)
460                    # And holders of the eid as well
461                    self.auth.set_attribute(eid, eid)
[db6b092]462                    # allow overrides to control experiments as well
463                    for o in self.overrides:
464                        self.auth.set_attribute(o, eid)
[cc8d8e9]465                    # Set permissions to allow reading of the software repo, if
466                    # any, as well.
[2761484]467                    for a in self.get_alloc_ids(s):
[cc8d8e9]468                        self.auth.set_attribute(a, 'repo/%s' % eid)
469                else:
470                    raise KeyError("No experiment id")
[866c983]471            except KeyError, e:
472                self.log.warning("[read_state]: State ownership or identity " +\
473                        "misformatted in %s: %s" % (self.state_filename, e))
[4064742]474
475
476    def read_accessdb(self, accessdb_file):
[866c983]477        """
478        Read the mapping from fedids that can create experiments to their name
479        in the 3-level access namespace.  All will be asserted from this
480        testbed and can include the local username and porject that will be
481        asserted on their behalf by this fedd.  Each fedid is also added to the
482        authorization system with the "create" attribute.
483        """
484        self.accessdb = {}
485        # These are the regexps for parsing the db
486        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
487        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
488                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
489        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
490                "\s*->\s*(" + name_expr + ")\s*$")
491        lineno = 0
492
493        # Parse the mappings and store in self.authdb, a dict of
494        # fedid -> (proj, user)
495        try:
496            f = open(accessdb_file, "r")
497            for line in f:
498                lineno += 1
499                line = line.strip()
500                if len(line) == 0 or line.startswith('#'):
501                    continue
502                m = project_line.match(line)
503                if m:
504                    fid = fedid(hexstr=m.group(1))
505                    project, user = m.group(2,3)
506                    if not self.accessdb.has_key(fid):
507                        self.accessdb[fid] = []
508                    self.accessdb[fid].append((project, user))
509                    continue
510
511                m = user_line.match(line)
512                if m:
513                    fid = fedid(hexstr=m.group(1))
514                    project = None
515                    user = m.group(2)
516                    if not self.accessdb.has_key(fid):
517                        self.accessdb[fid] = []
518                    self.accessdb[fid].append((project, user))
519                    continue
520                self.log.warn("[experiment_control] Error parsing access " +\
521                        "db %s at line %d" %  (accessdb_file, lineno))
522        except IOError:
523            raise service_error(service_error.internal,
524                    "Error opening/reading %s as experiment " +\
525                            "control accessdb" %  accessdb_file)
526        f.close()
527
528        # Initialize the authorization attributes
529        for fid in self.accessdb.keys():
530            self.auth.set_attribute(fid, 'create')
[a3ad8bd]531            self.auth.set_attribute(fid, 'new')
[34bc05c]532
533    def read_mapdb(self, file):
[866c983]534        """
535        Read a simple colon separated list of mappings for the
536        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
537        """
538
539        self.tbmap = { }
540        lineno =0
541        try:
542            f = open(file, "r")
543            for line in f:
544                lineno += 1
545                line = line.strip()
546                if line.startswith('#') or len(line) == 0:
547                    continue
548                try:
549                    label, url = line.split(':', 1)
550                    self.tbmap[label] = url
551                except ValueError, e:
552                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
553                            "map db: %s %s" % (lineno, line, e))
554        except IOError, e:
555            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
556                    "open %s: %s" % (file, e))
557        f.close()
[2761484]558
559    def read_store(self):
560        try:
561            self.synch_store = synch_store()
562            self.synch_store.load(self.store_filename)
563            self.log.debug("[read_store]: Read store from %s" % \
564                    self.store_filename)
565        except IOError, e:
566            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
567                    % (self.state_filename, e))
568            self.synch_store = synch_store()
569
570        # Set the initial permissions on data in the store.  XXX: This ad hoc
571        # authorization attribute initialization is getting out of hand.
572        for k in self.synch_store.all_keys():
573            try:
574                if k.startswith('fedid:'):
575                    fid = fedid(hexstr=k[6:46])
576                    if self.state.has_key(fid):
577                        for a in self.get_alloc_ids(self.state[fid]):
578                            self.auth.set_attribute(a, k)
579            except ValueError, e:
580                self.log.warn("Cannot deduce permissions for %s" % k)
581
582
583    def write_store(self):
584        """
585        Write a new copy of synch_store after writing current state
586        to a backup.  We use the internal synch_store pickle method to avoid
587        incinsistent data.
588
589        State format is a simple pickling of the store.
590        """
591        if os.access(self.store_filename, os.W_OK):
592            copy_file(self.store_filename, \
593                    "%s.bak" % self.store_filename)
594        try:
595            self.synch_store.save(self.store_filename)
596        except IOError, e:
597            self.log.error("Can't write file %s: %s" % \
598                    (self.store_filename, e))
599        except TypeError, e:
600            self.log.error("Pickling problem (TypeError): %s" % e)
601
[866c983]602       
[6679c122]603    def generate_ssh_keys(self, dest, type="rsa" ):
[866c983]604        """
605        Generate a set of keys for the gateways to use to talk.
606
607        Keys are of type type and are stored in the required dest file.
608        """
609        valid_types = ("rsa", "dsa")
610        t = type.lower();
611        if t not in valid_types: raise ValueError
612        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
613
614        try:
615            trace = open("/dev/null", "w")
616        except IOError:
617            raise service_error(service_error.internal,
618                    "Cannot open /dev/null??");
619
620        # May raise CalledProcessError
621        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
[4ea1e22]622        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
[866c983]623        if rv != 0:
624            raise service_error(service_error.internal, 
625                    "Cannot generate nonce ssh keys.  %s return code %d" \
626                            % (self.ssh_keygen, rv))
[6679c122]627
[0d830de]628    def gentopo(self, str):
[866c983]629        """
630        Generate the topology dtat structure from the splitter's XML
631        representation of it.
632
633        The topology XML looks like:
634            <experiment>
635                <nodes>
636                    <node><vname></vname><ips>ip1:ip2</ips></node>
637                </nodes>
638                <lans>
639                    <lan>
640                        <vname></vname><vnode></vnode><ip></ip>
641                        <bandwidth></bandwidth><member>node:port</member>
642                    </lan>
643                </lans>
644        """
645        class topo_parse:
646            """
647            Parse the topology XML and create the dats structure.
648            """
649            def __init__(self):
650                # Typing of the subelements for data conversion
651                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
652                self.int_subelements = ( 'bandwidth',)
653                self.float_subelements = ( 'delay',)
654                # The final data structure
655                self.nodes = [ ]
656                self.lans =  [ ]
657                self.topo = { \
658                        'node': self.nodes,\
659                        'lan' : self.lans,\
660                    }
661                self.element = { }  # Current element being created
662                self.chars = ""     # Last text seen
663
664            def end_element(self, name):
665                # After each sub element the contents is added to the current
666                # element or to the appropriate list.
667                if name == 'node':
668                    self.nodes.append(self.element)
669                    self.element = { }
670                elif name == 'lan':
671                    self.lans.append(self.element)
672                    self.element = { }
673                elif name in self.str_subelements:
674                    self.element[name] = self.chars
675                    self.chars = ""
676                elif name in self.int_subelements:
677                    self.element[name] = int(self.chars)
678                    self.chars = ""
679                elif name in self.float_subelements:
680                    self.element[name] = float(self.chars)
681                    self.chars = ""
682
683            def found_chars(self, data):
684                self.chars += data.rstrip()
685
686
687        tp = topo_parse();
688        parser = xml.parsers.expat.ParserCreate()
689        parser.EndElementHandler = tp.end_element
690        parser.CharacterDataHandler = tp.found_chars
691
692        parser.Parse(str)
693
694        return tp.topo
695       
[0d830de]696
697    def genviz(self, topo):
[866c983]698        """
699        Generate the visualization the virtual topology
700        """
701
702        neato = "/usr/local/bin/neato"
703        # These are used to parse neato output and to create the visualization
704        # file.
[0ac1934]705        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
[866c983]706        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
707                "%s</type></node>"
708
709        try:
710            # Node names
711            nodes = [ n['vname'] for n in topo['node'] ]
712            topo_lans = topo['lan']
[cc8d8e9]713        except KeyError, e:
714            raise service_error(service_error.internal, "Bad topology: %s" %e)
[866c983]715
716        lans = { }
717        links = { }
718
719        # Walk through the virtual topology, organizing the connections into
720        # 2-node connections (links) and more-than-2-node connections (lans).
721        # When a lan is created, it's added to the list of nodes (there's a
722        # node in the visualization for the lan).
723        for l in topo_lans:
724            if links.has_key(l['vname']):
725                if len(links[l['vname']]) < 2:
726                    links[l['vname']].append(l['vnode'])
727                else:
728                    nodes.append(l['vname'])
729                    lans[l['vname']] = links[l['vname']]
730                    del links[l['vname']]
731                    lans[l['vname']].append(l['vnode'])
732            elif lans.has_key(l['vname']):
733                lans[l['vname']].append(l['vnode'])
734            else:
735                links[l['vname']] = [ l['vnode'] ]
736
737
738        # Open up a temporary file for dot to turn into a visualization
739        try:
740            df, dotname = tempfile.mkstemp()
741            dotfile = os.fdopen(df, 'w')
742        except IOError:
743            raise service_error(service_error.internal,
744                    "Failed to open file in genviz")
745
[db6b092]746        try:
747            dnull = open('/dev/null', 'w')
748        except IOError:
749            service_error(service_error.internal,
[886307f]750                    "Failed to open /dev/null in genviz")
751
[866c983]752        # Generate a dot/neato input file from the links, nodes and lans
753        try:
754            print >>dotfile, "graph G {"
755            for n in nodes:
756                print >>dotfile, '\t"%s"' % n
757            for l in links.keys():
758                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
759            for l in lans.keys():
760                for n in lans[l]:
761                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
762            print >>dotfile, "}"
763            dotfile.close()
764        except TypeError:
765            raise service_error(service_error.internal,
766                    "Single endpoint link in vtopo")
767        except IOError:
768            raise service_error(service_error.internal, "Cannot write dot file")
769
770        # Use dot to create a visualization
771        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
[886307f]772                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
[db6b092]773                close_fds=True)
774        dnull.close()
[866c983]775
776        # Translate dot to vis format
777        vis_nodes = [ ]
778        vis = { 'node': vis_nodes }
779        for line in dot.stdout:
780            m = vis_re.match(line)
781            if m:
782                vn = m.group(1)
783                vis_node = {'name': vn, \
784                        'x': float(m.group(2)),\
785                        'y' : float(m.group(3)),\
786                    }
787                if vn in links.keys() or vn in lans.keys():
788                    vis_node['type'] = 'lan'
789                else:
790                    vis_node['type'] = 'node'
791                vis_nodes.append(vis_node)
792        rv = dot.wait()
793
794        os.remove(dotname)
795        if rv == 0 : return vis
796        else: return None
[d0ae12d]797
[43197eb]798    def get_access(self, tb, nodes, tbparam, access_user, masters):
[866c983]799        """
800        Get access to testbed through fedd and set the parameters for that tb
801        """
[43197eb]802        def get_export_project(svcs):
803            """
804            Look through for the list of federated_service for this testbed
805            objects for a project_export service, and extract the project
806            parameter.
807            """
808
809            pe = [s for s in svcs if s.name=='project_export']
810            if len(pe) == 1:
811                return pe[0].params.get('project', None)
812            elif len(pe) == 0:
813                return None
814            else:
815                raise service_error(service_error.req,
816                        "More than one project export is not supported")
817
[866c983]818        uri = self.tbmap.get(tb, None)
819        if not uri:
[b78c9ea]820            raise service_error(service_error.server_config, 
[866c983]821                    "Unknown testbed: %s" % tb)
822
[43197eb]823        export_svcs = masters.get(tb,[])
824        import_svcs = [ s for m in masters.values() \
825                for s in m \
826                    if tb in s.importers ]
827
828        export_project = get_export_project(export_svcs)
829
[8218a3b]830        # Tweak search order so that if there are entries in access_user that
831        # have a project matching the export project, we try them first
[5f6929a]832        if export_project: 
833            access_sequence = [ (p, u) for p, u in access_user \
834                    if p == export_project] 
835            access_sequence.extend([(p, u) for p, u in access_user \
836                    if p != export_project]) 
[8218a3b]837        else: 
838            access_sequence = access_user
839
840        for p, u in access_sequence: 
[866c983]841            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
842                    "to %s") %  ((p or "None"), u, uri))
843
844            if p:
845                # Request with user and project specified
846                req = {\
847                        'destinationTestbed' : { 'uri' : uri },
[3bddd24]848                        'credential': [ "project: %s" % p, "user: %s"  % u],
[866c983]849                        'allocID' : { 'localname': 'test' },
850                    }
851            else:
852                # Request with only user specified
853                req = {\
854                        'destinationTestbed' : { 'uri' : uri },
[3bddd24]855                        'credential': [ 'user: %s' % u ],
[866c983]856                        'allocID' : { 'localname': 'test' },
857                    }
858
[43197eb]859            # Make the service request from the services we're importing and
860            # exporting.  Keep track of the export request ids so we can
861            # collect the resulting info from the access response.
862            e_keys = { }
863            if import_svcs or export_svcs:
864                req['service'] = [ ]
865
866                for i, s in enumerate(import_svcs):
867                    idx = 'import%d' % i
868                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
869                    if s.params:
870                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
871                                for k, v in s.params.items()]
872                    req['service'].append(sr)
873
874                for i, s in enumerate(export_svcs):
875                    idx = 'export%d' % i
876                    e_keys[idx] = s
877                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
878                    if s.params:
879                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
880                                for k, v in s.params.items()]
881                    req['service'].append(sr)
[866c983]882
883            # node resources if any
884            if nodes != None and len(nodes) > 0:
885                rnodes = [ ]
886                for n in nodes:
887                    rn = { }
888                    image, hw, count = n.split(":")
889                    if image: rn['image'] = [ image ]
890                    if hw: rn['hardware'] = [ hw ]
891                    if count and int(count) >0 : rn['count'] = int(count)
892                    rnodes.append(rn)
893                req['resources']= { }
894                req['resources']['node'] = rnodes
895
896            try:
897                if self.local_access.has_key(uri):
898                    # Local access call
899                    req = { 'RequestAccessRequestBody' : req }
900                    r = self.local_access[uri].RequestAccess(req, 
901                            fedid(file=self.cert_file))
902                    r = { 'RequestAccessResponseBody' : r }
903                else:
904                    r = self.call_RequestAccess(uri, req, 
905                            self.cert_file, self.cert_pwd, self.trusted_certs)
906            except service_error, e:
907                if e.code == service_error.access:
908                    self.log.debug("[get_access] Access denied")
909                    r = None
910                    continue
911                else:
912                    raise e
913
[e19b75c]914            if r.has_key('RequestAccessResponseBody'):
915                # Through to here we have a valid response, not a fault.
916                # Access denied is a fault, so something better or worse than
917                # access denied has happened.
918                r = r['RequestAccessResponseBody']
919                self.log.debug("[get_access] Access granted")
920                break
921            else:
922                raise service_error(service_error.protocol,
923                        "Bad proxy response")
924       
925        if not r:
926            raise service_error(service_error.access, 
927                    "Access denied by %s (%s)" % (tb, uri))
[db6b092]928
[4afcfc4]929        tbparam[tb] = { 
[69692a9]930                "allocID" : r['allocID'],
931                "uri": uri,
[4afcfc4]932                }
[43197eb]933
934        # Collect the responses corresponding to the services this testbed
935        # exports.  These will be the service requests that we will include in
936        # the start segment requests (with appropriate visibility values) to
937        # import and export the segments.
938        for s in r.get('service', []):
939            id = s.get('id', None)
940            if id and id in e_keys:
941                e_keys[id].reqs.append(s)
[4afcfc4]942
943        # Add attributes to parameter space.  We don't allow attributes to
944        # overlay any parameters already installed.
[617592b]945        for a in r.get('fedAttr', []):
[4afcfc4]946            try:
947                if a['attribute'] and \
948                        isinstance(a['attribute'], basestring)\
949                        and not tbparam[tb].has_key(a['attribute'].lower()):
950                    tbparam[tb][a['attribute'].lower()] = a['value']
951            except KeyError:
952                self.log.error("Bad attribute in response: %s" % a)
[db6b092]953
[69692a9]954    def release_access(self, tb, aid, uri=None):
[e19b75c]955        """
956        Release access to testbed through fedd
957        """
[db6b092]958
[69692a9]959        if not uri:
960            uri = self.tbmap.get(tb, None)
[e19b75c]961        if not uri:
[69692a9]962            raise service_error(service_error.server_config, 
[e19b75c]963                    "Unknown testbed: %s" % tb)
[db6b092]964
[e19b75c]965        if self.local_access.has_key(uri):
966            resp = self.local_access[uri].ReleaseAccess(\
967                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
968                    fedid(file=self.cert_file))
969            resp = { 'ReleaseAccessResponseBody': resp } 
970        else:
971            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
972                    self.cert_file, self.cert_pwd, self.trusted_certs)
[db6b092]973
[e19b75c]974        # better error coding
[db6b092]975
[5f6929a]976    def remote_ns2topdl(self, uri, desc):
[db6b092]977
[e19b75c]978        req = {
979                'description' : { 'ns2description': desc },
[db6b092]980            }
981
[5f6929a]982        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
[e19b75c]983                self.trusted_certs)
984
[5f6929a]985        if r.has_key('Ns2TopdlResponseBody'):
986            r = r['Ns2TopdlResponseBody']
[1dcaff4]987            ed = r.get('experimentdescription', None)
988            if ed.has_key('topdldescription'):
989                return topdl.Topology(**ed['topdldescription'])
[e19b75c]990            else:
991                raise service_error(service_error.protocol, 
992                        "Bad splitter response (no output)")
993        else:
994            raise service_error(service_error.protocol, "Bad splitter response")
[cc8d8e9]995
[e19b75c]996    class start_segment:
[fd556d1]997        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[f07fa49]998                cert_pwd=None, trusted_certs=None, caller=None,
999                log_collector=None):
[cc8d8e9]1000            self.log = log
1001            self.debug = debug
1002            self.cert_file = cert_file
1003            self.cert_pwd = cert_pwd
1004            self.trusted_certs = None
1005            self.caller = caller
[fd556d1]1006            self.testbed = testbed
[f07fa49]1007            self.log_collector = log_collector
[69692a9]1008            self.response = None
[cc8d8e9]1009
[43197eb]1010        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
[cc8d8e9]1011            req = {
1012                    'allocID': { 'fedid' : aid }, 
1013                    'segmentdescription': { 
1014                        'topdldescription': topo.to_dict(),
1015                    },
1016                }
[e02cd14]1017
1018            if connInfo:
1019                req['connection'] = connInfo
[43197eb]1020
1021            import_svcs = [ s for m in masters.values() \
1022                    for s in m if self.testbed in s.importers]
1023
1024            if import_svcs or self.testbed in masters:
1025                req['service'] = []
1026
1027            for s in import_svcs:
1028                for r in s.reqs:
1029                    sr = copy.deepcopy(r)
1030                    sr['visibility'] = 'import';
1031                    req['service'].append(sr)
1032
1033            for s in masters.get(self.testbed, []):
1034                for r in s.reqs:
1035                    sr = copy.deepcopy(r)
1036                    sr['visibility'] = 'export';
1037                    req['service'].append(sr)
1038
[6c57fe9]1039            if attrs:
1040                req['fedAttr'] = attrs
[cc8d8e9]1041
[fd556d1]1042            try:
[13e3dd2]1043                self.log.debug("Calling StartSegment at %s " % uri)
[fd556d1]1044                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1045                        self.trusted_certs)
[f07fa49]1046                if r.has_key('StartSegmentResponseBody'):
1047                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1048                            None)
1049                    if lval and self.log_collector:
1050                        for line in  lval.splitlines(True):
1051                            self.log_collector.write(line)
[69692a9]1052                    self.response = r
[f07fa49]1053                else:
1054                    raise service_error(service_error.internal, 
1055                            "Bad response!?: %s" %r)
[fd556d1]1056                return True
1057            except service_error, e:
1058                self.log.error("Start segment failed on %s: %s" % \
1059                        (self.testbed, e))
1060                return False
[cc8d8e9]1061
1062
[5ae3857]1063
[e19b75c]1064    class terminate_segment:
[fd556d1]1065        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[5ae3857]1066                cert_pwd=None, trusted_certs=None, caller=None):
1067            self.log = log
1068            self.debug = debug
1069            self.cert_file = cert_file
1070            self.cert_pwd = cert_pwd
1071            self.trusted_certs = None
1072            self.caller = caller
[fd556d1]1073            self.testbed = testbed
[5ae3857]1074
1075        def __call__(self, uri, aid ):
1076            req = {
1077                    'allocID': aid , 
1078                }
[fd556d1]1079            try:
1080                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1081                        self.trusted_certs)
1082                return True
1083            except service_error, e:
1084                self.log.error("Terminate segment failed on %s: %s" % \
1085                        (self.testbed, e))
1086                return False
[db6b092]1087   
1088
[43197eb]1089    def allocate_resources(self, allocated, masters, eid, expid, 
[f07fa49]1090            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
[43197eb]1091            attrs=None, connInfo={}):
[69692a9]1092
[cc8d8e9]1093        started = { }           # Testbeds where a sub-experiment started
1094                                # successfully
1095
1096        # XXX
1097        fail_soft = False
1098
1099        log = alloc_log or self.log
1100
1101        thread_pool = self.thread_pool(self.nthreads)
1102        threads = [ ]
1103
[109a32a]1104        for tb in allocated.keys():
1105            # Create and start a thread to start the segment, and save it
1106            # to get the return value later
1107            thread_pool.wait_for_slot()
1108            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1109            if not uri:
1110                raise service_error(service_error.internal, 
1111                        "Unknown testbed %s !?" % tb)
1112
[cc8d8e9]1113            if tbparams[tb].has_key('allocID') and \
1114                    tbparams[tb]['allocID'].has_key('fedid'):
1115                aid = tbparams[tb]['allocID']['fedid']
1116            else:
1117                raise service_error(service_error.internal, 
1118                        "No alloc id for testbed %s !?" % tb)
1119
[109a32a]1120            t  = self.pooled_thread(\
1121                    target=self.start_segment(log=log, debug=self.debug,
1122                        testbed=tb, cert_file=self.cert_file,
1123                        cert_pwd=self.cert_pwd,
1124                        trusted_certs=self.trusted_certs,
1125                        caller=self.call_StartSegment,
1126                        log_collector=log_collector), 
[43197eb]1127                    args=(uri, aid, topo[tb], masters, attrs, connInfo[tb]),
[109a32a]1128                    name=tb,
1129                    pdata=thread_pool, trace_file=self.trace_file)
[69692a9]1130            threads.append(t)
1131            t.start()
[cc8d8e9]1132
[109a32a]1133        # Wait until all finish (keep pinging the log, though)
1134        mins = 0
[dadc4da]1135        revoked = False
[109a32a]1136        while not thread_pool.wait_for_all_done(60.0):
1137            mins += 1
1138            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1139                    % mins)
[dadc4da]1140            if not revoked and \
[f52f5df]1141                    len([ t.getName() for t in threads if t.rv == False]) > 0:
[dadc4da]1142                # a testbed has failed.  Revoke this experiment's
1143                # synchronizarion values so that sub experiments will not
1144                # deadlock waiting for synchronization that will never happen
1145                self.log.info("A subexperiment has failed to swap in, " + \
1146                        "revoking synch keys")
1147                var_key = "fedid:%s" % expid
1148                for k in self.synch_store.all_keys():
1149                    if len(k) > 45 and k[0:46] == var_key:
1150                        self.synch_store.revoke_key(k)
1151                revoked = True
[69692a9]1152
[cc8d8e9]1153        failed = [ t.getName() for t in threads if not t.rv ]
1154        succeeded = [tb for tb in allocated.keys() if tb not in failed]
[3132419]1155
[cc8d8e9]1156        # If one failed clean up, unless fail_soft is set
[32e7d93]1157        if failed:
[cc8d8e9]1158            if not fail_soft:
1159                thread_pool.clear()
1160                for tb in succeeded:
1161                    # Create and start a thread to stop the segment
1162                    thread_pool.wait_for_slot()
[0fa1729]1163                    uri = tbparams[tb]['uri']
[cc8d8e9]1164                    t  = self.pooled_thread(\
[32e7d93]1165                            target=self.terminate_segment(log=log,
[fd556d1]1166                                testbed=tb,
[32e7d93]1167                                cert_file=self.cert_file, 
1168                                cert_pwd=self.cert_pwd,
1169                                trusted_certs=self.trusted_certs,
1170                                caller=self.call_TerminateSegment),
1171                            args=(uri, tbparams[tb]['federant']['allocID']),
1172                            name=tb,
[cc8d8e9]1173                            pdata=thread_pool, trace_file=self.trace_file)
1174                    t.start()
[f52f5df]1175                # Wait until all finish (if any are being stopped)
1176                if succeeded:
1177                    thread_pool.wait_for_all_done()
[cc8d8e9]1178
1179                # release the allocations
1180                for tb in tbparams.keys():
[69692a9]1181                    self.release_access(tb, tbparams[tb]['allocID'],
1182                            tbparams[tb].get('uri', None))
[cc8d8e9]1183                # Remove the placeholder
1184                self.state_lock.acquire()
1185                self.state[eid]['experimentStatus'] = 'failed'
1186                if self.state_filename: self.write_state()
1187                self.state_lock.release()
1188
1189                log.error("Swap in failed on %s" % ",".join(failed))
1190                return
1191        else:
1192            log.info("[start_segment]: Experiment %s active" % eid)
1193
1194
1195        # Walk up tmpdir, deleting as we go
[69692a9]1196        if self.cleanup:
1197            log.debug("[start_experiment]: removing %s" % tmpdir)
1198            for path, dirs, files in os.walk(tmpdir, topdown=False):
1199                for f in files:
1200                    os.remove(os.path.join(path, f))
1201                for d in dirs:
1202                    os.rmdir(os.path.join(path, d))
1203            os.rmdir(tmpdir)
1204        else:
1205            log.debug("[start_experiment]: not removing %s" % tmpdir)
[cc8d8e9]1206
1207        # Insert the experiment into our state and update the disk copy
1208        self.state_lock.acquire()
1209        self.state[expid]['experimentStatus'] = 'active'
1210        self.state[eid] = self.state[expid]
1211        if self.state_filename: self.write_state()
1212        self.state_lock.release()
1213        return
1214
1215
[895a133]1216    def add_kit(self, e, kit):
1217        """
1218        Add a Software object created from the list of (install, location)
1219        tuples passed as kit  to the software attribute of an object e.  We
1220        do this enough to break out the code, but it's kind of a hack to
1221        avoid changing the old tuple rep.
1222        """
1223
1224        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1225
1226        if isinstance(e.software, list): e.software.extend(s)
1227        else: e.software = s
1228
1229
[a3ad8bd]1230    def create_experiment_state(self, fid, req, expid, expcert, 
1231            state='starting'):
[895a133]1232        """
1233        Create the initial entry in the experiment's state.  The expid and
1234        expcert are the experiment's fedid and certifacte that represents that
1235        ID, which are installed in the experiment state.  If the request
1236        includes a suggested local name that is used if possible.  If the local
1237        name is already taken by an experiment owned by this user that has
[a3ad8bd]1238        failed, it is overwritten.  Otherwise new letters are added until a
[895a133]1239        valid localname is found.  The generated local name is returned.
1240        """
1241
1242        if req.has_key('experimentID') and \
1243                req['experimentID'].has_key('localname'):
1244            overwrite = False
1245            eid = req['experimentID']['localname']
1246            # If there's an old failed experiment here with the same local name
1247            # and accessible by this user, we'll overwrite it, otherwise we'll
1248            # fall through and do the collision avoidance.
1249            old_expid = self.get_experiment_fedid(eid)
1250            if old_expid and self.check_experiment_access(fid, old_expid):
1251                self.state_lock.acquire()
1252                status = self.state[eid].get('experimentStatus', None)
1253                if status and status == 'failed':
1254                    # remove the old access attribute
1255                    self.auth.unset_attribute(fid, old_expid)
1256                    overwrite = True
1257                    del self.state[eid]
1258                    del self.state[old_expid]
1259                self.state_lock.release()
1260            self.state_lock.acquire()
1261            while (self.state.has_key(eid) and not overwrite):
1262                eid += random.choice(string.ascii_letters)
1263            # Initial state
1264            self.state[eid] = {
1265                    'experimentID' : \
1266                            [ { 'localname' : eid }, {'fedid': expid } ],
[a3ad8bd]1267                    'experimentStatus': state,
[895a133]1268                    'experimentAccess': { 'X509' : expcert },
1269                    'owner': fid,
1270                    'log' : [],
1271                }
1272            self.state[expid] = self.state[eid]
1273            if self.state_filename: self.write_state()
1274            self.state_lock.release()
1275        else:
1276            eid = self.exp_stem
1277            for i in range(0,5):
1278                eid += random.choice(string.ascii_letters)
1279            self.state_lock.acquire()
1280            while (self.state.has_key(eid)):
1281                eid = self.exp_stem
1282                for i in range(0,5):
1283                    eid += random.choice(string.ascii_letters)
1284            # Initial state
1285            self.state[eid] = {
1286                    'experimentID' : \
1287                            [ { 'localname' : eid }, {'fedid': expid } ],
[a3ad8bd]1288                    'experimentStatus': state,
[895a133]1289                    'experimentAccess': { 'X509' : expcert },
1290                    'owner': fid,
1291                    'log' : [],
1292                }
1293            self.state[expid] = self.state[eid]
1294            if self.state_filename: self.write_state()
1295            self.state_lock.release()
1296
1297        return eid
1298
1299
1300    def allocate_ips_to_topo(self, top):
1301        """
[69692a9]1302        Add an ip4_address attribute to all the hosts in the topology, based on
[895a133]1303        the shared substrates on which they sit.  An /etc/hosts file is also
[69692a9]1304        created and returned as a list of hostfiles entries.  We also return
1305        the allocator, because we may need to allocate IPs to portals
1306        (specifically DRAGON portals).
[895a133]1307        """
1308        subs = sorted(top.substrates, 
1309                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1310                reverse=True)
1311        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1312        ifs = { }
1313        hosts = [ ]
1314
1315        for idx, s in enumerate(subs):
[289ff7e]1316            net_size = len(s.interfaces)+2
1317
1318            a = ips.allocate(net_size)
[895a133]1319            if a :
1320                base, num = a
[289ff7e]1321                if num < net_size: 
[895a133]1322                    raise service_error(service_error.internal,
1323                            "Allocator returned wrong number of IPs??")
1324            else:
1325                raise service_error(service_error.req, 
1326                        "Cannot allocate IP addresses")
[062b991]1327            mask = ips.min_alloc
1328            while mask < net_size:
1329                mask *= 2
[289ff7e]1330
[062b991]1331            netmask = ((2**32-1) ^ (mask-1))
[895a133]1332
1333            base += 1
1334            for i in s.interfaces:
1335                i.attribute.append(
1336                        topdl.Attribute('ip4_address', 
1337                            "%s" % ip_addr(base)))
[289ff7e]1338                i.attribute.append(
1339                        topdl.Attribute('ip4_netmask', 
1340                            "%s" % ip_addr(int(netmask))))
1341
[895a133]1342                hname = i.element.name[0]
1343                if ifs.has_key(hname):
1344                    hosts.append("%s\t%s-%s %s-%d" % \
1345                            (ip_addr(base), hname, s.name, hname,
1346                                ifs[hname]))
1347                else:
1348                    ifs[hname] = 0
1349                    hosts.append("%s\t%s-%s %s-%d %s" % \
1350                            (ip_addr(base), hname, s.name, hname,
1351                                ifs[hname], hname))
1352
1353                ifs[hname] += 1
1354                base += 1
[69692a9]1355        return hosts, ips
[895a133]1356
[43197eb]1357    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1358            tbparams, masters):
[895a133]1359        """
1360        Request access to the various testbeds required for this instantiation
1361        (passed in as testbeds).  User, access_user, expoert_project and master
1362        are used to construct the correct requests.  Per-testbed parameters are
1363        returned in tbparams.
1364        """
1365        for tb in testbeds:
[43197eb]1366            self.get_access(tb, None, tbparams, access_user, masters)
[895a133]1367            allocated[tb] = 1
1368
[7fe81be]1369    def split_topology(self, top, topo, testbeds):
[895a133]1370        """
[e02cd14]1371        Create the sub-topologies that are needed for experiment instantiation.
[895a133]1372        """
1373        for tb in testbeds:
1374            topo[tb] = top.clone()
[7fe81be]1375            # copy in for loop allows deletions from the original
1376            for e in [ e for e in topo[tb].elements]:
[895a133]1377                etb = e.get_attribute('testbed')
[7fe81be]1378                # NB: elements without a testbed attribute won't appear in any
1379                # sub topologies. 
1380                if not etb or etb != tb:
[895a133]1381                    for i in e.interface:
1382                        for s in i.subs:
1383                            try:
1384                                s.interfaces.remove(i)
1385                            except ValueError:
1386                                raise service_error(service_error.internal,
1387                                        "Can't remove interface??")
[7fe81be]1388                    topo[tb].elements.remove(e)
[895a133]1389            topo[tb].make_indices()
1390
1391    def wrangle_software(self, expid, top, topo, tbparams):
1392        """
1393        Copy software out to the repository directory, allocate permissions and
1394        rewrite the segment topologies to look for the software in local
1395        places.
1396        """
1397
1398        # Copy the rpms and tarfiles to a distribution directory from
1399        # which the federants can retrieve them
1400        linkpath = "%s/software" %  expid
1401        softdir ="%s/%s" % ( self.repodir, linkpath)
1402        softmap = { }
1403        # These are in a list of tuples format (each kit).  This comprehension
1404        # unwraps them into a single list of tuples that initilaizes the set of
1405        # tuples.
1406        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1407                for p, t in l ])
1408        pkgs.update([x.location for e in top.elements \
1409                for x in e.software])
1410        try:
1411            os.makedirs(softdir)
1412        except IOError, e:
1413            raise service_error(
1414                    "Cannot create software directory: %s" % e)
1415        # The actual copying.  Everything's converted into a url for copying.
1416        for pkg in pkgs:
1417            loc = pkg
1418
1419            scheme, host, path = urlparse(loc)[0:3]
1420            dest = os.path.basename(path)
1421            if not scheme:
1422                if not loc.startswith('/'):
1423                    loc = "/%s" % loc
1424                loc = "file://%s" %loc
1425            try:
1426                u = urlopen(loc)
1427            except Exception, e:
1428                raise service_error(service_error.req, 
1429                        "Cannot open %s: %s" % (loc, e))
1430            try:
1431                f = open("%s/%s" % (softdir, dest) , "w")
1432                self.log.debug("Writing %s/%s" % (softdir,dest) )
1433                data = u.read(4096)
1434                while data:
1435                    f.write(data)
1436                    data = u.read(4096)
1437                f.close()
1438                u.close()
1439            except Exception, e:
1440                raise service_error(service_error.internal,
1441                        "Could not copy %s: %s" % (loc, e))
1442            path = re.sub("/tmp", "", linkpath)
1443            # XXX
1444            softmap[pkg] = \
[7183b48]1445                    "%s/%s/%s" %\
1446                    ( self.repo_url, path, dest)
[895a133]1447
1448            # Allow the individual segments to access the software.
1449            for tb in tbparams.keys():
1450                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1451                        "/%s/%s" % ( path, dest))
1452
1453        # Convert the software locations in the segments into the local
1454        # copies on this host
1455        for soft in [ s for tb in topo.values() \
1456                for e in tb.elements \
1457                    if getattr(e, 'software', False) \
1458                        for s in e.software ]:
1459            if softmap.has_key(soft.location):
1460                soft.location = softmap[soft.location]
1461
1462
[a3ad8bd]1463    def new_experiment(self, req, fid):
1464        """
1465        The external interface to empty initial experiment creation called from
1466        the dispatcher.
1467
1468        Creates a working directory, splits the incoming description using the
1469        splitter script and parses out the avrious subsections using the
1470        lcasses above.  Once each sub-experiment is created, use pooled threads
1471        to instantiate them and start it all up.
1472        """
1473        if not self.auth.check_attribute(fid, 'new'):
1474            raise service_error(service_error.access, "New access denied")
1475
1476        try:
1477            tmpdir = tempfile.mkdtemp(prefix="split-")
1478        except IOError:
1479            raise service_error(service_error.internal, "Cannot create tmp dir")
1480
1481        try:
1482            access_user = self.accessdb[fid]
1483        except KeyError:
1484            raise service_error(service_error.internal,
1485                    "Access map and authorizer out of sync in " + \
[7183b48]1486                            "new_experiment for fedid %s"  % fid)
[a3ad8bd]1487
1488        pid = "dummy"
1489        gid = "dummy"
1490
1491        req = req.get('NewRequestBody', None)
1492        if not req:
1493            raise service_error(service_error.req,
1494                    "Bad request format (no NewRequestBody)")
1495
1496        # Generate an ID for the experiment (slice) and a certificate that the
1497        # allocator can use to prove they own it.  We'll ship it back through
1498        # the encrypted connection.
1499        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1500
1501        #now we're done with the tmpdir, and it should be empty
1502        if self.cleanup:
1503            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1504            os.rmdir(tmpdir)
1505        else:
1506            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1507
1508        eid = self.create_experiment_state(fid, req, expid, expcert, 
1509                state='empty')
1510
1511        # Let users touch the state
1512        self.auth.set_attribute(fid, expid)
1513        self.auth.set_attribute(expid, expid)
1514        # Override fedids can manipulate state as well
1515        for o in self.overrides:
1516            self.auth.set_attribute(o, expid)
1517
1518        rv = {
1519                'experimentID': [
1520                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1521                ],
1522                'experimentStatus': 'empty',
1523                'experimentAccess': { 'X509' : expcert }
1524            }
1525
1526        return rv
1527
[e19b75c]1528    def create_experiment(self, req, fid):
[db6b092]1529        """
1530        The external interface to experiment creation called from the
1531        dispatcher.
1532
1533        Creates a working directory, splits the incoming description using the
[43197eb]1534        splitter script and parses out the various subsections using the
1535        classes above.  Once each sub-experiment is created, use pooled threads
[db6b092]1536        to instantiate them and start it all up.
1537        """
[7183b48]1538
1539        req = req.get('CreateRequestBody', None)
1540        if not req:
1541            raise service_error(service_error.req,
1542                    "Bad request format (no CreateRequestBody)")
1543
1544        # Get the experiment access
1545        exp = req.get('experimentID', None)
1546        if exp:
1547            if exp.has_key('fedid'):
1548                key = exp['fedid']
1549                expid = key
1550                eid = None
1551            elif exp.has_key('localname'):
1552                key = exp['localname']
1553                eid = key
1554                expid = None
1555            else:
1556                raise service_error(service_error.req, "Unknown lookup type")
1557        else:
1558            raise service_error(service_error.req, "No request?")
1559
1560        self.check_experiment_access(fid, key)
[db6b092]1561
1562        try:
1563            tmpdir = tempfile.mkdtemp(prefix="split-")
[895a133]1564            os.mkdir(tmpdir+"/keys")
[db6b092]1565        except IOError:
1566            raise service_error(service_error.internal, "Cannot create tmp dir")
1567
1568        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1569        gw_secretkey_base = "fed.%s" % self.ssh_type
1570        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1571        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1572        tclfile = tmpdir + "/experiment.tcl"
1573        tbparams = { }
1574        try:
1575            access_user = self.accessdb[fid]
1576        except KeyError:
1577            raise service_error(service_error.internal,
1578                    "Access map and authorizer out of sync in " + \
1579                            "create_experiment for fedid %s"  % fid)
1580
1581        pid = "dummy"
1582        gid = "dummy"
1583
1584        # The tcl parser needs to read a file so put the content into that file
1585        descr=req.get('experimentdescription', None)
1586        if descr:
1587            file_content=descr.get('ns2description', None)
1588            if file_content:
1589                try:
1590                    f = open(tclfile, 'w')
1591                    f.write(file_content)
1592                    f.close()
1593                except IOError:
1594                    raise service_error(service_error.internal,
1595                            "Cannot write temp experiment description")
1596            else:
1597                raise service_error(service_error.req, 
1598                        "Only ns2descriptions supported")
1599        else:
1600            raise service_error(service_error.req, "No experiment description")
1601
[7183b48]1602        self.state_lock.acquire()
1603        if self.state.has_key(key):
[4afcfc4]1604            self.state[key]['experimentStatus'] = "starting"
[7183b48]1605            for e in self.state[key].get('experimentID',[]):
1606                if not expid and e.has_key('fedid'):
1607                    expid = e['fedid']
1608                elif not eid and e.has_key('localname'):
1609                    eid = e['localname']
1610        self.state_lock.release()
1611
1612        if not (eid and expid):
1613            raise service_error(service_error.internal, 
1614                    "Cannot find local experiment info!?")
[db6b092]1615
1616        try: 
1617            # This catches exceptions to clear the placeholder if necessary
1618            try:
1619                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1620            except ValueError:
1621                raise service_error(service_error.server_config, 
1622                        "Bad key type (%s)" % self.ssh_type)
[5f6929a]1623
[43197eb]1624            # Copy the service request
1625            tb_services = [ s for s in req.get('service',[]) ]
[895a133]1626            # Translate to topdl
[db6b092]1627            if self.splitter_url:
[9b8e269]1628                self.log.debug("Calling remote topdl translator at %s" % \
[db6b092]1629                        self.splitter_url)
[5f6929a]1630                top = self.remote_ns2topdl(self.splitter_url, file_content)
[db6b092]1631            else:
1632                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
[43197eb]1633                    str(self.muxmax), '-m', 'dummy']
[db6b092]1634
1635                tclcmd.extend([pid, gid, eid, tclfile])
1636
1637                self.log.debug("running local splitter %s", " ".join(tclcmd))
1638                # This is just fantastic.  As a side effect the parser copies
1639                # tb_compat.tcl into the current directory, so that directory
1640                # must be writable by the fedd user.  Doing this in the
1641                # temporary subdir ensures this is the case.
[70caa72]1642                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
[db6b092]1643                        cwd=tmpdir)
[866c983]1644                split_data = tclparser.stdout
1645
[1dcaff4]1646                top = topdl.topology_from_xml(file=split_data, top="experiment")
[895a133]1647
[69692a9]1648            hosts, ip_allocator = self.allocate_ips_to_topo(top)
[895a133]1649             # Find the testbeds to look up
1650            testbeds = set([ a.value for e in top.elements \
1651                    for a in e.attribute \
[5f96438]1652                        if a.attribute == 'testbed'])
[895a133]1653
[43197eb]1654            masters = { }           # testbeds exporting services
1655            for s in tb_services:
1656                # If this is a project_export request with the imports field
1657                # blank, fill it in.
1658                if s.get('name', '') == 'project_export':
1659                    if 'import' not in s or len(s['import']) == 0:
1660                        s['import'] = [ tb for tb in testbeds \
1661                                if tb not in s.get('export',[])]
1662                # Add the service to masters
1663                for tb in s.get('export', []):
1664                    if s.get('name', None) and s.get('import', None):
1665                        if tb not in masters:
1666                            masters[tb] = [ ]
1667
1668                        params = { }
1669                        if 'fedAttr' in s:
1670                            for a in s['fedAttr']:
1671                                params[a.get('attribute', '')] = \
1672                                        a.get('value','')
1673
1674                        masters[tb].append(federated_service(name=s['name'],
1675                                exporter=tb, importers=s.get('import',[]),
1676                                params=params))
1677                    else:
1678                        log.error('Testbed service does not have name " + \
1679                                "and importers')
1680
1681
[895a133]1682            allocated = { }         # Testbeds we can access
1683            topo ={ }               # Sub topologies
[e02cd14]1684            connInfo = { }          # Connection information
[43197eb]1685            self.get_access_to_testbeds(testbeds, access_user, allocated, 
1686                    tbparams, masters)
[5f96438]1687
[7fe81be]1688            self.split_topology(top, topo, testbeds)
[895a133]1689
1690            # Copy configuration files into the remote file store
[6c57fe9]1691            # The config urlpath
1692            configpath = "/%s/config" % expid
1693            # The config file system location
1694            configdir ="%s%s" % ( self.repodir, configpath)
1695            try:
1696                os.makedirs(configdir)
1697            except IOError, e:
1698                raise service_error(
1699                        "Cannot create config directory: %s" % e)
1700            try:
1701                f = open("%s/hosts" % configdir, "w")
1702                f.write('\n'.join(hosts))
1703                f.close()
1704            except IOError, e:
1705                raise service_error(service_error.internal, 
1706                        "Cannot write hosts file: %s" % e)
1707            try:
[40dd8c1]1708                copy_file("%s" % gw_pubkey, "%s/%s" % \
[6c57fe9]1709                        (configdir, gw_pubkey_base))
[40dd8c1]1710                copy_file("%s" % gw_secretkey, "%s/%s" % \
[6c57fe9]1711                        (configdir, gw_secretkey_base))
1712            except IOError, e:
1713                raise service_error(service_error.internal, 
1714                        "Cannot copy keyfiles: %s" % e)
[cc8d8e9]1715
[6c57fe9]1716            # Allow the individual testbeds to access the configuration files.
1717            for tb in tbparams.keys():
1718                asignee = tbparams[tb]['allocID']['fedid']
1719                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1720                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
[cc8d8e9]1721
[73e7f5c]1722            part = experiment_partition(self.auth, self.store_url, self.tbmap,
1723                    self.muxmax)
[5f96438]1724            part.add_portals(top, topo, eid, masters, tbparams, ip_allocator,
[2761484]1725                    connInfo, expid)
[69692a9]1726            # Now get access to the dynamic testbeds
1727            for k, t in topo.items():
1728                if not t.get_attribute('dynamic'):
1729                    continue
1730                tb = t.get_attribute('testbed')
1731                if tb: 
[43197eb]1732                    self.get_access(tb, None, tbparams, export_project, 
1733                            access_user, masters)
[69692a9]1734                    tbparams[k] = tbparams[tb]
1735                    del tbparams[tb]
1736                    allocated[k] = 1
[109a32a]1737                    store_keys = t.get_attribute('store_keys')
1738                    # Give the testbed access to keys it exports or imports
1739                    if store_keys:
1740                        for sk in store_keys.split(" "):
1741                            self.auth.set_attribute(\
1742                                    tbparams[k]['allocID']['fedid'], sk)
[69692a9]1743                else:
1744                    raise service_error(service_error.internal, 
1745                            "Dynamic allocation from no testbed!?")
1746
[895a133]1747            self.wrangle_software(expid, top, topo, tbparams)
[cc8d8e9]1748
1749            vtopo = topdl.topology_to_vtopo(top)
1750            vis = self.genviz(vtopo)
[db6b092]1751
[866c983]1752            # save federant information
1753            for k in allocated.keys():
[ecf679e]1754                tbparams[k]['federant'] = {
1755                        'name': [ { 'localname' : eid} ],
1756                        'allocID' : tbparams[k]['allocID'],
1757                        'uri': tbparams[k]['uri'],
[866c983]1758                    }
1759
[db6b092]1760            self.state_lock.acquire()
1761            self.state[eid]['vtopo'] = vtopo
1762            self.state[eid]['vis'] = vis
1763            self.state[expid]['federant'] = \
1764                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1765                        if tbparams[tb].has_key('federant') ]
[cc8d8e9]1766            if self.state_filename: 
1767                self.write_state()
[db6b092]1768            self.state_lock.release()
[866c983]1769        except service_error, e:
1770            # If something goes wrong in the parse (usually an access error)
1771            # clear the placeholder state.  From here on out the code delays
[db6b092]1772            # exceptions.  Failing at this point returns a fault to the remote
1773            # caller.
[cc8d8e9]1774
[866c983]1775            self.state_lock.acquire()
1776            del self.state[eid]
[bd3e314]1777            del self.state[expid]
1778            if self.state_filename: self.write_state()
[866c983]1779            self.state_lock.release()
1780            raise e
1781
1782
[db6b092]1783        # Start the background swapper and return the starting state.  From
1784        # here on out, the state will stick around a while.
[866c983]1785
[db6b092]1786        # Let users touch the state
[bd3e314]1787        self.auth.set_attribute(fid, expid)
1788        self.auth.set_attribute(expid, expid)
[db6b092]1789        # Override fedids can manipulate state as well
1790        for o in self.overrides:
1791            self.auth.set_attribute(o, expid)
1792
1793        # Create a logger that logs to the experiment's state object as well as
1794        # to the main log file.
1795        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
[f07fa49]1796        alloc_collector = self.list_log(self.state[eid]['log'])
1797        h = logging.StreamHandler(alloc_collector)
[db6b092]1798        # XXX: there should be a global one of these rather than repeating the
1799        # code.
1800        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1801                    '%d %b %y %H:%M:%S'))
1802        alloc_log.addHandler(h)
1803       
[6c57fe9]1804        attrs = [ 
1805                {
1806                    'attribute': 'ssh_pubkey', 
1807                    'value': '%s/%s/config/%s' % \
[7183b48]1808                            (self.repo_url, expid, gw_pubkey_base)
[6c57fe9]1809                },
1810                {
1811                    'attribute': 'ssh_secretkey', 
1812                    'value': '%s/%s/config/%s' % \
[7183b48]1813                            (self.repo_url, expid, gw_secretkey_base)
[6c57fe9]1814                },
1815                {
1816                    'attribute': 'hosts', 
1817                    'value': '%s/%s/config/hosts' % \
[7183b48]1818                            (self.repo_url, expid)
[6c57fe9]1819                },
[ecca6eb]1820                {
1821                    'attribute': 'experiment_name',
1822                    'value': eid,
1823                },
[6c57fe9]1824            ]
1825
[617592b]1826        # transit and disconnected testbeds may not have a connInfo entry.
1827        # Fill in the blanks.
1828        for t in allocated.keys():
1829            if not connInfo.has_key(t):
1830                connInfo[t] = { }
1831
[db6b092]1832        # Start a thread to do the resource allocation
[e19b75c]1833        t  = Thread(target=self.allocate_resources,
[43197eb]1834                args=(allocated, masters, eid, expid, tbparams, 
1835                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo),
[db6b092]1836                name=eid)
1837        t.start()
1838
1839        rv = {
1840                'experimentID': [
1841                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1842                ],
1843                'experimentStatus': 'starting',
1844            }
1845
1846        return rv
[9479343]1847   
1848    def get_experiment_fedid(self, key):
1849        """
[db6b092]1850        find the fedid associated with the localname key in the state database.
[9479343]1851        """
1852
[db6b092]1853        rv = None
1854        self.state_lock.acquire()
1855        if self.state.has_key(key):
1856            if isinstance(self.state[key], dict):
1857                try:
1858                    kl = [ f['fedid'] for f in \
1859                            self.state[key]['experimentID']\
1860                                if f.has_key('fedid') ]
1861                except KeyError:
1862                    self.state_lock.release()
1863                    raise service_error(service_error.internal, 
1864                            "No fedid for experiment %s when getting "+\
1865                                    "fedid(!?)" % key)
1866                if len(kl) == 1:
1867                    rv = kl[0]
1868                else:
1869                    self.state_lock.release()
1870                    raise service_error(service_error.internal, 
1871                            "multiple fedids for experiment %s when " +\
1872                                    "getting fedid(!?)" % key)
1873            else:
1874                self.state_lock.release()
1875                raise service_error(service_error.internal, 
1876                        "Unexpected state for %s" % key)
1877        self.state_lock.release()
1878        return rv
[a97394b]1879
[4064742]1880    def check_experiment_access(self, fid, key):
[866c983]1881        """
1882        Confirm that the fid has access to the experiment.  Though a request
1883        may be made in terms of a local name, the access attribute is always
1884        the experiment's fedid.
1885        """
1886        if not isinstance(key, fedid):
[db6b092]1887            key = self.get_experiment_fedid(key)
[866c983]1888
1889        if self.auth.check_attribute(fid, key):
1890            return True
1891        else:
1892            raise service_error(service_error.access, "Access Denied")
[4064742]1893
1894
[db6b092]1895    def get_handler(self, path, fid):
[7183b48]1896        self.log.info("Get handler %s %s" % (path, fid))
[6c57fe9]1897        if self.auth.check_attribute(fid, path):
1898            return ("%s/%s" % (self.repodir, path), "application/binary")
1899        else:
1900            return (None, None)
[987aaa1]1901
1902    def get_vtopo(self, req, fid):
[866c983]1903        """
1904        Return the stored virtual topology for this experiment
1905        """
1906        rv = None
[db6b092]1907        state = None
[866c983]1908
1909        req = req.get('VtopoRequestBody', None)
1910        if not req:
1911            raise service_error(service_error.req,
1912                    "Bad request format (no VtopoRequestBody)")
1913        exp = req.get('experiment', None)
1914        if exp:
1915            if exp.has_key('fedid'):
1916                key = exp['fedid']
1917                keytype = "fedid"
1918            elif exp.has_key('localname'):
1919                key = exp['localname']
1920                keytype = "localname"
1921            else:
1922                raise service_error(service_error.req, "Unknown lookup type")
1923        else:
1924            raise service_error(service_error.req, "No request?")
1925
1926        self.check_experiment_access(fid, key)
1927
1928        self.state_lock.acquire()
1929        if self.state.has_key(key):
[db6b092]1930            if self.state[key].has_key('vtopo'):
1931                rv = { 'experiment' : {keytype: key },\
1932                        'vtopo': self.state[key]['vtopo'],\
1933                    }
1934            else:
1935                state = self.state[key]['experimentStatus']
[866c983]1936        self.state_lock.release()
1937
1938        if rv: return rv
[bd3e314]1939        else: 
[db6b092]1940            if state:
1941                raise service_error(service_error.partial, 
1942                        "Not ready: %s" % state)
1943            else:
1944                raise service_error(service_error.req, "No such experiment")
[987aaa1]1945
1946    def get_vis(self, req, fid):
[866c983]1947        """
1948        Return the stored visualization for this experiment
1949        """
1950        rv = None
[db6b092]1951        state = None
[866c983]1952
1953        req = req.get('VisRequestBody', None)
1954        if not req:
1955            raise service_error(service_error.req,
1956                    "Bad request format (no VisRequestBody)")
1957        exp = req.get('experiment', None)
1958        if exp:
1959            if exp.has_key('fedid'):
1960                key = exp['fedid']
1961                keytype = "fedid"
1962            elif exp.has_key('localname'):
1963                key = exp['localname']
1964                keytype = "localname"
1965            else:
1966                raise service_error(service_error.req, "Unknown lookup type")
1967        else:
1968            raise service_error(service_error.req, "No request?")
1969
1970        self.check_experiment_access(fid, key)
1971
1972        self.state_lock.acquire()
1973        if self.state.has_key(key):
[db6b092]1974            if self.state[key].has_key('vis'):
1975                rv =  { 'experiment' : {keytype: key },\
1976                        'vis': self.state[key]['vis'],\
1977                        }
1978            else:
1979                state = self.state[key]['experimentStatus']
[866c983]1980        self.state_lock.release()
1981
1982        if rv: return rv
[bd3e314]1983        else:
[db6b092]1984            if state:
1985                raise service_error(service_error.partial, 
1986                        "Not ready: %s" % state)
1987            else:
1988                raise service_error(service_error.req, "No such experiment")
[987aaa1]1989
[65f3f29]1990    def clean_info_response(self, rv):
[db6b092]1991        """
1992        Remove the information in the experiment's state object that is not in
1993        the info response.
1994        """
1995        # Remove the owner info (should always be there, but...)
1996        if rv.has_key('owner'): del rv['owner']
1997
1998        # Convert the log into the allocationLog parameter and remove the
1999        # log entry (with defensive programming)
2000        if rv.has_key('log'):
2001            rv['allocationLog'] = "".join(rv['log'])
2002            del rv['log']
2003        else:
2004            rv['allocationLog'] = ""
2005
2006        if rv['experimentStatus'] != 'active':
2007            if rv.has_key('federant'): del rv['federant']
2008        else:
[69692a9]2009            # remove the allocationID and uri info from each federant
[db6b092]2010            for f in rv.get('federant', []):
2011                if f.has_key('allocID'): del f['allocID']
[69692a9]2012                if f.has_key('uri'): del f['uri']
[db6b092]2013        return rv
[65f3f29]2014
[c52c48d]2015    def get_info(self, req, fid):
[866c983]2016        """
2017        Return all the stored info about this experiment
2018        """
2019        rv = None
2020
2021        req = req.get('InfoRequestBody', None)
2022        if not req:
2023            raise service_error(service_error.req,
[65f3f29]2024                    "Bad request format (no InfoRequestBody)")
[866c983]2025        exp = req.get('experiment', None)
2026        if exp:
2027            if exp.has_key('fedid'):
2028                key = exp['fedid']
2029                keytype = "fedid"
2030            elif exp.has_key('localname'):
2031                key = exp['localname']
2032                keytype = "localname"
2033            else:
2034                raise service_error(service_error.req, "Unknown lookup type")
2035        else:
2036            raise service_error(service_error.req, "No request?")
2037
2038        self.check_experiment_access(fid, key)
2039
2040        # The state may be massaged by the service function that called
2041        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2042        # state.
2043        self.state_lock.acquire()
2044        if self.state.has_key(key):
2045            rv = copy.deepcopy(self.state[key])
2046        self.state_lock.release()
2047
[db6b092]2048        if rv:
2049            return self.clean_info_response(rv)
[bd3e314]2050        else:
[db6b092]2051            raise service_error(service_error.req, "No such experiment")
[7a8d667]2052
[65f3f29]2053    def get_multi_info(self, req, fid):
2054        """
2055        Return all the stored info that this fedid can access
2056        """
[db6b092]2057        rv = { 'info': [ ] }
[65f3f29]2058
[db6b092]2059        self.state_lock.acquire()
2060        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
[829246e]2061            try:
2062                self.check_experiment_access(fid, key)
2063            except service_error, e:
2064                if e.code == service_error.access:
2065                    continue
2066                else:
2067                    self.state_lock.release()
2068                    raise e
[65f3f29]2069
[db6b092]2070            if self.state.has_key(key):
2071                e = copy.deepcopy(self.state[key])
2072                e = self.clean_info_response(e)
2073                rv['info'].append(e)
[65f3f29]2074        self.state_lock.release()
[db6b092]2075        return rv
[65f3f29]2076
[7a8d667]2077    def terminate_experiment(self, req, fid):
[866c983]2078        """
2079        Swap this experiment out on the federants and delete the shared
2080        information
2081        """
2082        tbparams = { }
2083        req = req.get('TerminateRequestBody', None)
2084        if not req:
2085            raise service_error(service_error.req,
2086                    "Bad request format (no TerminateRequestBody)")
[db6b092]2087        force = req.get('force', False)
[866c983]2088        exp = req.get('experiment', None)
2089        if exp:
2090            if exp.has_key('fedid'):
2091                key = exp['fedid']
2092                keytype = "fedid"
2093            elif exp.has_key('localname'):
2094                key = exp['localname']
2095                keytype = "localname"
2096            else:
2097                raise service_error(service_error.req, "Unknown lookup type")
2098        else:
2099            raise service_error(service_error.req, "No request?")
2100
2101        self.check_experiment_access(fid, key)
2102
[db6b092]2103        dealloc_list = [ ]
[46e4682]2104
2105
[5ae3857]2106        # Create a logger that logs to the dealloc_list as well as to the main
2107        # log file.
2108        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2109        h = logging.StreamHandler(self.list_log(dealloc_list))
2110        # XXX: there should be a global one of these rather than repeating the
2111        # code.
2112        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2113                    '%d %b %y %H:%M:%S'))
2114        dealloc_log.addHandler(h)
2115
2116        self.state_lock.acquire()
2117        fed_exp = self.state.get(key, None)
2118
2119        if fed_exp:
2120            # This branch of the conditional holds the lock to generate a
2121            # consistent temporary tbparams variable to deallocate experiments.
2122            # It releases the lock to do the deallocations and reacquires it to
2123            # remove the experiment state when the termination is complete.
2124
2125            # First make sure that the experiment creation is complete.
2126            status = fed_exp.get('experimentStatus', None)
2127
2128            if status:
2129                if status in ('starting', 'terminating'):
2130                    if not force:
2131                        self.state_lock.release()
2132                        raise service_error(service_error.partial, 
2133                                'Experiment still being created or destroyed')
2134                    else:
2135                        self.log.warning('Experiment in %s state ' % status + \
2136                                'being terminated by force.')
2137            else:
2138                # No status??? trouble
2139                self.state_lock.release()
2140                raise service_error(service_error.internal,
2141                        "Experiment has no status!?")
2142
2143            ids = []
2144            #  experimentID is a list of dicts that are self-describing
2145            #  identifiers.  This finds all the fedids and localnames - the
2146            #  keys of self.state - and puts them into ids.
2147            for id in fed_exp.get('experimentID', []):
2148                if id.has_key('fedid'): ids.append(id['fedid'])
2149                if id.has_key('localname'): ids.append(id['localname'])
2150
[63a35b7]2151            # Collect the allocation/segment ids into a dict keyed by the fedid
2152            # of the allocation (or a monotonically increasing integer) that
2153            # contains a tuple of uri, aid (which is a dict...)
2154            for i, fed in enumerate(fed_exp.get('federant', [])):
[5ae3857]2155                try:
[63a35b7]2156                    uri = fed['uri']
2157                    aid = fed['allocID']
2158                    k = fed['allocID'].get('fedid', i)
[5ae3857]2159                except KeyError, e:
2160                    continue
[63a35b7]2161                tbparams[k] = (uri, aid)
[5ae3857]2162            fed_exp['experimentStatus'] = 'terminating'
2163            if self.state_filename: self.write_state()
2164            self.state_lock.release()
2165
2166            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2167            # then completes, so we can't wait if nothing starts.  So, no
2168            # tbparams, no start.
2169            if len(tbparams) > 0:
2170                thread_pool = self.thread_pool(self.nthreads)
[63a35b7]2171                for k in tbparams.keys():
[5ae3857]2172                    # Create and start a thread to stop the segment
2173                    thread_pool.wait_for_slot()
[63a35b7]2174                    uri, aid = tbparams[k]
[5ae3857]2175                    t  = self.pooled_thread(\
[e19b75c]2176                            target=self.terminate_segment(log=dealloc_log,
[63a35b7]2177                                testbed=uri,
[5ae3857]2178                                cert_file=self.cert_file, 
2179                                cert_pwd=self.cert_pwd,
2180                                trusted_certs=self.trusted_certs,
2181                                caller=self.call_TerminateSegment),
[63a35b7]2182                            args=(uri, aid), name=k,
[5ae3857]2183                            pdata=thread_pool, trace_file=self.trace_file)
2184                    t.start()
2185                # Wait for completions
2186                thread_pool.wait_for_all_done()
2187
2188            # release the allocations (failed experiments have done this
2189            # already, and starting experiments may be in odd states, so we
2190            # ignore errors releasing those allocations
2191            try: 
[63a35b7]2192                for k in tbparams.keys():
[ecf679e]2193                    # This releases access by uri
[63a35b7]2194                    uri, aid = tbparams[k]
2195                    self.release_access(None, aid, uri=uri)
[5ae3857]2196            except service_error, e:
2197                if status != 'failed' and not force:
2198                    raise e
2199
2200            # Remove the terminated experiment
2201            self.state_lock.acquire()
2202            for id in ids:
2203                if self.state.has_key(id): del self.state[id]
2204
2205            if self.state_filename: self.write_state()
2206            self.state_lock.release()
2207
[2761484]2208            # Delete any synch points associated with this experiment.  All
2209            # synch points begin with the fedid of the experiment.
2210            fedid_keys = set(["fedid:%s" % f for f in ids \
2211                    if isinstance(f, fedid)])
2212            for k in self.synch_store.all_keys():
2213                try:
2214                    if len(k) > 45 and k[0:46] in fedid_keys:
2215                        self.synch_store.del_value(k)
[dadc4da]2216                except synch_store.BadDeletionError:
[2761484]2217                    pass
2218            self.write_store()
2219               
[5ae3857]2220            return { 
2221                    'experiment': exp , 
2222                    'deallocationLog': "".join(dealloc_list),
2223                    }
2224        else:
2225            # Don't forget to release the lock
2226            self.state_lock.release()
2227            raise service_error(service_error.req, "No saved state")
[2761484]2228
2229
2230    def GetValue(self, req, fid):
2231        """
2232        Get a value from the synchronized store
2233        """
2234        req = req.get('GetValueRequestBody', None)
2235        if not req:
2236            raise service_error(service_error.req,
2237                    "Bad request format (no GetValueRequestBody)")
2238       
2239        name = req['name']
2240        wait = req['wait']
2241        rv = { 'name': name }
2242
2243        if self.auth.check_attribute(fid, name):
[dadc4da]2244            try:
2245                v = self.synch_store.get_value(name, wait)
2246            except synch_store.RevokedKeyError:
2247                # No more synch on this key
2248                raise service_error(service_error.federant, 
2249                        "Synch key %s revoked" % name)
[2761484]2250            if v is not None:
2251                rv['value'] = v
[109a32a]2252            self.log.debug("[GetValue] got %s from %s" % (v, name))
[2761484]2253            return rv
2254        else:
2255            raise service_error(service_error.access, "Access Denied")
2256       
2257
2258    def SetValue(self, req, fid):
2259        """
2260        Set a value in the synchronized store
2261        """
2262        req = req.get('SetValueRequestBody', None)
2263        if not req:
2264            raise service_error(service_error.req,
2265                    "Bad request format (no SetValueRequestBody)")
2266       
2267        name = req['name']
2268        v = req['value']
2269
2270        if self.auth.check_attribute(fid, name):
2271            try:
2272                self.synch_store.set_value(name, v)
2273                self.write_store()
[109a32a]2274                self.log.debug("[SetValue] set %s to %s" % (name, v))
[2761484]2275            except synch_store.CollisionError:
2276                # Translate into a service_error
2277                raise service_error(service_error.req,
2278                        "Value already set: %s" %name)
[dadc4da]2279            except synch_store.RevokedKeyError:
2280                # No more synch on this key
2281                raise service_error(service_error.federant, 
2282                        "Synch key %s revoked" % name)
[2761484]2283            return { 'name': name, 'value': v }
2284        else:
2285            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.