source: fedd/federation/experiment_control.py @ 5b19c3a

version-3.01
Last change on this file since 5b19c3a was 5b19c3a, checked in by Ted Faber <faber@…>, 13 years ago

This rolls back the 3.01 branch to the original 3.01 state. I erroneously kept the changes between 3.01 and 3.02 in this branch. With the new branch created, this restores the 3.01 tag's correctness.

  • Property mode set to 100644
File size: 82.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32
33import topdl
34import list_log
35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
45
46# Right now, no support for composition.
47class federated_service:
48    def __init__(self, name, exporter=None, importers=None, params=None, 
49            reqs=None, portal=None):
50        self.name=name
51        self.exporter=exporter
52        if importers is None: self.importers = []
53        else: self.importers=importers
54        if params is None: self.params = { }
55        else: self.params = params
56        if reqs is None: self.reqs = []
57        else: self.reqs = reqs
58
59        if portal is not None:
60            self.portal = portal
61        else:
62            self.portal = (name in federated_service.needs_portal)
63
64    def __str__(self):
65        return "name %s export %s import %s params %s reqs %s" % \
66                (self.name, self.exporter, self.importers, self.params,
67                        [ (r['name'], r['visibility']) for r in self.reqs] )
68
69    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
70
71class experiment_control_local:
72    """
73    Control of experiments that this system can directly access.
74
75    Includes experiment creation, termination and information dissemination.
76    Thred safe.
77    """
78
79    class ssh_cmd_timeout(RuntimeError): pass
80   
81    class thread_pool:
82        """
83        A class to keep track of a set of threads all invoked for the same
84        task.  Manages the mutual exclusion of the states.
85        """
86        def __init__(self, nthreads):
87            """
88            Start a pool.
89            """
90            self.changed = Condition()
91            self.started = 0
92            self.terminated = 0
93            self.nthreads = nthreads
94
95        def acquire(self):
96            """
97            Get the pool's lock.
98            """
99            self.changed.acquire()
100
101        def release(self):
102            """
103            Release the pool's lock.
104            """
105            self.changed.release()
106
107        def wait(self, timeout = None):
108            """
109            Wait for a pool thread to start or stop.
110            """
111            self.changed.wait(timeout)
112
113        def start(self):
114            """
115            Called by a pool thread to report starting.
116            """
117            self.changed.acquire()
118            self.started += 1
119            self.changed.notifyAll()
120            self.changed.release()
121
122        def terminate(self):
123            """
124            Called by a pool thread to report finishing.
125            """
126            self.changed.acquire()
127            self.terminated += 1
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def clear(self):
132            """
133            Clear all pool data.
134            """
135            self.changed.acquire()
136            self.started = 0
137            self.terminated =0
138            self.changed.notifyAll()
139            self.changed.release()
140
141        def wait_for_slot(self):
142            """
143            Wait until we have a free slot to start another pooled thread
144            """
145            self.acquire()
146            while self.started - self.terminated >= self.nthreads:
147                self.wait()
148            self.release()
149
150        def wait_for_all_done(self, timeout=None):
151            """
152            Wait until all active threads finish (and at least one has
153            started).  If a timeout is given, return after waiting that long
154            for termination.  If all threads are done (and one has started in
155            the since the last clear()) return True, otherwise False.
156            """
157            if timeout:
158                deadline = time.time() + timeout
159            self.acquire()
160            while self.started == 0 or self.started > self.terminated:
161                self.wait(timeout)
162                if timeout:
163                    if time.time() > deadline:
164                        break
165                    timeout = deadline - time.time()
166            self.release()
167            return not (self.started == 0 or self.started > self.terminated)
168
169    class pooled_thread(Thread):
170        """
171        One of a set of threads dedicated to a specific task.  Uses the
172        thread_pool class above for coordination.
173        """
174        def __init__(self, group=None, target=None, name=None, args=(), 
175                kwargs={}, pdata=None, trace_file=None):
176            Thread.__init__(self, group, target, name, args, kwargs)
177            self.rv = None          # Return value of the ops in this thread
178            self.exception = None   # Exception that terminated this thread
179            self.target=target      # Target function to run on start()
180            self.args = args        # Args to pass to target
181            self.kwargs = kwargs    # Additional kw args
182            self.pdata = pdata      # thread_pool for this class
183            # Logger for this thread
184            self.log = logging.getLogger("fedd.experiment_control")
185       
186        def run(self):
187            """
188            Emulate Thread.run, except add pool data manipulation and error
189            logging.
190            """
191            if self.pdata:
192                self.pdata.start()
193
194            if self.target:
195                try:
196                    self.rv = self.target(*self.args, **self.kwargs)
197                except service_error, s:
198                    self.exception = s
199                    self.log.error("Thread exception: %s %s" % \
200                            (s.code_string(), s.desc))
201                except:
202                    self.exception = sys.exc_info()[1]
203                    self.log.error(("Unexpected thread exception: %s" +\
204                            "Trace %s") % (self.exception,\
205                                traceback.format_exc()))
206            if self.pdata:
207                self.pdata.terminate()
208
209    call_RequestAccess = service_caller('RequestAccess')
210    call_ReleaseAccess = service_caller('ReleaseAccess')
211    call_StartSegment = service_caller('StartSegment')
212    call_TerminateSegment = service_caller('TerminateSegment')
213    call_Ns2Topdl = service_caller('Ns2Topdl')
214
215    def __init__(self, config=None, auth=None):
216        """
217        Intialize the various attributes, most from the config object
218        """
219
220        def parse_tarfile_list(tf):
221            """
222            Parse a tarfile list from the configuration.  This is a set of
223            paths and tarfiles separated by spaces.
224            """
225            rv = [ ]
226            if tf is not None:
227                tl = tf.split()
228                while len(tl) > 1:
229                    p, t = tl[0:2]
230                    del tl[0:2]
231                    rv.append((p, t))
232            return rv
233
234        self.thread_with_rv = experiment_control_local.pooled_thread
235        self.thread_pool = experiment_control_local.thread_pool
236        self.list_log = list_log.list_log
237
238        self.cert_file = config.get("experiment_control", "cert_file")
239        if self.cert_file:
240            self.cert_pwd = config.get("experiment_control", "cert_pwd")
241        else:
242            self.cert_file = config.get("globals", "cert_file")
243            self.cert_pwd = config.get("globals", "cert_pwd")
244
245        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
246                or config.get("globals", "trusted_certs")
247
248        self.repodir = config.get("experiment_control", "repodir")
249        self.repo_url = config.get("experiment_control", "repo_url", 
250                "https://users.isi.deterlab.net:23235");
251
252        self.exp_stem = "fed-stem"
253        self.log = logging.getLogger("fedd.experiment_control")
254        set_log_level(config, "experiment_control", self.log)
255        self.muxmax = 2
256        self.nthreads = 10
257        self.randomize_experiments = False
258
259        self.splitter = None
260        self.ssh_keygen = "/usr/bin/ssh-keygen"
261        self.ssh_identity_file = None
262
263
264        self.debug = config.getboolean("experiment_control", "create_debug")
265        self.cleanup = not config.getboolean("experiment_control", 
266                "leave_tmpfiles")
267        self.state_filename = config.get("experiment_control", 
268                "experiment_state")
269        self.store_filename = config.get("experiment_control", 
270                "synch_store")
271        self.store_url = config.get("experiment_control", "store_url")
272        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
273        self.fedkit = parse_tarfile_list(\
274                config.get("experiment_control", "fedkit"))
275        self.gatewaykit = parse_tarfile_list(\
276                config.get("experiment_control", "gatewaykit"))
277        accessdb_file = config.get("experiment_control", "accessdb")
278
279        self.ssh_pubkey_file = config.get("experiment_control", 
280                "ssh_pubkey_file")
281        self.ssh_privkey_file = config.get("experiment_control",
282                "ssh_privkey_file")
283        dt = config.get("experiment_control", "direct_transit")
284        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
285        else: self.direct_transit = [ ]
286        # NB for internal master/slave ops, not experiment setup
287        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
288
289        self.overrides = set([])
290        ovr = config.get('experiment_control', 'overrides')
291        if ovr:
292            for o in ovr.split(","):
293                o = o.strip()
294                if o.startswith('fedid:'): o = o[len('fedid:'):]
295                self.overrides.add(fedid(hexstr=o))
296
297        self.state = { }
298        self.state_lock = Lock()
299        self.tclsh = "/usr/local/bin/otclsh"
300        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
301                config.get("experiment_control", "tcl_splitter",
302                        "/usr/testbed/lib/ns2ir/parse.tcl")
303        mapdb_file = config.get("experiment_control", "mapdb")
304        self.trace_file = sys.stderr
305
306        self.def_expstart = \
307                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
308                "/tmp/federate";
309        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
310                "FEDDIR/hosts";
311        self.def_gwstart = \
312                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
313                "/tmp/bridge.log";
314        self.def_mgwstart = \
315                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
316                "/tmp/bridge.log";
317        self.def_gwimage = "FBSD61-TUNNEL2";
318        self.def_gwtype = "pc";
319        self.local_access = { }
320
321        if auth:
322            self.auth = auth
323        else:
324            self.log.error(\
325                    "[access]: No authorizer initialized, creating local one.")
326            auth = authorizer()
327
328
329        if self.ssh_pubkey_file:
330            try:
331                f = open(self.ssh_pubkey_file, 'r')
332                self.ssh_pubkey = f.read()
333                f.close()
334            except EnvironmentError:
335                raise service_error(service_error.internal,
336                        "Cannot read sshpubkey")
337        else:
338            raise service_error(service_error.internal, 
339                    "No SSH public key file?")
340
341        if not self.ssh_privkey_file:
342            raise service_error(service_error.internal, 
343                    "No SSH public key file?")
344
345
346        if mapdb_file:
347            self.read_mapdb(mapdb_file)
348        else:
349            self.log.warn("[experiment_control] No testbed map, using defaults")
350            self.tbmap = { 
351                    'deter':'https://users.isi.deterlab.net:23235',
352                    'emulab':'https://users.isi.deterlab.net:23236',
353                    'ucb':'https://users.isi.deterlab.net:23237',
354                    }
355
356        if accessdb_file:
357                self.read_accessdb(accessdb_file)
358        else:
359            raise service_error(service_error.internal,
360                    "No accessdb specified in config")
361
362        # Grab saved state.  OK to do this w/o locking because it's read only
363        # and only one thread should be in existence that can see self.state at
364        # this point.
365        if self.state_filename:
366            self.read_state()
367
368        if self.store_filename:
369            self.read_store()
370        else:
371            self.log.warning("No saved synch store")
372            self.synch_store = synch_store
373
374        # Dispatch tables
375        self.soap_services = {\
376                'New': soap_handler('New', self.new_experiment),
377                'Create': soap_handler('Create', self.create_experiment),
378                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
379                'Vis': soap_handler('Vis', self.get_vis),
380                'Info': soap_handler('Info', self.get_info),
381                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
382                'Terminate': soap_handler('Terminate', 
383                    self.terminate_experiment),
384                'GetValue': soap_handler('GetValue', self.GetValue),
385                'SetValue': soap_handler('SetValue', self.SetValue),
386        }
387
388        self.xmlrpc_services = {\
389                'New': xmlrpc_handler('New', self.new_experiment),
390                'Create': xmlrpc_handler('Create', self.create_experiment),
391                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
392                'Vis': xmlrpc_handler('Vis', self.get_vis),
393                'Info': xmlrpc_handler('Info', self.get_info),
394                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
395                'Terminate': xmlrpc_handler('Terminate',
396                    self.terminate_experiment),
397                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
398                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
399        }
400
401    # Call while holding self.state_lock
402    def write_state(self):
403        """
404        Write a new copy of experiment state after copying the existing state
405        to a backup.
406
407        State format is a simple pickling of the state dictionary.
408        """
409        if os.access(self.state_filename, os.W_OK):
410            copy_file(self.state_filename, \
411                    "%s.bak" % self.state_filename)
412        try:
413            f = open(self.state_filename, 'w')
414            pickle.dump(self.state, f)
415        except EnvironmentError, e:
416            self.log.error("Can't write file %s: %s" % \
417                    (self.state_filename, e))
418        except pickle.PicklingError, e:
419            self.log.error("Pickling problem: %s" % e)
420        except TypeError, e:
421            self.log.error("Pickling problem (TypeError): %s" % e)
422
423    @staticmethod
424    def get_alloc_ids(state):
425        """
426        Pull the fedids of the identifiers of each allocation from the
427        state.  Again, a dict dive that's best isolated.
428
429        Used by read_store and read state
430        """
431
432        return [ f['allocID']['fedid'] 
433                for f in state.get('federant',[]) \
434                    if f.has_key('allocID') and \
435                        f['allocID'].has_key('fedid')]
436
437    # Call while holding self.state_lock
438    def read_state(self):
439        """
440        Read a new copy of experiment state.  Old state is overwritten.
441
442        State format is a simple pickling of the state dictionary.
443        """
444       
445        def get_experiment_id(state):
446            """
447            Pull the fedid experimentID out of the saved state.  This is kind
448            of a gross walk through the dict.
449            """
450
451            if state.has_key('experimentID'):
452                for e in state['experimentID']:
453                    if e.has_key('fedid'):
454                        return e['fedid']
455                else:
456                    return None
457            else:
458                return None
459
460        try:
461            f = open(self.state_filename, "r")
462            self.state = pickle.load(f)
463            self.log.debug("[read_state]: Read state from %s" % \
464                    self.state_filename)
465        except EnvironmentError, e:
466            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
467                    % (self.state_filename, e))
468        except pickle.UnpicklingError, e:
469            self.log.warning(("[read_state]: No saved state: " + \
470                    "Unpickling failed: %s") % e)
471       
472        for s in self.state.values():
473            try:
474
475                eid = get_experiment_id(s)
476                if eid : 
477                    # Give the owner rights to the experiment
478                    self.auth.set_attribute(s['owner'], eid)
479                    # And holders of the eid as well
480                    self.auth.set_attribute(eid, eid)
481                    # allow overrides to control experiments as well
482                    for o in self.overrides:
483                        self.auth.set_attribute(o, eid)
484                    # Set permissions to allow reading of the software repo, if
485                    # any, as well.
486                    for a in self.get_alloc_ids(s):
487                        self.auth.set_attribute(a, 'repo/%s' % eid)
488                else:
489                    raise KeyError("No experiment id")
490            except KeyError, e:
491                self.log.warning("[read_state]: State ownership or identity " +\
492                        "misformatted in %s: %s" % (self.state_filename, e))
493
494
495    def read_accessdb(self, accessdb_file):
496        """
497        Read the mapping from fedids that can create experiments to their name
498        in the 3-level access namespace.  All will be asserted from this
499        testbed and can include the local username and porject that will be
500        asserted on their behalf by this fedd.  Each fedid is also added to the
501        authorization system with the "create" attribute.
502        """
503        self.accessdb = {}
504        # These are the regexps for parsing the db
505        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
506        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
507                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
508        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
509                "\s*->\s*(" + name_expr + ")\s*$")
510        lineno = 0
511
512        # Parse the mappings and store in self.authdb, a dict of
513        # fedid -> (proj, user)
514        try:
515            f = open(accessdb_file, "r")
516            for line in f:
517                lineno += 1
518                line = line.strip()
519                if len(line) == 0 or line.startswith('#'):
520                    continue
521                m = project_line.match(line)
522                if m:
523                    fid = fedid(hexstr=m.group(1))
524                    project, user = m.group(2,3)
525                    if not self.accessdb.has_key(fid):
526                        self.accessdb[fid] = []
527                    self.accessdb[fid].append((project, user))
528                    continue
529
530                m = user_line.match(line)
531                if m:
532                    fid = fedid(hexstr=m.group(1))
533                    project = None
534                    user = m.group(2)
535                    if not self.accessdb.has_key(fid):
536                        self.accessdb[fid] = []
537                    self.accessdb[fid].append((project, user))
538                    continue
539                self.log.warn("[experiment_control] Error parsing access " +\
540                        "db %s at line %d" %  (accessdb_file, lineno))
541        except EnvironmentError:
542            raise service_error(service_error.internal,
543                    ("Error opening/reading %s as experiment " +\
544                            "control accessdb") %  accessdb_file)
545        f.close()
546
547        # Initialize the authorization attributes
548        for fid in self.accessdb.keys():
549            self.auth.set_attribute(fid, 'create')
550            self.auth.set_attribute(fid, 'new')
551
552    def read_mapdb(self, file):
553        """
554        Read a simple colon separated list of mappings for the
555        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
556        """
557
558        self.tbmap = { }
559        lineno =0
560        try:
561            f = open(file, "r")
562            for line in f:
563                lineno += 1
564                line = line.strip()
565                if line.startswith('#') or len(line) == 0:
566                    continue
567                try:
568                    label, url = line.split(':', 1)
569                    self.tbmap[label] = url
570                except ValueError, e:
571                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
572                            "map db: %s %s" % (lineno, line, e))
573        except EnvironmentError, e:
574            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
575                    "open %s: %s" % (file, e))
576        f.close()
577
578    def read_store(self):
579        try:
580            self.synch_store = synch_store()
581            self.synch_store.load(self.store_filename)
582            self.log.debug("[read_store]: Read store from %s" % \
583                    self.store_filename)
584        except EnvironmentError, e:
585            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
586                    % (self.state_filename, e))
587            self.synch_store = synch_store()
588
589        # Set the initial permissions on data in the store.  XXX: This ad hoc
590        # authorization attribute initialization is getting out of hand.
591        for k in self.synch_store.all_keys():
592            try:
593                if k.startswith('fedid:'):
594                    fid = fedid(hexstr=k[6:46])
595                    if self.state.has_key(fid):
596                        for a in self.get_alloc_ids(self.state[fid]):
597                            self.auth.set_attribute(a, k)
598            except ValueError, e:
599                self.log.warn("Cannot deduce permissions for %s" % k)
600
601
602    def write_store(self):
603        """
604        Write a new copy of synch_store after writing current state
605        to a backup.  We use the internal synch_store pickle method to avoid
606        incinsistent data.
607
608        State format is a simple pickling of the store.
609        """
610        if os.access(self.store_filename, os.W_OK):
611            copy_file(self.store_filename, \
612                    "%s.bak" % self.store_filename)
613        try:
614            self.synch_store.save(self.store_filename)
615        except EnvironmentError, e:
616            self.log.error("Can't write file %s: %s" % \
617                    (self.store_filename, e))
618        except TypeError, e:
619            self.log.error("Pickling problem (TypeError): %s" % e)
620
621       
622    def generate_ssh_keys(self, dest, type="rsa" ):
623        """
624        Generate a set of keys for the gateways to use to talk.
625
626        Keys are of type type and are stored in the required dest file.
627        """
628        valid_types = ("rsa", "dsa")
629        t = type.lower();
630        if t not in valid_types: raise ValueError
631        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
632
633        try:
634            trace = open("/dev/null", "w")
635        except EnvironmentError:
636            raise service_error(service_error.internal,
637                    "Cannot open /dev/null??");
638
639        # May raise CalledProcessError
640        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
641        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
642        if rv != 0:
643            raise service_error(service_error.internal, 
644                    "Cannot generate nonce ssh keys.  %s return code %d" \
645                            % (self.ssh_keygen, rv))
646
647    def gentopo(self, str):
648        """
649        Generate the topology dtat structure from the splitter's XML
650        representation of it.
651
652        The topology XML looks like:
653            <experiment>
654                <nodes>
655                    <node><vname></vname><ips>ip1:ip2</ips></node>
656                </nodes>
657                <lans>
658                    <lan>
659                        <vname></vname><vnode></vnode><ip></ip>
660                        <bandwidth></bandwidth><member>node:port</member>
661                    </lan>
662                </lans>
663        """
664        class topo_parse:
665            """
666            Parse the topology XML and create the dats structure.
667            """
668            def __init__(self):
669                # Typing of the subelements for data conversion
670                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
671                self.int_subelements = ( 'bandwidth',)
672                self.float_subelements = ( 'delay',)
673                # The final data structure
674                self.nodes = [ ]
675                self.lans =  [ ]
676                self.topo = { \
677                        'node': self.nodes,\
678                        'lan' : self.lans,\
679                    }
680                self.element = { }  # Current element being created
681                self.chars = ""     # Last text seen
682
683            def end_element(self, name):
684                # After each sub element the contents is added to the current
685                # element or to the appropriate list.
686                if name == 'node':
687                    self.nodes.append(self.element)
688                    self.element = { }
689                elif name == 'lan':
690                    self.lans.append(self.element)
691                    self.element = { }
692                elif name in self.str_subelements:
693                    self.element[name] = self.chars
694                    self.chars = ""
695                elif name in self.int_subelements:
696                    self.element[name] = int(self.chars)
697                    self.chars = ""
698                elif name in self.float_subelements:
699                    self.element[name] = float(self.chars)
700                    self.chars = ""
701
702            def found_chars(self, data):
703                self.chars += data.rstrip()
704
705
706        tp = topo_parse();
707        parser = xml.parsers.expat.ParserCreate()
708        parser.EndElementHandler = tp.end_element
709        parser.CharacterDataHandler = tp.found_chars
710
711        parser.Parse(str)
712
713        return tp.topo
714       
715
716    def genviz(self, topo):
717        """
718        Generate the visualization the virtual topology
719        """
720
721        neato = "/usr/local/bin/neato"
722        # These are used to parse neato output and to create the visualization
723        # file.
724        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
725        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
726                "%s</type></node>"
727
728        try:
729            # Node names
730            nodes = [ n['vname'] for n in topo['node'] ]
731            topo_lans = topo['lan']
732        except KeyError, e:
733            raise service_error(service_error.internal, "Bad topology: %s" %e)
734
735        lans = { }
736        links = { }
737
738        # Walk through the virtual topology, organizing the connections into
739        # 2-node connections (links) and more-than-2-node connections (lans).
740        # When a lan is created, it's added to the list of nodes (there's a
741        # node in the visualization for the lan).
742        for l in topo_lans:
743            if links.has_key(l['vname']):
744                if len(links[l['vname']]) < 2:
745                    links[l['vname']].append(l['vnode'])
746                else:
747                    nodes.append(l['vname'])
748                    lans[l['vname']] = links[l['vname']]
749                    del links[l['vname']]
750                    lans[l['vname']].append(l['vnode'])
751            elif lans.has_key(l['vname']):
752                lans[l['vname']].append(l['vnode'])
753            else:
754                links[l['vname']] = [ l['vnode'] ]
755
756
757        # Open up a temporary file for dot to turn into a visualization
758        try:
759            df, dotname = tempfile.mkstemp()
760            dotfile = os.fdopen(df, 'w')
761        except EnvironmentError:
762            raise service_error(service_error.internal,
763                    "Failed to open file in genviz")
764
765        try:
766            dnull = open('/dev/null', 'w')
767        except EnvironmentError:
768            service_error(service_error.internal,
769                    "Failed to open /dev/null in genviz")
770
771        # Generate a dot/neato input file from the links, nodes and lans
772        try:
773            print >>dotfile, "graph G {"
774            for n in nodes:
775                print >>dotfile, '\t"%s"' % n
776            for l in links.keys():
777                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
778            for l in lans.keys():
779                for n in lans[l]:
780                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
781            print >>dotfile, "}"
782            dotfile.close()
783        except TypeError:
784            raise service_error(service_error.internal,
785                    "Single endpoint link in vtopo")
786        except EnvironmentError:
787            raise service_error(service_error.internal, "Cannot write dot file")
788
789        # Use dot to create a visualization
790        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
791                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
792                close_fds=True)
793        dnull.close()
794
795        # Translate dot to vis format
796        vis_nodes = [ ]
797        vis = { 'node': vis_nodes }
798        for line in dot.stdout:
799            m = vis_re.match(line)
800            if m:
801                vn = m.group(1)
802                vis_node = {'name': vn, \
803                        'x': float(m.group(2)),\
804                        'y' : float(m.group(3)),\
805                    }
806                if vn in links.keys() or vn in lans.keys():
807                    vis_node['type'] = 'lan'
808                else:
809                    vis_node['type'] = 'node'
810                vis_nodes.append(vis_node)
811        rv = dot.wait()
812
813        os.remove(dotname)
814        if rv == 0 : return vis
815        else: return None
816
817    def get_access(self, tb, nodes, tbparam, access_user, masters, tbmap):
818        """
819        Get access to testbed through fedd and set the parameters for that tb
820        """
821        def get_export_project(svcs):
822            """
823            Look through for the list of federated_service for this testbed
824            objects for a project_export service, and extract the project
825            parameter.
826            """
827
828            pe = [s for s in svcs if s.name=='project_export']
829            if len(pe) == 1:
830                return pe[0].params.get('project', None)
831            elif len(pe) == 0:
832                return None
833            else:
834                raise service_error(service_error.req,
835                        "More than one project export is not supported")
836
837        uri = tbmap.get(testbed_base(tb), None)
838        if not uri:
839            raise service_error(service_error.server_config, 
840                    "Unknown testbed: %s" % tb)
841
842        export_svcs = masters.get(tb,[])
843        import_svcs = [ s for m in masters.values() \
844                for s in m \
845                    if tb in s.importers ]
846
847        export_project = get_export_project(export_svcs)
848
849        # Tweak search order so that if there are entries in access_user that
850        # have a project matching the export project, we try them first
851        if export_project: 
852            access_sequence = [ (p, u) for p, u in access_user \
853                    if p == export_project] 
854            access_sequence.extend([(p, u) for p, u in access_user \
855                    if p != export_project]) 
856        else: 
857            access_sequence = access_user
858
859        for p, u in access_sequence: 
860            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
861                    "to %s") %  ((p or "None"), u, uri))
862
863            if p:
864                # Request with user and project specified
865                req = {\
866                        'credential': [ "project: %s" % p, "user: %s"  % u],
867                    }
868            else:
869                # Request with only user specified
870                req = {\
871                        'credential': [ 'user: %s' % u ],
872                    }
873
874            # Make the service request from the services we're importing and
875            # exporting.  Keep track of the export request ids so we can
876            # collect the resulting info from the access response.
877            e_keys = { }
878            if import_svcs or export_svcs:
879                req['service'] = [ ]
880
881                for i, s in enumerate(import_svcs):
882                    idx = 'import%d' % i
883                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
884                    if s.params:
885                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
886                                for k, v in s.params.items()]
887                    req['service'].append(sr)
888
889                for i, s in enumerate(export_svcs):
890                    idx = 'export%d' % i
891                    e_keys[idx] = s
892                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
893                    if s.params:
894                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
895                                for k, v in s.params.items()]
896                    req['service'].append(sr)
897
898            # node resources if any
899            if nodes != None and len(nodes) > 0:
900                rnodes = [ ]
901                for n in nodes:
902                    rn = { }
903                    image, hw, count = n.split(":")
904                    if image: rn['image'] = [ image ]
905                    if hw: rn['hardware'] = [ hw ]
906                    if count and int(count) >0 : rn['count'] = int(count)
907                    rnodes.append(rn)
908                req['resources']= { }
909                req['resources']['node'] = rnodes
910
911            try:
912                if self.local_access.has_key(uri):
913                    # Local access call
914                    req = { 'RequestAccessRequestBody' : req }
915                    r = self.local_access[uri].RequestAccess(req, 
916                            fedid(file=self.cert_file))
917                    r = { 'RequestAccessResponseBody' : r }
918                else:
919                    r = self.call_RequestAccess(uri, req, 
920                            self.cert_file, self.cert_pwd, self.trusted_certs)
921            except service_error, e:
922                if e.code == service_error.access:
923                    self.log.debug("[get_access] Access denied")
924                    r = None
925                    continue
926                else:
927                    raise e
928
929            if r.has_key('RequestAccessResponseBody'):
930                # Through to here we have a valid response, not a fault.
931                # Access denied is a fault, so something better or worse than
932                # access denied has happened.
933                r = r['RequestAccessResponseBody']
934                self.log.debug("[get_access] Access granted")
935                break
936            else:
937                raise service_error(service_error.protocol,
938                        "Bad proxy response")
939       
940        if not r:
941            raise service_error(service_error.access, 
942                    "Access denied by %s (%s)" % (tb, uri))
943
944        tbparam[tb] = { 
945                "allocID" : r['allocID'],
946                "uri": uri,
947                }
948
949        # Collect the responses corresponding to the services this testbed
950        # exports.  These will be the service requests that we will include in
951        # the start segment requests (with appropriate visibility values) to
952        # import and export the segments.
953        for s in r.get('service', []):
954            id = s.get('id', None)
955            if id and id in e_keys:
956                e_keys[id].reqs.append(s)
957
958        # Add attributes to parameter space.  We don't allow attributes to
959        # overlay any parameters already installed.
960        for a in r.get('fedAttr', []):
961            try:
962                if a['attribute'] and \
963                        isinstance(a['attribute'], basestring)\
964                        and not tbparam[tb].has_key(a['attribute'].lower()):
965                    tbparam[tb][a['attribute'].lower()] = a['value']
966            except KeyError:
967                self.log.error("Bad attribute in response: %s" % a)
968
969    def release_access(self, tb, aid, tbmap=None, uri=None):
970        """
971        Release access to testbed through fedd
972        """
973
974        if not uri and tbmap:
975            uri = tbmap.get(tb, None)
976        if not uri:
977            raise service_error(service_error.server_config, 
978                    "Unknown testbed: %s" % tb)
979
980        if self.local_access.has_key(uri):
981            resp = self.local_access[uri].ReleaseAccess(\
982                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
983                    fedid(file=self.cert_file))
984            resp = { 'ReleaseAccessResponseBody': resp } 
985        else:
986            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
987                    self.cert_file, self.cert_pwd, self.trusted_certs)
988
989        # better error coding
990
991    def remote_ns2topdl(self, uri, desc):
992
993        req = {
994                'description' : { 'ns2description': desc },
995            }
996
997        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
998                self.trusted_certs)
999
1000        if r.has_key('Ns2TopdlResponseBody'):
1001            r = r['Ns2TopdlResponseBody']
1002            ed = r.get('experimentdescription', None)
1003            if ed.has_key('topdldescription'):
1004                return topdl.Topology(**ed['topdldescription'])
1005            else:
1006                raise service_error(service_error.protocol, 
1007                        "Bad splitter response (no output)")
1008        else:
1009            raise service_error(service_error.protocol, "Bad splitter response")
1010
1011    class start_segment:
1012        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1013                cert_pwd=None, trusted_certs=None, caller=None,
1014                log_collector=None):
1015            self.log = log
1016            self.debug = debug
1017            self.cert_file = cert_file
1018            self.cert_pwd = cert_pwd
1019            self.trusted_certs = None
1020            self.caller = caller
1021            self.testbed = testbed
1022            self.log_collector = log_collector
1023            self.response = None
1024            self.node = { }
1025
1026        def make_map(self, resp):
1027            for e in resp.get('embedding', []):
1028                if 'toponame' in e and 'physname' in e:
1029                    self.node[e['toponame']] = e['physname'][0]
1030
1031        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1032            req = {
1033                    'allocID': { 'fedid' : aid }, 
1034                    'segmentdescription': { 
1035                        'topdldescription': topo.to_dict(),
1036                    },
1037                }
1038
1039            if connInfo:
1040                req['connection'] = connInfo
1041
1042            import_svcs = [ s for m in masters.values() \
1043                    for s in m if self.testbed in s.importers]
1044
1045            if import_svcs or self.testbed in masters:
1046                req['service'] = []
1047
1048            for s in import_svcs:
1049                for r in s.reqs:
1050                    sr = copy.deepcopy(r)
1051                    sr['visibility'] = 'import';
1052                    req['service'].append(sr)
1053
1054            for s in masters.get(self.testbed, []):
1055                for r in s.reqs:
1056                    sr = copy.deepcopy(r)
1057                    sr['visibility'] = 'export';
1058                    req['service'].append(sr)
1059
1060            if attrs:
1061                req['fedAttr'] = attrs
1062
1063            try:
1064                self.log.debug("Calling StartSegment at %s " % uri)
1065                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1066                        self.trusted_certs)
1067                if r.has_key('StartSegmentResponseBody'):
1068                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1069                            None)
1070                    if lval and self.log_collector:
1071                        for line in  lval.splitlines(True):
1072                            self.log_collector.write(line)
1073                    self.make_map(r['StartSegmentResponseBody'])
1074                    self.response = r
1075                else:
1076                    raise service_error(service_error.internal, 
1077                            "Bad response!?: %s" %r)
1078                return True
1079            except service_error, e:
1080                self.log.error("Start segment failed on %s: %s" % \
1081                        (self.testbed, e))
1082                return False
1083
1084
1085
1086    class terminate_segment:
1087        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1088                cert_pwd=None, trusted_certs=None, caller=None):
1089            self.log = log
1090            self.debug = debug
1091            self.cert_file = cert_file
1092            self.cert_pwd = cert_pwd
1093            self.trusted_certs = None
1094            self.caller = caller
1095            self.testbed = testbed
1096
1097        def __call__(self, uri, aid ):
1098            req = {
1099                    'allocID': aid , 
1100                }
1101            try:
1102                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1103                        self.trusted_certs)
1104                return True
1105            except service_error, e:
1106                self.log.error("Terminate segment failed on %s: %s" % \
1107                        (self.testbed, e))
1108                return False
1109   
1110
1111    def allocate_resources(self, allocated, masters, eid, expid, 
1112            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1113            attrs=None, connInfo={}, tbmap=None):
1114
1115        started = { }           # Testbeds where a sub-experiment started
1116                                # successfully
1117
1118        # XXX
1119        fail_soft = False
1120
1121        if tbmap is None: tbmap = { }
1122
1123        log = alloc_log or self.log
1124
1125        thread_pool = self.thread_pool(self.nthreads)
1126        threads = [ ]
1127        starters = [ ]
1128
1129        for tb in allocated.keys():
1130            # Create and start a thread to start the segment, and save it
1131            # to get the return value later
1132            tb_attrs = copy.copy(attrs)
1133            thread_pool.wait_for_slot()
1134            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
1135            base, suffix = split_testbed(tb)
1136            if suffix:
1137                tb_attrs.append({'attribute': 'experiment_name', 
1138                    'value': "%s-%s" % (eid, suffix)})
1139            else:
1140                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1141            if not uri:
1142                raise service_error(service_error.internal, 
1143                        "Unknown testbed %s !?" % tb)
1144
1145            if tbparams[tb].has_key('allocID') and \
1146                    tbparams[tb]['allocID'].has_key('fedid'):
1147                aid = tbparams[tb]['allocID']['fedid']
1148            else:
1149                raise service_error(service_error.internal, 
1150                        "No alloc id for testbed %s !?" % tb)
1151
1152            s = self.start_segment(log=log, debug=self.debug,
1153                    testbed=tb, cert_file=self.cert_file,
1154                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1155                    caller=self.call_StartSegment,
1156                    log_collector=log_collector)
1157            starters.append(s)
1158            t  = self.pooled_thread(\
1159                    target=s, name=tb,
1160                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1161                    pdata=thread_pool, trace_file=self.trace_file)
1162            threads.append(t)
1163            t.start()
1164
1165        # Wait until all finish (keep pinging the log, though)
1166        mins = 0
1167        revoked = False
1168        while not thread_pool.wait_for_all_done(60.0):
1169            mins += 1
1170            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1171                    % mins)
1172            if not revoked and \
1173                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1174                # a testbed has failed.  Revoke this experiment's
1175                # synchronizarion values so that sub experiments will not
1176                # deadlock waiting for synchronization that will never happen
1177                self.log.info("A subexperiment has failed to swap in, " + \
1178                        "revoking synch keys")
1179                var_key = "fedid:%s" % expid
1180                for k in self.synch_store.all_keys():
1181                    if len(k) > 45 and k[0:46] == var_key:
1182                        self.synch_store.revoke_key(k)
1183                revoked = True
1184
1185        failed = [ t.getName() for t in threads if not t.rv ]
1186        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1187
1188        # If one failed clean up, unless fail_soft is set
1189        if failed:
1190            if not fail_soft:
1191                thread_pool.clear()
1192                for tb in succeeded:
1193                    # Create and start a thread to stop the segment
1194                    thread_pool.wait_for_slot()
1195                    uri = tbparams[tb]['uri']
1196                    t  = self.pooled_thread(\
1197                            target=self.terminate_segment(log=log,
1198                                testbed=tb,
1199                                cert_file=self.cert_file, 
1200                                cert_pwd=self.cert_pwd,
1201                                trusted_certs=self.trusted_certs,
1202                                caller=self.call_TerminateSegment),
1203                            args=(uri, tbparams[tb]['federant']['allocID']),
1204                            name=tb,
1205                            pdata=thread_pool, trace_file=self.trace_file)
1206                    t.start()
1207                # Wait until all finish (if any are being stopped)
1208                if succeeded:
1209                    thread_pool.wait_for_all_done()
1210
1211                # release the allocations
1212                for tb in tbparams.keys():
1213                    self.release_access(tb, tbparams[tb]['allocID'], 
1214                            tbmap=tbmap, uri=tbparams[tb].get('uri', None))
1215                # Remove the placeholder
1216                self.state_lock.acquire()
1217                self.state[eid]['experimentStatus'] = 'failed'
1218                if self.state_filename: self.write_state()
1219                self.state_lock.release()
1220                # Remove the repo dir
1221                self.remove_dirs("%s/%s" %(self.repodir, expid))
1222                # Walk up tmpdir, deleting as we go
1223                if self.cleanup:
1224                    self.remove_dirs(tmpdir)
1225                else:
1226                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1227
1228
1229                log.error("Swap in failed on %s" % ",".join(failed))
1230                return
1231        else:
1232            # Walk through the successes and gather the virtual to physical
1233            # mapping.
1234            embedding = [ ]
1235            for s in starters:
1236                for k, v in s.node.items():
1237                    embedding.append({
1238                        'toponame': k, 
1239                        'physname': [ v],
1240                        'testbed': s.testbed
1241                        })
1242            log.info("[start_segment]: Experiment %s active" % eid)
1243
1244
1245        # Walk up tmpdir, deleting as we go
1246        if self.cleanup:
1247            self.remove_dirs(tmpdir)
1248        else:
1249            log.debug("[start_experiment]: not removing %s" % tmpdir)
1250
1251        # Insert the experiment into our state and update the disk copy.
1252        self.state_lock.acquire()
1253        self.state[expid]['experimentStatus'] = 'active'
1254        self.state[eid] = self.state[expid]
1255        self.state[eid]['experimentdescription']['topdldescription'] = \
1256                top.to_dict()
1257        self.state[eid]['embedding'] = embedding
1258        if self.state_filename: self.write_state()
1259        self.state_lock.release()
1260        return
1261
1262
1263    def add_kit(self, e, kit):
1264        """
1265        Add a Software object created from the list of (install, location)
1266        tuples passed as kit  to the software attribute of an object e.  We
1267        do this enough to break out the code, but it's kind of a hack to
1268        avoid changing the old tuple rep.
1269        """
1270
1271        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1272
1273        if isinstance(e.software, list): e.software.extend(s)
1274        else: e.software = s
1275
1276
1277    def create_experiment_state(self, fid, req, expid, expcert,
1278            state='starting'):
1279        """
1280        Create the initial entry in the experiment's state.  The expid and
1281        expcert are the experiment's fedid and certifacte that represents that
1282        ID, which are installed in the experiment state.  If the request
1283        includes a suggested local name that is used if possible.  If the local
1284        name is already taken by an experiment owned by this user that has
1285        failed, it is overwritten.  Otherwise new letters are added until a
1286        valid localname is found.  The generated local name is returned.
1287        """
1288
1289        if req.has_key('experimentID') and \
1290                req['experimentID'].has_key('localname'):
1291            overwrite = False
1292            eid = req['experimentID']['localname']
1293            # If there's an old failed experiment here with the same local name
1294            # and accessible by this user, we'll overwrite it, otherwise we'll
1295            # fall through and do the collision avoidance.
1296            old_expid = self.get_experiment_fedid(eid)
1297            if old_expid and self.check_experiment_access(fid, old_expid):
1298                self.state_lock.acquire()
1299                status = self.state[eid].get('experimentStatus', None)
1300                if status and status == 'failed':
1301                    # remove the old access attribute
1302                    self.auth.unset_attribute(fid, old_expid)
1303                    overwrite = True
1304                    del self.state[eid]
1305                    del self.state[old_expid]
1306                self.state_lock.release()
1307            self.state_lock.acquire()
1308            while (self.state.has_key(eid) and not overwrite):
1309                eid += random.choice(string.ascii_letters)
1310            # Initial state
1311            self.state[eid] = {
1312                    'experimentID' : \
1313                            [ { 'localname' : eid }, {'fedid': expid } ],
1314                    'experimentStatus': state,
1315                    'experimentAccess': { 'X509' : expcert },
1316                    'owner': fid,
1317                    'log' : [],
1318                }
1319            self.state[expid] = self.state[eid]
1320            if self.state_filename: self.write_state()
1321            self.state_lock.release()
1322        else:
1323            eid = self.exp_stem
1324            for i in range(0,5):
1325                eid += random.choice(string.ascii_letters)
1326            self.state_lock.acquire()
1327            while (self.state.has_key(eid)):
1328                eid = self.exp_stem
1329                for i in range(0,5):
1330                    eid += random.choice(string.ascii_letters)
1331            # Initial state
1332            self.state[eid] = {
1333                    'experimentID' : \
1334                            [ { 'localname' : eid }, {'fedid': expid } ],
1335                    'experimentStatus': state,
1336                    'experimentAccess': { 'X509' : expcert },
1337                    'owner': fid,
1338                    'log' : [],
1339                }
1340            self.state[expid] = self.state[eid]
1341            if self.state_filename: self.write_state()
1342            self.state_lock.release()
1343
1344        return eid
1345
1346
1347    def allocate_ips_to_topo(self, top):
1348        """
1349        Add an ip4_address attribute to all the hosts in the topology, based on
1350        the shared substrates on which they sit.  An /etc/hosts file is also
1351        created and returned as a list of hostfiles entries.  We also return
1352        the allocator, because we may need to allocate IPs to portals
1353        (specifically DRAGON portals).
1354        """
1355        subs = sorted(top.substrates, 
1356                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1357                reverse=True)
1358        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1359        ifs = { }
1360        hosts = [ ]
1361
1362        for idx, s in enumerate(subs):
1363            net_size = len(s.interfaces)+2
1364
1365            a = ips.allocate(net_size)
1366            if a :
1367                base, num = a
1368                if num < net_size: 
1369                    raise service_error(service_error.internal,
1370                            "Allocator returned wrong number of IPs??")
1371            else:
1372                raise service_error(service_error.req, 
1373                        "Cannot allocate IP addresses")
1374            mask = ips.min_alloc
1375            while mask < net_size:
1376                mask *= 2
1377
1378            netmask = ((2**32-1) ^ (mask-1))
1379
1380            base += 1
1381            for i in s.interfaces:
1382                i.attribute.append(
1383                        topdl.Attribute('ip4_address', 
1384                            "%s" % ip_addr(base)))
1385                i.attribute.append(
1386                        topdl.Attribute('ip4_netmask', 
1387                            "%s" % ip_addr(int(netmask))))
1388
1389                hname = i.element.name
1390                if ifs.has_key(hname):
1391                    hosts.append("%s\t%s-%s %s-%d" % \
1392                            (ip_addr(base), hname, s.name, hname,
1393                                ifs[hname]))
1394                else:
1395                    ifs[hname] = 0
1396                    hosts.append("%s\t%s-%s %s-%d %s" % \
1397                            (ip_addr(base), hname, s.name, hname,
1398                                ifs[hname], hname))
1399
1400                ifs[hname] += 1
1401                base += 1
1402        return hosts, ips
1403
1404    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1405            tbparams, masters, tbmap):
1406        """
1407        Request access to the various testbeds required for this instantiation
1408        (passed in as testbeds).  User, access_user, expoert_project and master
1409        are used to construct the correct requests.  Per-testbed parameters are
1410        returned in tbparams.
1411        """
1412        for tb in testbeds:
1413            self.get_access(tb, None, tbparams, access_user, masters, tbmap)
1414            allocated[tb] = 1
1415
1416    def split_topology(self, top, topo, testbeds):
1417        """
1418        Create the sub-topologies that are needed for experiment instantiation.
1419        """
1420        for tb in testbeds:
1421            topo[tb] = top.clone()
1422            # copy in for loop allows deletions from the original
1423            for e in [ e for e in topo[tb].elements]:
1424                etb = e.get_attribute('testbed')
1425                # NB: elements without a testbed attribute won't appear in any
1426                # sub topologies. 
1427                if not etb or etb != tb:
1428                    for i in e.interface:
1429                        for s in i.subs:
1430                            try:
1431                                s.interfaces.remove(i)
1432                            except ValueError:
1433                                raise service_error(service_error.internal,
1434                                        "Can't remove interface??")
1435                    topo[tb].elements.remove(e)
1436            topo[tb].make_indices()
1437
1438    def wrangle_software(self, expid, top, topo, tbparams):
1439        """
1440        Copy software out to the repository directory, allocate permissions and
1441        rewrite the segment topologies to look for the software in local
1442        places.
1443        """
1444
1445        # Copy the rpms and tarfiles to a distribution directory from
1446        # which the federants can retrieve them
1447        linkpath = "%s/software" %  expid
1448        softdir ="%s/%s" % ( self.repodir, linkpath)
1449        softmap = { }
1450        # These are in a list of tuples format (each kit).  This comprehension
1451        # unwraps them into a single list of tuples that initilaizes the set of
1452        # tuples.
1453        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1454                for p, t in l ])
1455        pkgs.update([x.location for e in top.elements \
1456                for x in e.software])
1457        try:
1458            os.makedirs(softdir)
1459        except EnvironmentError, e:
1460            raise service_error(
1461                    "Cannot create software directory: %s" % e)
1462        # The actual copying.  Everything's converted into a url for copying.
1463        for pkg in pkgs:
1464            loc = pkg
1465
1466            scheme, host, path = urlparse(loc)[0:3]
1467            dest = os.path.basename(path)
1468            if not scheme:
1469                if not loc.startswith('/'):
1470                    loc = "/%s" % loc
1471                loc = "file://%s" %loc
1472            try:
1473                u = urlopen(loc)
1474            except Exception, e:
1475                raise service_error(service_error.req, 
1476                        "Cannot open %s: %s" % (loc, e))
1477            try:
1478                f = open("%s/%s" % (softdir, dest) , "w")
1479                self.log.debug("Writing %s/%s" % (softdir,dest) )
1480                data = u.read(4096)
1481                while data:
1482                    f.write(data)
1483                    data = u.read(4096)
1484                f.close()
1485                u.close()
1486            except Exception, e:
1487                raise service_error(service_error.internal,
1488                        "Could not copy %s: %s" % (loc, e))
1489            path = re.sub("/tmp", "", linkpath)
1490            # XXX
1491            softmap[pkg] = \
1492                    "%s/%s/%s" %\
1493                    ( self.repo_url, path, dest)
1494
1495            # Allow the individual segments to access the software.
1496            for tb in tbparams.keys():
1497                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1498                        "/%s/%s" % ( path, dest))
1499
1500        # Convert the software locations in the segments into the local
1501        # copies on this host
1502        for soft in [ s for tb in topo.values() \
1503                for e in tb.elements \
1504                    if getattr(e, 'software', False) \
1505                        for s in e.software ]:
1506            if softmap.has_key(soft.location):
1507                soft.location = softmap[soft.location]
1508
1509
1510    def new_experiment(self, req, fid):
1511        """
1512        The external interface to empty initial experiment creation called from
1513        the dispatcher.
1514
1515        Creates a working directory, splits the incoming description using the
1516        splitter script and parses out the avrious subsections using the
1517        lcasses above.  Once each sub-experiment is created, use pooled threads
1518        to instantiate them and start it all up.
1519        """
1520        if not self.auth.check_attribute(fid, 'new'):
1521            raise service_error(service_error.access, "New access denied")
1522
1523        try:
1524            tmpdir = tempfile.mkdtemp(prefix="split-")
1525        except EnvironmentError:
1526            raise service_error(service_error.internal, "Cannot create tmp dir")
1527
1528        try:
1529            access_user = self.accessdb[fid]
1530        except KeyError:
1531            raise service_error(service_error.internal,
1532                    "Access map and authorizer out of sync in " + \
1533                            "new_experiment for fedid %s"  % fid)
1534
1535        pid = "dummy"
1536        gid = "dummy"
1537
1538        req = req.get('NewRequestBody', None)
1539        if not req:
1540            raise service_error(service_error.req,
1541                    "Bad request format (no NewRequestBody)")
1542
1543        # Generate an ID for the experiment (slice) and a certificate that the
1544        # allocator can use to prove they own it.  We'll ship it back through
1545        # the encrypted connection.
1546        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1547
1548        #now we're done with the tmpdir, and it should be empty
1549        if self.cleanup:
1550            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1551            os.rmdir(tmpdir)
1552        else:
1553            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1554
1555        eid = self.create_experiment_state(fid, req, expid, expcert, 
1556                state='empty')
1557
1558        # Let users touch the state
1559        self.auth.set_attribute(fid, expid)
1560        self.auth.set_attribute(expid, expid)
1561        # Override fedids can manipulate state as well
1562        for o in self.overrides:
1563            self.auth.set_attribute(o, expid)
1564
1565        rv = {
1566                'experimentID': [
1567                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1568                ],
1569                'experimentStatus': 'empty',
1570                'experimentAccess': { 'X509' : expcert }
1571            }
1572
1573        return rv
1574
1575    def create_experiment(self, req, fid):
1576        """
1577        The external interface to experiment creation called from the
1578        dispatcher.
1579
1580        Creates a working directory, splits the incoming description using the
1581        splitter script and parses out the various subsections using the
1582        classes above.  Once each sub-experiment is created, use pooled threads
1583        to instantiate them and start it all up.
1584        """
1585
1586        req = req.get('CreateRequestBody', None)
1587        if not req:
1588            raise service_error(service_error.req,
1589                    "Bad request format (no CreateRequestBody)")
1590
1591        # Get the experiment access
1592        exp = req.get('experimentID', None)
1593        if exp:
1594            if exp.has_key('fedid'):
1595                key = exp['fedid']
1596                expid = key
1597                eid = None
1598            elif exp.has_key('localname'):
1599                key = exp['localname']
1600                eid = key
1601                expid = None
1602            else:
1603                raise service_error(service_error.req, "Unknown lookup type")
1604        else:
1605            raise service_error(service_error.req, "No request?")
1606
1607        self.check_experiment_access(fid, key)
1608
1609        # Install the testbed map entries supplied with the request into a copy
1610        # of the testbed map.
1611        tbmap = dict(self.tbmap)
1612        for m in req.get('testbedmap', []):
1613            if 'testbed' in m and 'uri' in m:
1614                tbmap[m['testbed']] = m['uri']
1615
1616        try:
1617            tmpdir = tempfile.mkdtemp(prefix="split-")
1618            os.mkdir(tmpdir+"/keys")
1619        except EnvironmentError:
1620            raise service_error(service_error.internal, "Cannot create tmp dir")
1621
1622        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1623        gw_secretkey_base = "fed.%s" % self.ssh_type
1624        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1625        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1626        tclfile = tmpdir + "/experiment.tcl"
1627        tbparams = { }
1628        try:
1629            access_user = self.accessdb[fid]
1630        except KeyError:
1631            raise service_error(service_error.internal,
1632                    "Access map and authorizer out of sync in " + \
1633                            "create_experiment for fedid %s"  % fid)
1634
1635        pid = "dummy"
1636        gid = "dummy"
1637
1638        # The tcl parser needs to read a file so put the content into that file
1639        descr=req.get('experimentdescription', None)
1640        if descr:
1641            file_content=descr.get('ns2description', None)
1642            if file_content:
1643                try:
1644                    f = open(tclfile, 'w')
1645                    f.write(file_content)
1646                    f.close()
1647                except EnvironmentError:
1648                    raise service_error(service_error.internal,
1649                            "Cannot write temp experiment description")
1650            else:
1651                raise service_error(service_error.req, 
1652                        "Only ns2descriptions supported")
1653        else:
1654            raise service_error(service_error.req, "No experiment description")
1655
1656        self.state_lock.acquire()
1657        if self.state.has_key(key):
1658            self.state[key]['experimentStatus'] = "starting"
1659            for e in self.state[key].get('experimentID',[]):
1660                if not expid and e.has_key('fedid'):
1661                    expid = e['fedid']
1662                elif not eid and e.has_key('localname'):
1663                    eid = e['localname']
1664        self.state_lock.release()
1665
1666        if not (eid and expid):
1667            raise service_error(service_error.internal, 
1668                    "Cannot find local experiment info!?")
1669
1670        try: 
1671            # This catches exceptions to clear the placeholder if necessary
1672            try:
1673                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1674            except ValueError:
1675                raise service_error(service_error.server_config, 
1676                        "Bad key type (%s)" % self.ssh_type)
1677
1678            # Copy the service request
1679            tb_services = [ s for s in req.get('service',[]) ]
1680            # Translate to topdl
1681            if self.splitter_url:
1682                self.log.debug("Calling remote topdl translator at %s" % \
1683                        self.splitter_url)
1684                top = self.remote_ns2topdl(self.splitter_url, file_content)
1685            else:
1686                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1687                    str(self.muxmax), '-m', 'dummy']
1688
1689                tclcmd.extend([pid, gid, eid, tclfile])
1690
1691                self.log.debug("running local splitter %s", " ".join(tclcmd))
1692                # This is just fantastic.  As a side effect the parser copies
1693                # tb_compat.tcl into the current directory, so that directory
1694                # must be writable by the fedd user.  Doing this in the
1695                # temporary subdir ensures this is the case.
1696                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1697                        cwd=tmpdir)
1698                split_data = tclparser.stdout
1699
1700                top = topdl.topology_from_xml(file=split_data, top="experiment")
1701
1702            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1703            # Find the testbeds to look up
1704            testbeds = set([ a.value for e in top.elements \
1705                    for a in e.attribute \
1706                        if a.attribute == 'testbed'])
1707
1708            tb_hosts = { }
1709            for tb in testbeds:
1710                tb_hosts[tb] = [ e.name for e in top.elements \
1711                        if isinstance(e, topdl.Computer) and \
1712                            e.get_attribute('testbed') and \
1713                            e.get_attribute('testbed') == tb]
1714
1715            masters = { }           # testbeds exporting services
1716            pmasters = { }          # Testbeds exporting services that
1717                                    # need portals
1718            for s in tb_services:
1719                # If this is a service request with the importall field
1720                # set, fill it out.
1721
1722                if s.get('importall', False):
1723                    s['import'] = [ tb for tb in testbeds \
1724                            if tb not in s.get('export',[])]
1725                    del s['importall']
1726
1727                # Add the service to masters
1728                for tb in s.get('export', []):
1729                    if s.get('name', None):
1730                        if tb not in masters:
1731                            masters[tb] = [ ]
1732
1733                        params = { }
1734                        if 'fedAttr' in s:
1735                            for a in s['fedAttr']:
1736                                params[a.get('attribute', '')] = \
1737                                        a.get('value','')
1738
1739                        fser = federated_service(name=s['name'],
1740                                exporter=tb, importers=s.get('import',[]),
1741                                params=params)
1742                        if fser.name == 'hide_hosts' \
1743                                and 'hosts' not in fser.params:
1744                            fser.params['hosts'] = \
1745                                    ",".join(tb_hosts.get(fser.exporter, []))
1746                        masters[tb].append(fser)
1747
1748                        if fser.portal:
1749                            if tb not in pmasters: pmasters[tb] = [ fser ]
1750                            else: pmasters[tb].append(fser)
1751                    else:
1752                        self.log.error('Testbed service does not have name " + \
1753                                "and importers')
1754
1755
1756            allocated = { }         # Testbeds we can access
1757            topo ={ }               # Sub topologies
1758            connInfo = { }          # Connection information
1759
1760            self.get_access_to_testbeds(testbeds, access_user, allocated, 
1761                    tbparams, masters, tbmap)
1762
1763            self.split_topology(top, topo, testbeds)
1764
1765            # Copy configuration files into the remote file store
1766            # The config urlpath
1767            configpath = "/%s/config" % expid
1768            # The config file system location
1769            configdir ="%s%s" % ( self.repodir, configpath)
1770            try:
1771                os.makedirs(configdir)
1772            except EnvironmentError, e:
1773                raise service_error(service_error.internal,
1774                        "Cannot create config directory: %s" % e)
1775            try:
1776                f = open("%s/hosts" % configdir, "w")
1777                f.write('\n'.join(hosts))
1778                f.close()
1779            except EnvironmentError, e:
1780                raise service_error(service_error.internal, 
1781                        "Cannot write hosts file: %s" % e)
1782            try:
1783                copy_file("%s" % gw_pubkey, "%s/%s" % \
1784                        (configdir, gw_pubkey_base))
1785                copy_file("%s" % gw_secretkey, "%s/%s" % \
1786                        (configdir, gw_secretkey_base))
1787            except EnvironmentError, e:
1788                raise service_error(service_error.internal, 
1789                        "Cannot copy keyfiles: %s" % e)
1790
1791            # Allow the individual testbeds to access the configuration files.
1792            for tb in tbparams.keys():
1793                asignee = tbparams[tb]['allocID']['fedid']
1794                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1795                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1796
1797            part = experiment_partition(self.auth, self.store_url, tbmap,
1798                    self.muxmax, self.direct_transit)
1799            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1800                    connInfo, expid)
1801            # Now get access to the dynamic testbeds (those added above)
1802            for tb in [ t for t in topo if t not in allocated]:
1803                self.get_access(tb, None, tbparams, access_user, masters)
1804                allocated[tb] = 1
1805                store_keys = topo[tb].get_attribute('store_keys')
1806                # Give the testbed access to keys it exports or imports
1807                if store_keys:
1808                    for sk in store_keys.split(" "):
1809                        self.auth.set_attribute(\
1810                                tbparams[tb]['allocID']['fedid'], sk)
1811
1812            self.wrangle_software(expid, top, topo, tbparams)
1813
1814            vtopo = topdl.topology_to_vtopo(top)
1815            vis = self.genviz(vtopo)
1816
1817            # save federant information
1818            for k in allocated.keys():
1819                tbparams[k]['federant'] = {
1820                        'name': [ { 'localname' : eid} ],
1821                        'allocID' : tbparams[k]['allocID'],
1822                        'uri': tbparams[k]['uri'],
1823                    }
1824
1825            self.state_lock.acquire()
1826            self.state[eid]['vtopo'] = vtopo
1827            self.state[eid]['vis'] = vis
1828            self.state[eid]['experimentdescription'] = \
1829                    { 'topdldescription': top.to_dict() }
1830            self.state[eid]['federant'] = \
1831                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1832                        if tbparams[tb].has_key('federant') ]
1833            if self.state_filename: 
1834                self.write_state()
1835            self.state_lock.release()
1836        except service_error, e:
1837            # If something goes wrong in the parse (usually an access error)
1838            # clear the placeholder state.  From here on out the code delays
1839            # exceptions.  Failing at this point returns a fault to the remote
1840            # caller.
1841
1842            self.state_lock.acquire()
1843            del self.state[eid]
1844            del self.state[expid]
1845            if self.state_filename: self.write_state()
1846            self.state_lock.release()
1847            raise e
1848
1849
1850        # Start the background swapper and return the starting state.  From
1851        # here on out, the state will stick around a while.
1852
1853        # Let users touch the state
1854        self.auth.set_attribute(fid, expid)
1855        self.auth.set_attribute(expid, expid)
1856        # Override fedids can manipulate state as well
1857        for o in self.overrides:
1858            self.auth.set_attribute(o, expid)
1859
1860        # Create a logger that logs to the experiment's state object as well as
1861        # to the main log file.
1862        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1863        alloc_collector = self.list_log(self.state[eid]['log'])
1864        h = logging.StreamHandler(alloc_collector)
1865        # XXX: there should be a global one of these rather than repeating the
1866        # code.
1867        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1868                    '%d %b %y %H:%M:%S'))
1869        alloc_log.addHandler(h)
1870       
1871        attrs = [ 
1872                {
1873                    'attribute': 'ssh_pubkey', 
1874                    'value': '%s/%s/config/%s' % \
1875                            (self.repo_url, expid, gw_pubkey_base)
1876                },
1877                {
1878                    'attribute': 'ssh_secretkey', 
1879                    'value': '%s/%s/config/%s' % \
1880                            (self.repo_url, expid, gw_secretkey_base)
1881                },
1882                {
1883                    'attribute': 'hosts', 
1884                    'value': '%s/%s/config/hosts' % \
1885                            (self.repo_url, expid)
1886                },
1887            ]
1888
1889        # transit and disconnected testbeds may not have a connInfo entry.
1890        # Fill in the blanks.
1891        for t in allocated.keys():
1892            if not connInfo.has_key(t):
1893                connInfo[t] = { }
1894
1895        # Start a thread to do the resource allocation
1896        t  = Thread(target=self.allocate_resources,
1897                args=(allocated, masters, eid, expid, tbparams, 
1898                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1899                    connInfo, tbmap),
1900                name=eid)
1901        t.start()
1902
1903        rv = {
1904                'experimentID': [
1905                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1906                ],
1907                'experimentStatus': 'starting',
1908            }
1909
1910        return rv
1911   
1912    def get_experiment_fedid(self, key):
1913        """
1914        find the fedid associated with the localname key in the state database.
1915        """
1916
1917        rv = None
1918        self.state_lock.acquire()
1919        if self.state.has_key(key):
1920            if isinstance(self.state[key], dict):
1921                try:
1922                    kl = [ f['fedid'] for f in \
1923                            self.state[key]['experimentID']\
1924                                if f.has_key('fedid') ]
1925                except KeyError:
1926                    self.state_lock.release()
1927                    raise service_error(service_error.internal, 
1928                            "No fedid for experiment %s when getting "+\
1929                                    "fedid(!?)" % key)
1930                if len(kl) == 1:
1931                    rv = kl[0]
1932                else:
1933                    self.state_lock.release()
1934                    raise service_error(service_error.internal, 
1935                            "multiple fedids for experiment %s when " +\
1936                                    "getting fedid(!?)" % key)
1937            else:
1938                self.state_lock.release()
1939                raise service_error(service_error.internal, 
1940                        "Unexpected state for %s" % key)
1941        self.state_lock.release()
1942        return rv
1943
1944    def check_experiment_access(self, fid, key):
1945        """
1946        Confirm that the fid has access to the experiment.  Though a request
1947        may be made in terms of a local name, the access attribute is always
1948        the experiment's fedid.
1949        """
1950        if not isinstance(key, fedid):
1951            key = self.get_experiment_fedid(key)
1952
1953        if self.auth.check_attribute(fid, key):
1954            return True
1955        else:
1956            raise service_error(service_error.access, "Access Denied")
1957
1958
1959    def get_handler(self, path, fid):
1960        self.log.info("Get handler %s %s" % (path, fid))
1961        if self.auth.check_attribute(fid, path):
1962            return ("%s/%s" % (self.repodir, path), "application/binary")
1963        else:
1964            return (None, None)
1965
1966    def get_vtopo(self, req, fid):
1967        """
1968        Return the stored virtual topology for this experiment
1969        """
1970        rv = None
1971        state = None
1972
1973        req = req.get('VtopoRequestBody', None)
1974        if not req:
1975            raise service_error(service_error.req,
1976                    "Bad request format (no VtopoRequestBody)")
1977        exp = req.get('experiment', None)
1978        if exp:
1979            if exp.has_key('fedid'):
1980                key = exp['fedid']
1981                keytype = "fedid"
1982            elif exp.has_key('localname'):
1983                key = exp['localname']
1984                keytype = "localname"
1985            else:
1986                raise service_error(service_error.req, "Unknown lookup type")
1987        else:
1988            raise service_error(service_error.req, "No request?")
1989
1990        self.check_experiment_access(fid, key)
1991
1992        self.state_lock.acquire()
1993        if self.state.has_key(key):
1994            if self.state[key].has_key('vtopo'):
1995                rv = { 'experiment' : {keytype: key },\
1996                        'vtopo': self.state[key]['vtopo'],\
1997                    }
1998            else:
1999                state = self.state[key]['experimentStatus']
2000        self.state_lock.release()
2001
2002        if rv: return rv
2003        else: 
2004            if state:
2005                raise service_error(service_error.partial, 
2006                        "Not ready: %s" % state)
2007            else:
2008                raise service_error(service_error.req, "No such experiment")
2009
2010    def get_vis(self, req, fid):
2011        """
2012        Return the stored visualization for this experiment
2013        """
2014        rv = None
2015        state = None
2016
2017        req = req.get('VisRequestBody', None)
2018        if not req:
2019            raise service_error(service_error.req,
2020                    "Bad request format (no VisRequestBody)")
2021        exp = req.get('experiment', None)
2022        if exp:
2023            if exp.has_key('fedid'):
2024                key = exp['fedid']
2025                keytype = "fedid"
2026            elif exp.has_key('localname'):
2027                key = exp['localname']
2028                keytype = "localname"
2029            else:
2030                raise service_error(service_error.req, "Unknown lookup type")
2031        else:
2032            raise service_error(service_error.req, "No request?")
2033
2034        self.check_experiment_access(fid, key)
2035
2036        self.state_lock.acquire()
2037        if self.state.has_key(key):
2038            if self.state[key].has_key('vis'):
2039                rv =  { 'experiment' : {keytype: key },\
2040                        'vis': self.state[key]['vis'],\
2041                        }
2042            else:
2043                state = self.state[key]['experimentStatus']
2044        self.state_lock.release()
2045
2046        if rv: return rv
2047        else:
2048            if state:
2049                raise service_error(service_error.partial, 
2050                        "Not ready: %s" % state)
2051            else:
2052                raise service_error(service_error.req, "No such experiment")
2053
2054    def clean_info_response(self, rv):
2055        """
2056        Remove the information in the experiment's state object that is not in
2057        the info response.
2058        """
2059        # Remove the owner info (should always be there, but...)
2060        if rv.has_key('owner'): del rv['owner']
2061
2062        # Convert the log into the allocationLog parameter and remove the
2063        # log entry (with defensive programming)
2064        if rv.has_key('log'):
2065            rv['allocationLog'] = "".join(rv['log'])
2066            del rv['log']
2067        else:
2068            rv['allocationLog'] = ""
2069
2070        if rv['experimentStatus'] != 'active':
2071            if rv.has_key('federant'): del rv['federant']
2072        else:
2073            # remove the allocationID and uri info from each federant
2074            for f in rv.get('federant', []):
2075                if f.has_key('allocID'): del f['allocID']
2076                if f.has_key('uri'): del f['uri']
2077
2078        return rv
2079
2080    def get_info(self, req, fid):
2081        """
2082        Return all the stored info about this experiment
2083        """
2084        rv = None
2085
2086        req = req.get('InfoRequestBody', None)
2087        if not req:
2088            raise service_error(service_error.req,
2089                    "Bad request format (no InfoRequestBody)")
2090        exp = req.get('experiment', None)
2091        if exp:
2092            if exp.has_key('fedid'):
2093                key = exp['fedid']
2094                keytype = "fedid"
2095            elif exp.has_key('localname'):
2096                key = exp['localname']
2097                keytype = "localname"
2098            else:
2099                raise service_error(service_error.req, "Unknown lookup type")
2100        else:
2101            raise service_error(service_error.req, "No request?")
2102
2103        self.check_experiment_access(fid, key)
2104
2105        # The state may be massaged by the service function that called
2106        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2107        # state.
2108        self.state_lock.acquire()
2109        if self.state.has_key(key):
2110            rv = copy.deepcopy(self.state[key])
2111        self.state_lock.release()
2112
2113        if rv:
2114            return self.clean_info_response(rv)
2115        else:
2116            raise service_error(service_error.req, "No such experiment")
2117
2118    def get_multi_info(self, req, fid):
2119        """
2120        Return all the stored info that this fedid can access
2121        """
2122        rv = { 'info': [ ] }
2123
2124        self.state_lock.acquire()
2125        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2126            try:
2127                self.check_experiment_access(fid, key)
2128            except service_error, e:
2129                if e.code == service_error.access:
2130                    continue
2131                else:
2132                    self.state_lock.release()
2133                    raise e
2134
2135            if self.state.has_key(key):
2136                e = copy.deepcopy(self.state[key])
2137                e = self.clean_info_response(e)
2138                rv['info'].append(e)
2139        self.state_lock.release()
2140        return rv
2141
2142    def remove_dirs(self, dir):
2143        """
2144        Remove the directory tree and all files rooted at dir.  Log any errors,
2145        but continue.
2146        """
2147        self.log.debug("[removedirs]: removing %s" % dir)
2148        try:
2149            for path, dirs, files in os.walk(dir, topdown=False):
2150                for f in files:
2151                    os.remove(os.path.join(path, f))
2152                for d in dirs:
2153                    os.rmdir(os.path.join(path, d))
2154            os.rmdir(dir)
2155        except EnvironmentError, e:
2156            self.log.error("Error deleting directory tree in %s" % e);
2157
2158    def terminate_experiment(self, req, fid):
2159        """
2160        Swap this experiment out on the federants and delete the shared
2161        information
2162        """
2163        tbparams = { }
2164        req = req.get('TerminateRequestBody', None)
2165        if not req:
2166            raise service_error(service_error.req,
2167                    "Bad request format (no TerminateRequestBody)")
2168        force = req.get('force', False)
2169        exp = req.get('experiment', None)
2170        if exp:
2171            if exp.has_key('fedid'):
2172                key = exp['fedid']
2173                keytype = "fedid"
2174            elif exp.has_key('localname'):
2175                key = exp['localname']
2176                keytype = "localname"
2177            else:
2178                raise service_error(service_error.req, "Unknown lookup type")
2179        else:
2180            raise service_error(service_error.req, "No request?")
2181
2182        self.check_experiment_access(fid, key)
2183
2184        dealloc_list = [ ]
2185
2186
2187        # Create a logger that logs to the dealloc_list as well as to the main
2188        # log file.
2189        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2190        h = logging.StreamHandler(self.list_log(dealloc_list))
2191        # XXX: there should be a global one of these rather than repeating the
2192        # code.
2193        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2194                    '%d %b %y %H:%M:%S'))
2195        dealloc_log.addHandler(h)
2196
2197        self.state_lock.acquire()
2198        fed_exp = self.state.get(key, None)
2199        repo = None
2200
2201        if fed_exp:
2202            # This branch of the conditional holds the lock to generate a
2203            # consistent temporary tbparams variable to deallocate experiments.
2204            # It releases the lock to do the deallocations and reacquires it to
2205            # remove the experiment state when the termination is complete.
2206
2207            # First make sure that the experiment creation is complete.
2208            status = fed_exp.get('experimentStatus', None)
2209
2210            if status:
2211                if status in ('starting', 'terminating'):
2212                    if not force:
2213                        self.state_lock.release()
2214                        raise service_error(service_error.partial, 
2215                                'Experiment still being created or destroyed')
2216                    else:
2217                        self.log.warning('Experiment in %s state ' % status + \
2218                                'being terminated by force.')
2219            else:
2220                # No status??? trouble
2221                self.state_lock.release()
2222                raise service_error(service_error.internal,
2223                        "Experiment has no status!?")
2224
2225            ids = []
2226            #  experimentID is a list of dicts that are self-describing
2227            #  identifiers.  This finds all the fedids and localnames - the
2228            #  keys of self.state - and puts them into ids.
2229            for id in fed_exp.get('experimentID', []):
2230                if id.has_key('fedid'): 
2231                    ids.append(id['fedid'])
2232                    repo = "%s" % id['fedid']
2233                if id.has_key('localname'): ids.append(id['localname'])
2234
2235            # Collect the allocation/segment ids into a dict keyed by the fedid
2236            # of the allocation (or a monotonically increasing integer) that
2237            # contains a tuple of uri, aid (which is a dict...)
2238            for i, fed in enumerate(fed_exp.get('federant', [])):
2239                try:
2240                    uri = fed['uri']
2241                    aid = fed['allocID']
2242                    k = fed['allocID'].get('fedid', i)
2243                except KeyError, e:
2244                    continue
2245                tbparams[k] = (uri, aid)
2246            fed_exp['experimentStatus'] = 'terminating'
2247            if self.state_filename: self.write_state()
2248            self.state_lock.release()
2249
2250            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2251            # then completes, so we can't wait if nothing starts.  So, no
2252            # tbparams, no start.
2253            if len(tbparams) > 0:
2254                thread_pool = self.thread_pool(self.nthreads)
2255                for k in tbparams.keys():
2256                    # Create and start a thread to stop the segment
2257                    thread_pool.wait_for_slot()
2258                    uri, aid = tbparams[k]
2259                    t  = self.pooled_thread(\
2260                            target=self.terminate_segment(log=dealloc_log,
2261                                testbed=uri,
2262                                cert_file=self.cert_file, 
2263                                cert_pwd=self.cert_pwd,
2264                                trusted_certs=self.trusted_certs,
2265                                caller=self.call_TerminateSegment),
2266                            args=(uri, aid), name=k,
2267                            pdata=thread_pool, trace_file=self.trace_file)
2268                    t.start()
2269                # Wait for completions
2270                thread_pool.wait_for_all_done()
2271
2272            # release the allocations (failed experiments have done this
2273            # already, and starting experiments may be in odd states, so we
2274            # ignore errors releasing those allocations
2275            try: 
2276                for k in tbparams.keys():
2277                    # This releases access by uri
2278                    uri, aid = tbparams[k]
2279                    self.release_access(None, aid, uri=uri)
2280            except service_error, e:
2281                if status != 'failed' and not force:
2282                    raise e
2283
2284            # Remove the terminated experiment
2285            self.state_lock.acquire()
2286            for id in ids:
2287                if self.state.has_key(id): del self.state[id]
2288
2289            if self.state_filename: self.write_state()
2290            self.state_lock.release()
2291
2292            # Delete any synch points associated with this experiment.  All
2293            # synch points begin with the fedid of the experiment.
2294            fedid_keys = set(["fedid:%s" % f for f in ids \
2295                    if isinstance(f, fedid)])
2296            for k in self.synch_store.all_keys():
2297                try:
2298                    if len(k) > 45 and k[0:46] in fedid_keys:
2299                        self.synch_store.del_value(k)
2300                except synch_store.BadDeletionError:
2301                    pass
2302            self.write_store()
2303
2304            # Remove software and other cached stuff from the filesystem.
2305            if repo:
2306                self.remove_dirs("%s/%s" % (self.repodir, repo))
2307               
2308            return { 
2309                    'experiment': exp , 
2310                    'deallocationLog': "".join(dealloc_list),
2311                    }
2312        else:
2313            # Don't forget to release the lock
2314            self.state_lock.release()
2315            raise service_error(service_error.req, "No saved state")
2316
2317
2318    def GetValue(self, req, fid):
2319        """
2320        Get a value from the synchronized store
2321        """
2322        req = req.get('GetValueRequestBody', None)
2323        if not req:
2324            raise service_error(service_error.req,
2325                    "Bad request format (no GetValueRequestBody)")
2326       
2327        name = req['name']
2328        wait = req['wait']
2329        rv = { 'name': name }
2330
2331        if self.auth.check_attribute(fid, name):
2332            self.log.debug("[GetValue] asking for %s " % name)
2333            try:
2334                v = self.synch_store.get_value(name, wait)
2335            except synch_store.RevokedKeyError:
2336                # No more synch on this key
2337                raise service_error(service_error.federant, 
2338                        "Synch key %s revoked" % name)
2339            if v is not None:
2340                rv['value'] = v
2341            self.log.debug("[GetValue] got %s from %s" % (v, name))
2342            return rv
2343        else:
2344            raise service_error(service_error.access, "Access Denied")
2345       
2346
2347    def SetValue(self, req, fid):
2348        """
2349        Set a value in the synchronized store
2350        """
2351        req = req.get('SetValueRequestBody', None)
2352        if not req:
2353            raise service_error(service_error.req,
2354                    "Bad request format (no SetValueRequestBody)")
2355       
2356        name = req['name']
2357        v = req['value']
2358
2359        if self.auth.check_attribute(fid, name):
2360            try:
2361                self.synch_store.set_value(name, v)
2362                self.write_store()
2363                self.log.debug("[SetValue] set %s to %s" % (name, v))
2364            except synch_store.CollisionError:
2365                # Translate into a service_error
2366                raise service_error(service_error.req,
2367                        "Value already set: %s" %name)
2368            except synch_store.RevokedKeyError:
2369                # No more synch on this key
2370                raise service_error(service_error.federant, 
2371                        "Synch key %s revoked" % name)
2372            return { 'name': name, 'value': v }
2373        else:
2374            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.