source: fedd/federation/experiment_control.py @ 05e8da8

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 05e8da8 was 05e8da8, checked in by Ted Faber <faber@…>, 14 years ago

cleanup on failure

  • Property mode set to 100644
File size: 82.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32
33import topdl
34import list_log
35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
45
46# Right now, no support for composition.
47class federated_service:
48    def __init__(self, name, exporter=None, importers=None, params=None, 
49            reqs=None, portal=None):
50        self.name=name
51        self.exporter=exporter
52        if importers is None: self.importers = []
53        else: self.importers=importers
54        if params is None: self.params = { }
55        else: self.params = params
56        if reqs is None: self.reqs = []
57        else: self.reqs = reqs
58
59        if portal is not None:
60            self.portal = portal
61        else:
62            self.portal = (name in federated_service.needs_portal)
63
64    def __str__(self):
65        return "name %s export %s import %s params %s reqs %s" % \
66                (self.name, self.exporter, self.importers, self.params,
67                        [ (r['name'], r['visibility']) for r in self.reqs] )
68
69    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
70
71class experiment_control_local:
72    """
73    Control of experiments that this system can directly access.
74
75    Includes experiment creation, termination and information dissemination.
76    Thred safe.
77    """
78
79    class ssh_cmd_timeout(RuntimeError): pass
80   
81    class thread_pool:
82        """
83        A class to keep track of a set of threads all invoked for the same
84        task.  Manages the mutual exclusion of the states.
85        """
86        def __init__(self, nthreads):
87            """
88            Start a pool.
89            """
90            self.changed = Condition()
91            self.started = 0
92            self.terminated = 0
93            self.nthreads = nthreads
94
95        def acquire(self):
96            """
97            Get the pool's lock.
98            """
99            self.changed.acquire()
100
101        def release(self):
102            """
103            Release the pool's lock.
104            """
105            self.changed.release()
106
107        def wait(self, timeout = None):
108            """
109            Wait for a pool thread to start or stop.
110            """
111            self.changed.wait(timeout)
112
113        def start(self):
114            """
115            Called by a pool thread to report starting.
116            """
117            self.changed.acquire()
118            self.started += 1
119            self.changed.notifyAll()
120            self.changed.release()
121
122        def terminate(self):
123            """
124            Called by a pool thread to report finishing.
125            """
126            self.changed.acquire()
127            self.terminated += 1
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def clear(self):
132            """
133            Clear all pool data.
134            """
135            self.changed.acquire()
136            self.started = 0
137            self.terminated =0
138            self.changed.notifyAll()
139            self.changed.release()
140
141        def wait_for_slot(self):
142            """
143            Wait until we have a free slot to start another pooled thread
144            """
145            self.acquire()
146            while self.started - self.terminated >= self.nthreads:
147                self.wait()
148            self.release()
149
150        def wait_for_all_done(self, timeout=None):
151            """
152            Wait until all active threads finish (and at least one has
153            started).  If a timeout is given, return after waiting that long
154            for termination.  If all threads are done (and one has started in
155            the since the last clear()) return True, otherwise False.
156            """
157            if timeout:
158                deadline = time.time() + timeout
159            self.acquire()
160            while self.started == 0 or self.started > self.terminated:
161                self.wait(timeout)
162                if timeout:
163                    if time.time() > deadline:
164                        break
165                    timeout = deadline - time.time()
166            self.release()
167            return not (self.started == 0 or self.started > self.terminated)
168
169    class pooled_thread(Thread):
170        """
171        One of a set of threads dedicated to a specific task.  Uses the
172        thread_pool class above for coordination.
173        """
174        def __init__(self, group=None, target=None, name=None, args=(), 
175                kwargs={}, pdata=None, trace_file=None):
176            Thread.__init__(self, group, target, name, args, kwargs)
177            self.rv = None          # Return value of the ops in this thread
178            self.exception = None   # Exception that terminated this thread
179            self.target=target      # Target function to run on start()
180            self.args = args        # Args to pass to target
181            self.kwargs = kwargs    # Additional kw args
182            self.pdata = pdata      # thread_pool for this class
183            # Logger for this thread
184            self.log = logging.getLogger("fedd.experiment_control")
185       
186        def run(self):
187            """
188            Emulate Thread.run, except add pool data manipulation and error
189            logging.
190            """
191            if self.pdata:
192                self.pdata.start()
193
194            if self.target:
195                try:
196                    self.rv = self.target(*self.args, **self.kwargs)
197                except service_error, s:
198                    self.exception = s
199                    self.log.error("Thread exception: %s %s" % \
200                            (s.code_string(), s.desc))
201                except:
202                    self.exception = sys.exc_info()[1]
203                    self.log.error(("Unexpected thread exception: %s" +\
204                            "Trace %s") % (self.exception,\
205                                traceback.format_exc()))
206            if self.pdata:
207                self.pdata.terminate()
208
209    call_RequestAccess = service_caller('RequestAccess')
210    call_ReleaseAccess = service_caller('ReleaseAccess')
211    call_StartSegment = service_caller('StartSegment')
212    call_TerminateSegment = service_caller('TerminateSegment')
213    call_Ns2Topdl = service_caller('Ns2Topdl')
214
215    def __init__(self, config=None, auth=None):
216        """
217        Intialize the various attributes, most from the config object
218        """
219
220        def parse_tarfile_list(tf):
221            """
222            Parse a tarfile list from the configuration.  This is a set of
223            paths and tarfiles separated by spaces.
224            """
225            rv = [ ]
226            if tf is not None:
227                tl = tf.split()
228                while len(tl) > 1:
229                    p, t = tl[0:2]
230                    del tl[0:2]
231                    rv.append((p, t))
232            return rv
233
234        self.thread_with_rv = experiment_control_local.pooled_thread
235        self.thread_pool = experiment_control_local.thread_pool
236        self.list_log = list_log.list_log
237
238        self.cert_file = config.get("experiment_control", "cert_file")
239        if self.cert_file:
240            self.cert_pwd = config.get("experiment_control", "cert_pwd")
241        else:
242            self.cert_file = config.get("globals", "cert_file")
243            self.cert_pwd = config.get("globals", "cert_pwd")
244
245        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
246                or config.get("globals", "trusted_certs")
247
248        self.repodir = config.get("experiment_control", "repodir")
249        self.repo_url = config.get("experiment_control", "repo_url", 
250                "https://users.isi.deterlab.net:23235");
251
252        self.exp_stem = "fed-stem"
253        self.log = logging.getLogger("fedd.experiment_control")
254        set_log_level(config, "experiment_control", self.log)
255        self.muxmax = 2
256        self.nthreads = 10
257        self.randomize_experiments = False
258
259        self.splitter = None
260        self.ssh_keygen = "/usr/bin/ssh-keygen"
261        self.ssh_identity_file = None
262
263
264        self.debug = config.getboolean("experiment_control", "create_debug")
265        self.cleanup = not config.getboolean("experiment_control", 
266                "leave_tmpfiles")
267        self.state_filename = config.get("experiment_control", 
268                "experiment_state")
269        self.store_filename = config.get("experiment_control", 
270                "synch_store")
271        self.store_url = config.get("experiment_control", "store_url")
272        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
273        self.fedkit = parse_tarfile_list(\
274                config.get("experiment_control", "fedkit"))
275        self.gatewaykit = parse_tarfile_list(\
276                config.get("experiment_control", "gatewaykit"))
277        accessdb_file = config.get("experiment_control", "accessdb")
278
279        self.ssh_pubkey_file = config.get("experiment_control", 
280                "ssh_pubkey_file")
281        self.ssh_privkey_file = config.get("experiment_control",
282                "ssh_privkey_file")
283        dt = config.get("experiment_control", "direct_transit")
284        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
285        else: self.direct_transit = [ ]
286        # NB for internal master/slave ops, not experiment setup
287        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
288
289        self.overrides = set([])
290        ovr = config.get('experiment_control', 'overrides')
291        if ovr:
292            for o in ovr.split(","):
293                o = o.strip()
294                if o.startswith('fedid:'): o = o[len('fedid:'):]
295                self.overrides.add(fedid(hexstr=o))
296
297        self.state = { }
298        self.state_lock = Lock()
299        self.tclsh = "/usr/local/bin/otclsh"
300        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
301                config.get("experiment_control", "tcl_splitter",
302                        "/usr/testbed/lib/ns2ir/parse.tcl")
303        mapdb_file = config.get("experiment_control", "mapdb")
304        self.trace_file = sys.stderr
305
306        self.def_expstart = \
307                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
308                "/tmp/federate";
309        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
310                "FEDDIR/hosts";
311        self.def_gwstart = \
312                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
313                "/tmp/bridge.log";
314        self.def_mgwstart = \
315                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
316                "/tmp/bridge.log";
317        self.def_gwimage = "FBSD61-TUNNEL2";
318        self.def_gwtype = "pc";
319        self.local_access = { }
320
321        if auth:
322            self.auth = auth
323        else:
324            self.log.error(\
325                    "[access]: No authorizer initialized, creating local one.")
326            auth = authorizer()
327
328
329        if self.ssh_pubkey_file:
330            try:
331                f = open(self.ssh_pubkey_file, 'r')
332                self.ssh_pubkey = f.read()
333                f.close()
334            except EnvironmentError:
335                raise service_error(service_error.internal,
336                        "Cannot read sshpubkey")
337        else:
338            raise service_error(service_error.internal, 
339                    "No SSH public key file?")
340
341        if not self.ssh_privkey_file:
342            raise service_error(service_error.internal, 
343                    "No SSH public key file?")
344
345
346        if mapdb_file:
347            self.read_mapdb(mapdb_file)
348        else:
349            self.log.warn("[experiment_control] No testbed map, using defaults")
350            self.tbmap = { 
351                    'deter':'https://users.isi.deterlab.net:23235',
352                    'emulab':'https://users.isi.deterlab.net:23236',
353                    'ucb':'https://users.isi.deterlab.net:23237',
354                    }
355
356        if accessdb_file:
357                self.read_accessdb(accessdb_file)
358        else:
359            raise service_error(service_error.internal,
360                    "No accessdb specified in config")
361
362        # Grab saved state.  OK to do this w/o locking because it's read only
363        # and only one thread should be in existence that can see self.state at
364        # this point.
365        if self.state_filename:
366            self.read_state()
367
368        if self.store_filename:
369            self.read_store()
370        else:
371            self.log.warning("No saved synch store")
372            self.synch_store = synch_store
373
374        # Dispatch tables
375        self.soap_services = {\
376                'New': soap_handler('New', self.new_experiment),
377                'Create': soap_handler('Create', self.create_experiment),
378                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
379                'Vis': soap_handler('Vis', self.get_vis),
380                'Info': soap_handler('Info', self.get_info),
381                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
382                'Terminate': soap_handler('Terminate', 
383                    self.terminate_experiment),
384                'GetValue': soap_handler('GetValue', self.GetValue),
385                'SetValue': soap_handler('SetValue', self.SetValue),
386        }
387
388        self.xmlrpc_services = {\
389                'New': xmlrpc_handler('New', self.new_experiment),
390                'Create': xmlrpc_handler('Create', self.create_experiment),
391                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
392                'Vis': xmlrpc_handler('Vis', self.get_vis),
393                'Info': xmlrpc_handler('Info', self.get_info),
394                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
395                'Terminate': xmlrpc_handler('Terminate',
396                    self.terminate_experiment),
397                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
398                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
399        }
400
401    # Call while holding self.state_lock
402    def write_state(self):
403        """
404        Write a new copy of experiment state after copying the existing state
405        to a backup.
406
407        State format is a simple pickling of the state dictionary.
408        """
409        if os.access(self.state_filename, os.W_OK):
410            copy_file(self.state_filename, \
411                    "%s.bak" % self.state_filename)
412        try:
413            f = open(self.state_filename, 'w')
414            pickle.dump(self.state, f)
415        except EnvironmentError, e:
416            self.log.error("Can't write file %s: %s" % \
417                    (self.state_filename, e))
418        except pickle.PicklingError, e:
419            self.log.error("Pickling problem: %s" % e)
420        except TypeError, e:
421            self.log.error("Pickling problem (TypeError): %s" % e)
422
423    @staticmethod
424    def get_alloc_ids(state):
425        """
426        Pull the fedids of the identifiers of each allocation from the
427        state.  Again, a dict dive that's best isolated.
428
429        Used by read_store and read state
430        """
431
432        return [ f['allocID']['fedid'] 
433                for f in state.get('federant',[]) \
434                    if f.has_key('allocID') and \
435                        f['allocID'].has_key('fedid')]
436
437    # Call while holding self.state_lock
438    def read_state(self):
439        """
440        Read a new copy of experiment state.  Old state is overwritten.
441
442        State format is a simple pickling of the state dictionary.
443        """
444       
445        def get_experiment_id(state):
446            """
447            Pull the fedid experimentID out of the saved state.  This is kind
448            of a gross walk through the dict.
449            """
450
451            if state.has_key('experimentID'):
452                for e in state['experimentID']:
453                    if e.has_key('fedid'):
454                        return e['fedid']
455                else:
456                    return None
457            else:
458                return None
459
460        try:
461            f = open(self.state_filename, "r")
462            self.state = pickle.load(f)
463            self.log.debug("[read_state]: Read state from %s" % \
464                    self.state_filename)
465        except EnvironmentError, e:
466            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
467                    % (self.state_filename, e))
468        except pickle.UnpicklingError, e:
469            self.log.warning(("[read_state]: No saved state: " + \
470                    "Unpickling failed: %s") % e)
471       
472        for s in self.state.values():
473            try:
474
475                eid = get_experiment_id(s)
476                if eid : 
477                    # Give the owner rights to the experiment
478                    self.auth.set_attribute(s['owner'], eid)
479                    # And holders of the eid as well
480                    self.auth.set_attribute(eid, eid)
481                    # allow overrides to control experiments as well
482                    for o in self.overrides:
483                        self.auth.set_attribute(o, eid)
484                    # Set permissions to allow reading of the software repo, if
485                    # any, as well.
486                    for a in self.get_alloc_ids(s):
487                        self.auth.set_attribute(a, 'repo/%s' % eid)
488                else:
489                    raise KeyError("No experiment id")
490            except KeyError, e:
491                self.log.warning("[read_state]: State ownership or identity " +\
492                        "misformatted in %s: %s" % (self.state_filename, e))
493
494
495    def read_accessdb(self, accessdb_file):
496        """
497        Read the mapping from fedids that can create experiments to their name
498        in the 3-level access namespace.  All will be asserted from this
499        testbed and can include the local username and porject that will be
500        asserted on their behalf by this fedd.  Each fedid is also added to the
501        authorization system with the "create" attribute.
502        """
503        self.accessdb = {}
504        # These are the regexps for parsing the db
505        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
506        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
507                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
508        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
509                "\s*->\s*(" + name_expr + ")\s*$")
510        lineno = 0
511
512        # Parse the mappings and store in self.authdb, a dict of
513        # fedid -> (proj, user)
514        try:
515            f = open(accessdb_file, "r")
516            for line in f:
517                lineno += 1
518                line = line.strip()
519                if len(line) == 0 or line.startswith('#'):
520                    continue
521                m = project_line.match(line)
522                if m:
523                    fid = fedid(hexstr=m.group(1))
524                    project, user = m.group(2,3)
525                    if not self.accessdb.has_key(fid):
526                        self.accessdb[fid] = []
527                    self.accessdb[fid].append((project, user))
528                    continue
529
530                m = user_line.match(line)
531                if m:
532                    fid = fedid(hexstr=m.group(1))
533                    project = None
534                    user = m.group(2)
535                    if not self.accessdb.has_key(fid):
536                        self.accessdb[fid] = []
537                    self.accessdb[fid].append((project, user))
538                    continue
539                self.log.warn("[experiment_control] Error parsing access " +\
540                        "db %s at line %d" %  (accessdb_file, lineno))
541        except EnvironmentError:
542            raise service_error(service_error.internal,
543                    ("Error opening/reading %s as experiment " +\
544                            "control accessdb") %  accessdb_file)
545        f.close()
546
547        # Initialize the authorization attributes
548        for fid in self.accessdb.keys():
549            self.auth.set_attribute(fid, 'create')
550            self.auth.set_attribute(fid, 'new')
551
552    def read_mapdb(self, file):
553        """
554        Read a simple colon separated list of mappings for the
555        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
556        """
557
558        self.tbmap = { }
559        lineno =0
560        try:
561            f = open(file, "r")
562            for line in f:
563                lineno += 1
564                line = line.strip()
565                if line.startswith('#') or len(line) == 0:
566                    continue
567                try:
568                    label, url = line.split(':', 1)
569                    self.tbmap[label] = url
570                except ValueError, e:
571                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
572                            "map db: %s %s" % (lineno, line, e))
573        except EnvironmentError, e:
574            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
575                    "open %s: %s" % (file, e))
576        f.close()
577
578    def read_store(self):
579        try:
580            self.synch_store = synch_store()
581            self.synch_store.load(self.store_filename)
582            self.log.debug("[read_store]: Read store from %s" % \
583                    self.store_filename)
584        except EnvironmentError, e:
585            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
586                    % (self.state_filename, e))
587            self.synch_store = synch_store()
588
589        # Set the initial permissions on data in the store.  XXX: This ad hoc
590        # authorization attribute initialization is getting out of hand.
591        for k in self.synch_store.all_keys():
592            try:
593                if k.startswith('fedid:'):
594                    fid = fedid(hexstr=k[6:46])
595                    if self.state.has_key(fid):
596                        for a in self.get_alloc_ids(self.state[fid]):
597                            self.auth.set_attribute(a, k)
598            except ValueError, e:
599                self.log.warn("Cannot deduce permissions for %s" % k)
600
601
602    def write_store(self):
603        """
604        Write a new copy of synch_store after writing current state
605        to a backup.  We use the internal synch_store pickle method to avoid
606        incinsistent data.
607
608        State format is a simple pickling of the store.
609        """
610        if os.access(self.store_filename, os.W_OK):
611            copy_file(self.store_filename, \
612                    "%s.bak" % self.store_filename)
613        try:
614            self.synch_store.save(self.store_filename)
615        except EnvironmentError, e:
616            self.log.error("Can't write file %s: %s" % \
617                    (self.store_filename, e))
618        except TypeError, e:
619            self.log.error("Pickling problem (TypeError): %s" % e)
620
621       
622    def generate_ssh_keys(self, dest, type="rsa" ):
623        """
624        Generate a set of keys for the gateways to use to talk.
625
626        Keys are of type type and are stored in the required dest file.
627        """
628        valid_types = ("rsa", "dsa")
629        t = type.lower();
630        if t not in valid_types: raise ValueError
631        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
632
633        try:
634            trace = open("/dev/null", "w")
635        except EnvironmentError:
636            raise service_error(service_error.internal,
637                    "Cannot open /dev/null??");
638
639        # May raise CalledProcessError
640        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
641        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
642        if rv != 0:
643            raise service_error(service_error.internal, 
644                    "Cannot generate nonce ssh keys.  %s return code %d" \
645                            % (self.ssh_keygen, rv))
646
647    def gentopo(self, str):
648        """
649        Generate the topology dtat structure from the splitter's XML
650        representation of it.
651
652        The topology XML looks like:
653            <experiment>
654                <nodes>
655                    <node><vname></vname><ips>ip1:ip2</ips></node>
656                </nodes>
657                <lans>
658                    <lan>
659                        <vname></vname><vnode></vnode><ip></ip>
660                        <bandwidth></bandwidth><member>node:port</member>
661                    </lan>
662                </lans>
663        """
664        class topo_parse:
665            """
666            Parse the topology XML and create the dats structure.
667            """
668            def __init__(self):
669                # Typing of the subelements for data conversion
670                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
671                self.int_subelements = ( 'bandwidth',)
672                self.float_subelements = ( 'delay',)
673                # The final data structure
674                self.nodes = [ ]
675                self.lans =  [ ]
676                self.topo = { \
677                        'node': self.nodes,\
678                        'lan' : self.lans,\
679                    }
680                self.element = { }  # Current element being created
681                self.chars = ""     # Last text seen
682
683            def end_element(self, name):
684                # After each sub element the contents is added to the current
685                # element or to the appropriate list.
686                if name == 'node':
687                    self.nodes.append(self.element)
688                    self.element = { }
689                elif name == 'lan':
690                    self.lans.append(self.element)
691                    self.element = { }
692                elif name in self.str_subelements:
693                    self.element[name] = self.chars
694                    self.chars = ""
695                elif name in self.int_subelements:
696                    self.element[name] = int(self.chars)
697                    self.chars = ""
698                elif name in self.float_subelements:
699                    self.element[name] = float(self.chars)
700                    self.chars = ""
701
702            def found_chars(self, data):
703                self.chars += data.rstrip()
704
705
706        tp = topo_parse();
707        parser = xml.parsers.expat.ParserCreate()
708        parser.EndElementHandler = tp.end_element
709        parser.CharacterDataHandler = tp.found_chars
710
711        parser.Parse(str)
712
713        return tp.topo
714       
715
716    def genviz(self, topo):
717        """
718        Generate the visualization the virtual topology
719        """
720
721        neato = "/usr/local/bin/neato"
722        # These are used to parse neato output and to create the visualization
723        # file.
724        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
725        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
726                "%s</type></node>"
727
728        try:
729            # Node names
730            nodes = [ n['vname'] for n in topo['node'] ]
731            topo_lans = topo['lan']
732        except KeyError, e:
733            raise service_error(service_error.internal, "Bad topology: %s" %e)
734
735        lans = { }
736        links = { }
737
738        # Walk through the virtual topology, organizing the connections into
739        # 2-node connections (links) and more-than-2-node connections (lans).
740        # When a lan is created, it's added to the list of nodes (there's a
741        # node in the visualization for the lan).
742        for l in topo_lans:
743            if links.has_key(l['vname']):
744                if len(links[l['vname']]) < 2:
745                    links[l['vname']].append(l['vnode'])
746                else:
747                    nodes.append(l['vname'])
748                    lans[l['vname']] = links[l['vname']]
749                    del links[l['vname']]
750                    lans[l['vname']].append(l['vnode'])
751            elif lans.has_key(l['vname']):
752                lans[l['vname']].append(l['vnode'])
753            else:
754                links[l['vname']] = [ l['vnode'] ]
755
756
757        # Open up a temporary file for dot to turn into a visualization
758        try:
759            df, dotname = tempfile.mkstemp()
760            dotfile = os.fdopen(df, 'w')
761        except EnvironmentError:
762            raise service_error(service_error.internal,
763                    "Failed to open file in genviz")
764
765        try:
766            dnull = open('/dev/null', 'w')
767        except EnvironmentError:
768            service_error(service_error.internal,
769                    "Failed to open /dev/null in genviz")
770
771        # Generate a dot/neato input file from the links, nodes and lans
772        try:
773            print >>dotfile, "graph G {"
774            for n in nodes:
775                print >>dotfile, '\t"%s"' % n
776            for l in links.keys():
777                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
778            for l in lans.keys():
779                for n in lans[l]:
780                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
781            print >>dotfile, "}"
782            dotfile.close()
783        except TypeError:
784            raise service_error(service_error.internal,
785                    "Single endpoint link in vtopo")
786        except EnvironmentError:
787            raise service_error(service_error.internal, "Cannot write dot file")
788
789        # Use dot to create a visualization
790        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
791                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
792                close_fds=True)
793        dnull.close()
794
795        # Translate dot to vis format
796        vis_nodes = [ ]
797        vis = { 'node': vis_nodes }
798        for line in dot.stdout:
799            m = vis_re.match(line)
800            if m:
801                vn = m.group(1)
802                vis_node = {'name': vn, \
803                        'x': float(m.group(2)),\
804                        'y' : float(m.group(3)),\
805                    }
806                if vn in links.keys() or vn in lans.keys():
807                    vis_node['type'] = 'lan'
808                else:
809                    vis_node['type'] = 'node'
810                vis_nodes.append(vis_node)
811        rv = dot.wait()
812
813        os.remove(dotname)
814        if rv == 0 : return vis
815        else: return None
816
817    def get_access(self, tb, nodes, tbparam, access_user, masters):
818        """
819        Get access to testbed through fedd and set the parameters for that tb
820        """
821        def get_export_project(svcs):
822            """
823            Look through for the list of federated_service for this testbed
824            objects for a project_export service, and extract the project
825            parameter.
826            """
827
828            pe = [s for s in svcs if s.name=='project_export']
829            if len(pe) == 1:
830                return pe[0].params.get('project', None)
831            elif len(pe) == 0:
832                return None
833            else:
834                raise service_error(service_error.req,
835                        "More than one project export is not supported")
836
837        uri = self.tbmap.get(testbed_base(tb), None)
838        if not uri:
839            raise service_error(service_error.server_config, 
840                    "Unknown testbed: %s" % tb)
841
842        export_svcs = masters.get(tb,[])
843        import_svcs = [ s for m in masters.values() \
844                for s in m \
845                    if tb in s.importers ]
846
847        export_project = get_export_project(export_svcs)
848
849        # Tweak search order so that if there are entries in access_user that
850        # have a project matching the export project, we try them first
851        if export_project: 
852            access_sequence = [ (p, u) for p, u in access_user \
853                    if p == export_project] 
854            access_sequence.extend([(p, u) for p, u in access_user \
855                    if p != export_project]) 
856        else: 
857            access_sequence = access_user
858
859        for p, u in access_sequence: 
860            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
861                    "to %s") %  ((p or "None"), u, uri))
862
863            if p:
864                # Request with user and project specified
865                req = {\
866                        'destinationTestbed' : { 'uri' : uri },
867                        'credential': [ "project: %s" % p, "user: %s"  % u],
868                        'allocID' : { 'localname': 'test' },
869                    }
870            else:
871                # Request with only user specified
872                req = {\
873                        'destinationTestbed' : { 'uri' : uri },
874                        'credential': [ 'user: %s' % u ],
875                        'allocID' : { 'localname': 'test' },
876                    }
877
878            # Make the service request from the services we're importing and
879            # exporting.  Keep track of the export request ids so we can
880            # collect the resulting info from the access response.
881            e_keys = { }
882            if import_svcs or export_svcs:
883                req['service'] = [ ]
884
885                for i, s in enumerate(import_svcs):
886                    idx = 'import%d' % i
887                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
888                    if s.params:
889                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
890                                for k, v in s.params.items()]
891                    req['service'].append(sr)
892
893                for i, s in enumerate(export_svcs):
894                    idx = 'export%d' % i
895                    e_keys[idx] = s
896                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
897                    if s.params:
898                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
899                                for k, v in s.params.items()]
900                    req['service'].append(sr)
901
902            # node resources if any
903            if nodes != None and len(nodes) > 0:
904                rnodes = [ ]
905                for n in nodes:
906                    rn = { }
907                    image, hw, count = n.split(":")
908                    if image: rn['image'] = [ image ]
909                    if hw: rn['hardware'] = [ hw ]
910                    if count and int(count) >0 : rn['count'] = int(count)
911                    rnodes.append(rn)
912                req['resources']= { }
913                req['resources']['node'] = rnodes
914
915            try:
916                if self.local_access.has_key(uri):
917                    # Local access call
918                    req = { 'RequestAccessRequestBody' : req }
919                    r = self.local_access[uri].RequestAccess(req, 
920                            fedid(file=self.cert_file))
921                    r = { 'RequestAccessResponseBody' : r }
922                else:
923                    r = self.call_RequestAccess(uri, req, 
924                            self.cert_file, self.cert_pwd, self.trusted_certs)
925            except service_error, e:
926                if e.code == service_error.access:
927                    self.log.debug("[get_access] Access denied")
928                    r = None
929                    continue
930                else:
931                    raise e
932
933            if r.has_key('RequestAccessResponseBody'):
934                # Through to here we have a valid response, not a fault.
935                # Access denied is a fault, so something better or worse than
936                # access denied has happened.
937                r = r['RequestAccessResponseBody']
938                self.log.debug("[get_access] Access granted")
939                break
940            else:
941                raise service_error(service_error.protocol,
942                        "Bad proxy response")
943       
944        if not r:
945            raise service_error(service_error.access, 
946                    "Access denied by %s (%s)" % (tb, uri))
947
948        tbparam[tb] = { 
949                "allocID" : r['allocID'],
950                "uri": uri,
951                }
952
953        # Collect the responses corresponding to the services this testbed
954        # exports.  These will be the service requests that we will include in
955        # the start segment requests (with appropriate visibility values) to
956        # import and export the segments.
957        for s in r.get('service', []):
958            id = s.get('id', None)
959            if id and id in e_keys:
960                e_keys[id].reqs.append(s)
961
962        # Add attributes to parameter space.  We don't allow attributes to
963        # overlay any parameters already installed.
964        for a in r.get('fedAttr', []):
965            try:
966                if a['attribute'] and \
967                        isinstance(a['attribute'], basestring)\
968                        and not tbparam[tb].has_key(a['attribute'].lower()):
969                    tbparam[tb][a['attribute'].lower()] = a['value']
970            except KeyError:
971                self.log.error("Bad attribute in response: %s" % a)
972
973    def release_access(self, tb, aid, uri=None):
974        """
975        Release access to testbed through fedd
976        """
977
978        if not uri:
979            uri = self.tbmap.get(tb, None)
980        if not uri:
981            raise service_error(service_error.server_config, 
982                    "Unknown testbed: %s" % tb)
983
984        if self.local_access.has_key(uri):
985            resp = self.local_access[uri].ReleaseAccess(\
986                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
987                    fedid(file=self.cert_file))
988            resp = { 'ReleaseAccessResponseBody': resp } 
989        else:
990            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
991                    self.cert_file, self.cert_pwd, self.trusted_certs)
992
993        # better error coding
994
995    def remote_ns2topdl(self, uri, desc):
996
997        req = {
998                'description' : { 'ns2description': desc },
999            }
1000
1001        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
1002                self.trusted_certs)
1003
1004        if r.has_key('Ns2TopdlResponseBody'):
1005            r = r['Ns2TopdlResponseBody']
1006            ed = r.get('experimentdescription', None)
1007            if ed.has_key('topdldescription'):
1008                return topdl.Topology(**ed['topdldescription'])
1009            else:
1010                raise service_error(service_error.protocol, 
1011                        "Bad splitter response (no output)")
1012        else:
1013            raise service_error(service_error.protocol, "Bad splitter response")
1014
1015    class start_segment:
1016        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1017                cert_pwd=None, trusted_certs=None, caller=None,
1018                log_collector=None):
1019            self.log = log
1020            self.debug = debug
1021            self.cert_file = cert_file
1022            self.cert_pwd = cert_pwd
1023            self.trusted_certs = None
1024            self.caller = caller
1025            self.testbed = testbed
1026            self.log_collector = log_collector
1027            self.response = None
1028            self.node = { }
1029
1030        def make_map(self, resp):
1031            for e in resp.get('embedding', []):
1032                if 'toponame' in e and 'physname' in e:
1033                    self.node[e['toponame']] = e['physname'][0]
1034
1035        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1036            req = {
1037                    'allocID': { 'fedid' : aid }, 
1038                    'segmentdescription': { 
1039                        'topdldescription': topo.to_dict(),
1040                    },
1041                }
1042
1043            if connInfo:
1044                req['connection'] = connInfo
1045
1046            import_svcs = [ s for m in masters.values() \
1047                    for s in m if self.testbed in s.importers]
1048
1049            if import_svcs or self.testbed in masters:
1050                req['service'] = []
1051
1052            for s in import_svcs:
1053                for r in s.reqs:
1054                    sr = copy.deepcopy(r)
1055                    sr['visibility'] = 'import';
1056                    req['service'].append(sr)
1057
1058            for s in masters.get(self.testbed, []):
1059                for r in s.reqs:
1060                    sr = copy.deepcopy(r)
1061                    sr['visibility'] = 'export';
1062                    req['service'].append(sr)
1063
1064            if attrs:
1065                req['fedAttr'] = attrs
1066
1067            try:
1068                self.log.debug("Calling StartSegment at %s " % uri)
1069                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1070                        self.trusted_certs)
1071                if r.has_key('StartSegmentResponseBody'):
1072                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1073                            None)
1074                    if lval and self.log_collector:
1075                        for line in  lval.splitlines(True):
1076                            self.log_collector.write(line)
1077                    self.make_map(r['StartSegmentResponseBody'])
1078                    self.response = r
1079                else:
1080                    raise service_error(service_error.internal, 
1081                            "Bad response!?: %s" %r)
1082                return True
1083            except service_error, e:
1084                self.log.error("Start segment failed on %s: %s" % \
1085                        (self.testbed, e))
1086                return False
1087
1088
1089
1090    class terminate_segment:
1091        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1092                cert_pwd=None, trusted_certs=None, caller=None):
1093            self.log = log
1094            self.debug = debug
1095            self.cert_file = cert_file
1096            self.cert_pwd = cert_pwd
1097            self.trusted_certs = None
1098            self.caller = caller
1099            self.testbed = testbed
1100
1101        def __call__(self, uri, aid ):
1102            req = {
1103                    'allocID': aid , 
1104                }
1105            try:
1106                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1107                        self.trusted_certs)
1108                return True
1109            except service_error, e:
1110                self.log.error("Terminate segment failed on %s: %s" % \
1111                        (self.testbed, e))
1112                return False
1113   
1114
1115    def allocate_resources(self, allocated, masters, eid, expid, 
1116            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1117            attrs=None, connInfo={}):
1118
1119        started = { }           # Testbeds where a sub-experiment started
1120                                # successfully
1121
1122        # XXX
1123        fail_soft = False
1124
1125        log = alloc_log or self.log
1126
1127        thread_pool = self.thread_pool(self.nthreads)
1128        threads = [ ]
1129        starters = [ ]
1130
1131        for tb in allocated.keys():
1132            # Create and start a thread to start the segment, and save it
1133            # to get the return value later
1134            tb_attrs = copy.copy(attrs)
1135            thread_pool.wait_for_slot()
1136            uri = tbparams[tb].get('uri', \
1137                    self.tbmap.get(testbed_base(tb), None))
1138            base, suffix = split_testbed(tb)
1139            if suffix:
1140                tb_attrs.append({'attribute': 'experiment_name', 
1141                    'value': "%s-%s" % (eid, suffix)})
1142            else:
1143                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1144            if not uri:
1145                raise service_error(service_error.internal, 
1146                        "Unknown testbed %s !?" % tb)
1147
1148            if tbparams[tb].has_key('allocID') and \
1149                    tbparams[tb]['allocID'].has_key('fedid'):
1150                aid = tbparams[tb]['allocID']['fedid']
1151            else:
1152                raise service_error(service_error.internal, 
1153                        "No alloc id for testbed %s !?" % tb)
1154
1155            s = self.start_segment(log=log, debug=self.debug,
1156                    testbed=tb, cert_file=self.cert_file,
1157                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1158                    caller=self.call_StartSegment,
1159                    log_collector=log_collector)
1160            starters.append(s)
1161            t  = self.pooled_thread(\
1162                    target=s, name=tb,
1163                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1164                    pdata=thread_pool, trace_file=self.trace_file)
1165            threads.append(t)
1166            t.start()
1167
1168        # Wait until all finish (keep pinging the log, though)
1169        mins = 0
1170        revoked = False
1171        while not thread_pool.wait_for_all_done(60.0):
1172            mins += 1
1173            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1174                    % mins)
1175            if not revoked and \
1176                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1177                # a testbed has failed.  Revoke this experiment's
1178                # synchronizarion values so that sub experiments will not
1179                # deadlock waiting for synchronization that will never happen
1180                self.log.info("A subexperiment has failed to swap in, " + \
1181                        "revoking synch keys")
1182                var_key = "fedid:%s" % expid
1183                for k in self.synch_store.all_keys():
1184                    if len(k) > 45 and k[0:46] == var_key:
1185                        self.synch_store.revoke_key(k)
1186                revoked = True
1187
1188        failed = [ t.getName() for t in threads if not t.rv ]
1189        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1190
1191        # If one failed clean up, unless fail_soft is set
1192        if failed:
1193            if not fail_soft:
1194                thread_pool.clear()
1195                for tb in succeeded:
1196                    # Create and start a thread to stop the segment
1197                    thread_pool.wait_for_slot()
1198                    uri = tbparams[tb]['uri']
1199                    t  = self.pooled_thread(\
1200                            target=self.terminate_segment(log=log,
1201                                testbed=tb,
1202                                cert_file=self.cert_file, 
1203                                cert_pwd=self.cert_pwd,
1204                                trusted_certs=self.trusted_certs,
1205                                caller=self.call_TerminateSegment),
1206                            args=(uri, tbparams[tb]['federant']['allocID']),
1207                            name=tb,
1208                            pdata=thread_pool, trace_file=self.trace_file)
1209                    t.start()
1210                # Wait until all finish (if any are being stopped)
1211                if succeeded:
1212                    thread_pool.wait_for_all_done()
1213
1214                # release the allocations
1215                for tb in tbparams.keys():
1216                    self.release_access(tb, tbparams[tb]['allocID'],
1217                            tbparams[tb].get('uri', None))
1218                # Remove the placeholder
1219                self.state_lock.acquire()
1220                self.state[eid]['experimentStatus'] = 'failed'
1221                if self.state_filename: self.write_state()
1222                self.state_lock.release()
1223                # Remove the repo dir
1224                self.remove_dirs("%s/%s" %(self.repodir, expid))
1225                # Walk up tmpdir, deleting as we go
1226                if self.cleanup:
1227                    self.remove_dirs(tmpdir)
1228                else:
1229                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1230
1231
1232                log.error("Swap in failed on %s" % ",".join(failed))
1233                return
1234        else:
1235            # Walk through the successes and gather the virtual to physical
1236            # mapping.
1237            embedding = [ ]
1238            for s in starters:
1239                for k, v in s.node.items():
1240                    embedding.append({
1241                        'toponame': k, 
1242                        'physname': [ v],
1243                        'testbed': s.testbed
1244                        })
1245            log.info("[start_segment]: Experiment %s active" % eid)
1246
1247
1248        # Walk up tmpdir, deleting as we go
1249        if self.cleanup:
1250            self.remove_dirs(tmpdir)
1251        else:
1252            log.debug("[start_experiment]: not removing %s" % tmpdir)
1253
1254        # Insert the experiment into our state and update the disk copy.
1255        self.state_lock.acquire()
1256        self.state[expid]['experimentStatus'] = 'active'
1257        self.state[eid] = self.state[expid]
1258        self.state[eid]['experimentdescription']['topdldescription'] = \
1259                top.to_dict()
1260        self.state[eid]['embedding'] = embedding
1261        if self.state_filename: self.write_state()
1262        self.state_lock.release()
1263        return
1264
1265
1266    def add_kit(self, e, kit):
1267        """
1268        Add a Software object created from the list of (install, location)
1269        tuples passed as kit  to the software attribute of an object e.  We
1270        do this enough to break out the code, but it's kind of a hack to
1271        avoid changing the old tuple rep.
1272        """
1273
1274        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1275
1276        if isinstance(e.software, list): e.software.extend(s)
1277        else: e.software = s
1278
1279
1280    def create_experiment_state(self, fid, req, expid, expcert,
1281            state='starting'):
1282        """
1283        Create the initial entry in the experiment's state.  The expid and
1284        expcert are the experiment's fedid and certifacte that represents that
1285        ID, which are installed in the experiment state.  If the request
1286        includes a suggested local name that is used if possible.  If the local
1287        name is already taken by an experiment owned by this user that has
1288        failed, it is overwritten.  Otherwise new letters are added until a
1289        valid localname is found.  The generated local name is returned.
1290        """
1291
1292        if req.has_key('experimentID') and \
1293                req['experimentID'].has_key('localname'):
1294            overwrite = False
1295            eid = req['experimentID']['localname']
1296            # If there's an old failed experiment here with the same local name
1297            # and accessible by this user, we'll overwrite it, otherwise we'll
1298            # fall through and do the collision avoidance.
1299            old_expid = self.get_experiment_fedid(eid)
1300            if old_expid and self.check_experiment_access(fid, old_expid):
1301                self.state_lock.acquire()
1302                status = self.state[eid].get('experimentStatus', None)
1303                if status and status == 'failed':
1304                    # remove the old access attribute
1305                    self.auth.unset_attribute(fid, old_expid)
1306                    overwrite = True
1307                    del self.state[eid]
1308                    del self.state[old_expid]
1309                self.state_lock.release()
1310            self.state_lock.acquire()
1311            while (self.state.has_key(eid) and not overwrite):
1312                eid += random.choice(string.ascii_letters)
1313            # Initial state
1314            self.state[eid] = {
1315                    'experimentID' : \
1316                            [ { 'localname' : eid }, {'fedid': expid } ],
1317                    'experimentStatus': state,
1318                    'experimentAccess': { 'X509' : expcert },
1319                    'owner': fid,
1320                    'log' : [],
1321                }
1322            self.state[expid] = self.state[eid]
1323            if self.state_filename: self.write_state()
1324            self.state_lock.release()
1325        else:
1326            eid = self.exp_stem
1327            for i in range(0,5):
1328                eid += random.choice(string.ascii_letters)
1329            self.state_lock.acquire()
1330            while (self.state.has_key(eid)):
1331                eid = self.exp_stem
1332                for i in range(0,5):
1333                    eid += random.choice(string.ascii_letters)
1334            # Initial state
1335            self.state[eid] = {
1336                    'experimentID' : \
1337                            [ { 'localname' : eid }, {'fedid': expid } ],
1338                    'experimentStatus': state,
1339                    'experimentAccess': { 'X509' : expcert },
1340                    'owner': fid,
1341                    'log' : [],
1342                }
1343            self.state[expid] = self.state[eid]
1344            if self.state_filename: self.write_state()
1345            self.state_lock.release()
1346
1347        return eid
1348
1349
1350    def allocate_ips_to_topo(self, top):
1351        """
1352        Add an ip4_address attribute to all the hosts in the topology, based on
1353        the shared substrates on which they sit.  An /etc/hosts file is also
1354        created and returned as a list of hostfiles entries.  We also return
1355        the allocator, because we may need to allocate IPs to portals
1356        (specifically DRAGON portals).
1357        """
1358        subs = sorted(top.substrates, 
1359                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1360                reverse=True)
1361        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1362        ifs = { }
1363        hosts = [ ]
1364
1365        for idx, s in enumerate(subs):
1366            net_size = len(s.interfaces)+2
1367
1368            a = ips.allocate(net_size)
1369            if a :
1370                base, num = a
1371                if num < net_size: 
1372                    raise service_error(service_error.internal,
1373                            "Allocator returned wrong number of IPs??")
1374            else:
1375                raise service_error(service_error.req, 
1376                        "Cannot allocate IP addresses")
1377            mask = ips.min_alloc
1378            while mask < net_size:
1379                mask *= 2
1380
1381            netmask = ((2**32-1) ^ (mask-1))
1382
1383            base += 1
1384            for i in s.interfaces:
1385                i.attribute.append(
1386                        topdl.Attribute('ip4_address', 
1387                            "%s" % ip_addr(base)))
1388                i.attribute.append(
1389                        topdl.Attribute('ip4_netmask', 
1390                            "%s" % ip_addr(int(netmask))))
1391
1392                hname = i.element.name
1393                if ifs.has_key(hname):
1394                    hosts.append("%s\t%s-%s %s-%d" % \
1395                            (ip_addr(base), hname, s.name, hname,
1396                                ifs[hname]))
1397                else:
1398                    ifs[hname] = 0
1399                    hosts.append("%s\t%s-%s %s-%d %s" % \
1400                            (ip_addr(base), hname, s.name, hname,
1401                                ifs[hname], hname))
1402
1403                ifs[hname] += 1
1404                base += 1
1405        return hosts, ips
1406
1407    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1408            tbparams, masters):
1409        """
1410        Request access to the various testbeds required for this instantiation
1411        (passed in as testbeds).  User, access_user, expoert_project and master
1412        are used to construct the correct requests.  Per-testbed parameters are
1413        returned in tbparams.
1414        """
1415        for tb in testbeds:
1416            self.get_access(tb, None, tbparams, access_user, masters)
1417            allocated[tb] = 1
1418
1419    def split_topology(self, top, topo, testbeds):
1420        """
1421        Create the sub-topologies that are needed for experiment instantiation.
1422        """
1423        for tb in testbeds:
1424            topo[tb] = top.clone()
1425            # copy in for loop allows deletions from the original
1426            for e in [ e for e in topo[tb].elements]:
1427                etb = e.get_attribute('testbed')
1428                # NB: elements without a testbed attribute won't appear in any
1429                # sub topologies. 
1430                if not etb or etb != tb:
1431                    for i in e.interface:
1432                        for s in i.subs:
1433                            try:
1434                                s.interfaces.remove(i)
1435                            except ValueError:
1436                                raise service_error(service_error.internal,
1437                                        "Can't remove interface??")
1438                    topo[tb].elements.remove(e)
1439            topo[tb].make_indices()
1440
1441    def wrangle_software(self, expid, top, topo, tbparams):
1442        """
1443        Copy software out to the repository directory, allocate permissions and
1444        rewrite the segment topologies to look for the software in local
1445        places.
1446        """
1447
1448        # Copy the rpms and tarfiles to a distribution directory from
1449        # which the federants can retrieve them
1450        linkpath = "%s/software" %  expid
1451        softdir ="%s/%s" % ( self.repodir, linkpath)
1452        softmap = { }
1453        # These are in a list of tuples format (each kit).  This comprehension
1454        # unwraps them into a single list of tuples that initilaizes the set of
1455        # tuples.
1456        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1457                for p, t in l ])
1458        pkgs.update([x.location for e in top.elements \
1459                for x in e.software])
1460        try:
1461            os.makedirs(softdir)
1462        except EnvironmentError, e:
1463            raise service_error(
1464                    "Cannot create software directory: %s" % e)
1465        # The actual copying.  Everything's converted into a url for copying.
1466        for pkg in pkgs:
1467            loc = pkg
1468
1469            scheme, host, path = urlparse(loc)[0:3]
1470            dest = os.path.basename(path)
1471            if not scheme:
1472                if not loc.startswith('/'):
1473                    loc = "/%s" % loc
1474                loc = "file://%s" %loc
1475            try:
1476                u = urlopen(loc)
1477            except Exception, e:
1478                raise service_error(service_error.req, 
1479                        "Cannot open %s: %s" % (loc, e))
1480            try:
1481                f = open("%s/%s" % (softdir, dest) , "w")
1482                self.log.debug("Writing %s/%s" % (softdir,dest) )
1483                data = u.read(4096)
1484                while data:
1485                    f.write(data)
1486                    data = u.read(4096)
1487                f.close()
1488                u.close()
1489            except Exception, e:
1490                raise service_error(service_error.internal,
1491                        "Could not copy %s: %s" % (loc, e))
1492            path = re.sub("/tmp", "", linkpath)
1493            # XXX
1494            softmap[pkg] = \
1495                    "%s/%s/%s" %\
1496                    ( self.repo_url, path, dest)
1497
1498            # Allow the individual segments to access the software.
1499            for tb in tbparams.keys():
1500                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1501                        "/%s/%s" % ( path, dest))
1502
1503        # Convert the software locations in the segments into the local
1504        # copies on this host
1505        for soft in [ s for tb in topo.values() \
1506                for e in tb.elements \
1507                    if getattr(e, 'software', False) \
1508                        for s in e.software ]:
1509            if softmap.has_key(soft.location):
1510                soft.location = softmap[soft.location]
1511
1512
1513    def new_experiment(self, req, fid):
1514        """
1515        The external interface to empty initial experiment creation called from
1516        the dispatcher.
1517
1518        Creates a working directory, splits the incoming description using the
1519        splitter script and parses out the avrious subsections using the
1520        lcasses above.  Once each sub-experiment is created, use pooled threads
1521        to instantiate them and start it all up.
1522        """
1523        if not self.auth.check_attribute(fid, 'new'):
1524            raise service_error(service_error.access, "New access denied")
1525
1526        try:
1527            tmpdir = tempfile.mkdtemp(prefix="split-")
1528        except EnvironmentError:
1529            raise service_error(service_error.internal, "Cannot create tmp dir")
1530
1531        try:
1532            access_user = self.accessdb[fid]
1533        except KeyError:
1534            raise service_error(service_error.internal,
1535                    "Access map and authorizer out of sync in " + \
1536                            "new_experiment for fedid %s"  % fid)
1537
1538        pid = "dummy"
1539        gid = "dummy"
1540
1541        req = req.get('NewRequestBody', None)
1542        if not req:
1543            raise service_error(service_error.req,
1544                    "Bad request format (no NewRequestBody)")
1545
1546        # Generate an ID for the experiment (slice) and a certificate that the
1547        # allocator can use to prove they own it.  We'll ship it back through
1548        # the encrypted connection.
1549        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1550
1551        #now we're done with the tmpdir, and it should be empty
1552        if self.cleanup:
1553            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1554            os.rmdir(tmpdir)
1555        else:
1556            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1557
1558        eid = self.create_experiment_state(fid, req, expid, expcert, 
1559                state='empty')
1560
1561        # Let users touch the state
1562        self.auth.set_attribute(fid, expid)
1563        self.auth.set_attribute(expid, expid)
1564        # Override fedids can manipulate state as well
1565        for o in self.overrides:
1566            self.auth.set_attribute(o, expid)
1567
1568        rv = {
1569                'experimentID': [
1570                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1571                ],
1572                'experimentStatus': 'empty',
1573                'experimentAccess': { 'X509' : expcert }
1574            }
1575
1576        return rv
1577
1578    def create_experiment(self, req, fid):
1579        """
1580        The external interface to experiment creation called from the
1581        dispatcher.
1582
1583        Creates a working directory, splits the incoming description using the
1584        splitter script and parses out the various subsections using the
1585        classes above.  Once each sub-experiment is created, use pooled threads
1586        to instantiate them and start it all up.
1587        """
1588
1589        req = req.get('CreateRequestBody', None)
1590        if not req:
1591            raise service_error(service_error.req,
1592                    "Bad request format (no CreateRequestBody)")
1593
1594        # Get the experiment access
1595        exp = req.get('experimentID', None)
1596        if exp:
1597            if exp.has_key('fedid'):
1598                key = exp['fedid']
1599                expid = key
1600                eid = None
1601            elif exp.has_key('localname'):
1602                key = exp['localname']
1603                eid = key
1604                expid = None
1605            else:
1606                raise service_error(service_error.req, "Unknown lookup type")
1607        else:
1608            raise service_error(service_error.req, "No request?")
1609
1610        self.check_experiment_access(fid, key)
1611
1612        try:
1613            tmpdir = tempfile.mkdtemp(prefix="split-")
1614            os.mkdir(tmpdir+"/keys")
1615        except EnvironmentError:
1616            raise service_error(service_error.internal, "Cannot create tmp dir")
1617
1618        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1619        gw_secretkey_base = "fed.%s" % self.ssh_type
1620        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1621        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1622        tclfile = tmpdir + "/experiment.tcl"
1623        tbparams = { }
1624        try:
1625            access_user = self.accessdb[fid]
1626        except KeyError:
1627            raise service_error(service_error.internal,
1628                    "Access map and authorizer out of sync in " + \
1629                            "create_experiment for fedid %s"  % fid)
1630
1631        pid = "dummy"
1632        gid = "dummy"
1633
1634        # The tcl parser needs to read a file so put the content into that file
1635        descr=req.get('experimentdescription', None)
1636        if descr:
1637            file_content=descr.get('ns2description', None)
1638            if file_content:
1639                try:
1640                    f = open(tclfile, 'w')
1641                    f.write(file_content)
1642                    f.close()
1643                except EnvironmentError:
1644                    raise service_error(service_error.internal,
1645                            "Cannot write temp experiment description")
1646            else:
1647                raise service_error(service_error.req, 
1648                        "Only ns2descriptions supported")
1649        else:
1650            raise service_error(service_error.req, "No experiment description")
1651
1652        self.state_lock.acquire()
1653        if self.state.has_key(key):
1654            self.state[key]['experimentStatus'] = "starting"
1655            for e in self.state[key].get('experimentID',[]):
1656                if not expid and e.has_key('fedid'):
1657                    expid = e['fedid']
1658                elif not eid and e.has_key('localname'):
1659                    eid = e['localname']
1660        self.state_lock.release()
1661
1662        if not (eid and expid):
1663            raise service_error(service_error.internal, 
1664                    "Cannot find local experiment info!?")
1665
1666        try: 
1667            # This catches exceptions to clear the placeholder if necessary
1668            try:
1669                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1670            except ValueError:
1671                raise service_error(service_error.server_config, 
1672                        "Bad key type (%s)" % self.ssh_type)
1673
1674            # Copy the service request
1675            tb_services = [ s for s in req.get('service',[]) ]
1676            # Translate to topdl
1677            if self.splitter_url:
1678                self.log.debug("Calling remote topdl translator at %s" % \
1679                        self.splitter_url)
1680                top = self.remote_ns2topdl(self.splitter_url, file_content)
1681            else:
1682                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1683                    str(self.muxmax), '-m', 'dummy']
1684
1685                tclcmd.extend([pid, gid, eid, tclfile])
1686
1687                self.log.debug("running local splitter %s", " ".join(tclcmd))
1688                # This is just fantastic.  As a side effect the parser copies
1689                # tb_compat.tcl into the current directory, so that directory
1690                # must be writable by the fedd user.  Doing this in the
1691                # temporary subdir ensures this is the case.
1692                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1693                        cwd=tmpdir)
1694                split_data = tclparser.stdout
1695
1696                top = topdl.topology_from_xml(file=split_data, top="experiment")
1697
1698            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1699            # Find the testbeds to look up
1700            testbeds = set([ a.value for e in top.elements \
1701                    for a in e.attribute \
1702                        if a.attribute == 'testbed'])
1703
1704            tb_hosts = { }
1705            for tb in testbeds:
1706                tb_hosts[tb] = [ e.name for e in top.elements \
1707                        if isinstance(e, topdl.Computer) and \
1708                            e.get_attribute('testbed') and \
1709                            e.get_attribute('testbed') == tb]
1710
1711            masters = { }           # testbeds exporting services
1712            pmasters = { }          # Testbeds exporting services that
1713                                    # need portals
1714            for s in tb_services:
1715                # If this is a service request with the importall field
1716                # set, fill it out.
1717
1718                if s.get('importall', False):
1719                    s['import'] = [ tb for tb in testbeds \
1720                            if tb not in s.get('export',[])]
1721                    del s['importall']
1722
1723                # Add the service to masters
1724                for tb in s.get('export', []):
1725                    if s.get('name', None):
1726                        if tb not in masters:
1727                            masters[tb] = [ ]
1728
1729                        params = { }
1730                        if 'fedAttr' in s:
1731                            for a in s['fedAttr']:
1732                                params[a.get('attribute', '')] = \
1733                                        a.get('value','')
1734
1735                        fser = federated_service(name=s['name'],
1736                                exporter=tb, importers=s.get('import',[]),
1737                                params=params)
1738                        if fser.name == 'hide_hosts' \
1739                                and 'hosts' not in fser.params:
1740                            fser.params['hosts'] = \
1741                                    ",".join(tb_hosts.get(fser.exporter, []))
1742                        masters[tb].append(fser)
1743
1744                        if fser.portal:
1745                            if tb not in pmasters: pmasters[tb] = [ fser ]
1746                            else: pmasters[tb].append(fser)
1747                    else:
1748                        self.log.error('Testbed service does not have name " + \
1749                                "and importers')
1750
1751
1752            allocated = { }         # Testbeds we can access
1753            topo ={ }               # Sub topologies
1754            connInfo = { }          # Connection information
1755
1756            self.get_access_to_testbeds(testbeds, access_user, allocated, 
1757                    tbparams, masters)
1758
1759            self.split_topology(top, topo, testbeds)
1760
1761            # Copy configuration files into the remote file store
1762            # The config urlpath
1763            configpath = "/%s/config" % expid
1764            # The config file system location
1765            configdir ="%s%s" % ( self.repodir, configpath)
1766            try:
1767                os.makedirs(configdir)
1768            except EnvironmentError, e:
1769                raise service_error(service_error.internal,
1770                        "Cannot create config directory: %s" % e)
1771            try:
1772                f = open("%s/hosts" % configdir, "w")
1773                f.write('\n'.join(hosts))
1774                f.close()
1775            except EnvironmentError, e:
1776                raise service_error(service_error.internal, 
1777                        "Cannot write hosts file: %s" % e)
1778            try:
1779                copy_file("%s" % gw_pubkey, "%s/%s" % \
1780                        (configdir, gw_pubkey_base))
1781                copy_file("%s" % gw_secretkey, "%s/%s" % \
1782                        (configdir, gw_secretkey_base))
1783            except EnvironmentError, e:
1784                raise service_error(service_error.internal, 
1785                        "Cannot copy keyfiles: %s" % e)
1786
1787            # Allow the individual testbeds to access the configuration files.
1788            for tb in tbparams.keys():
1789                asignee = tbparams[tb]['allocID']['fedid']
1790                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1791                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1792
1793            part = experiment_partition(self.auth, self.store_url, self.tbmap,
1794                    self.muxmax, self.direct_transit)
1795            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1796                    connInfo, expid)
1797            # Now get access to the dynamic testbeds (those added above)
1798            for tb in [ t for t in topo if t not in allocated]:
1799                self.get_access(tb, None, tbparams, access_user, masters)
1800                allocated[tb] = 1
1801                store_keys = topo[tb].get_attribute('store_keys')
1802                # Give the testbed access to keys it exports or imports
1803                if store_keys:
1804                    for sk in store_keys.split(" "):
1805                        self.auth.set_attribute(\
1806                                tbparams[tb]['allocID']['fedid'], sk)
1807
1808            self.wrangle_software(expid, top, topo, tbparams)
1809
1810            vtopo = topdl.topology_to_vtopo(top)
1811            vis = self.genviz(vtopo)
1812
1813            # save federant information
1814            for k in allocated.keys():
1815                tbparams[k]['federant'] = {
1816                        'name': [ { 'localname' : eid} ],
1817                        'allocID' : tbparams[k]['allocID'],
1818                        'uri': tbparams[k]['uri'],
1819                    }
1820
1821            self.state_lock.acquire()
1822            self.state[eid]['vtopo'] = vtopo
1823            self.state[eid]['vis'] = vis
1824            self.state[eid]['experimentdescription'] = \
1825                    { 'topdldescription': top.to_dict() }
1826            self.state[eid]['federant'] = \
1827                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1828                        if tbparams[tb].has_key('federant') ]
1829            if self.state_filename: 
1830                self.write_state()
1831            self.state_lock.release()
1832        except service_error, e:
1833            # If something goes wrong in the parse (usually an access error)
1834            # clear the placeholder state.  From here on out the code delays
1835            # exceptions.  Failing at this point returns a fault to the remote
1836            # caller.
1837
1838            self.state_lock.acquire()
1839            del self.state[eid]
1840            del self.state[expid]
1841            if self.state_filename: self.write_state()
1842            self.state_lock.release()
1843            raise e
1844
1845
1846        # Start the background swapper and return the starting state.  From
1847        # here on out, the state will stick around a while.
1848
1849        # Let users touch the state
1850        self.auth.set_attribute(fid, expid)
1851        self.auth.set_attribute(expid, expid)
1852        # Override fedids can manipulate state as well
1853        for o in self.overrides:
1854            self.auth.set_attribute(o, expid)
1855
1856        # Create a logger that logs to the experiment's state object as well as
1857        # to the main log file.
1858        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1859        alloc_collector = self.list_log(self.state[eid]['log'])
1860        h = logging.StreamHandler(alloc_collector)
1861        # XXX: there should be a global one of these rather than repeating the
1862        # code.
1863        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1864                    '%d %b %y %H:%M:%S'))
1865        alloc_log.addHandler(h)
1866       
1867        attrs = [ 
1868                {
1869                    'attribute': 'ssh_pubkey', 
1870                    'value': '%s/%s/config/%s' % \
1871                            (self.repo_url, expid, gw_pubkey_base)
1872                },
1873                {
1874                    'attribute': 'ssh_secretkey', 
1875                    'value': '%s/%s/config/%s' % \
1876                            (self.repo_url, expid, gw_secretkey_base)
1877                },
1878                {
1879                    'attribute': 'hosts', 
1880                    'value': '%s/%s/config/hosts' % \
1881                            (self.repo_url, expid)
1882                },
1883            ]
1884
1885        # transit and disconnected testbeds may not have a connInfo entry.
1886        # Fill in the blanks.
1887        for t in allocated.keys():
1888            if not connInfo.has_key(t):
1889                connInfo[t] = { }
1890
1891        # Start a thread to do the resource allocation
1892        t  = Thread(target=self.allocate_resources,
1893                args=(allocated, masters, eid, expid, tbparams, 
1894                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1895                    connInfo),
1896                name=eid)
1897        t.start()
1898
1899        rv = {
1900                'experimentID': [
1901                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1902                ],
1903                'experimentStatus': 'starting',
1904            }
1905
1906        return rv
1907   
1908    def get_experiment_fedid(self, key):
1909        """
1910        find the fedid associated with the localname key in the state database.
1911        """
1912
1913        rv = None
1914        self.state_lock.acquire()
1915        if self.state.has_key(key):
1916            if isinstance(self.state[key], dict):
1917                try:
1918                    kl = [ f['fedid'] for f in \
1919                            self.state[key]['experimentID']\
1920                                if f.has_key('fedid') ]
1921                except KeyError:
1922                    self.state_lock.release()
1923                    raise service_error(service_error.internal, 
1924                            "No fedid for experiment %s when getting "+\
1925                                    "fedid(!?)" % key)
1926                if len(kl) == 1:
1927                    rv = kl[0]
1928                else:
1929                    self.state_lock.release()
1930                    raise service_error(service_error.internal, 
1931                            "multiple fedids for experiment %s when " +\
1932                                    "getting fedid(!?)" % key)
1933            else:
1934                self.state_lock.release()
1935                raise service_error(service_error.internal, 
1936                        "Unexpected state for %s" % key)
1937        self.state_lock.release()
1938        return rv
1939
1940    def check_experiment_access(self, fid, key):
1941        """
1942        Confirm that the fid has access to the experiment.  Though a request
1943        may be made in terms of a local name, the access attribute is always
1944        the experiment's fedid.
1945        """
1946        if not isinstance(key, fedid):
1947            key = self.get_experiment_fedid(key)
1948
1949        if self.auth.check_attribute(fid, key):
1950            return True
1951        else:
1952            raise service_error(service_error.access, "Access Denied")
1953
1954
1955    def get_handler(self, path, fid):
1956        self.log.info("Get handler %s %s" % (path, fid))
1957        if self.auth.check_attribute(fid, path):
1958            return ("%s/%s" % (self.repodir, path), "application/binary")
1959        else:
1960            return (None, None)
1961
1962    def get_vtopo(self, req, fid):
1963        """
1964        Return the stored virtual topology for this experiment
1965        """
1966        rv = None
1967        state = None
1968
1969        req = req.get('VtopoRequestBody', None)
1970        if not req:
1971            raise service_error(service_error.req,
1972                    "Bad request format (no VtopoRequestBody)")
1973        exp = req.get('experiment', None)
1974        if exp:
1975            if exp.has_key('fedid'):
1976                key = exp['fedid']
1977                keytype = "fedid"
1978            elif exp.has_key('localname'):
1979                key = exp['localname']
1980                keytype = "localname"
1981            else:
1982                raise service_error(service_error.req, "Unknown lookup type")
1983        else:
1984            raise service_error(service_error.req, "No request?")
1985
1986        self.check_experiment_access(fid, key)
1987
1988        self.state_lock.acquire()
1989        if self.state.has_key(key):
1990            if self.state[key].has_key('vtopo'):
1991                rv = { 'experiment' : {keytype: key },\
1992                        'vtopo': self.state[key]['vtopo'],\
1993                    }
1994            else:
1995                state = self.state[key]['experimentStatus']
1996        self.state_lock.release()
1997
1998        if rv: return rv
1999        else: 
2000            if state:
2001                raise service_error(service_error.partial, 
2002                        "Not ready: %s" % state)
2003            else:
2004                raise service_error(service_error.req, "No such experiment")
2005
2006    def get_vis(self, req, fid):
2007        """
2008        Return the stored visualization for this experiment
2009        """
2010        rv = None
2011        state = None
2012
2013        req = req.get('VisRequestBody', None)
2014        if not req:
2015            raise service_error(service_error.req,
2016                    "Bad request format (no VisRequestBody)")
2017        exp = req.get('experiment', None)
2018        if exp:
2019            if exp.has_key('fedid'):
2020                key = exp['fedid']
2021                keytype = "fedid"
2022            elif exp.has_key('localname'):
2023                key = exp['localname']
2024                keytype = "localname"
2025            else:
2026                raise service_error(service_error.req, "Unknown lookup type")
2027        else:
2028            raise service_error(service_error.req, "No request?")
2029
2030        self.check_experiment_access(fid, key)
2031
2032        self.state_lock.acquire()
2033        if self.state.has_key(key):
2034            if self.state[key].has_key('vis'):
2035                rv =  { 'experiment' : {keytype: key },\
2036                        'vis': self.state[key]['vis'],\
2037                        }
2038            else:
2039                state = self.state[key]['experimentStatus']
2040        self.state_lock.release()
2041
2042        if rv: return rv
2043        else:
2044            if state:
2045                raise service_error(service_error.partial, 
2046                        "Not ready: %s" % state)
2047            else:
2048                raise service_error(service_error.req, "No such experiment")
2049
2050    def clean_info_response(self, rv):
2051        """
2052        Remove the information in the experiment's state object that is not in
2053        the info response.
2054        """
2055        # Remove the owner info (should always be there, but...)
2056        if rv.has_key('owner'): del rv['owner']
2057
2058        # Convert the log into the allocationLog parameter and remove the
2059        # log entry (with defensive programming)
2060        if rv.has_key('log'):
2061            rv['allocationLog'] = "".join(rv['log'])
2062            del rv['log']
2063        else:
2064            rv['allocationLog'] = ""
2065
2066        if rv['experimentStatus'] != 'active':
2067            if rv.has_key('federant'): del rv['federant']
2068        else:
2069            # remove the allocationID and uri info from each federant
2070            for f in rv.get('federant', []):
2071                if f.has_key('allocID'): del f['allocID']
2072                if f.has_key('uri'): del f['uri']
2073
2074        return rv
2075
2076    def get_info(self, req, fid):
2077        """
2078        Return all the stored info about this experiment
2079        """
2080        rv = None
2081
2082        req = req.get('InfoRequestBody', None)
2083        if not req:
2084            raise service_error(service_error.req,
2085                    "Bad request format (no InfoRequestBody)")
2086        exp = req.get('experiment', None)
2087        if exp:
2088            if exp.has_key('fedid'):
2089                key = exp['fedid']
2090                keytype = "fedid"
2091            elif exp.has_key('localname'):
2092                key = exp['localname']
2093                keytype = "localname"
2094            else:
2095                raise service_error(service_error.req, "Unknown lookup type")
2096        else:
2097            raise service_error(service_error.req, "No request?")
2098
2099        self.check_experiment_access(fid, key)
2100
2101        # The state may be massaged by the service function that called
2102        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2103        # state.
2104        self.state_lock.acquire()
2105        if self.state.has_key(key):
2106            rv = copy.deepcopy(self.state[key])
2107        self.state_lock.release()
2108
2109        if rv:
2110            return self.clean_info_response(rv)
2111        else:
2112            raise service_error(service_error.req, "No such experiment")
2113
2114    def get_multi_info(self, req, fid):
2115        """
2116        Return all the stored info that this fedid can access
2117        """
2118        rv = { 'info': [ ] }
2119
2120        self.state_lock.acquire()
2121        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2122            try:
2123                self.check_experiment_access(fid, key)
2124            except service_error, e:
2125                if e.code == service_error.access:
2126                    continue
2127                else:
2128                    self.state_lock.release()
2129                    raise e
2130
2131            if self.state.has_key(key):
2132                e = copy.deepcopy(self.state[key])
2133                e = self.clean_info_response(e)
2134                rv['info'].append(e)
2135        self.state_lock.release()
2136        return rv
2137
2138    def remove_dirs(self, dir):
2139        """
2140        Remove the directory tree and all files rooted at dir.  Log any errors,
2141        but continue.
2142        """
2143        self.log.debug("[removedirs]: removing %s" % dir)
2144        try:
2145            for path, dirs, files in os.walk(dir, topdown=False):
2146                for f in files:
2147                    os.remove(os.path.join(path, f))
2148                for d in dirs:
2149                    os.rmdir(os.path.join(path, d))
2150            os.rmdir(dir)
2151        except EnvironmentError, e:
2152            self.log.error("Error deleting directory tree in %s" % e);
2153
2154    def terminate_experiment(self, req, fid):
2155        """
2156        Swap this experiment out on the federants and delete the shared
2157        information
2158        """
2159        tbparams = { }
2160        req = req.get('TerminateRequestBody', None)
2161        if not req:
2162            raise service_error(service_error.req,
2163                    "Bad request format (no TerminateRequestBody)")
2164        force = req.get('force', False)
2165        exp = req.get('experiment', None)
2166        if exp:
2167            if exp.has_key('fedid'):
2168                key = exp['fedid']
2169                keytype = "fedid"
2170            elif exp.has_key('localname'):
2171                key = exp['localname']
2172                keytype = "localname"
2173            else:
2174                raise service_error(service_error.req, "Unknown lookup type")
2175        else:
2176            raise service_error(service_error.req, "No request?")
2177
2178        self.check_experiment_access(fid, key)
2179
2180        dealloc_list = [ ]
2181
2182
2183        # Create a logger that logs to the dealloc_list as well as to the main
2184        # log file.
2185        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2186        h = logging.StreamHandler(self.list_log(dealloc_list))
2187        # XXX: there should be a global one of these rather than repeating the
2188        # code.
2189        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2190                    '%d %b %y %H:%M:%S'))
2191        dealloc_log.addHandler(h)
2192
2193        self.state_lock.acquire()
2194        fed_exp = self.state.get(key, None)
2195        repo = None
2196
2197        if fed_exp:
2198            # This branch of the conditional holds the lock to generate a
2199            # consistent temporary tbparams variable to deallocate experiments.
2200            # It releases the lock to do the deallocations and reacquires it to
2201            # remove the experiment state when the termination is complete.
2202
2203            # First make sure that the experiment creation is complete.
2204            status = fed_exp.get('experimentStatus', None)
2205
2206            if status:
2207                if status in ('starting', 'terminating'):
2208                    if not force:
2209                        self.state_lock.release()
2210                        raise service_error(service_error.partial, 
2211                                'Experiment still being created or destroyed')
2212                    else:
2213                        self.log.warning('Experiment in %s state ' % status + \
2214                                'being terminated by force.')
2215            else:
2216                # No status??? trouble
2217                self.state_lock.release()
2218                raise service_error(service_error.internal,
2219                        "Experiment has no status!?")
2220
2221            ids = []
2222            #  experimentID is a list of dicts that are self-describing
2223            #  identifiers.  This finds all the fedids and localnames - the
2224            #  keys of self.state - and puts them into ids.
2225            for id in fed_exp.get('experimentID', []):
2226                if id.has_key('fedid'): 
2227                    ids.append(id['fedid'])
2228                    repo = "%s" % id['fedid']
2229                if id.has_key('localname'): ids.append(id['localname'])
2230
2231            # Collect the allocation/segment ids into a dict keyed by the fedid
2232            # of the allocation (or a monotonically increasing integer) that
2233            # contains a tuple of uri, aid (which is a dict...)
2234            for i, fed in enumerate(fed_exp.get('federant', [])):
2235                try:
2236                    uri = fed['uri']
2237                    aid = fed['allocID']
2238                    k = fed['allocID'].get('fedid', i)
2239                except KeyError, e:
2240                    continue
2241                tbparams[k] = (uri, aid)
2242            fed_exp['experimentStatus'] = 'terminating'
2243            if self.state_filename: self.write_state()
2244            self.state_lock.release()
2245
2246            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2247            # then completes, so we can't wait if nothing starts.  So, no
2248            # tbparams, no start.
2249            if len(tbparams) > 0:
2250                thread_pool = self.thread_pool(self.nthreads)
2251                for k in tbparams.keys():
2252                    # Create and start a thread to stop the segment
2253                    thread_pool.wait_for_slot()
2254                    uri, aid = tbparams[k]
2255                    t  = self.pooled_thread(\
2256                            target=self.terminate_segment(log=dealloc_log,
2257                                testbed=uri,
2258                                cert_file=self.cert_file, 
2259                                cert_pwd=self.cert_pwd,
2260                                trusted_certs=self.trusted_certs,
2261                                caller=self.call_TerminateSegment),
2262                            args=(uri, aid), name=k,
2263                            pdata=thread_pool, trace_file=self.trace_file)
2264                    t.start()
2265                # Wait for completions
2266                thread_pool.wait_for_all_done()
2267
2268            # release the allocations (failed experiments have done this
2269            # already, and starting experiments may be in odd states, so we
2270            # ignore errors releasing those allocations
2271            try: 
2272                for k in tbparams.keys():
2273                    # This releases access by uri
2274                    uri, aid = tbparams[k]
2275                    self.release_access(None, aid, uri=uri)
2276            except service_error, e:
2277                if status != 'failed' and not force:
2278                    raise e
2279
2280            # Remove the terminated experiment
2281            self.state_lock.acquire()
2282            for id in ids:
2283                if self.state.has_key(id): del self.state[id]
2284
2285            if self.state_filename: self.write_state()
2286            self.state_lock.release()
2287
2288            # Delete any synch points associated with this experiment.  All
2289            # synch points begin with the fedid of the experiment.
2290            fedid_keys = set(["fedid:%s" % f for f in ids \
2291                    if isinstance(f, fedid)])
2292            for k in self.synch_store.all_keys():
2293                try:
2294                    if len(k) > 45 and k[0:46] in fedid_keys:
2295                        self.synch_store.del_value(k)
2296                except synch_store.BadDeletionError:
2297                    pass
2298            self.write_store()
2299
2300            # Remove software and other cached stuff from the filesystem.
2301            if repo:
2302                self.remove_dirs("%s/%s" % (self.repodir, repo))
2303               
2304            return { 
2305                    'experiment': exp , 
2306                    'deallocationLog': "".join(dealloc_list),
2307                    }
2308        else:
2309            # Don't forget to release the lock
2310            self.state_lock.release()
2311            raise service_error(service_error.req, "No saved state")
2312
2313
2314    def GetValue(self, req, fid):
2315        """
2316        Get a value from the synchronized store
2317        """
2318        req = req.get('GetValueRequestBody', None)
2319        if not req:
2320            raise service_error(service_error.req,
2321                    "Bad request format (no GetValueRequestBody)")
2322       
2323        name = req['name']
2324        wait = req['wait']
2325        rv = { 'name': name }
2326
2327        if self.auth.check_attribute(fid, name):
2328            self.log.debug("[GetValue] asking for %s " % name)
2329            try:
2330                v = self.synch_store.get_value(name, wait)
2331            except synch_store.RevokedKeyError:
2332                # No more synch on this key
2333                raise service_error(service_error.federant, 
2334                        "Synch key %s revoked" % name)
2335            if v is not None:
2336                rv['value'] = v
2337            self.log.debug("[GetValue] got %s from %s" % (v, name))
2338            return rv
2339        else:
2340            raise service_error(service_error.access, "Access Denied")
2341       
2342
2343    def SetValue(self, req, fid):
2344        """
2345        Set a value in the synchronized store
2346        """
2347        req = req.get('SetValueRequestBody', None)
2348        if not req:
2349            raise service_error(service_error.req,
2350                    "Bad request format (no SetValueRequestBody)")
2351       
2352        name = req['name']
2353        v = req['value']
2354
2355        if self.auth.check_attribute(fid, name):
2356            try:
2357                self.synch_store.set_value(name, v)
2358                self.write_store()
2359                self.log.debug("[SetValue] set %s to %s" % (name, v))
2360            except synch_store.CollisionError:
2361                # Translate into a service_error
2362                raise service_error(service_error.req,
2363                        "Value already set: %s" %name)
2364            except synch_store.RevokedKeyError:
2365                # No more synch on this key
2366                raise service_error(service_error.federant, 
2367                        "Synch key %s revoked" % name)
2368            return { 'name': name, 'value': v }
2369        else:
2370            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.