source: fedd/federation/experiment_control.py @ fd07c48

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since fd07c48 was fd07c48, checked in by Ted Faber <faber@…>, 14 years ago

Add support for user-supplied testbed mappings

  • Property mode set to 100644
File size: 82.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32
33import topdl
34import list_log
35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
45
46# Right now, no support for composition.
47class federated_service:
48    def __init__(self, name, exporter=None, importers=None, params=None, 
49            reqs=None, portal=None):
50        self.name=name
51        self.exporter=exporter
52        if importers is None: self.importers = []
53        else: self.importers=importers
54        if params is None: self.params = { }
55        else: self.params = params
56        if reqs is None: self.reqs = []
57        else: self.reqs = reqs
58
59        if portal is not None:
60            self.portal = portal
61        else:
62            self.portal = (name in federated_service.needs_portal)
63
64    def __str__(self):
65        return "name %s export %s import %s params %s reqs %s" % \
66                (self.name, self.exporter, self.importers, self.params,
67                        [ (r['name'], r['visibility']) for r in self.reqs] )
68
69    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
70
71class experiment_control_local:
72    """
73    Control of experiments that this system can directly access.
74
75    Includes experiment creation, termination and information dissemination.
76    Thred safe.
77    """
78
79    class ssh_cmd_timeout(RuntimeError): pass
80   
81    class thread_pool:
82        """
83        A class to keep track of a set of threads all invoked for the same
84        task.  Manages the mutual exclusion of the states.
85        """
86        def __init__(self, nthreads):
87            """
88            Start a pool.
89            """
90            self.changed = Condition()
91            self.started = 0
92            self.terminated = 0
93            self.nthreads = nthreads
94
95        def acquire(self):
96            """
97            Get the pool's lock.
98            """
99            self.changed.acquire()
100
101        def release(self):
102            """
103            Release the pool's lock.
104            """
105            self.changed.release()
106
107        def wait(self, timeout = None):
108            """
109            Wait for a pool thread to start or stop.
110            """
111            self.changed.wait(timeout)
112
113        def start(self):
114            """
115            Called by a pool thread to report starting.
116            """
117            self.changed.acquire()
118            self.started += 1
119            self.changed.notifyAll()
120            self.changed.release()
121
122        def terminate(self):
123            """
124            Called by a pool thread to report finishing.
125            """
126            self.changed.acquire()
127            self.terminated += 1
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def clear(self):
132            """
133            Clear all pool data.
134            """
135            self.changed.acquire()
136            self.started = 0
137            self.terminated =0
138            self.changed.notifyAll()
139            self.changed.release()
140
141        def wait_for_slot(self):
142            """
143            Wait until we have a free slot to start another pooled thread
144            """
145            self.acquire()
146            while self.started - self.terminated >= self.nthreads:
147                self.wait()
148            self.release()
149
150        def wait_for_all_done(self, timeout=None):
151            """
152            Wait until all active threads finish (and at least one has
153            started).  If a timeout is given, return after waiting that long
154            for termination.  If all threads are done (and one has started in
155            the since the last clear()) return True, otherwise False.
156            """
157            if timeout:
158                deadline = time.time() + timeout
159            self.acquire()
160            while self.started == 0 or self.started > self.terminated:
161                self.wait(timeout)
162                if timeout:
163                    if time.time() > deadline:
164                        break
165                    timeout = deadline - time.time()
166            self.release()
167            return not (self.started == 0 or self.started > self.terminated)
168
169    class pooled_thread(Thread):
170        """
171        One of a set of threads dedicated to a specific task.  Uses the
172        thread_pool class above for coordination.
173        """
174        def __init__(self, group=None, target=None, name=None, args=(), 
175                kwargs={}, pdata=None, trace_file=None):
176            Thread.__init__(self, group, target, name, args, kwargs)
177            self.rv = None          # Return value of the ops in this thread
178            self.exception = None   # Exception that terminated this thread
179            self.target=target      # Target function to run on start()
180            self.args = args        # Args to pass to target
181            self.kwargs = kwargs    # Additional kw args
182            self.pdata = pdata      # thread_pool for this class
183            # Logger for this thread
184            self.log = logging.getLogger("fedd.experiment_control")
185       
186        def run(self):
187            """
188            Emulate Thread.run, except add pool data manipulation and error
189            logging.
190            """
191            if self.pdata:
192                self.pdata.start()
193
194            if self.target:
195                try:
196                    self.rv = self.target(*self.args, **self.kwargs)
197                except service_error, s:
198                    self.exception = s
199                    self.log.error("Thread exception: %s %s" % \
200                            (s.code_string(), s.desc))
201                except:
202                    self.exception = sys.exc_info()[1]
203                    self.log.error(("Unexpected thread exception: %s" +\
204                            "Trace %s") % (self.exception,\
205                                traceback.format_exc()))
206            if self.pdata:
207                self.pdata.terminate()
208
209    call_RequestAccess = service_caller('RequestAccess')
210    call_ReleaseAccess = service_caller('ReleaseAccess')
211    call_StartSegment = service_caller('StartSegment')
212    call_TerminateSegment = service_caller('TerminateSegment')
213    call_Ns2Topdl = service_caller('Ns2Topdl')
214
215    def __init__(self, config=None, auth=None):
216        """
217        Intialize the various attributes, most from the config object
218        """
219
220        def parse_tarfile_list(tf):
221            """
222            Parse a tarfile list from the configuration.  This is a set of
223            paths and tarfiles separated by spaces.
224            """
225            rv = [ ]
226            if tf is not None:
227                tl = tf.split()
228                while len(tl) > 1:
229                    p, t = tl[0:2]
230                    del tl[0:2]
231                    rv.append((p, t))
232            return rv
233
234        self.thread_with_rv = experiment_control_local.pooled_thread
235        self.thread_pool = experiment_control_local.thread_pool
236        self.list_log = list_log.list_log
237
238        self.cert_file = config.get("experiment_control", "cert_file")
239        if self.cert_file:
240            self.cert_pwd = config.get("experiment_control", "cert_pwd")
241        else:
242            self.cert_file = config.get("globals", "cert_file")
243            self.cert_pwd = config.get("globals", "cert_pwd")
244
245        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
246                or config.get("globals", "trusted_certs")
247
248        self.repodir = config.get("experiment_control", "repodir")
249        self.repo_url = config.get("experiment_control", "repo_url", 
250                "https://users.isi.deterlab.net:23235");
251
252        self.exp_stem = "fed-stem"
253        self.log = logging.getLogger("fedd.experiment_control")
254        set_log_level(config, "experiment_control", self.log)
255        self.muxmax = 2
256        self.nthreads = 10
257        self.randomize_experiments = False
258
259        self.splitter = None
260        self.ssh_keygen = "/usr/bin/ssh-keygen"
261        self.ssh_identity_file = None
262
263
264        self.debug = config.getboolean("experiment_control", "create_debug")
265        self.cleanup = not config.getboolean("experiment_control", 
266                "leave_tmpfiles")
267        self.state_filename = config.get("experiment_control", 
268                "experiment_state")
269        self.store_filename = config.get("experiment_control", 
270                "synch_store")
271        self.store_url = config.get("experiment_control", "store_url")
272        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
273        self.fedkit = parse_tarfile_list(\
274                config.get("experiment_control", "fedkit"))
275        self.gatewaykit = parse_tarfile_list(\
276                config.get("experiment_control", "gatewaykit"))
277        accessdb_file = config.get("experiment_control", "accessdb")
278
279        self.ssh_pubkey_file = config.get("experiment_control", 
280                "ssh_pubkey_file")
281        self.ssh_privkey_file = config.get("experiment_control",
282                "ssh_privkey_file")
283        dt = config.get("experiment_control", "direct_transit")
284        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
285        else: self.direct_transit = [ ]
286        # NB for internal master/slave ops, not experiment setup
287        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
288
289        self.overrides = set([])
290        ovr = config.get('experiment_control', 'overrides')
291        if ovr:
292            for o in ovr.split(","):
293                o = o.strip()
294                if o.startswith('fedid:'): o = o[len('fedid:'):]
295                self.overrides.add(fedid(hexstr=o))
296
297        self.state = { }
298        self.state_lock = Lock()
299        self.tclsh = "/usr/local/bin/otclsh"
300        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
301                config.get("experiment_control", "tcl_splitter",
302                        "/usr/testbed/lib/ns2ir/parse.tcl")
303        mapdb_file = config.get("experiment_control", "mapdb")
304        self.trace_file = sys.stderr
305
306        self.def_expstart = \
307                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
308                "/tmp/federate";
309        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
310                "FEDDIR/hosts";
311        self.def_gwstart = \
312                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
313                "/tmp/bridge.log";
314        self.def_mgwstart = \
315                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
316                "/tmp/bridge.log";
317        self.def_gwimage = "FBSD61-TUNNEL2";
318        self.def_gwtype = "pc";
319        self.local_access = { }
320
321        if auth:
322            self.auth = auth
323        else:
324            self.log.error(\
325                    "[access]: No authorizer initialized, creating local one.")
326            auth = authorizer()
327
328
329        if self.ssh_pubkey_file:
330            try:
331                f = open(self.ssh_pubkey_file, 'r')
332                self.ssh_pubkey = f.read()
333                f.close()
334            except EnvironmentError:
335                raise service_error(service_error.internal,
336                        "Cannot read sshpubkey")
337        else:
338            raise service_error(service_error.internal, 
339                    "No SSH public key file?")
340
341        if not self.ssh_privkey_file:
342            raise service_error(service_error.internal, 
343                    "No SSH public key file?")
344
345
346        if mapdb_file:
347            self.read_mapdb(mapdb_file)
348        else:
349            self.log.warn("[experiment_control] No testbed map, using defaults")
350            self.tbmap = { 
351                    'deter':'https://users.isi.deterlab.net:23235',
352                    'emulab':'https://users.isi.deterlab.net:23236',
353                    'ucb':'https://users.isi.deterlab.net:23237',
354                    }
355
356        if accessdb_file:
357                self.read_accessdb(accessdb_file)
358        else:
359            raise service_error(service_error.internal,
360                    "No accessdb specified in config")
361
362        # Grab saved state.  OK to do this w/o locking because it's read only
363        # and only one thread should be in existence that can see self.state at
364        # this point.
365        if self.state_filename:
366            self.read_state()
367
368        if self.store_filename:
369            self.read_store()
370        else:
371            self.log.warning("No saved synch store")
372            self.synch_store = synch_store
373
374        # Dispatch tables
375        self.soap_services = {\
376                'New': soap_handler('New', self.new_experiment),
377                'Create': soap_handler('Create', self.create_experiment),
378                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
379                'Vis': soap_handler('Vis', self.get_vis),
380                'Info': soap_handler('Info', self.get_info),
381                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
382                'Terminate': soap_handler('Terminate', 
383                    self.terminate_experiment),
384                'GetValue': soap_handler('GetValue', self.GetValue),
385                'SetValue': soap_handler('SetValue', self.SetValue),
386        }
387
388        self.xmlrpc_services = {\
389                'New': xmlrpc_handler('New', self.new_experiment),
390                'Create': xmlrpc_handler('Create', self.create_experiment),
391                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
392                'Vis': xmlrpc_handler('Vis', self.get_vis),
393                'Info': xmlrpc_handler('Info', self.get_info),
394                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
395                'Terminate': xmlrpc_handler('Terminate',
396                    self.terminate_experiment),
397                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
398                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
399        }
400
401    # Call while holding self.state_lock
402    def write_state(self):
403        """
404        Write a new copy of experiment state after copying the existing state
405        to a backup.
406
407        State format is a simple pickling of the state dictionary.
408        """
409        if os.access(self.state_filename, os.W_OK):
410            copy_file(self.state_filename, \
411                    "%s.bak" % self.state_filename)
412        try:
413            f = open(self.state_filename, 'w')
414            pickle.dump(self.state, f)
415        except EnvironmentError, e:
416            self.log.error("Can't write file %s: %s" % \
417                    (self.state_filename, e))
418        except pickle.PicklingError, e:
419            self.log.error("Pickling problem: %s" % e)
420        except TypeError, e:
421            self.log.error("Pickling problem (TypeError): %s" % e)
422
423    @staticmethod
424    def get_alloc_ids(state):
425        """
426        Pull the fedids of the identifiers of each allocation from the
427        state.  Again, a dict dive that's best isolated.
428
429        Used by read_store and read state
430        """
431
432        return [ f['allocID']['fedid'] 
433                for f in state.get('federant',[]) \
434                    if f.has_key('allocID') and \
435                        f['allocID'].has_key('fedid')]
436
437    # Call while holding self.state_lock
438    def read_state(self):
439        """
440        Read a new copy of experiment state.  Old state is overwritten.
441
442        State format is a simple pickling of the state dictionary.
443        """
444       
445        def get_experiment_id(state):
446            """
447            Pull the fedid experimentID out of the saved state.  This is kind
448            of a gross walk through the dict.
449            """
450
451            if state.has_key('experimentID'):
452                for e in state['experimentID']:
453                    if e.has_key('fedid'):
454                        return e['fedid']
455                else:
456                    return None
457            else:
458                return None
459
460        try:
461            f = open(self.state_filename, "r")
462            self.state = pickle.load(f)
463            self.log.debug("[read_state]: Read state from %s" % \
464                    self.state_filename)
465        except EnvironmentError, e:
466            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
467                    % (self.state_filename, e))
468        except pickle.UnpicklingError, e:
469            self.log.warning(("[read_state]: No saved state: " + \
470                    "Unpickling failed: %s") % e)
471       
472        for s in self.state.values():
473            try:
474
475                eid = get_experiment_id(s)
476                if eid : 
477                    # Give the owner rights to the experiment
478                    self.auth.set_attribute(s['owner'], eid)
479                    # And holders of the eid as well
480                    self.auth.set_attribute(eid, eid)
481                    # allow overrides to control experiments as well
482                    for o in self.overrides:
483                        self.auth.set_attribute(o, eid)
484                    # Set permissions to allow reading of the software repo, if
485                    # any, as well.
486                    for a in self.get_alloc_ids(s):
487                        self.auth.set_attribute(a, 'repo/%s' % eid)
488                else:
489                    raise KeyError("No experiment id")
490            except KeyError, e:
491                self.log.warning("[read_state]: State ownership or identity " +\
492                        "misformatted in %s: %s" % (self.state_filename, e))
493
494
495    def read_accessdb(self, accessdb_file):
496        """
497        Read the mapping from fedids that can create experiments to their name
498        in the 3-level access namespace.  All will be asserted from this
499        testbed and can include the local username and porject that will be
500        asserted on their behalf by this fedd.  Each fedid is also added to the
501        authorization system with the "create" attribute.
502        """
503        self.accessdb = {}
504        # These are the regexps for parsing the db
505        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
506        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
507                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
508        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
509                "\s*->\s*(" + name_expr + ")\s*$")
510        lineno = 0
511
512        # Parse the mappings and store in self.authdb, a dict of
513        # fedid -> (proj, user)
514        try:
515            f = open(accessdb_file, "r")
516            for line in f:
517                lineno += 1
518                line = line.strip()
519                if len(line) == 0 or line.startswith('#'):
520                    continue
521                m = project_line.match(line)
522                if m:
523                    fid = fedid(hexstr=m.group(1))
524                    project, user = m.group(2,3)
525                    if not self.accessdb.has_key(fid):
526                        self.accessdb[fid] = []
527                    self.accessdb[fid].append((project, user))
528                    continue
529
530                m = user_line.match(line)
531                if m:
532                    fid = fedid(hexstr=m.group(1))
533                    project = None
534                    user = m.group(2)
535                    if not self.accessdb.has_key(fid):
536                        self.accessdb[fid] = []
537                    self.accessdb[fid].append((project, user))
538                    continue
539                self.log.warn("[experiment_control] Error parsing access " +\
540                        "db %s at line %d" %  (accessdb_file, lineno))
541        except EnvironmentError:
542            raise service_error(service_error.internal,
543                    ("Error opening/reading %s as experiment " +\
544                            "control accessdb") %  accessdb_file)
545        f.close()
546
547        # Initialize the authorization attributes
548        for fid in self.accessdb.keys():
549            self.auth.set_attribute(fid, 'create')
550            self.auth.set_attribute(fid, 'new')
551
552    def read_mapdb(self, file):
553        """
554        Read a simple colon separated list of mappings for the
555        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
556        """
557
558        self.tbmap = { }
559        lineno =0
560        try:
561            f = open(file, "r")
562            for line in f:
563                lineno += 1
564                line = line.strip()
565                if line.startswith('#') or len(line) == 0:
566                    continue
567                try:
568                    label, url = line.split(':', 1)
569                    self.tbmap[label] = url
570                except ValueError, e:
571                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
572                            "map db: %s %s" % (lineno, line, e))
573        except EnvironmentError, e:
574            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
575                    "open %s: %s" % (file, e))
576        f.close()
577
578    def read_store(self):
579        try:
580            self.synch_store = synch_store()
581            self.synch_store.load(self.store_filename)
582            self.log.debug("[read_store]: Read store from %s" % \
583                    self.store_filename)
584        except EnvironmentError, e:
585            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
586                    % (self.state_filename, e))
587            self.synch_store = synch_store()
588
589        # Set the initial permissions on data in the store.  XXX: This ad hoc
590        # authorization attribute initialization is getting out of hand.
591        for k in self.synch_store.all_keys():
592            try:
593                if k.startswith('fedid:'):
594                    fid = fedid(hexstr=k[6:46])
595                    if self.state.has_key(fid):
596                        for a in self.get_alloc_ids(self.state[fid]):
597                            self.auth.set_attribute(a, k)
598            except ValueError, e:
599                self.log.warn("Cannot deduce permissions for %s" % k)
600
601
602    def write_store(self):
603        """
604        Write a new copy of synch_store after writing current state
605        to a backup.  We use the internal synch_store pickle method to avoid
606        incinsistent data.
607
608        State format is a simple pickling of the store.
609        """
610        if os.access(self.store_filename, os.W_OK):
611            copy_file(self.store_filename, \
612                    "%s.bak" % self.store_filename)
613        try:
614            self.synch_store.save(self.store_filename)
615        except EnvironmentError, e:
616            self.log.error("Can't write file %s: %s" % \
617                    (self.store_filename, e))
618        except TypeError, e:
619            self.log.error("Pickling problem (TypeError): %s" % e)
620
621       
622    def generate_ssh_keys(self, dest, type="rsa" ):
623        """
624        Generate a set of keys for the gateways to use to talk.
625
626        Keys are of type type and are stored in the required dest file.
627        """
628        valid_types = ("rsa", "dsa")
629        t = type.lower();
630        if t not in valid_types: raise ValueError
631        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
632
633        try:
634            trace = open("/dev/null", "w")
635        except EnvironmentError:
636            raise service_error(service_error.internal,
637                    "Cannot open /dev/null??");
638
639        # May raise CalledProcessError
640        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
641        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
642        if rv != 0:
643            raise service_error(service_error.internal, 
644                    "Cannot generate nonce ssh keys.  %s return code %d" \
645                            % (self.ssh_keygen, rv))
646
647    def gentopo(self, str):
648        """
649        Generate the topology dtat structure from the splitter's XML
650        representation of it.
651
652        The topology XML looks like:
653            <experiment>
654                <nodes>
655                    <node><vname></vname><ips>ip1:ip2</ips></node>
656                </nodes>
657                <lans>
658                    <lan>
659                        <vname></vname><vnode></vnode><ip></ip>
660                        <bandwidth></bandwidth><member>node:port</member>
661                    </lan>
662                </lans>
663        """
664        class topo_parse:
665            """
666            Parse the topology XML and create the dats structure.
667            """
668            def __init__(self):
669                # Typing of the subelements for data conversion
670                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
671                self.int_subelements = ( 'bandwidth',)
672                self.float_subelements = ( 'delay',)
673                # The final data structure
674                self.nodes = [ ]
675                self.lans =  [ ]
676                self.topo = { \
677                        'node': self.nodes,\
678                        'lan' : self.lans,\
679                    }
680                self.element = { }  # Current element being created
681                self.chars = ""     # Last text seen
682
683            def end_element(self, name):
684                # After each sub element the contents is added to the current
685                # element or to the appropriate list.
686                if name == 'node':
687                    self.nodes.append(self.element)
688                    self.element = { }
689                elif name == 'lan':
690                    self.lans.append(self.element)
691                    self.element = { }
692                elif name in self.str_subelements:
693                    self.element[name] = self.chars
694                    self.chars = ""
695                elif name in self.int_subelements:
696                    self.element[name] = int(self.chars)
697                    self.chars = ""
698                elif name in self.float_subelements:
699                    self.element[name] = float(self.chars)
700                    self.chars = ""
701
702            def found_chars(self, data):
703                self.chars += data.rstrip()
704
705
706        tp = topo_parse();
707        parser = xml.parsers.expat.ParserCreate()
708        parser.EndElementHandler = tp.end_element
709        parser.CharacterDataHandler = tp.found_chars
710
711        parser.Parse(str)
712
713        return tp.topo
714       
715
716    def genviz(self, topo):
717        """
718        Generate the visualization the virtual topology
719        """
720
721        neato = "/usr/local/bin/neato"
722        # These are used to parse neato output and to create the visualization
723        # file.
724        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
725        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
726                "%s</type></node>"
727
728        try:
729            # Node names
730            nodes = [ n['vname'] for n in topo['node'] ]
731            topo_lans = topo['lan']
732        except KeyError, e:
733            raise service_error(service_error.internal, "Bad topology: %s" %e)
734
735        lans = { }
736        links = { }
737
738        # Walk through the virtual topology, organizing the connections into
739        # 2-node connections (links) and more-than-2-node connections (lans).
740        # When a lan is created, it's added to the list of nodes (there's a
741        # node in the visualization for the lan).
742        for l in topo_lans:
743            if links.has_key(l['vname']):
744                if len(links[l['vname']]) < 2:
745                    links[l['vname']].append(l['vnode'])
746                else:
747                    nodes.append(l['vname'])
748                    lans[l['vname']] = links[l['vname']]
749                    del links[l['vname']]
750                    lans[l['vname']].append(l['vnode'])
751            elif lans.has_key(l['vname']):
752                lans[l['vname']].append(l['vnode'])
753            else:
754                links[l['vname']] = [ l['vnode'] ]
755
756
757        # Open up a temporary file for dot to turn into a visualization
758        try:
759            df, dotname = tempfile.mkstemp()
760            dotfile = os.fdopen(df, 'w')
761        except EnvironmentError:
762            raise service_error(service_error.internal,
763                    "Failed to open file in genviz")
764
765        try:
766            dnull = open('/dev/null', 'w')
767        except EnvironmentError:
768            service_error(service_error.internal,
769                    "Failed to open /dev/null in genviz")
770
771        # Generate a dot/neato input file from the links, nodes and lans
772        try:
773            print >>dotfile, "graph G {"
774            for n in nodes:
775                print >>dotfile, '\t"%s"' % n
776            for l in links.keys():
777                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
778            for l in lans.keys():
779                for n in lans[l]:
780                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
781            print >>dotfile, "}"
782            dotfile.close()
783        except TypeError:
784            raise service_error(service_error.internal,
785                    "Single endpoint link in vtopo")
786        except EnvironmentError:
787            raise service_error(service_error.internal, "Cannot write dot file")
788
789        # Use dot to create a visualization
790        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
791                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
792                close_fds=True)
793        dnull.close()
794
795        # Translate dot to vis format
796        vis_nodes = [ ]
797        vis = { 'node': vis_nodes }
798        for line in dot.stdout:
799            m = vis_re.match(line)
800            if m:
801                vn = m.group(1)
802                vis_node = {'name': vn, \
803                        'x': float(m.group(2)),\
804                        'y' : float(m.group(3)),\
805                    }
806                if vn in links.keys() or vn in lans.keys():
807                    vis_node['type'] = 'lan'
808                else:
809                    vis_node['type'] = 'node'
810                vis_nodes.append(vis_node)
811        rv = dot.wait()
812
813        os.remove(dotname)
814        if rv == 0 : return vis
815        else: return None
816
817    def get_access(self, tb, nodes, tbparam, access_user, masters, tbmap):
818        """
819        Get access to testbed through fedd and set the parameters for that tb
820        """
821        def get_export_project(svcs):
822            """
823            Look through for the list of federated_service for this testbed
824            objects for a project_export service, and extract the project
825            parameter.
826            """
827
828            pe = [s for s in svcs if s.name=='project_export']
829            if len(pe) == 1:
830                return pe[0].params.get('project', None)
831            elif len(pe) == 0:
832                return None
833            else:
834                raise service_error(service_error.req,
835                        "More than one project export is not supported")
836
837        uri = tbmap.get(testbed_base(tb), None)
838        if not uri:
839            raise service_error(service_error.server_config, 
840                    "Unknown testbed: %s" % tb)
841
842        export_svcs = masters.get(tb,[])
843        import_svcs = [ s for m in masters.values() \
844                for s in m \
845                    if tb in s.importers ]
846
847        export_project = get_export_project(export_svcs)
848
849        # Tweak search order so that if there are entries in access_user that
850        # have a project matching the export project, we try them first
851        if export_project: 
852            access_sequence = [ (p, u) for p, u in access_user \
853                    if p == export_project] 
854            access_sequence.extend([(p, u) for p, u in access_user \
855                    if p != export_project]) 
856        else: 
857            access_sequence = access_user
858
859        for p, u in access_sequence: 
860            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
861                    "to %s") %  ((p or "None"), u, uri))
862
863            if p:
864                # Request with user and project specified
865                req = {\
866                        'destinationTestbed' : { 'uri' : uri },
867                        'credential': [ "project: %s" % p, "user: %s"  % u],
868                        'allocID' : { 'localname': 'test' },
869                    }
870            else:
871                # Request with only user specified
872                req = {\
873                        'destinationTestbed' : { 'uri' : uri },
874                        'credential': [ 'user: %s' % u ],
875                        'allocID' : { 'localname': 'test' },
876                    }
877
878            # Make the service request from the services we're importing and
879            # exporting.  Keep track of the export request ids so we can
880            # collect the resulting info from the access response.
881            e_keys = { }
882            if import_svcs or export_svcs:
883                req['service'] = [ ]
884
885                for i, s in enumerate(import_svcs):
886                    idx = 'import%d' % i
887                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
888                    if s.params:
889                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
890                                for k, v in s.params.items()]
891                    req['service'].append(sr)
892
893                for i, s in enumerate(export_svcs):
894                    idx = 'export%d' % i
895                    e_keys[idx] = s
896                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
897                    if s.params:
898                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
899                                for k, v in s.params.items()]
900                    req['service'].append(sr)
901
902            # node resources if any
903            if nodes != None and len(nodes) > 0:
904                rnodes = [ ]
905                for n in nodes:
906                    rn = { }
907                    image, hw, count = n.split(":")
908                    if image: rn['image'] = [ image ]
909                    if hw: rn['hardware'] = [ hw ]
910                    if count and int(count) >0 : rn['count'] = int(count)
911                    rnodes.append(rn)
912                req['resources']= { }
913                req['resources']['node'] = rnodes
914
915            try:
916                if self.local_access.has_key(uri):
917                    # Local access call
918                    req = { 'RequestAccessRequestBody' : req }
919                    r = self.local_access[uri].RequestAccess(req, 
920                            fedid(file=self.cert_file))
921                    r = { 'RequestAccessResponseBody' : r }
922                else:
923                    r = self.call_RequestAccess(uri, req, 
924                            self.cert_file, self.cert_pwd, self.trusted_certs)
925            except service_error, e:
926                if e.code == service_error.access:
927                    self.log.debug("[get_access] Access denied")
928                    r = None
929                    continue
930                else:
931                    raise e
932
933            if r.has_key('RequestAccessResponseBody'):
934                # Through to here we have a valid response, not a fault.
935                # Access denied is a fault, so something better or worse than
936                # access denied has happened.
937                r = r['RequestAccessResponseBody']
938                self.log.debug("[get_access] Access granted")
939                break
940            else:
941                raise service_error(service_error.protocol,
942                        "Bad proxy response")
943       
944        if not r:
945            raise service_error(service_error.access, 
946                    "Access denied by %s (%s)" % (tb, uri))
947
948        tbparam[tb] = { 
949                "allocID" : r['allocID'],
950                "uri": uri,
951                }
952
953        # Collect the responses corresponding to the services this testbed
954        # exports.  These will be the service requests that we will include in
955        # the start segment requests (with appropriate visibility values) to
956        # import and export the segments.
957        for s in r.get('service', []):
958            id = s.get('id', None)
959            if id and id in e_keys:
960                e_keys[id].reqs.append(s)
961
962        # Add attributes to parameter space.  We don't allow attributes to
963        # overlay any parameters already installed.
964        for a in r.get('fedAttr', []):
965            try:
966                if a['attribute'] and \
967                        isinstance(a['attribute'], basestring)\
968                        and not tbparam[tb].has_key(a['attribute'].lower()):
969                    tbparam[tb][a['attribute'].lower()] = a['value']
970            except KeyError:
971                self.log.error("Bad attribute in response: %s" % a)
972
973    def release_access(self, tb, aid, tbmap=None, uri=None):
974        """
975        Release access to testbed through fedd
976        """
977
978        if not uri and tbmap:
979            uri = tbmap.get(tb, None)
980        if not uri:
981            raise service_error(service_error.server_config, 
982                    "Unknown testbed: %s" % tb)
983
984        if self.local_access.has_key(uri):
985            resp = self.local_access[uri].ReleaseAccess(\
986                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
987                    fedid(file=self.cert_file))
988            resp = { 'ReleaseAccessResponseBody': resp } 
989        else:
990            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
991                    self.cert_file, self.cert_pwd, self.trusted_certs)
992
993        # better error coding
994
995    def remote_ns2topdl(self, uri, desc):
996
997        req = {
998                'description' : { 'ns2description': desc },
999            }
1000
1001        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
1002                self.trusted_certs)
1003
1004        if r.has_key('Ns2TopdlResponseBody'):
1005            r = r['Ns2TopdlResponseBody']
1006            ed = r.get('experimentdescription', None)
1007            if ed.has_key('topdldescription'):
1008                return topdl.Topology(**ed['topdldescription'])
1009            else:
1010                raise service_error(service_error.protocol, 
1011                        "Bad splitter response (no output)")
1012        else:
1013            raise service_error(service_error.protocol, "Bad splitter response")
1014
1015    class start_segment:
1016        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1017                cert_pwd=None, trusted_certs=None, caller=None,
1018                log_collector=None):
1019            self.log = log
1020            self.debug = debug
1021            self.cert_file = cert_file
1022            self.cert_pwd = cert_pwd
1023            self.trusted_certs = None
1024            self.caller = caller
1025            self.testbed = testbed
1026            self.log_collector = log_collector
1027            self.response = None
1028            self.node = { }
1029
1030        def make_map(self, resp):
1031            for e in resp.get('embedding', []):
1032                if 'toponame' in e and 'physname' in e:
1033                    self.node[e['toponame']] = e['physname'][0]
1034
1035        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1036            req = {
1037                    'allocID': { 'fedid' : aid }, 
1038                    'segmentdescription': { 
1039                        'topdldescription': topo.to_dict(),
1040                    },
1041                }
1042
1043            if connInfo:
1044                req['connection'] = connInfo
1045
1046            import_svcs = [ s for m in masters.values() \
1047                    for s in m if self.testbed in s.importers]
1048
1049            if import_svcs or self.testbed in masters:
1050                req['service'] = []
1051
1052            for s in import_svcs:
1053                for r in s.reqs:
1054                    sr = copy.deepcopy(r)
1055                    sr['visibility'] = 'import';
1056                    req['service'].append(sr)
1057
1058            for s in masters.get(self.testbed, []):
1059                for r in s.reqs:
1060                    sr = copy.deepcopy(r)
1061                    sr['visibility'] = 'export';
1062                    req['service'].append(sr)
1063
1064            if attrs:
1065                req['fedAttr'] = attrs
1066
1067            try:
1068                self.log.debug("Calling StartSegment at %s " % uri)
1069                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1070                        self.trusted_certs)
1071                if r.has_key('StartSegmentResponseBody'):
1072                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1073                            None)
1074                    if lval and self.log_collector:
1075                        for line in  lval.splitlines(True):
1076                            self.log_collector.write(line)
1077                    self.make_map(r['StartSegmentResponseBody'])
1078                    self.response = r
1079                else:
1080                    raise service_error(service_error.internal, 
1081                            "Bad response!?: %s" %r)
1082                return True
1083            except service_error, e:
1084                self.log.error("Start segment failed on %s: %s" % \
1085                        (self.testbed, e))
1086                return False
1087
1088
1089
1090    class terminate_segment:
1091        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1092                cert_pwd=None, trusted_certs=None, caller=None):
1093            self.log = log
1094            self.debug = debug
1095            self.cert_file = cert_file
1096            self.cert_pwd = cert_pwd
1097            self.trusted_certs = None
1098            self.caller = caller
1099            self.testbed = testbed
1100
1101        def __call__(self, uri, aid ):
1102            req = {
1103                    'allocID': aid , 
1104                }
1105            try:
1106                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1107                        self.trusted_certs)
1108                return True
1109            except service_error, e:
1110                self.log.error("Terminate segment failed on %s: %s" % \
1111                        (self.testbed, e))
1112                return False
1113   
1114
1115    def allocate_resources(self, allocated, masters, eid, expid, 
1116            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1117            attrs=None, connInfo={}, tbmap=None):
1118
1119        started = { }           # Testbeds where a sub-experiment started
1120                                # successfully
1121
1122        # XXX
1123        fail_soft = False
1124
1125        if tbmap is None: tbmap = { }
1126
1127        log = alloc_log or self.log
1128
1129        thread_pool = self.thread_pool(self.nthreads)
1130        threads = [ ]
1131        starters = [ ]
1132
1133        for tb in allocated.keys():
1134            # Create and start a thread to start the segment, and save it
1135            # to get the return value later
1136            tb_attrs = copy.copy(attrs)
1137            thread_pool.wait_for_slot()
1138            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
1139            base, suffix = split_testbed(tb)
1140            if suffix:
1141                tb_attrs.append({'attribute': 'experiment_name', 
1142                    'value': "%s-%s" % (eid, suffix)})
1143            else:
1144                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1145            if not uri:
1146                raise service_error(service_error.internal, 
1147                        "Unknown testbed %s !?" % tb)
1148
1149            if tbparams[tb].has_key('allocID') and \
1150                    tbparams[tb]['allocID'].has_key('fedid'):
1151                aid = tbparams[tb]['allocID']['fedid']
1152            else:
1153                raise service_error(service_error.internal, 
1154                        "No alloc id for testbed %s !?" % tb)
1155
1156            s = self.start_segment(log=log, debug=self.debug,
1157                    testbed=tb, cert_file=self.cert_file,
1158                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1159                    caller=self.call_StartSegment,
1160                    log_collector=log_collector)
1161            starters.append(s)
1162            t  = self.pooled_thread(\
1163                    target=s, name=tb,
1164                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1165                    pdata=thread_pool, trace_file=self.trace_file)
1166            threads.append(t)
1167            t.start()
1168
1169        # Wait until all finish (keep pinging the log, though)
1170        mins = 0
1171        revoked = False
1172        while not thread_pool.wait_for_all_done(60.0):
1173            mins += 1
1174            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1175                    % mins)
1176            if not revoked and \
1177                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1178                # a testbed has failed.  Revoke this experiment's
1179                # synchronizarion values so that sub experiments will not
1180                # deadlock waiting for synchronization that will never happen
1181                self.log.info("A subexperiment has failed to swap in, " + \
1182                        "revoking synch keys")
1183                var_key = "fedid:%s" % expid
1184                for k in self.synch_store.all_keys():
1185                    if len(k) > 45 and k[0:46] == var_key:
1186                        self.synch_store.revoke_key(k)
1187                revoked = True
1188
1189        failed = [ t.getName() for t in threads if not t.rv ]
1190        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1191
1192        # If one failed clean up, unless fail_soft is set
1193        if failed:
1194            if not fail_soft:
1195                thread_pool.clear()
1196                for tb in succeeded:
1197                    # Create and start a thread to stop the segment
1198                    thread_pool.wait_for_slot()
1199                    uri = tbparams[tb]['uri']
1200                    t  = self.pooled_thread(\
1201                            target=self.terminate_segment(log=log,
1202                                testbed=tb,
1203                                cert_file=self.cert_file, 
1204                                cert_pwd=self.cert_pwd,
1205                                trusted_certs=self.trusted_certs,
1206                                caller=self.call_TerminateSegment),
1207                            args=(uri, tbparams[tb]['federant']['allocID']),
1208                            name=tb,
1209                            pdata=thread_pool, trace_file=self.trace_file)
1210                    t.start()
1211                # Wait until all finish (if any are being stopped)
1212                if succeeded:
1213                    thread_pool.wait_for_all_done()
1214
1215                # release the allocations
1216                for tb in tbparams.keys():
1217                    self.release_access(tb, tbparams[tb]['allocID'], 
1218                            tbmap=tbmap, uri=tbparams[tb].get('uri', None))
1219                # Remove the placeholder
1220                self.state_lock.acquire()
1221                self.state[eid]['experimentStatus'] = 'failed'
1222                if self.state_filename: self.write_state()
1223                self.state_lock.release()
1224                # Remove the repo dir
1225                self.remove_dirs("%s/%s" %(self.repodir, expid))
1226                # Walk up tmpdir, deleting as we go
1227                if self.cleanup:
1228                    self.remove_dirs(tmpdir)
1229                else:
1230                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1231
1232
1233                log.error("Swap in failed on %s" % ",".join(failed))
1234                return
1235        else:
1236            # Walk through the successes and gather the virtual to physical
1237            # mapping.
1238            embedding = [ ]
1239            for s in starters:
1240                for k, v in s.node.items():
1241                    embedding.append({
1242                        'toponame': k, 
1243                        'physname': [ v],
1244                        'testbed': s.testbed
1245                        })
1246            log.info("[start_segment]: Experiment %s active" % eid)
1247
1248
1249        # Walk up tmpdir, deleting as we go
1250        if self.cleanup:
1251            self.remove_dirs(tmpdir)
1252        else:
1253            log.debug("[start_experiment]: not removing %s" % tmpdir)
1254
1255        # Insert the experiment into our state and update the disk copy.
1256        self.state_lock.acquire()
1257        self.state[expid]['experimentStatus'] = 'active'
1258        self.state[eid] = self.state[expid]
1259        self.state[eid]['experimentdescription']['topdldescription'] = \
1260                top.to_dict()
1261        self.state[eid]['embedding'] = embedding
1262        if self.state_filename: self.write_state()
1263        self.state_lock.release()
1264        return
1265
1266
1267    def add_kit(self, e, kit):
1268        """
1269        Add a Software object created from the list of (install, location)
1270        tuples passed as kit  to the software attribute of an object e.  We
1271        do this enough to break out the code, but it's kind of a hack to
1272        avoid changing the old tuple rep.
1273        """
1274
1275        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1276
1277        if isinstance(e.software, list): e.software.extend(s)
1278        else: e.software = s
1279
1280
1281    def create_experiment_state(self, fid, req, expid, expcert,
1282            state='starting'):
1283        """
1284        Create the initial entry in the experiment's state.  The expid and
1285        expcert are the experiment's fedid and certifacte that represents that
1286        ID, which are installed in the experiment state.  If the request
1287        includes a suggested local name that is used if possible.  If the local
1288        name is already taken by an experiment owned by this user that has
1289        failed, it is overwritten.  Otherwise new letters are added until a
1290        valid localname is found.  The generated local name is returned.
1291        """
1292
1293        if req.has_key('experimentID') and \
1294                req['experimentID'].has_key('localname'):
1295            overwrite = False
1296            eid = req['experimentID']['localname']
1297            # If there's an old failed experiment here with the same local name
1298            # and accessible by this user, we'll overwrite it, otherwise we'll
1299            # fall through and do the collision avoidance.
1300            old_expid = self.get_experiment_fedid(eid)
1301            if old_expid and self.check_experiment_access(fid, old_expid):
1302                self.state_lock.acquire()
1303                status = self.state[eid].get('experimentStatus', None)
1304                if status and status == 'failed':
1305                    # remove the old access attribute
1306                    self.auth.unset_attribute(fid, old_expid)
1307                    overwrite = True
1308                    del self.state[eid]
1309                    del self.state[old_expid]
1310                self.state_lock.release()
1311            self.state_lock.acquire()
1312            while (self.state.has_key(eid) and not overwrite):
1313                eid += random.choice(string.ascii_letters)
1314            # Initial state
1315            self.state[eid] = {
1316                    'experimentID' : \
1317                            [ { 'localname' : eid }, {'fedid': expid } ],
1318                    'experimentStatus': state,
1319                    'experimentAccess': { 'X509' : expcert },
1320                    'owner': fid,
1321                    'log' : [],
1322                }
1323            self.state[expid] = self.state[eid]
1324            if self.state_filename: self.write_state()
1325            self.state_lock.release()
1326        else:
1327            eid = self.exp_stem
1328            for i in range(0,5):
1329                eid += random.choice(string.ascii_letters)
1330            self.state_lock.acquire()
1331            while (self.state.has_key(eid)):
1332                eid = self.exp_stem
1333                for i in range(0,5):
1334                    eid += random.choice(string.ascii_letters)
1335            # Initial state
1336            self.state[eid] = {
1337                    'experimentID' : \
1338                            [ { 'localname' : eid }, {'fedid': expid } ],
1339                    'experimentStatus': state,
1340                    'experimentAccess': { 'X509' : expcert },
1341                    'owner': fid,
1342                    'log' : [],
1343                }
1344            self.state[expid] = self.state[eid]
1345            if self.state_filename: self.write_state()
1346            self.state_lock.release()
1347
1348        return eid
1349
1350
1351    def allocate_ips_to_topo(self, top):
1352        """
1353        Add an ip4_address attribute to all the hosts in the topology, based on
1354        the shared substrates on which they sit.  An /etc/hosts file is also
1355        created and returned as a list of hostfiles entries.  We also return
1356        the allocator, because we may need to allocate IPs to portals
1357        (specifically DRAGON portals).
1358        """
1359        subs = sorted(top.substrates, 
1360                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1361                reverse=True)
1362        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1363        ifs = { }
1364        hosts = [ ]
1365
1366        for idx, s in enumerate(subs):
1367            net_size = len(s.interfaces)+2
1368
1369            a = ips.allocate(net_size)
1370            if a :
1371                base, num = a
1372                if num < net_size: 
1373                    raise service_error(service_error.internal,
1374                            "Allocator returned wrong number of IPs??")
1375            else:
1376                raise service_error(service_error.req, 
1377                        "Cannot allocate IP addresses")
1378            mask = ips.min_alloc
1379            while mask < net_size:
1380                mask *= 2
1381
1382            netmask = ((2**32-1) ^ (mask-1))
1383
1384            base += 1
1385            for i in s.interfaces:
1386                i.attribute.append(
1387                        topdl.Attribute('ip4_address', 
1388                            "%s" % ip_addr(base)))
1389                i.attribute.append(
1390                        topdl.Attribute('ip4_netmask', 
1391                            "%s" % ip_addr(int(netmask))))
1392
1393                hname = i.element.name
1394                if ifs.has_key(hname):
1395                    hosts.append("%s\t%s-%s %s-%d" % \
1396                            (ip_addr(base), hname, s.name, hname,
1397                                ifs[hname]))
1398                else:
1399                    ifs[hname] = 0
1400                    hosts.append("%s\t%s-%s %s-%d %s" % \
1401                            (ip_addr(base), hname, s.name, hname,
1402                                ifs[hname], hname))
1403
1404                ifs[hname] += 1
1405                base += 1
1406        return hosts, ips
1407
1408    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1409            tbparams, masters, tbmap):
1410        """
1411        Request access to the various testbeds required for this instantiation
1412        (passed in as testbeds).  User, access_user, expoert_project and master
1413        are used to construct the correct requests.  Per-testbed parameters are
1414        returned in tbparams.
1415        """
1416        for tb in testbeds:
1417            self.get_access(tb, None, tbparams, access_user, masters, tbmap)
1418            allocated[tb] = 1
1419
1420    def split_topology(self, top, topo, testbeds):
1421        """
1422        Create the sub-topologies that are needed for experiment instantiation.
1423        """
1424        for tb in testbeds:
1425            topo[tb] = top.clone()
1426            # copy in for loop allows deletions from the original
1427            for e in [ e for e in topo[tb].elements]:
1428                etb = e.get_attribute('testbed')
1429                # NB: elements without a testbed attribute won't appear in any
1430                # sub topologies. 
1431                if not etb or etb != tb:
1432                    for i in e.interface:
1433                        for s in i.subs:
1434                            try:
1435                                s.interfaces.remove(i)
1436                            except ValueError:
1437                                raise service_error(service_error.internal,
1438                                        "Can't remove interface??")
1439                    topo[tb].elements.remove(e)
1440            topo[tb].make_indices()
1441
1442    def wrangle_software(self, expid, top, topo, tbparams):
1443        """
1444        Copy software out to the repository directory, allocate permissions and
1445        rewrite the segment topologies to look for the software in local
1446        places.
1447        """
1448
1449        # Copy the rpms and tarfiles to a distribution directory from
1450        # which the federants can retrieve them
1451        linkpath = "%s/software" %  expid
1452        softdir ="%s/%s" % ( self.repodir, linkpath)
1453        softmap = { }
1454        # These are in a list of tuples format (each kit).  This comprehension
1455        # unwraps them into a single list of tuples that initilaizes the set of
1456        # tuples.
1457        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1458                for p, t in l ])
1459        pkgs.update([x.location for e in top.elements \
1460                for x in e.software])
1461        try:
1462            os.makedirs(softdir)
1463        except EnvironmentError, e:
1464            raise service_error(
1465                    "Cannot create software directory: %s" % e)
1466        # The actual copying.  Everything's converted into a url for copying.
1467        for pkg in pkgs:
1468            loc = pkg
1469
1470            scheme, host, path = urlparse(loc)[0:3]
1471            dest = os.path.basename(path)
1472            if not scheme:
1473                if not loc.startswith('/'):
1474                    loc = "/%s" % loc
1475                loc = "file://%s" %loc
1476            try:
1477                u = urlopen(loc)
1478            except Exception, e:
1479                raise service_error(service_error.req, 
1480                        "Cannot open %s: %s" % (loc, e))
1481            try:
1482                f = open("%s/%s" % (softdir, dest) , "w")
1483                self.log.debug("Writing %s/%s" % (softdir,dest) )
1484                data = u.read(4096)
1485                while data:
1486                    f.write(data)
1487                    data = u.read(4096)
1488                f.close()
1489                u.close()
1490            except Exception, e:
1491                raise service_error(service_error.internal,
1492                        "Could not copy %s: %s" % (loc, e))
1493            path = re.sub("/tmp", "", linkpath)
1494            # XXX
1495            softmap[pkg] = \
1496                    "%s/%s/%s" %\
1497                    ( self.repo_url, path, dest)
1498
1499            # Allow the individual segments to access the software.
1500            for tb in tbparams.keys():
1501                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1502                        "/%s/%s" % ( path, dest))
1503
1504        # Convert the software locations in the segments into the local
1505        # copies on this host
1506        for soft in [ s for tb in topo.values() \
1507                for e in tb.elements \
1508                    if getattr(e, 'software', False) \
1509                        for s in e.software ]:
1510            if softmap.has_key(soft.location):
1511                soft.location = softmap[soft.location]
1512
1513
1514    def new_experiment(self, req, fid):
1515        """
1516        The external interface to empty initial experiment creation called from
1517        the dispatcher.
1518
1519        Creates a working directory, splits the incoming description using the
1520        splitter script and parses out the avrious subsections using the
1521        lcasses above.  Once each sub-experiment is created, use pooled threads
1522        to instantiate them and start it all up.
1523        """
1524        if not self.auth.check_attribute(fid, 'new'):
1525            raise service_error(service_error.access, "New access denied")
1526
1527        try:
1528            tmpdir = tempfile.mkdtemp(prefix="split-")
1529        except EnvironmentError:
1530            raise service_error(service_error.internal, "Cannot create tmp dir")
1531
1532        try:
1533            access_user = self.accessdb[fid]
1534        except KeyError:
1535            raise service_error(service_error.internal,
1536                    "Access map and authorizer out of sync in " + \
1537                            "new_experiment for fedid %s"  % fid)
1538
1539        pid = "dummy"
1540        gid = "dummy"
1541
1542        req = req.get('NewRequestBody', None)
1543        if not req:
1544            raise service_error(service_error.req,
1545                    "Bad request format (no NewRequestBody)")
1546
1547        # Generate an ID for the experiment (slice) and a certificate that the
1548        # allocator can use to prove they own it.  We'll ship it back through
1549        # the encrypted connection.
1550        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1551
1552        #now we're done with the tmpdir, and it should be empty
1553        if self.cleanup:
1554            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1555            os.rmdir(tmpdir)
1556        else:
1557            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1558
1559        eid = self.create_experiment_state(fid, req, expid, expcert, 
1560                state='empty')
1561
1562        # Let users touch the state
1563        self.auth.set_attribute(fid, expid)
1564        self.auth.set_attribute(expid, expid)
1565        # Override fedids can manipulate state as well
1566        for o in self.overrides:
1567            self.auth.set_attribute(o, expid)
1568
1569        rv = {
1570                'experimentID': [
1571                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1572                ],
1573                'experimentStatus': 'empty',
1574                'experimentAccess': { 'X509' : expcert }
1575            }
1576
1577        return rv
1578
1579    def create_experiment(self, req, fid):
1580        """
1581        The external interface to experiment creation called from the
1582        dispatcher.
1583
1584        Creates a working directory, splits the incoming description using the
1585        splitter script and parses out the various subsections using the
1586        classes above.  Once each sub-experiment is created, use pooled threads
1587        to instantiate them and start it all up.
1588        """
1589
1590        req = req.get('CreateRequestBody', None)
1591        if not req:
1592            raise service_error(service_error.req,
1593                    "Bad request format (no CreateRequestBody)")
1594
1595        # Get the experiment access
1596        exp = req.get('experimentID', None)
1597        if exp:
1598            if exp.has_key('fedid'):
1599                key = exp['fedid']
1600                expid = key
1601                eid = None
1602            elif exp.has_key('localname'):
1603                key = exp['localname']
1604                eid = key
1605                expid = None
1606            else:
1607                raise service_error(service_error.req, "Unknown lookup type")
1608        else:
1609            raise service_error(service_error.req, "No request?")
1610
1611        self.check_experiment_access(fid, key)
1612
1613        # Install the testbed map entries supplied with the request into a copy
1614        # of the testbed map.
1615        tbmap = dict(self.tbmap)
1616        for m in req.get('testbedmap', []):
1617            if 'testbed' in m and 'uri' in m:
1618                tbmap[m['testbed']] = m['uri']
1619
1620        try:
1621            tmpdir = tempfile.mkdtemp(prefix="split-")
1622            os.mkdir(tmpdir+"/keys")
1623        except EnvironmentError:
1624            raise service_error(service_error.internal, "Cannot create tmp dir")
1625
1626        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1627        gw_secretkey_base = "fed.%s" % self.ssh_type
1628        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1629        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1630        tclfile = tmpdir + "/experiment.tcl"
1631        tbparams = { }
1632        try:
1633            access_user = self.accessdb[fid]
1634        except KeyError:
1635            raise service_error(service_error.internal,
1636                    "Access map and authorizer out of sync in " + \
1637                            "create_experiment for fedid %s"  % fid)
1638
1639        pid = "dummy"
1640        gid = "dummy"
1641
1642        # The tcl parser needs to read a file so put the content into that file
1643        descr=req.get('experimentdescription', None)
1644        if descr:
1645            file_content=descr.get('ns2description', None)
1646            if file_content:
1647                try:
1648                    f = open(tclfile, 'w')
1649                    f.write(file_content)
1650                    f.close()
1651                except EnvironmentError:
1652                    raise service_error(service_error.internal,
1653                            "Cannot write temp experiment description")
1654            else:
1655                raise service_error(service_error.req, 
1656                        "Only ns2descriptions supported")
1657        else:
1658            raise service_error(service_error.req, "No experiment description")
1659
1660        self.state_lock.acquire()
1661        if self.state.has_key(key):
1662            self.state[key]['experimentStatus'] = "starting"
1663            for e in self.state[key].get('experimentID',[]):
1664                if not expid and e.has_key('fedid'):
1665                    expid = e['fedid']
1666                elif not eid and e.has_key('localname'):
1667                    eid = e['localname']
1668        self.state_lock.release()
1669
1670        if not (eid and expid):
1671            raise service_error(service_error.internal, 
1672                    "Cannot find local experiment info!?")
1673
1674        try: 
1675            # This catches exceptions to clear the placeholder if necessary
1676            try:
1677                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1678            except ValueError:
1679                raise service_error(service_error.server_config, 
1680                        "Bad key type (%s)" % self.ssh_type)
1681
1682            # Copy the service request
1683            tb_services = [ s for s in req.get('service',[]) ]
1684            # Translate to topdl
1685            if self.splitter_url:
1686                self.log.debug("Calling remote topdl translator at %s" % \
1687                        self.splitter_url)
1688                top = self.remote_ns2topdl(self.splitter_url, file_content)
1689            else:
1690                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1691                    str(self.muxmax), '-m', 'dummy']
1692
1693                tclcmd.extend([pid, gid, eid, tclfile])
1694
1695                self.log.debug("running local splitter %s", " ".join(tclcmd))
1696                # This is just fantastic.  As a side effect the parser copies
1697                # tb_compat.tcl into the current directory, so that directory
1698                # must be writable by the fedd user.  Doing this in the
1699                # temporary subdir ensures this is the case.
1700                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1701                        cwd=tmpdir)
1702                split_data = tclparser.stdout
1703
1704                top = topdl.topology_from_xml(file=split_data, top="experiment")
1705
1706            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1707            # Find the testbeds to look up
1708            testbeds = set([ a.value for e in top.elements \
1709                    for a in e.attribute \
1710                        if a.attribute == 'testbed'])
1711
1712            tb_hosts = { }
1713            for tb in testbeds:
1714                tb_hosts[tb] = [ e.name for e in top.elements \
1715                        if isinstance(e, topdl.Computer) and \
1716                            e.get_attribute('testbed') and \
1717                            e.get_attribute('testbed') == tb]
1718
1719            masters = { }           # testbeds exporting services
1720            pmasters = { }          # Testbeds exporting services that
1721                                    # need portals
1722            for s in tb_services:
1723                # If this is a service request with the importall field
1724                # set, fill it out.
1725
1726                if s.get('importall', False):
1727                    s['import'] = [ tb for tb in testbeds \
1728                            if tb not in s.get('export',[])]
1729                    del s['importall']
1730
1731                # Add the service to masters
1732                for tb in s.get('export', []):
1733                    if s.get('name', None):
1734                        if tb not in masters:
1735                            masters[tb] = [ ]
1736
1737                        params = { }
1738                        if 'fedAttr' in s:
1739                            for a in s['fedAttr']:
1740                                params[a.get('attribute', '')] = \
1741                                        a.get('value','')
1742
1743                        fser = federated_service(name=s['name'],
1744                                exporter=tb, importers=s.get('import',[]),
1745                                params=params)
1746                        if fser.name == 'hide_hosts' \
1747                                and 'hosts' not in fser.params:
1748                            fser.params['hosts'] = \
1749                                    ",".join(tb_hosts.get(fser.exporter, []))
1750                        masters[tb].append(fser)
1751
1752                        if fser.portal:
1753                            if tb not in pmasters: pmasters[tb] = [ fser ]
1754                            else: pmasters[tb].append(fser)
1755                    else:
1756                        self.log.error('Testbed service does not have name " + \
1757                                "and importers')
1758
1759
1760            allocated = { }         # Testbeds we can access
1761            topo ={ }               # Sub topologies
1762            connInfo = { }          # Connection information
1763
1764            self.get_access_to_testbeds(testbeds, access_user, allocated, 
1765                    tbparams, masters, tbmap)
1766
1767            self.split_topology(top, topo, testbeds)
1768
1769            # Copy configuration files into the remote file store
1770            # The config urlpath
1771            configpath = "/%s/config" % expid
1772            # The config file system location
1773            configdir ="%s%s" % ( self.repodir, configpath)
1774            try:
1775                os.makedirs(configdir)
1776            except EnvironmentError, e:
1777                raise service_error(service_error.internal,
1778                        "Cannot create config directory: %s" % e)
1779            try:
1780                f = open("%s/hosts" % configdir, "w")
1781                f.write('\n'.join(hosts))
1782                f.close()
1783            except EnvironmentError, e:
1784                raise service_error(service_error.internal, 
1785                        "Cannot write hosts file: %s" % e)
1786            try:
1787                copy_file("%s" % gw_pubkey, "%s/%s" % \
1788                        (configdir, gw_pubkey_base))
1789                copy_file("%s" % gw_secretkey, "%s/%s" % \
1790                        (configdir, gw_secretkey_base))
1791            except EnvironmentError, e:
1792                raise service_error(service_error.internal, 
1793                        "Cannot copy keyfiles: %s" % e)
1794
1795            # Allow the individual testbeds to access the configuration files.
1796            for tb in tbparams.keys():
1797                asignee = tbparams[tb]['allocID']['fedid']
1798                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1799                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1800
1801            part = experiment_partition(self.auth, self.store_url, tbmap,
1802                    self.muxmax, self.direct_transit)
1803            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1804                    connInfo, expid)
1805            # Now get access to the dynamic testbeds (those added above)
1806            for tb in [ t for t in topo if t not in allocated]:
1807                self.get_access(tb, None, tbparams, access_user, masters)
1808                allocated[tb] = 1
1809                store_keys = topo[tb].get_attribute('store_keys')
1810                # Give the testbed access to keys it exports or imports
1811                if store_keys:
1812                    for sk in store_keys.split(" "):
1813                        self.auth.set_attribute(\
1814                                tbparams[tb]['allocID']['fedid'], sk)
1815
1816            self.wrangle_software(expid, top, topo, tbparams)
1817
1818            vtopo = topdl.topology_to_vtopo(top)
1819            vis = self.genviz(vtopo)
1820
1821            # save federant information
1822            for k in allocated.keys():
1823                tbparams[k]['federant'] = {
1824                        'name': [ { 'localname' : eid} ],
1825                        'allocID' : tbparams[k]['allocID'],
1826                        'uri': tbparams[k]['uri'],
1827                    }
1828
1829            self.state_lock.acquire()
1830            self.state[eid]['vtopo'] = vtopo
1831            self.state[eid]['vis'] = vis
1832            self.state[eid]['experimentdescription'] = \
1833                    { 'topdldescription': top.to_dict() }
1834            self.state[eid]['federant'] = \
1835                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1836                        if tbparams[tb].has_key('federant') ]
1837            if self.state_filename: 
1838                self.write_state()
1839            self.state_lock.release()
1840        except service_error, e:
1841            # If something goes wrong in the parse (usually an access error)
1842            # clear the placeholder state.  From here on out the code delays
1843            # exceptions.  Failing at this point returns a fault to the remote
1844            # caller.
1845
1846            self.state_lock.acquire()
1847            del self.state[eid]
1848            del self.state[expid]
1849            if self.state_filename: self.write_state()
1850            self.state_lock.release()
1851            raise e
1852
1853
1854        # Start the background swapper and return the starting state.  From
1855        # here on out, the state will stick around a while.
1856
1857        # Let users touch the state
1858        self.auth.set_attribute(fid, expid)
1859        self.auth.set_attribute(expid, expid)
1860        # Override fedids can manipulate state as well
1861        for o in self.overrides:
1862            self.auth.set_attribute(o, expid)
1863
1864        # Create a logger that logs to the experiment's state object as well as
1865        # to the main log file.
1866        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1867        alloc_collector = self.list_log(self.state[eid]['log'])
1868        h = logging.StreamHandler(alloc_collector)
1869        # XXX: there should be a global one of these rather than repeating the
1870        # code.
1871        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1872                    '%d %b %y %H:%M:%S'))
1873        alloc_log.addHandler(h)
1874       
1875        attrs = [ 
1876                {
1877                    'attribute': 'ssh_pubkey', 
1878                    'value': '%s/%s/config/%s' % \
1879                            (self.repo_url, expid, gw_pubkey_base)
1880                },
1881                {
1882                    'attribute': 'ssh_secretkey', 
1883                    'value': '%s/%s/config/%s' % \
1884                            (self.repo_url, expid, gw_secretkey_base)
1885                },
1886                {
1887                    'attribute': 'hosts', 
1888                    'value': '%s/%s/config/hosts' % \
1889                            (self.repo_url, expid)
1890                },
1891            ]
1892
1893        # transit and disconnected testbeds may not have a connInfo entry.
1894        # Fill in the blanks.
1895        for t in allocated.keys():
1896            if not connInfo.has_key(t):
1897                connInfo[t] = { }
1898
1899        # Start a thread to do the resource allocation
1900        t  = Thread(target=self.allocate_resources,
1901                args=(allocated, masters, eid, expid, tbparams, 
1902                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1903                    connInfo, tbmap),
1904                name=eid)
1905        t.start()
1906
1907        rv = {
1908                'experimentID': [
1909                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1910                ],
1911                'experimentStatus': 'starting',
1912            }
1913
1914        return rv
1915   
1916    def get_experiment_fedid(self, key):
1917        """
1918        find the fedid associated with the localname key in the state database.
1919        """
1920
1921        rv = None
1922        self.state_lock.acquire()
1923        if self.state.has_key(key):
1924            if isinstance(self.state[key], dict):
1925                try:
1926                    kl = [ f['fedid'] for f in \
1927                            self.state[key]['experimentID']\
1928                                if f.has_key('fedid') ]
1929                except KeyError:
1930                    self.state_lock.release()
1931                    raise service_error(service_error.internal, 
1932                            "No fedid for experiment %s when getting "+\
1933                                    "fedid(!?)" % key)
1934                if len(kl) == 1:
1935                    rv = kl[0]
1936                else:
1937                    self.state_lock.release()
1938                    raise service_error(service_error.internal, 
1939                            "multiple fedids for experiment %s when " +\
1940                                    "getting fedid(!?)" % key)
1941            else:
1942                self.state_lock.release()
1943                raise service_error(service_error.internal, 
1944                        "Unexpected state for %s" % key)
1945        self.state_lock.release()
1946        return rv
1947
1948    def check_experiment_access(self, fid, key):
1949        """
1950        Confirm that the fid has access to the experiment.  Though a request
1951        may be made in terms of a local name, the access attribute is always
1952        the experiment's fedid.
1953        """
1954        if not isinstance(key, fedid):
1955            key = self.get_experiment_fedid(key)
1956
1957        if self.auth.check_attribute(fid, key):
1958            return True
1959        else:
1960            raise service_error(service_error.access, "Access Denied")
1961
1962
1963    def get_handler(self, path, fid):
1964        self.log.info("Get handler %s %s" % (path, fid))
1965        if self.auth.check_attribute(fid, path):
1966            return ("%s/%s" % (self.repodir, path), "application/binary")
1967        else:
1968            return (None, None)
1969
1970    def get_vtopo(self, req, fid):
1971        """
1972        Return the stored virtual topology for this experiment
1973        """
1974        rv = None
1975        state = None
1976
1977        req = req.get('VtopoRequestBody', None)
1978        if not req:
1979            raise service_error(service_error.req,
1980                    "Bad request format (no VtopoRequestBody)")
1981        exp = req.get('experiment', None)
1982        if exp:
1983            if exp.has_key('fedid'):
1984                key = exp['fedid']
1985                keytype = "fedid"
1986            elif exp.has_key('localname'):
1987                key = exp['localname']
1988                keytype = "localname"
1989            else:
1990                raise service_error(service_error.req, "Unknown lookup type")
1991        else:
1992            raise service_error(service_error.req, "No request?")
1993
1994        self.check_experiment_access(fid, key)
1995
1996        self.state_lock.acquire()
1997        if self.state.has_key(key):
1998            if self.state[key].has_key('vtopo'):
1999                rv = { 'experiment' : {keytype: key },\
2000                        'vtopo': self.state[key]['vtopo'],\
2001                    }
2002            else:
2003                state = self.state[key]['experimentStatus']
2004        self.state_lock.release()
2005
2006        if rv: return rv
2007        else: 
2008            if state:
2009                raise service_error(service_error.partial, 
2010                        "Not ready: %s" % state)
2011            else:
2012                raise service_error(service_error.req, "No such experiment")
2013
2014    def get_vis(self, req, fid):
2015        """
2016        Return the stored visualization for this experiment
2017        """
2018        rv = None
2019        state = None
2020
2021        req = req.get('VisRequestBody', None)
2022        if not req:
2023            raise service_error(service_error.req,
2024                    "Bad request format (no VisRequestBody)")
2025        exp = req.get('experiment', None)
2026        if exp:
2027            if exp.has_key('fedid'):
2028                key = exp['fedid']
2029                keytype = "fedid"
2030            elif exp.has_key('localname'):
2031                key = exp['localname']
2032                keytype = "localname"
2033            else:
2034                raise service_error(service_error.req, "Unknown lookup type")
2035        else:
2036            raise service_error(service_error.req, "No request?")
2037
2038        self.check_experiment_access(fid, key)
2039
2040        self.state_lock.acquire()
2041        if self.state.has_key(key):
2042            if self.state[key].has_key('vis'):
2043                rv =  { 'experiment' : {keytype: key },\
2044                        'vis': self.state[key]['vis'],\
2045                        }
2046            else:
2047                state = self.state[key]['experimentStatus']
2048        self.state_lock.release()
2049
2050        if rv: return rv
2051        else:
2052            if state:
2053                raise service_error(service_error.partial, 
2054                        "Not ready: %s" % state)
2055            else:
2056                raise service_error(service_error.req, "No such experiment")
2057
2058    def clean_info_response(self, rv):
2059        """
2060        Remove the information in the experiment's state object that is not in
2061        the info response.
2062        """
2063        # Remove the owner info (should always be there, but...)
2064        if rv.has_key('owner'): del rv['owner']
2065
2066        # Convert the log into the allocationLog parameter and remove the
2067        # log entry (with defensive programming)
2068        if rv.has_key('log'):
2069            rv['allocationLog'] = "".join(rv['log'])
2070            del rv['log']
2071        else:
2072            rv['allocationLog'] = ""
2073
2074        if rv['experimentStatus'] != 'active':
2075            if rv.has_key('federant'): del rv['federant']
2076        else:
2077            # remove the allocationID and uri info from each federant
2078            for f in rv.get('federant', []):
2079                if f.has_key('allocID'): del f['allocID']
2080                if f.has_key('uri'): del f['uri']
2081
2082        return rv
2083
2084    def get_info(self, req, fid):
2085        """
2086        Return all the stored info about this experiment
2087        """
2088        rv = None
2089
2090        req = req.get('InfoRequestBody', None)
2091        if not req:
2092            raise service_error(service_error.req,
2093                    "Bad request format (no InfoRequestBody)")
2094        exp = req.get('experiment', None)
2095        if exp:
2096            if exp.has_key('fedid'):
2097                key = exp['fedid']
2098                keytype = "fedid"
2099            elif exp.has_key('localname'):
2100                key = exp['localname']
2101                keytype = "localname"
2102            else:
2103                raise service_error(service_error.req, "Unknown lookup type")
2104        else:
2105            raise service_error(service_error.req, "No request?")
2106
2107        self.check_experiment_access(fid, key)
2108
2109        # The state may be massaged by the service function that called
2110        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2111        # state.
2112        self.state_lock.acquire()
2113        if self.state.has_key(key):
2114            rv = copy.deepcopy(self.state[key])
2115        self.state_lock.release()
2116
2117        if rv:
2118            return self.clean_info_response(rv)
2119        else:
2120            raise service_error(service_error.req, "No such experiment")
2121
2122    def get_multi_info(self, req, fid):
2123        """
2124        Return all the stored info that this fedid can access
2125        """
2126        rv = { 'info': [ ] }
2127
2128        self.state_lock.acquire()
2129        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2130            try:
2131                self.check_experiment_access(fid, key)
2132            except service_error, e:
2133                if e.code == service_error.access:
2134                    continue
2135                else:
2136                    self.state_lock.release()
2137                    raise e
2138
2139            if self.state.has_key(key):
2140                e = copy.deepcopy(self.state[key])
2141                e = self.clean_info_response(e)
2142                rv['info'].append(e)
2143        self.state_lock.release()
2144        return rv
2145
2146    def remove_dirs(self, dir):
2147        """
2148        Remove the directory tree and all files rooted at dir.  Log any errors,
2149        but continue.
2150        """
2151        self.log.debug("[removedirs]: removing %s" % dir)
2152        try:
2153            for path, dirs, files in os.walk(dir, topdown=False):
2154                for f in files:
2155                    os.remove(os.path.join(path, f))
2156                for d in dirs:
2157                    os.rmdir(os.path.join(path, d))
2158            os.rmdir(dir)
2159        except EnvironmentError, e:
2160            self.log.error("Error deleting directory tree in %s" % e);
2161
2162    def terminate_experiment(self, req, fid):
2163        """
2164        Swap this experiment out on the federants and delete the shared
2165        information
2166        """
2167        tbparams = { }
2168        req = req.get('TerminateRequestBody', None)
2169        if not req:
2170            raise service_error(service_error.req,
2171                    "Bad request format (no TerminateRequestBody)")
2172        force = req.get('force', False)
2173        exp = req.get('experiment', None)
2174        if exp:
2175            if exp.has_key('fedid'):
2176                key = exp['fedid']
2177                keytype = "fedid"
2178            elif exp.has_key('localname'):
2179                key = exp['localname']
2180                keytype = "localname"
2181            else:
2182                raise service_error(service_error.req, "Unknown lookup type")
2183        else:
2184            raise service_error(service_error.req, "No request?")
2185
2186        self.check_experiment_access(fid, key)
2187
2188        dealloc_list = [ ]
2189
2190
2191        # Create a logger that logs to the dealloc_list as well as to the main
2192        # log file.
2193        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2194        h = logging.StreamHandler(self.list_log(dealloc_list))
2195        # XXX: there should be a global one of these rather than repeating the
2196        # code.
2197        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2198                    '%d %b %y %H:%M:%S'))
2199        dealloc_log.addHandler(h)
2200
2201        self.state_lock.acquire()
2202        fed_exp = self.state.get(key, None)
2203        repo = None
2204
2205        if fed_exp:
2206            # This branch of the conditional holds the lock to generate a
2207            # consistent temporary tbparams variable to deallocate experiments.
2208            # It releases the lock to do the deallocations and reacquires it to
2209            # remove the experiment state when the termination is complete.
2210
2211            # First make sure that the experiment creation is complete.
2212            status = fed_exp.get('experimentStatus', None)
2213
2214            if status:
2215                if status in ('starting', 'terminating'):
2216                    if not force:
2217                        self.state_lock.release()
2218                        raise service_error(service_error.partial, 
2219                                'Experiment still being created or destroyed')
2220                    else:
2221                        self.log.warning('Experiment in %s state ' % status + \
2222                                'being terminated by force.')
2223            else:
2224                # No status??? trouble
2225                self.state_lock.release()
2226                raise service_error(service_error.internal,
2227                        "Experiment has no status!?")
2228
2229            ids = []
2230            #  experimentID is a list of dicts that are self-describing
2231            #  identifiers.  This finds all the fedids and localnames - the
2232            #  keys of self.state - and puts them into ids.
2233            for id in fed_exp.get('experimentID', []):
2234                if id.has_key('fedid'): 
2235                    ids.append(id['fedid'])
2236                    repo = "%s" % id['fedid']
2237                if id.has_key('localname'): ids.append(id['localname'])
2238
2239            # Collect the allocation/segment ids into a dict keyed by the fedid
2240            # of the allocation (or a monotonically increasing integer) that
2241            # contains a tuple of uri, aid (which is a dict...)
2242            for i, fed in enumerate(fed_exp.get('federant', [])):
2243                try:
2244                    uri = fed['uri']
2245                    aid = fed['allocID']
2246                    k = fed['allocID'].get('fedid', i)
2247                except KeyError, e:
2248                    continue
2249                tbparams[k] = (uri, aid)
2250            fed_exp['experimentStatus'] = 'terminating'
2251            if self.state_filename: self.write_state()
2252            self.state_lock.release()
2253
2254            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2255            # then completes, so we can't wait if nothing starts.  So, no
2256            # tbparams, no start.
2257            if len(tbparams) > 0:
2258                thread_pool = self.thread_pool(self.nthreads)
2259                for k in tbparams.keys():
2260                    # Create and start a thread to stop the segment
2261                    thread_pool.wait_for_slot()
2262                    uri, aid = tbparams[k]
2263                    t  = self.pooled_thread(\
2264                            target=self.terminate_segment(log=dealloc_log,
2265                                testbed=uri,
2266                                cert_file=self.cert_file, 
2267                                cert_pwd=self.cert_pwd,
2268                                trusted_certs=self.trusted_certs,
2269                                caller=self.call_TerminateSegment),
2270                            args=(uri, aid), name=k,
2271                            pdata=thread_pool, trace_file=self.trace_file)
2272                    t.start()
2273                # Wait for completions
2274                thread_pool.wait_for_all_done()
2275
2276            # release the allocations (failed experiments have done this
2277            # already, and starting experiments may be in odd states, so we
2278            # ignore errors releasing those allocations
2279            try: 
2280                for k in tbparams.keys():
2281                    # This releases access by uri
2282                    uri, aid = tbparams[k]
2283                    self.release_access(None, aid, uri=uri)
2284            except service_error, e:
2285                if status != 'failed' and not force:
2286                    raise e
2287
2288            # Remove the terminated experiment
2289            self.state_lock.acquire()
2290            for id in ids:
2291                if self.state.has_key(id): del self.state[id]
2292
2293            if self.state_filename: self.write_state()
2294            self.state_lock.release()
2295
2296            # Delete any synch points associated with this experiment.  All
2297            # synch points begin with the fedid of the experiment.
2298            fedid_keys = set(["fedid:%s" % f for f in ids \
2299                    if isinstance(f, fedid)])
2300            for k in self.synch_store.all_keys():
2301                try:
2302                    if len(k) > 45 and k[0:46] in fedid_keys:
2303                        self.synch_store.del_value(k)
2304                except synch_store.BadDeletionError:
2305                    pass
2306            self.write_store()
2307
2308            # Remove software and other cached stuff from the filesystem.
2309            if repo:
2310                self.remove_dirs("%s/%s" % (self.repodir, repo))
2311               
2312            return { 
2313                    'experiment': exp , 
2314                    'deallocationLog': "".join(dealloc_list),
2315                    }
2316        else:
2317            # Don't forget to release the lock
2318            self.state_lock.release()
2319            raise service_error(service_error.req, "No saved state")
2320
2321
2322    def GetValue(self, req, fid):
2323        """
2324        Get a value from the synchronized store
2325        """
2326        req = req.get('GetValueRequestBody', None)
2327        if not req:
2328            raise service_error(service_error.req,
2329                    "Bad request format (no GetValueRequestBody)")
2330       
2331        name = req['name']
2332        wait = req['wait']
2333        rv = { 'name': name }
2334
2335        if self.auth.check_attribute(fid, name):
2336            self.log.debug("[GetValue] asking for %s " % name)
2337            try:
2338                v = self.synch_store.get_value(name, wait)
2339            except synch_store.RevokedKeyError:
2340                # No more synch on this key
2341                raise service_error(service_error.federant, 
2342                        "Synch key %s revoked" % name)
2343            if v is not None:
2344                rv['value'] = v
2345            self.log.debug("[GetValue] got %s from %s" % (v, name))
2346            return rv
2347        else:
2348            raise service_error(service_error.access, "Access Denied")
2349       
2350
2351    def SetValue(self, req, fid):
2352        """
2353        Set a value in the synchronized store
2354        """
2355        req = req.get('SetValueRequestBody', None)
2356        if not req:
2357            raise service_error(service_error.req,
2358                    "Bad request format (no SetValueRequestBody)")
2359       
2360        name = req['name']
2361        v = req['value']
2362
2363        if self.auth.check_attribute(fid, name):
2364            try:
2365                self.synch_store.set_value(name, v)
2366                self.write_store()
2367                self.log.debug("[SetValue] set %s to %s" % (name, v))
2368            except synch_store.CollisionError:
2369                # Translate into a service_error
2370                raise service_error(service_error.req,
2371                        "Value already set: %s" %name)
2372            except synch_store.RevokedKeyError:
2373                # No more synch on this key
2374                raise service_error(service_error.federant, 
2375                        "Synch key %s revoked" % name)
2376            return { 'name': name, 'value': v }
2377        else:
2378            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.