source: fedd/federation/experiment_control.py @ f771e2f

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since f771e2f was 5334044, checked in by Ted Faber <faber@…>, 15 years ago

Add service to selectively hide hosts from other testbeds

  • Property mode set to 100644
File size: 81.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32
33import topdl
34import list_log
35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
45
46# Right now, no support for composition.
47class federated_service:
48    def __init__(self, name, exporter=None, importers=None, params=None, 
49            reqs=None, portal=None):
50        self.name=name
51        self.exporter=exporter
52        if importers is None: self.importers = []
53        else: self.importers=importers
54        if params is None: self.params = { }
55        else: self.params = params
56        if reqs is None: self.reqs = []
57        else: self.reqs = reqs
58
59        if portal is not None:
60            self.portal = portal
61        else:
62            self.portal = (name in federated_service.needs_portal)
63
64    def __str__(self):
65        return "name %s export %s import %s params %s reqs %s" % \
66                (self.name, self.exporter, self.importers, self.params,
67                        [ (r['name'], r['visibility']) for r in self.reqs] )
68
69    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
70
71class experiment_control_local:
72    """
73    Control of experiments that this system can directly access.
74
75    Includes experiment creation, termination and information dissemination.
76    Thred safe.
77    """
78
79    class ssh_cmd_timeout(RuntimeError): pass
80   
81    class thread_pool:
82        """
83        A class to keep track of a set of threads all invoked for the same
84        task.  Manages the mutual exclusion of the states.
85        """
86        def __init__(self, nthreads):
87            """
88            Start a pool.
89            """
90            self.changed = Condition()
91            self.started = 0
92            self.terminated = 0
93            self.nthreads = nthreads
94
95        def acquire(self):
96            """
97            Get the pool's lock.
98            """
99            self.changed.acquire()
100
101        def release(self):
102            """
103            Release the pool's lock.
104            """
105            self.changed.release()
106
107        def wait(self, timeout = None):
108            """
109            Wait for a pool thread to start or stop.
110            """
111            self.changed.wait(timeout)
112
113        def start(self):
114            """
115            Called by a pool thread to report starting.
116            """
117            self.changed.acquire()
118            self.started += 1
119            self.changed.notifyAll()
120            self.changed.release()
121
122        def terminate(self):
123            """
124            Called by a pool thread to report finishing.
125            """
126            self.changed.acquire()
127            self.terminated += 1
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def clear(self):
132            """
133            Clear all pool data.
134            """
135            self.changed.acquire()
136            self.started = 0
137            self.terminated =0
138            self.changed.notifyAll()
139            self.changed.release()
140
141        def wait_for_slot(self):
142            """
143            Wait until we have a free slot to start another pooled thread
144            """
145            self.acquire()
146            while self.started - self.terminated >= self.nthreads:
147                self.wait()
148            self.release()
149
150        def wait_for_all_done(self, timeout=None):
151            """
152            Wait until all active threads finish (and at least one has
153            started).  If a timeout is given, return after waiting that long
154            for termination.  If all threads are done (and one has started in
155            the since the last clear()) return True, otherwise False.
156            """
157            if timeout:
158                deadline = time.time() + timeout
159            self.acquire()
160            while self.started == 0 or self.started > self.terminated:
161                self.wait(timeout)
162                if timeout:
163                    if time.time() > deadline:
164                        break
165                    timeout = deadline - time.time()
166            self.release()
167            return not (self.started == 0 or self.started > self.terminated)
168
169    class pooled_thread(Thread):
170        """
171        One of a set of threads dedicated to a specific task.  Uses the
172        thread_pool class above for coordination.
173        """
174        def __init__(self, group=None, target=None, name=None, args=(), 
175                kwargs={}, pdata=None, trace_file=None):
176            Thread.__init__(self, group, target, name, args, kwargs)
177            self.rv = None          # Return value of the ops in this thread
178            self.exception = None   # Exception that terminated this thread
179            self.target=target      # Target function to run on start()
180            self.args = args        # Args to pass to target
181            self.kwargs = kwargs    # Additional kw args
182            self.pdata = pdata      # thread_pool for this class
183            # Logger for this thread
184            self.log = logging.getLogger("fedd.experiment_control")
185       
186        def run(self):
187            """
188            Emulate Thread.run, except add pool data manipulation and error
189            logging.
190            """
191            if self.pdata:
192                self.pdata.start()
193
194            if self.target:
195                try:
196                    self.rv = self.target(*self.args, **self.kwargs)
197                except service_error, s:
198                    self.exception = s
199                    self.log.error("Thread exception: %s %s" % \
200                            (s.code_string(), s.desc))
201                except:
202                    self.exception = sys.exc_info()[1]
203                    self.log.error(("Unexpected thread exception: %s" +\
204                            "Trace %s") % (self.exception,\
205                                traceback.format_exc()))
206            if self.pdata:
207                self.pdata.terminate()
208
209    call_RequestAccess = service_caller('RequestAccess')
210    call_ReleaseAccess = service_caller('ReleaseAccess')
211    call_StartSegment = service_caller('StartSegment')
212    call_TerminateSegment = service_caller('TerminateSegment')
213    call_Ns2Topdl = service_caller('Ns2Topdl')
214
215    def __init__(self, config=None, auth=None):
216        """
217        Intialize the various attributes, most from the config object
218        """
219
220        def parse_tarfile_list(tf):
221            """
222            Parse a tarfile list from the configuration.  This is a set of
223            paths and tarfiles separated by spaces.
224            """
225            rv = [ ]
226            if tf is not None:
227                tl = tf.split()
228                while len(tl) > 1:
229                    p, t = tl[0:2]
230                    del tl[0:2]
231                    rv.append((p, t))
232            return rv
233
234        self.thread_with_rv = experiment_control_local.pooled_thread
235        self.thread_pool = experiment_control_local.thread_pool
236        self.list_log = list_log.list_log
237
238        self.cert_file = config.get("experiment_control", "cert_file")
239        if self.cert_file:
240            self.cert_pwd = config.get("experiment_control", "cert_pwd")
241        else:
242            self.cert_file = config.get("globals", "cert_file")
243            self.cert_pwd = config.get("globals", "cert_pwd")
244
245        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
246                or config.get("globals", "trusted_certs")
247
248        self.repodir = config.get("experiment_control", "repodir")
249        self.repo_url = config.get("experiment_control", "repo_url", 
250                "https://users.isi.deterlab.net:23235");
251
252        self.exp_stem = "fed-stem"
253        self.log = logging.getLogger("fedd.experiment_control")
254        set_log_level(config, "experiment_control", self.log)
255        self.muxmax = 2
256        self.nthreads = 10
257        self.randomize_experiments = False
258
259        self.splitter = None
260        self.ssh_keygen = "/usr/bin/ssh-keygen"
261        self.ssh_identity_file = None
262
263
264        self.debug = config.getboolean("experiment_control", "create_debug")
265        self.cleanup = not config.getboolean("experiment_control", 
266                "leave_tmpfiles")
267        self.state_filename = config.get("experiment_control", 
268                "experiment_state")
269        self.store_filename = config.get("experiment_control", 
270                "synch_store")
271        self.store_url = config.get("experiment_control", "store_url")
272        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
273        self.fedkit = parse_tarfile_list(\
274                config.get("experiment_control", "fedkit"))
275        self.gatewaykit = parse_tarfile_list(\
276                config.get("experiment_control", "gatewaykit"))
277        accessdb_file = config.get("experiment_control", "accessdb")
278
279        self.ssh_pubkey_file = config.get("experiment_control", 
280                "ssh_pubkey_file")
281        self.ssh_privkey_file = config.get("experiment_control",
282                "ssh_privkey_file")
283        dt = config.get("experiment_control", "direct_transit")
284        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
285        else: self.direct_transit = [ ]
286        # NB for internal master/slave ops, not experiment setup
287        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
288
289        self.overrides = set([])
290        ovr = config.get('experiment_control', 'overrides')
291        if ovr:
292            for o in ovr.split(","):
293                o = o.strip()
294                if o.startswith('fedid:'): o = o[len('fedid:'):]
295                self.overrides.add(fedid(hexstr=o))
296
297        self.state = { }
298        self.state_lock = Lock()
299        self.tclsh = "/usr/local/bin/otclsh"
300        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
301                config.get("experiment_control", "tcl_splitter",
302                        "/usr/testbed/lib/ns2ir/parse.tcl")
303        mapdb_file = config.get("experiment_control", "mapdb")
304        self.trace_file = sys.stderr
305
306        self.def_expstart = \
307                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
308                "/tmp/federate";
309        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
310                "FEDDIR/hosts";
311        self.def_gwstart = \
312                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
313                "/tmp/bridge.log";
314        self.def_mgwstart = \
315                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
316                "/tmp/bridge.log";
317        self.def_gwimage = "FBSD61-TUNNEL2";
318        self.def_gwtype = "pc";
319        self.local_access = { }
320
321        if auth:
322            self.auth = auth
323        else:
324            self.log.error(\
325                    "[access]: No authorizer initialized, creating local one.")
326            auth = authorizer()
327
328
329        if self.ssh_pubkey_file:
330            try:
331                f = open(self.ssh_pubkey_file, 'r')
332                self.ssh_pubkey = f.read()
333                f.close()
334            except EnvironmentError:
335                raise service_error(service_error.internal,
336                        "Cannot read sshpubkey")
337        else:
338            raise service_error(service_error.internal, 
339                    "No SSH public key file?")
340
341        if not self.ssh_privkey_file:
342            raise service_error(service_error.internal, 
343                    "No SSH public key file?")
344
345
346        if mapdb_file:
347            self.read_mapdb(mapdb_file)
348        else:
349            self.log.warn("[experiment_control] No testbed map, using defaults")
350            self.tbmap = { 
351                    'deter':'https://users.isi.deterlab.net:23235',
352                    'emulab':'https://users.isi.deterlab.net:23236',
353                    'ucb':'https://users.isi.deterlab.net:23237',
354                    }
355
356        if accessdb_file:
357                self.read_accessdb(accessdb_file)
358        else:
359            raise service_error(service_error.internal,
360                    "No accessdb specified in config")
361
362        # Grab saved state.  OK to do this w/o locking because it's read only
363        # and only one thread should be in existence that can see self.state at
364        # this point.
365        if self.state_filename:
366            self.read_state()
367
368        if self.store_filename:
369            self.read_store()
370        else:
371            self.log.warning("No saved synch store")
372            self.synch_store = synch_store
373
374        # Dispatch tables
375        self.soap_services = {\
376                'New': soap_handler('New', self.new_experiment),
377                'Create': soap_handler('Create', self.create_experiment),
378                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
379                'Vis': soap_handler('Vis', self.get_vis),
380                'Info': soap_handler('Info', self.get_info),
381                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
382                'Terminate': soap_handler('Terminate', 
383                    self.terminate_experiment),
384                'GetValue': soap_handler('GetValue', self.GetValue),
385                'SetValue': soap_handler('SetValue', self.SetValue),
386        }
387
388        self.xmlrpc_services = {\
389                'New': xmlrpc_handler('New', self.new_experiment),
390                'Create': xmlrpc_handler('Create', self.create_experiment),
391                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
392                'Vis': xmlrpc_handler('Vis', self.get_vis),
393                'Info': xmlrpc_handler('Info', self.get_info),
394                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
395                'Terminate': xmlrpc_handler('Terminate',
396                    self.terminate_experiment),
397                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
398                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
399        }
400
401    # Call while holding self.state_lock
402    def write_state(self):
403        """
404        Write a new copy of experiment state after copying the existing state
405        to a backup.
406
407        State format is a simple pickling of the state dictionary.
408        """
409        if os.access(self.state_filename, os.W_OK):
410            copy_file(self.state_filename, \
411                    "%s.bak" % self.state_filename)
412        try:
413            f = open(self.state_filename, 'w')
414            pickle.dump(self.state, f)
415        except EnvironmentError, e:
416            self.log.error("Can't write file %s: %s" % \
417                    (self.state_filename, e))
418        except pickle.PicklingError, e:
419            self.log.error("Pickling problem: %s" % e)
420        except TypeError, e:
421            self.log.error("Pickling problem (TypeError): %s" % e)
422
423    @staticmethod
424    def get_alloc_ids(state):
425        """
426        Pull the fedids of the identifiers of each allocation from the
427        state.  Again, a dict dive that's best isolated.
428
429        Used by read_store and read state
430        """
431
432        return [ f['allocID']['fedid'] 
433                for f in state.get('federant',[]) \
434                    if f.has_key('allocID') and \
435                        f['allocID'].has_key('fedid')]
436
437    # Call while holding self.state_lock
438    def read_state(self):
439        """
440        Read a new copy of experiment state.  Old state is overwritten.
441
442        State format is a simple pickling of the state dictionary.
443        """
444       
445        def get_experiment_id(state):
446            """
447            Pull the fedid experimentID out of the saved state.  This is kind
448            of a gross walk through the dict.
449            """
450
451            if state.has_key('experimentID'):
452                for e in state['experimentID']:
453                    if e.has_key('fedid'):
454                        return e['fedid']
455                else:
456                    return None
457            else:
458                return None
459
460        try:
461            f = open(self.state_filename, "r")
462            self.state = pickle.load(f)
463            self.log.debug("[read_state]: Read state from %s" % \
464                    self.state_filename)
465        except EnvironmentError, e:
466            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
467                    % (self.state_filename, e))
468        except pickle.UnpicklingError, e:
469            self.log.warning(("[read_state]: No saved state: " + \
470                    "Unpickling failed: %s") % e)
471       
472        for s in self.state.values():
473            try:
474
475                eid = get_experiment_id(s)
476                if eid : 
477                    # Give the owner rights to the experiment
478                    self.auth.set_attribute(s['owner'], eid)
479                    # And holders of the eid as well
480                    self.auth.set_attribute(eid, eid)
481                    # allow overrides to control experiments as well
482                    for o in self.overrides:
483                        self.auth.set_attribute(o, eid)
484                    # Set permissions to allow reading of the software repo, if
485                    # any, as well.
486                    for a in self.get_alloc_ids(s):
487                        self.auth.set_attribute(a, 'repo/%s' % eid)
488                else:
489                    raise KeyError("No experiment id")
490            except KeyError, e:
491                self.log.warning("[read_state]: State ownership or identity " +\
492                        "misformatted in %s: %s" % (self.state_filename, e))
493
494
495    def read_accessdb(self, accessdb_file):
496        """
497        Read the mapping from fedids that can create experiments to their name
498        in the 3-level access namespace.  All will be asserted from this
499        testbed and can include the local username and porject that will be
500        asserted on their behalf by this fedd.  Each fedid is also added to the
501        authorization system with the "create" attribute.
502        """
503        self.accessdb = {}
504        # These are the regexps for parsing the db
505        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
506        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
507                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
508        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
509                "\s*->\s*(" + name_expr + ")\s*$")
510        lineno = 0
511
512        # Parse the mappings and store in self.authdb, a dict of
513        # fedid -> (proj, user)
514        try:
515            f = open(accessdb_file, "r")
516            for line in f:
517                lineno += 1
518                line = line.strip()
519                if len(line) == 0 or line.startswith('#'):
520                    continue
521                m = project_line.match(line)
522                if m:
523                    fid = fedid(hexstr=m.group(1))
524                    project, user = m.group(2,3)
525                    if not self.accessdb.has_key(fid):
526                        self.accessdb[fid] = []
527                    self.accessdb[fid].append((project, user))
528                    continue
529
530                m = user_line.match(line)
531                if m:
532                    fid = fedid(hexstr=m.group(1))
533                    project = None
534                    user = m.group(2)
535                    if not self.accessdb.has_key(fid):
536                        self.accessdb[fid] = []
537                    self.accessdb[fid].append((project, user))
538                    continue
539                self.log.warn("[experiment_control] Error parsing access " +\
540                        "db %s at line %d" %  (accessdb_file, lineno))
541        except EnvironmentError:
542            raise service_error(service_error.internal,
543                    ("Error opening/reading %s as experiment " +\
544                            "control accessdb") %  accessdb_file)
545        f.close()
546
547        # Initialize the authorization attributes
548        for fid in self.accessdb.keys():
549            self.auth.set_attribute(fid, 'create')
550            self.auth.set_attribute(fid, 'new')
551
552    def read_mapdb(self, file):
553        """
554        Read a simple colon separated list of mappings for the
555        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
556        """
557
558        self.tbmap = { }
559        lineno =0
560        try:
561            f = open(file, "r")
562            for line in f:
563                lineno += 1
564                line = line.strip()
565                if line.startswith('#') or len(line) == 0:
566                    continue
567                try:
568                    label, url = line.split(':', 1)
569                    self.tbmap[label] = url
570                except ValueError, e:
571                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
572                            "map db: %s %s" % (lineno, line, e))
573        except EnvironmentError, e:
574            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
575                    "open %s: %s" % (file, e))
576        f.close()
577
578    def read_store(self):
579        try:
580            self.synch_store = synch_store()
581            self.synch_store.load(self.store_filename)
582            self.log.debug("[read_store]: Read store from %s" % \
583                    self.store_filename)
584        except EnvironmentError, e:
585            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
586                    % (self.state_filename, e))
587            self.synch_store = synch_store()
588
589        # Set the initial permissions on data in the store.  XXX: This ad hoc
590        # authorization attribute initialization is getting out of hand.
591        for k in self.synch_store.all_keys():
592            try:
593                if k.startswith('fedid:'):
594                    fid = fedid(hexstr=k[6:46])
595                    if self.state.has_key(fid):
596                        for a in self.get_alloc_ids(self.state[fid]):
597                            self.auth.set_attribute(a, k)
598            except ValueError, e:
599                self.log.warn("Cannot deduce permissions for %s" % k)
600
601
602    def write_store(self):
603        """
604        Write a new copy of synch_store after writing current state
605        to a backup.  We use the internal synch_store pickle method to avoid
606        incinsistent data.
607
608        State format is a simple pickling of the store.
609        """
610        if os.access(self.store_filename, os.W_OK):
611            copy_file(self.store_filename, \
612                    "%s.bak" % self.store_filename)
613        try:
614            self.synch_store.save(self.store_filename)
615        except EnvironmentError, e:
616            self.log.error("Can't write file %s: %s" % \
617                    (self.store_filename, e))
618        except TypeError, e:
619            self.log.error("Pickling problem (TypeError): %s" % e)
620
621       
622    def generate_ssh_keys(self, dest, type="rsa" ):
623        """
624        Generate a set of keys for the gateways to use to talk.
625
626        Keys are of type type and are stored in the required dest file.
627        """
628        valid_types = ("rsa", "dsa")
629        t = type.lower();
630        if t not in valid_types: raise ValueError
631        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
632
633        try:
634            trace = open("/dev/null", "w")
635        except EnvironmentError:
636            raise service_error(service_error.internal,
637                    "Cannot open /dev/null??");
638
639        # May raise CalledProcessError
640        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
641        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
642        if rv != 0:
643            raise service_error(service_error.internal, 
644                    "Cannot generate nonce ssh keys.  %s return code %d" \
645                            % (self.ssh_keygen, rv))
646
647    def gentopo(self, str):
648        """
649        Generate the topology dtat structure from the splitter's XML
650        representation of it.
651
652        The topology XML looks like:
653            <experiment>
654                <nodes>
655                    <node><vname></vname><ips>ip1:ip2</ips></node>
656                </nodes>
657                <lans>
658                    <lan>
659                        <vname></vname><vnode></vnode><ip></ip>
660                        <bandwidth></bandwidth><member>node:port</member>
661                    </lan>
662                </lans>
663        """
664        class topo_parse:
665            """
666            Parse the topology XML and create the dats structure.
667            """
668            def __init__(self):
669                # Typing of the subelements for data conversion
670                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
671                self.int_subelements = ( 'bandwidth',)
672                self.float_subelements = ( 'delay',)
673                # The final data structure
674                self.nodes = [ ]
675                self.lans =  [ ]
676                self.topo = { \
677                        'node': self.nodes,\
678                        'lan' : self.lans,\
679                    }
680                self.element = { }  # Current element being created
681                self.chars = ""     # Last text seen
682
683            def end_element(self, name):
684                # After each sub element the contents is added to the current
685                # element or to the appropriate list.
686                if name == 'node':
687                    self.nodes.append(self.element)
688                    self.element = { }
689                elif name == 'lan':
690                    self.lans.append(self.element)
691                    self.element = { }
692                elif name in self.str_subelements:
693                    self.element[name] = self.chars
694                    self.chars = ""
695                elif name in self.int_subelements:
696                    self.element[name] = int(self.chars)
697                    self.chars = ""
698                elif name in self.float_subelements:
699                    self.element[name] = float(self.chars)
700                    self.chars = ""
701
702            def found_chars(self, data):
703                self.chars += data.rstrip()
704
705
706        tp = topo_parse();
707        parser = xml.parsers.expat.ParserCreate()
708        parser.EndElementHandler = tp.end_element
709        parser.CharacterDataHandler = tp.found_chars
710
711        parser.Parse(str)
712
713        return tp.topo
714       
715
716    def genviz(self, topo):
717        """
718        Generate the visualization the virtual topology
719        """
720
721        neato = "/usr/local/bin/neato"
722        # These are used to parse neato output and to create the visualization
723        # file.
724        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
725        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
726                "%s</type></node>"
727
728        try:
729            # Node names
730            nodes = [ n['vname'] for n in topo['node'] ]
731            topo_lans = topo['lan']
732        except KeyError, e:
733            raise service_error(service_error.internal, "Bad topology: %s" %e)
734
735        lans = { }
736        links = { }
737
738        # Walk through the virtual topology, organizing the connections into
739        # 2-node connections (links) and more-than-2-node connections (lans).
740        # When a lan is created, it's added to the list of nodes (there's a
741        # node in the visualization for the lan).
742        for l in topo_lans:
743            if links.has_key(l['vname']):
744                if len(links[l['vname']]) < 2:
745                    links[l['vname']].append(l['vnode'])
746                else:
747                    nodes.append(l['vname'])
748                    lans[l['vname']] = links[l['vname']]
749                    del links[l['vname']]
750                    lans[l['vname']].append(l['vnode'])
751            elif lans.has_key(l['vname']):
752                lans[l['vname']].append(l['vnode'])
753            else:
754                links[l['vname']] = [ l['vnode'] ]
755
756
757        # Open up a temporary file for dot to turn into a visualization
758        try:
759            df, dotname = tempfile.mkstemp()
760            dotfile = os.fdopen(df, 'w')
761        except EnvironmentError:
762            raise service_error(service_error.internal,
763                    "Failed to open file in genviz")
764
765        try:
766            dnull = open('/dev/null', 'w')
767        except EnvironmentError:
768            service_error(service_error.internal,
769                    "Failed to open /dev/null in genviz")
770
771        # Generate a dot/neato input file from the links, nodes and lans
772        try:
773            print >>dotfile, "graph G {"
774            for n in nodes:
775                print >>dotfile, '\t"%s"' % n
776            for l in links.keys():
777                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
778            for l in lans.keys():
779                for n in lans[l]:
780                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
781            print >>dotfile, "}"
782            dotfile.close()
783        except TypeError:
784            raise service_error(service_error.internal,
785                    "Single endpoint link in vtopo")
786        except EnvironmentError:
787            raise service_error(service_error.internal, "Cannot write dot file")
788
789        # Use dot to create a visualization
790        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
791                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
792                close_fds=True)
793        dnull.close()
794
795        # Translate dot to vis format
796        vis_nodes = [ ]
797        vis = { 'node': vis_nodes }
798        for line in dot.stdout:
799            m = vis_re.match(line)
800            if m:
801                vn = m.group(1)
802                vis_node = {'name': vn, \
803                        'x': float(m.group(2)),\
804                        'y' : float(m.group(3)),\
805                    }
806                if vn in links.keys() or vn in lans.keys():
807                    vis_node['type'] = 'lan'
808                else:
809                    vis_node['type'] = 'node'
810                vis_nodes.append(vis_node)
811        rv = dot.wait()
812
813        os.remove(dotname)
814        if rv == 0 : return vis
815        else: return None
816
817    def get_access(self, tb, nodes, tbparam, access_user, masters):
818        """
819        Get access to testbed through fedd and set the parameters for that tb
820        """
821        def get_export_project(svcs):
822            """
823            Look through for the list of federated_service for this testbed
824            objects for a project_export service, and extract the project
825            parameter.
826            """
827
828            pe = [s for s in svcs if s.name=='project_export']
829            if len(pe) == 1:
830                return pe[0].params.get('project', None)
831            elif len(pe) == 0:
832                return None
833            else:
834                raise service_error(service_error.req,
835                        "More than one project export is not supported")
836
837        uri = self.tbmap.get(testbed_base(tb), None)
838        if not uri:
839            raise service_error(service_error.server_config, 
840                    "Unknown testbed: %s" % tb)
841
842        export_svcs = masters.get(tb,[])
843        import_svcs = [ s for m in masters.values() \
844                for s in m \
845                    if tb in s.importers ]
846
847        export_project = get_export_project(export_svcs)
848
849        # Tweak search order so that if there are entries in access_user that
850        # have a project matching the export project, we try them first
851        if export_project: 
852            access_sequence = [ (p, u) for p, u in access_user \
853                    if p == export_project] 
854            access_sequence.extend([(p, u) for p, u in access_user \
855                    if p != export_project]) 
856        else: 
857            access_sequence = access_user
858
859        for p, u in access_sequence: 
860            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
861                    "to %s") %  ((p or "None"), u, uri))
862
863            if p:
864                # Request with user and project specified
865                req = {\
866                        'destinationTestbed' : { 'uri' : uri },
867                        'credential': [ "project: %s" % p, "user: %s"  % u],
868                        'allocID' : { 'localname': 'test' },
869                    }
870            else:
871                # Request with only user specified
872                req = {\
873                        'destinationTestbed' : { 'uri' : uri },
874                        'credential': [ 'user: %s' % u ],
875                        'allocID' : { 'localname': 'test' },
876                    }
877
878            # Make the service request from the services we're importing and
879            # exporting.  Keep track of the export request ids so we can
880            # collect the resulting info from the access response.
881            e_keys = { }
882            if import_svcs or export_svcs:
883                req['service'] = [ ]
884
885                for i, s in enumerate(import_svcs):
886                    idx = 'import%d' % i
887                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
888                    if s.params:
889                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
890                                for k, v in s.params.items()]
891                    req['service'].append(sr)
892
893                for i, s in enumerate(export_svcs):
894                    idx = 'export%d' % i
895                    e_keys[idx] = s
896                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
897                    if s.params:
898                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
899                                for k, v in s.params.items()]
900                    req['service'].append(sr)
901
902            # node resources if any
903            if nodes != None and len(nodes) > 0:
904                rnodes = [ ]
905                for n in nodes:
906                    rn = { }
907                    image, hw, count = n.split(":")
908                    if image: rn['image'] = [ image ]
909                    if hw: rn['hardware'] = [ hw ]
910                    if count and int(count) >0 : rn['count'] = int(count)
911                    rnodes.append(rn)
912                req['resources']= { }
913                req['resources']['node'] = rnodes
914
915            try:
916                if self.local_access.has_key(uri):
917                    # Local access call
918                    req = { 'RequestAccessRequestBody' : req }
919                    r = self.local_access[uri].RequestAccess(req, 
920                            fedid(file=self.cert_file))
921                    r = { 'RequestAccessResponseBody' : r }
922                else:
923                    r = self.call_RequestAccess(uri, req, 
924                            self.cert_file, self.cert_pwd, self.trusted_certs)
925            except service_error, e:
926                if e.code == service_error.access:
927                    self.log.debug("[get_access] Access denied")
928                    r = None
929                    continue
930                else:
931                    raise e
932
933            if r.has_key('RequestAccessResponseBody'):
934                # Through to here we have a valid response, not a fault.
935                # Access denied is a fault, so something better or worse than
936                # access denied has happened.
937                r = r['RequestAccessResponseBody']
938                self.log.debug("[get_access] Access granted")
939                break
940            else:
941                raise service_error(service_error.protocol,
942                        "Bad proxy response")
943       
944        if not r:
945            raise service_error(service_error.access, 
946                    "Access denied by %s (%s)" % (tb, uri))
947
948        tbparam[tb] = { 
949                "allocID" : r['allocID'],
950                "uri": uri,
951                }
952
953        # Collect the responses corresponding to the services this testbed
954        # exports.  These will be the service requests that we will include in
955        # the start segment requests (with appropriate visibility values) to
956        # import and export the segments.
957        for s in r.get('service', []):
958            id = s.get('id', None)
959            if id and id in e_keys:
960                e_keys[id].reqs.append(s)
961
962        # Add attributes to parameter space.  We don't allow attributes to
963        # overlay any parameters already installed.
964        for a in r.get('fedAttr', []):
965            try:
966                if a['attribute'] and \
967                        isinstance(a['attribute'], basestring)\
968                        and not tbparam[tb].has_key(a['attribute'].lower()):
969                    tbparam[tb][a['attribute'].lower()] = a['value']
970            except KeyError:
971                self.log.error("Bad attribute in response: %s" % a)
972
973    def release_access(self, tb, aid, uri=None):
974        """
975        Release access to testbed through fedd
976        """
977
978        if not uri:
979            uri = self.tbmap.get(tb, None)
980        if not uri:
981            raise service_error(service_error.server_config, 
982                    "Unknown testbed: %s" % tb)
983
984        if self.local_access.has_key(uri):
985            resp = self.local_access[uri].ReleaseAccess(\
986                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
987                    fedid(file=self.cert_file))
988            resp = { 'ReleaseAccessResponseBody': resp } 
989        else:
990            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
991                    self.cert_file, self.cert_pwd, self.trusted_certs)
992
993        # better error coding
994
995    def remote_ns2topdl(self, uri, desc):
996
997        req = {
998                'description' : { 'ns2description': desc },
999            }
1000
1001        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
1002                self.trusted_certs)
1003
1004        if r.has_key('Ns2TopdlResponseBody'):
1005            r = r['Ns2TopdlResponseBody']
1006            ed = r.get('experimentdescription', None)
1007            if ed.has_key('topdldescription'):
1008                return topdl.Topology(**ed['topdldescription'])
1009            else:
1010                raise service_error(service_error.protocol, 
1011                        "Bad splitter response (no output)")
1012        else:
1013            raise service_error(service_error.protocol, "Bad splitter response")
1014
1015    class start_segment:
1016        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1017                cert_pwd=None, trusted_certs=None, caller=None,
1018                log_collector=None):
1019            self.log = log
1020            self.debug = debug
1021            self.cert_file = cert_file
1022            self.cert_pwd = cert_pwd
1023            self.trusted_certs = None
1024            self.caller = caller
1025            self.testbed = testbed
1026            self.log_collector = log_collector
1027            self.response = None
1028            self.node = { }
1029
1030        def make_map(self, resp):
1031            for e in resp.get('embedding', []):
1032                if 'toponame' in e and 'physname' in e:
1033                    self.node[e['toponame']] = e['physname'][0]
1034
1035        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1036            req = {
1037                    'allocID': { 'fedid' : aid }, 
1038                    'segmentdescription': { 
1039                        'topdldescription': topo.to_dict(),
1040                    },
1041                }
1042
1043            if connInfo:
1044                req['connection'] = connInfo
1045
1046            import_svcs = [ s for m in masters.values() \
1047                    for s in m if self.testbed in s.importers]
1048
1049            if import_svcs or self.testbed in masters:
1050                req['service'] = []
1051
1052            for s in import_svcs:
1053                for r in s.reqs:
1054                    sr = copy.deepcopy(r)
1055                    sr['visibility'] = 'import';
1056                    req['service'].append(sr)
1057
1058            for s in masters.get(self.testbed, []):
1059                for r in s.reqs:
1060                    sr = copy.deepcopy(r)
1061                    sr['visibility'] = 'export';
1062                    req['service'].append(sr)
1063
1064            if attrs:
1065                req['fedAttr'] = attrs
1066
1067            try:
1068                self.log.debug("Calling StartSegment at %s " % uri)
1069                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1070                        self.trusted_certs)
1071                if r.has_key('StartSegmentResponseBody'):
1072                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1073                            None)
1074                    if lval and self.log_collector:
1075                        for line in  lval.splitlines(True):
1076                            self.log_collector.write(line)
1077                    self.make_map(r['StartSegmentResponseBody'])
1078                    self.response = r
1079                else:
1080                    raise service_error(service_error.internal, 
1081                            "Bad response!?: %s" %r)
1082                return True
1083            except service_error, e:
1084                self.log.error("Start segment failed on %s: %s" % \
1085                        (self.testbed, e))
1086                return False
1087
1088
1089
1090    class terminate_segment:
1091        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1092                cert_pwd=None, trusted_certs=None, caller=None):
1093            self.log = log
1094            self.debug = debug
1095            self.cert_file = cert_file
1096            self.cert_pwd = cert_pwd
1097            self.trusted_certs = None
1098            self.caller = caller
1099            self.testbed = testbed
1100
1101        def __call__(self, uri, aid ):
1102            req = {
1103                    'allocID': aid , 
1104                }
1105            try:
1106                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1107                        self.trusted_certs)
1108                return True
1109            except service_error, e:
1110                self.log.error("Terminate segment failed on %s: %s" % \
1111                        (self.testbed, e))
1112                return False
1113   
1114
1115    def allocate_resources(self, allocated, masters, eid, expid, 
1116            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1117            attrs=None, connInfo={}):
1118
1119        started = { }           # Testbeds where a sub-experiment started
1120                                # successfully
1121
1122        # XXX
1123        fail_soft = False
1124
1125        log = alloc_log or self.log
1126
1127        thread_pool = self.thread_pool(self.nthreads)
1128        threads = [ ]
1129        starters = [ ]
1130
1131        for tb in allocated.keys():
1132            # Create and start a thread to start the segment, and save it
1133            # to get the return value later
1134            tb_attrs = copy.copy(attrs)
1135            thread_pool.wait_for_slot()
1136            uri = tbparams[tb].get('uri', \
1137                    self.tbmap.get(testbed_base(tb), None))
1138            base, suffix = split_testbed(tb)
1139            if suffix:
1140                tb_attrs.append({'attribute': 'experiment_name', 
1141                    'value': "%s-%s" % (eid, suffix)})
1142            else:
1143                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1144            if not uri:
1145                raise service_error(service_error.internal, 
1146                        "Unknown testbed %s !?" % tb)
1147
1148            if tbparams[tb].has_key('allocID') and \
1149                    tbparams[tb]['allocID'].has_key('fedid'):
1150                aid = tbparams[tb]['allocID']['fedid']
1151            else:
1152                raise service_error(service_error.internal, 
1153                        "No alloc id for testbed %s !?" % tb)
1154
1155            s = self.start_segment(log=log, debug=self.debug,
1156                    testbed=tb, cert_file=self.cert_file,
1157                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1158                    caller=self.call_StartSegment,
1159                    log_collector=log_collector)
1160            starters.append(s)
1161            t  = self.pooled_thread(\
1162                    target=s, name=tb,
1163                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1164                    pdata=thread_pool, trace_file=self.trace_file)
1165            threads.append(t)
1166            t.start()
1167
1168        # Wait until all finish (keep pinging the log, though)
1169        mins = 0
1170        revoked = False
1171        while not thread_pool.wait_for_all_done(60.0):
1172            mins += 1
1173            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1174                    % mins)
1175            if not revoked and \
1176                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1177                # a testbed has failed.  Revoke this experiment's
1178                # synchronizarion values so that sub experiments will not
1179                # deadlock waiting for synchronization that will never happen
1180                self.log.info("A subexperiment has failed to swap in, " + \
1181                        "revoking synch keys")
1182                var_key = "fedid:%s" % expid
1183                for k in self.synch_store.all_keys():
1184                    if len(k) > 45 and k[0:46] == var_key:
1185                        self.synch_store.revoke_key(k)
1186                revoked = True
1187
1188        failed = [ t.getName() for t in threads if not t.rv ]
1189        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1190
1191        # If one failed clean up, unless fail_soft is set
1192        if failed:
1193            if not fail_soft:
1194                thread_pool.clear()
1195                for tb in succeeded:
1196                    # Create and start a thread to stop the segment
1197                    thread_pool.wait_for_slot()
1198                    uri = tbparams[tb]['uri']
1199                    t  = self.pooled_thread(\
1200                            target=self.terminate_segment(log=log,
1201                                testbed=tb,
1202                                cert_file=self.cert_file, 
1203                                cert_pwd=self.cert_pwd,
1204                                trusted_certs=self.trusted_certs,
1205                                caller=self.call_TerminateSegment),
1206                            args=(uri, tbparams[tb]['federant']['allocID']),
1207                            name=tb,
1208                            pdata=thread_pool, trace_file=self.trace_file)
1209                    t.start()
1210                # Wait until all finish (if any are being stopped)
1211                if succeeded:
1212                    thread_pool.wait_for_all_done()
1213
1214                # release the allocations
1215                for tb in tbparams.keys():
1216                    self.release_access(tb, tbparams[tb]['allocID'],
1217                            tbparams[tb].get('uri', None))
1218                # Remove the placeholder
1219                self.state_lock.acquire()
1220                self.state[eid]['experimentStatus'] = 'failed'
1221                if self.state_filename: self.write_state()
1222                self.state_lock.release()
1223
1224                log.error("Swap in failed on %s" % ",".join(failed))
1225                return
1226        else:
1227            # Walk through the successes and gather the virtual to physical
1228            # mapping.
1229            embedding = [ ]
1230            for s in starters:
1231                for k, v in s.node.items():
1232                    embedding.append({
1233                        'toponame': k, 
1234                        'physname': [ v],
1235                        'testbed': s.testbed
1236                        })
1237            log.info("[start_segment]: Experiment %s active" % eid)
1238
1239
1240        # Walk up tmpdir, deleting as we go
1241        if self.cleanup:
1242            log.debug("[start_experiment]: removing %s" % tmpdir)
1243            for path, dirs, files in os.walk(tmpdir, topdown=False):
1244                for f in files:
1245                    os.remove(os.path.join(path, f))
1246                for d in dirs:
1247                    os.rmdir(os.path.join(path, d))
1248            os.rmdir(tmpdir)
1249        else:
1250            log.debug("[start_experiment]: not removing %s" % tmpdir)
1251
1252        # Insert the experiment into our state and update the disk copy.
1253        self.state_lock.acquire()
1254        self.state[expid]['experimentStatus'] = 'active'
1255        self.state[eid] = self.state[expid]
1256        self.state[eid]['experimentdescription']['topdldescription'] = \
1257                top.to_dict()
1258        self.state[eid]['embedding'] = embedding
1259        if self.state_filename: self.write_state()
1260        self.state_lock.release()
1261        return
1262
1263
1264    def add_kit(self, e, kit):
1265        """
1266        Add a Software object created from the list of (install, location)
1267        tuples passed as kit  to the software attribute of an object e.  We
1268        do this enough to break out the code, but it's kind of a hack to
1269        avoid changing the old tuple rep.
1270        """
1271
1272        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1273
1274        if isinstance(e.software, list): e.software.extend(s)
1275        else: e.software = s
1276
1277
1278    def create_experiment_state(self, fid, req, expid, expcert,
1279            state='starting'):
1280        """
1281        Create the initial entry in the experiment's state.  The expid and
1282        expcert are the experiment's fedid and certifacte that represents that
1283        ID, which are installed in the experiment state.  If the request
1284        includes a suggested local name that is used if possible.  If the local
1285        name is already taken by an experiment owned by this user that has
1286        failed, it is overwritten.  Otherwise new letters are added until a
1287        valid localname is found.  The generated local name is returned.
1288        """
1289
1290        if req.has_key('experimentID') and \
1291                req['experimentID'].has_key('localname'):
1292            overwrite = False
1293            eid = req['experimentID']['localname']
1294            # If there's an old failed experiment here with the same local name
1295            # and accessible by this user, we'll overwrite it, otherwise we'll
1296            # fall through and do the collision avoidance.
1297            old_expid = self.get_experiment_fedid(eid)
1298            if old_expid and self.check_experiment_access(fid, old_expid):
1299                self.state_lock.acquire()
1300                status = self.state[eid].get('experimentStatus', None)
1301                if status and status == 'failed':
1302                    # remove the old access attribute
1303                    self.auth.unset_attribute(fid, old_expid)
1304                    overwrite = True
1305                    del self.state[eid]
1306                    del self.state[old_expid]
1307                self.state_lock.release()
1308            self.state_lock.acquire()
1309            while (self.state.has_key(eid) and not overwrite):
1310                eid += random.choice(string.ascii_letters)
1311            # Initial state
1312            self.state[eid] = {
1313                    'experimentID' : \
1314                            [ { 'localname' : eid }, {'fedid': expid } ],
1315                    'experimentStatus': state,
1316                    'experimentAccess': { 'X509' : expcert },
1317                    'owner': fid,
1318                    'log' : [],
1319                }
1320            self.state[expid] = self.state[eid]
1321            if self.state_filename: self.write_state()
1322            self.state_lock.release()
1323        else:
1324            eid = self.exp_stem
1325            for i in range(0,5):
1326                eid += random.choice(string.ascii_letters)
1327            self.state_lock.acquire()
1328            while (self.state.has_key(eid)):
1329                eid = self.exp_stem
1330                for i in range(0,5):
1331                    eid += random.choice(string.ascii_letters)
1332            # Initial state
1333            self.state[eid] = {
1334                    'experimentID' : \
1335                            [ { 'localname' : eid }, {'fedid': expid } ],
1336                    'experimentStatus': state,
1337                    'experimentAccess': { 'X509' : expcert },
1338                    'owner': fid,
1339                    'log' : [],
1340                }
1341            self.state[expid] = self.state[eid]
1342            if self.state_filename: self.write_state()
1343            self.state_lock.release()
1344
1345        return eid
1346
1347
1348    def allocate_ips_to_topo(self, top):
1349        """
1350        Add an ip4_address attribute to all the hosts in the topology, based on
1351        the shared substrates on which they sit.  An /etc/hosts file is also
1352        created and returned as a list of hostfiles entries.  We also return
1353        the allocator, because we may need to allocate IPs to portals
1354        (specifically DRAGON portals).
1355        """
1356        subs = sorted(top.substrates, 
1357                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1358                reverse=True)
1359        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1360        ifs = { }
1361        hosts = [ ]
1362
1363        for idx, s in enumerate(subs):
1364            net_size = len(s.interfaces)+2
1365
1366            a = ips.allocate(net_size)
1367            if a :
1368                base, num = a
1369                if num < net_size: 
1370                    raise service_error(service_error.internal,
1371                            "Allocator returned wrong number of IPs??")
1372            else:
1373                raise service_error(service_error.req, 
1374                        "Cannot allocate IP addresses")
1375            mask = ips.min_alloc
1376            while mask < net_size:
1377                mask *= 2
1378
1379            netmask = ((2**32-1) ^ (mask-1))
1380
1381            base += 1
1382            for i in s.interfaces:
1383                i.attribute.append(
1384                        topdl.Attribute('ip4_address', 
1385                            "%s" % ip_addr(base)))
1386                i.attribute.append(
1387                        topdl.Attribute('ip4_netmask', 
1388                            "%s" % ip_addr(int(netmask))))
1389
1390                hname = i.element.name
1391                if ifs.has_key(hname):
1392                    hosts.append("%s\t%s-%s %s-%d" % \
1393                            (ip_addr(base), hname, s.name, hname,
1394                                ifs[hname]))
1395                else:
1396                    ifs[hname] = 0
1397                    hosts.append("%s\t%s-%s %s-%d %s" % \
1398                            (ip_addr(base), hname, s.name, hname,
1399                                ifs[hname], hname))
1400
1401                ifs[hname] += 1
1402                base += 1
1403        return hosts, ips
1404
1405    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1406            tbparams, masters):
1407        """
1408        Request access to the various testbeds required for this instantiation
1409        (passed in as testbeds).  User, access_user, expoert_project and master
1410        are used to construct the correct requests.  Per-testbed parameters are
1411        returned in tbparams.
1412        """
1413        for tb in testbeds:
1414            self.get_access(tb, None, tbparams, access_user, masters)
1415            allocated[tb] = 1
1416
1417    def split_topology(self, top, topo, testbeds):
1418        """
1419        Create the sub-topologies that are needed for experiment instantiation.
1420        """
1421        for tb in testbeds:
1422            topo[tb] = top.clone()
1423            # copy in for loop allows deletions from the original
1424            for e in [ e for e in topo[tb].elements]:
1425                etb = e.get_attribute('testbed')
1426                # NB: elements without a testbed attribute won't appear in any
1427                # sub topologies. 
1428                if not etb or etb != tb:
1429                    for i in e.interface:
1430                        for s in i.subs:
1431                            try:
1432                                s.interfaces.remove(i)
1433                            except ValueError:
1434                                raise service_error(service_error.internal,
1435                                        "Can't remove interface??")
1436                    topo[tb].elements.remove(e)
1437            topo[tb].make_indices()
1438
1439    def wrangle_software(self, expid, top, topo, tbparams):
1440        """
1441        Copy software out to the repository directory, allocate permissions and
1442        rewrite the segment topologies to look for the software in local
1443        places.
1444        """
1445
1446        # Copy the rpms and tarfiles to a distribution directory from
1447        # which the federants can retrieve them
1448        linkpath = "%s/software" %  expid
1449        softdir ="%s/%s" % ( self.repodir, linkpath)
1450        softmap = { }
1451        # These are in a list of tuples format (each kit).  This comprehension
1452        # unwraps them into a single list of tuples that initilaizes the set of
1453        # tuples.
1454        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1455                for p, t in l ])
1456        pkgs.update([x.location for e in top.elements \
1457                for x in e.software])
1458        try:
1459            os.makedirs(softdir)
1460        except EnvironmentError, e:
1461            raise service_error(
1462                    "Cannot create software directory: %s" % e)
1463        # The actual copying.  Everything's converted into a url for copying.
1464        for pkg in pkgs:
1465            loc = pkg
1466
1467            scheme, host, path = urlparse(loc)[0:3]
1468            dest = os.path.basename(path)
1469            if not scheme:
1470                if not loc.startswith('/'):
1471                    loc = "/%s" % loc
1472                loc = "file://%s" %loc
1473            try:
1474                u = urlopen(loc)
1475            except Exception, e:
1476                raise service_error(service_error.req, 
1477                        "Cannot open %s: %s" % (loc, e))
1478            try:
1479                f = open("%s/%s" % (softdir, dest) , "w")
1480                self.log.debug("Writing %s/%s" % (softdir,dest) )
1481                data = u.read(4096)
1482                while data:
1483                    f.write(data)
1484                    data = u.read(4096)
1485                f.close()
1486                u.close()
1487            except Exception, e:
1488                raise service_error(service_error.internal,
1489                        "Could not copy %s: %s" % (loc, e))
1490            path = re.sub("/tmp", "", linkpath)
1491            # XXX
1492            softmap[pkg] = \
1493                    "%s/%s/%s" %\
1494                    ( self.repo_url, path, dest)
1495
1496            # Allow the individual segments to access the software.
1497            for tb in tbparams.keys():
1498                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1499                        "/%s/%s" % ( path, dest))
1500
1501        # Convert the software locations in the segments into the local
1502        # copies on this host
1503        for soft in [ s for tb in topo.values() \
1504                for e in tb.elements \
1505                    if getattr(e, 'software', False) \
1506                        for s in e.software ]:
1507            if softmap.has_key(soft.location):
1508                soft.location = softmap[soft.location]
1509
1510
1511    def new_experiment(self, req, fid):
1512        """
1513        The external interface to empty initial experiment creation called from
1514        the dispatcher.
1515
1516        Creates a working directory, splits the incoming description using the
1517        splitter script and parses out the avrious subsections using the
1518        lcasses above.  Once each sub-experiment is created, use pooled threads
1519        to instantiate them and start it all up.
1520        """
1521        if not self.auth.check_attribute(fid, 'new'):
1522            raise service_error(service_error.access, "New access denied")
1523
1524        try:
1525            tmpdir = tempfile.mkdtemp(prefix="split-")
1526        except EnvironmentError:
1527            raise service_error(service_error.internal, "Cannot create tmp dir")
1528
1529        try:
1530            access_user = self.accessdb[fid]
1531        except KeyError:
1532            raise service_error(service_error.internal,
1533                    "Access map and authorizer out of sync in " + \
1534                            "new_experiment for fedid %s"  % fid)
1535
1536        pid = "dummy"
1537        gid = "dummy"
1538
1539        req = req.get('NewRequestBody', None)
1540        if not req:
1541            raise service_error(service_error.req,
1542                    "Bad request format (no NewRequestBody)")
1543
1544        # Generate an ID for the experiment (slice) and a certificate that the
1545        # allocator can use to prove they own it.  We'll ship it back through
1546        # the encrypted connection.
1547        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1548
1549        #now we're done with the tmpdir, and it should be empty
1550        if self.cleanup:
1551            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1552            os.rmdir(tmpdir)
1553        else:
1554            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1555
1556        eid = self.create_experiment_state(fid, req, expid, expcert, 
1557                state='empty')
1558
1559        # Let users touch the state
1560        self.auth.set_attribute(fid, expid)
1561        self.auth.set_attribute(expid, expid)
1562        # Override fedids can manipulate state as well
1563        for o in self.overrides:
1564            self.auth.set_attribute(o, expid)
1565
1566        rv = {
1567                'experimentID': [
1568                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1569                ],
1570                'experimentStatus': 'empty',
1571                'experimentAccess': { 'X509' : expcert }
1572            }
1573
1574        return rv
1575
1576    def create_experiment(self, req, fid):
1577        """
1578        The external interface to experiment creation called from the
1579        dispatcher.
1580
1581        Creates a working directory, splits the incoming description using the
1582        splitter script and parses out the various subsections using the
1583        classes above.  Once each sub-experiment is created, use pooled threads
1584        to instantiate them and start it all up.
1585        """
1586
1587        req = req.get('CreateRequestBody', None)
1588        if not req:
1589            raise service_error(service_error.req,
1590                    "Bad request format (no CreateRequestBody)")
1591
1592        # Get the experiment access
1593        exp = req.get('experimentID', None)
1594        if exp:
1595            if exp.has_key('fedid'):
1596                key = exp['fedid']
1597                expid = key
1598                eid = None
1599            elif exp.has_key('localname'):
1600                key = exp['localname']
1601                eid = key
1602                expid = None
1603            else:
1604                raise service_error(service_error.req, "Unknown lookup type")
1605        else:
1606            raise service_error(service_error.req, "No request?")
1607
1608        self.check_experiment_access(fid, key)
1609
1610        try:
1611            tmpdir = tempfile.mkdtemp(prefix="split-")
1612            os.mkdir(tmpdir+"/keys")
1613        except EnvironmentError:
1614            raise service_error(service_error.internal, "Cannot create tmp dir")
1615
1616        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1617        gw_secretkey_base = "fed.%s" % self.ssh_type
1618        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1619        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1620        tclfile = tmpdir + "/experiment.tcl"
1621        tbparams = { }
1622        try:
1623            access_user = self.accessdb[fid]
1624        except KeyError:
1625            raise service_error(service_error.internal,
1626                    "Access map and authorizer out of sync in " + \
1627                            "create_experiment for fedid %s"  % fid)
1628
1629        pid = "dummy"
1630        gid = "dummy"
1631
1632        # The tcl parser needs to read a file so put the content into that file
1633        descr=req.get('experimentdescription', None)
1634        if descr:
1635            file_content=descr.get('ns2description', None)
1636            if file_content:
1637                try:
1638                    f = open(tclfile, 'w')
1639                    f.write(file_content)
1640                    f.close()
1641                except EnvironmentError:
1642                    raise service_error(service_error.internal,
1643                            "Cannot write temp experiment description")
1644            else:
1645                raise service_error(service_error.req, 
1646                        "Only ns2descriptions supported")
1647        else:
1648            raise service_error(service_error.req, "No experiment description")
1649
1650        self.state_lock.acquire()
1651        if self.state.has_key(key):
1652            self.state[key]['experimentStatus'] = "starting"
1653            for e in self.state[key].get('experimentID',[]):
1654                if not expid and e.has_key('fedid'):
1655                    expid = e['fedid']
1656                elif not eid and e.has_key('localname'):
1657                    eid = e['localname']
1658        self.state_lock.release()
1659
1660        if not (eid and expid):
1661            raise service_error(service_error.internal, 
1662                    "Cannot find local experiment info!?")
1663
1664        try: 
1665            # This catches exceptions to clear the placeholder if necessary
1666            try:
1667                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1668            except ValueError:
1669                raise service_error(service_error.server_config, 
1670                        "Bad key type (%s)" % self.ssh_type)
1671
1672            # Copy the service request
1673            tb_services = [ s for s in req.get('service',[]) ]
1674            # Translate to topdl
1675            if self.splitter_url:
1676                self.log.debug("Calling remote topdl translator at %s" % \
1677                        self.splitter_url)
1678                top = self.remote_ns2topdl(self.splitter_url, file_content)
1679            else:
1680                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1681                    str(self.muxmax), '-m', 'dummy']
1682
1683                tclcmd.extend([pid, gid, eid, tclfile])
1684
1685                self.log.debug("running local splitter %s", " ".join(tclcmd))
1686                # This is just fantastic.  As a side effect the parser copies
1687                # tb_compat.tcl into the current directory, so that directory
1688                # must be writable by the fedd user.  Doing this in the
1689                # temporary subdir ensures this is the case.
1690                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1691                        cwd=tmpdir)
1692                split_data = tclparser.stdout
1693
1694                top = topdl.topology_from_xml(file=split_data, top="experiment")
1695
1696            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1697            # Find the testbeds to look up
1698            testbeds = set([ a.value for e in top.elements \
1699                    for a in e.attribute \
1700                        if a.attribute == 'testbed'])
1701
1702            tb_hosts = { }
1703            for tb in testbeds:
1704                tb_hosts[tb] = [ e.name for e in top.elements \
1705                        if isinstance(e, topdl.Computer) and \
1706                            e.get_attribute('testbed') and \
1707                            e.get_attribute('testbed') == tb]
1708
1709            masters = { }           # testbeds exporting services
1710            pmasters = { }          # Testbeds exporting services that
1711                                    # need portals
1712            for s in tb_services:
1713                # If this is a service request with the importall field
1714                # set, fill it out.
1715
1716                if s.get('importall', False):
1717                    s['import'] = [ tb for tb in testbeds \
1718                            if tb not in s.get('export',[])]
1719                    del s['importall']
1720
1721                # Add the service to masters
1722                for tb in s.get('export', []):
1723                    if s.get('name', None):
1724                        if tb not in masters:
1725                            masters[tb] = [ ]
1726
1727                        params = { }
1728                        if 'fedAttr' in s:
1729                            for a in s['fedAttr']:
1730                                params[a.get('attribute', '')] = \
1731                                        a.get('value','')
1732
1733                        fser = federated_service(name=s['name'],
1734                                exporter=tb, importers=s.get('import',[]),
1735                                params=params)
1736                        if fser.name == 'hide_hosts' \
1737                                and 'hosts' not in fser.params:
1738                            fser.params['hosts'] = \
1739                                    ",".join(tb_hosts.get(fser.exporter, []))
1740                        masters[tb].append(fser)
1741
1742                        if fser.portal:
1743                            if tb not in pmasters: pmasters[tb] = [ fser ]
1744                            else: pmasters[tb].append(fser)
1745                    else:
1746                        self.log.error('Testbed service does not have name " + \
1747                                "and importers')
1748
1749
1750            allocated = { }         # Testbeds we can access
1751            topo ={ }               # Sub topologies
1752            connInfo = { }          # Connection information
1753
1754            self.get_access_to_testbeds(testbeds, access_user, allocated, 
1755                    tbparams, masters)
1756
1757            self.split_topology(top, topo, testbeds)
1758
1759            # Copy configuration files into the remote file store
1760            # The config urlpath
1761            configpath = "/%s/config" % expid
1762            # The config file system location
1763            configdir ="%s%s" % ( self.repodir, configpath)
1764            try:
1765                os.makedirs(configdir)
1766            except EnvironmentError, e:
1767                raise service_error(service_error.internal,
1768                        "Cannot create config directory: %s" % e)
1769            try:
1770                f = open("%s/hosts" % configdir, "w")
1771                f.write('\n'.join(hosts))
1772                f.close()
1773            except EnvironmentError, e:
1774                raise service_error(service_error.internal, 
1775                        "Cannot write hosts file: %s" % e)
1776            try:
1777                copy_file("%s" % gw_pubkey, "%s/%s" % \
1778                        (configdir, gw_pubkey_base))
1779                copy_file("%s" % gw_secretkey, "%s/%s" % \
1780                        (configdir, gw_secretkey_base))
1781            except EnvironmentError, e:
1782                raise service_error(service_error.internal, 
1783                        "Cannot copy keyfiles: %s" % e)
1784
1785            # Allow the individual testbeds to access the configuration files.
1786            for tb in tbparams.keys():
1787                asignee = tbparams[tb]['allocID']['fedid']
1788                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1789                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1790
1791            part = experiment_partition(self.auth, self.store_url, self.tbmap,
1792                    self.muxmax, self.direct_transit)
1793            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1794                    connInfo, expid)
1795            # Now get access to the dynamic testbeds (those added above)
1796            for tb in [ t for t in topo if t not in allocated]:
1797                self.get_access(tb, None, tbparams, access_user, masters)
1798                allocated[tb] = 1
1799                store_keys = topo[tb].get_attribute('store_keys')
1800                # Give the testbed access to keys it exports or imports
1801                if store_keys:
1802                    for sk in store_keys.split(" "):
1803                        self.auth.set_attribute(\
1804                                tbparams[tb]['allocID']['fedid'], sk)
1805
1806            self.wrangle_software(expid, top, topo, tbparams)
1807
1808            vtopo = topdl.topology_to_vtopo(top)
1809            vis = self.genviz(vtopo)
1810
1811            # save federant information
1812            for k in allocated.keys():
1813                tbparams[k]['federant'] = {
1814                        'name': [ { 'localname' : eid} ],
1815                        'allocID' : tbparams[k]['allocID'],
1816                        'uri': tbparams[k]['uri'],
1817                    }
1818
1819            self.state_lock.acquire()
1820            self.state[eid]['vtopo'] = vtopo
1821            self.state[eid]['vis'] = vis
1822            self.state[eid]['experimentdescription'] = \
1823                    { 'topdldescription': top.to_dict() }
1824            self.state[eid]['federant'] = \
1825                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1826                        if tbparams[tb].has_key('federant') ]
1827            if self.state_filename: 
1828                self.write_state()
1829            self.state_lock.release()
1830        except service_error, e:
1831            # If something goes wrong in the parse (usually an access error)
1832            # clear the placeholder state.  From here on out the code delays
1833            # exceptions.  Failing at this point returns a fault to the remote
1834            # caller.
1835
1836            self.state_lock.acquire()
1837            del self.state[eid]
1838            del self.state[expid]
1839            if self.state_filename: self.write_state()
1840            self.state_lock.release()
1841            raise e
1842
1843
1844        # Start the background swapper and return the starting state.  From
1845        # here on out, the state will stick around a while.
1846
1847        # Let users touch the state
1848        self.auth.set_attribute(fid, expid)
1849        self.auth.set_attribute(expid, expid)
1850        # Override fedids can manipulate state as well
1851        for o in self.overrides:
1852            self.auth.set_attribute(o, expid)
1853
1854        # Create a logger that logs to the experiment's state object as well as
1855        # to the main log file.
1856        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1857        alloc_collector = self.list_log(self.state[eid]['log'])
1858        h = logging.StreamHandler(alloc_collector)
1859        # XXX: there should be a global one of these rather than repeating the
1860        # code.
1861        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1862                    '%d %b %y %H:%M:%S'))
1863        alloc_log.addHandler(h)
1864       
1865        attrs = [ 
1866                {
1867                    'attribute': 'ssh_pubkey', 
1868                    'value': '%s/%s/config/%s' % \
1869                            (self.repo_url, expid, gw_pubkey_base)
1870                },
1871                {
1872                    'attribute': 'ssh_secretkey', 
1873                    'value': '%s/%s/config/%s' % \
1874                            (self.repo_url, expid, gw_secretkey_base)
1875                },
1876                {
1877                    'attribute': 'hosts', 
1878                    'value': '%s/%s/config/hosts' % \
1879                            (self.repo_url, expid)
1880                },
1881            ]
1882
1883        # transit and disconnected testbeds may not have a connInfo entry.
1884        # Fill in the blanks.
1885        for t in allocated.keys():
1886            if not connInfo.has_key(t):
1887                connInfo[t] = { }
1888
1889        # Start a thread to do the resource allocation
1890        t  = Thread(target=self.allocate_resources,
1891                args=(allocated, masters, eid, expid, tbparams, 
1892                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1893                    connInfo),
1894                name=eid)
1895        t.start()
1896
1897        rv = {
1898                'experimentID': [
1899                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1900                ],
1901                'experimentStatus': 'starting',
1902            }
1903
1904        return rv
1905   
1906    def get_experiment_fedid(self, key):
1907        """
1908        find the fedid associated with the localname key in the state database.
1909        """
1910
1911        rv = None
1912        self.state_lock.acquire()
1913        if self.state.has_key(key):
1914            if isinstance(self.state[key], dict):
1915                try:
1916                    kl = [ f['fedid'] for f in \
1917                            self.state[key]['experimentID']\
1918                                if f.has_key('fedid') ]
1919                except KeyError:
1920                    self.state_lock.release()
1921                    raise service_error(service_error.internal, 
1922                            "No fedid for experiment %s when getting "+\
1923                                    "fedid(!?)" % key)
1924                if len(kl) == 1:
1925                    rv = kl[0]
1926                else:
1927                    self.state_lock.release()
1928                    raise service_error(service_error.internal, 
1929                            "multiple fedids for experiment %s when " +\
1930                                    "getting fedid(!?)" % key)
1931            else:
1932                self.state_lock.release()
1933                raise service_error(service_error.internal, 
1934                        "Unexpected state for %s" % key)
1935        self.state_lock.release()
1936        return rv
1937
1938    def check_experiment_access(self, fid, key):
1939        """
1940        Confirm that the fid has access to the experiment.  Though a request
1941        may be made in terms of a local name, the access attribute is always
1942        the experiment's fedid.
1943        """
1944        if not isinstance(key, fedid):
1945            key = self.get_experiment_fedid(key)
1946
1947        if self.auth.check_attribute(fid, key):
1948            return True
1949        else:
1950            raise service_error(service_error.access, "Access Denied")
1951
1952
1953    def get_handler(self, path, fid):
1954        self.log.info("Get handler %s %s" % (path, fid))
1955        if self.auth.check_attribute(fid, path):
1956            return ("%s/%s" % (self.repodir, path), "application/binary")
1957        else:
1958            return (None, None)
1959
1960    def get_vtopo(self, req, fid):
1961        """
1962        Return the stored virtual topology for this experiment
1963        """
1964        rv = None
1965        state = None
1966
1967        req = req.get('VtopoRequestBody', None)
1968        if not req:
1969            raise service_error(service_error.req,
1970                    "Bad request format (no VtopoRequestBody)")
1971        exp = req.get('experiment', None)
1972        if exp:
1973            if exp.has_key('fedid'):
1974                key = exp['fedid']
1975                keytype = "fedid"
1976            elif exp.has_key('localname'):
1977                key = exp['localname']
1978                keytype = "localname"
1979            else:
1980                raise service_error(service_error.req, "Unknown lookup type")
1981        else:
1982            raise service_error(service_error.req, "No request?")
1983
1984        self.check_experiment_access(fid, key)
1985
1986        self.state_lock.acquire()
1987        if self.state.has_key(key):
1988            if self.state[key].has_key('vtopo'):
1989                rv = { 'experiment' : {keytype: key },\
1990                        'vtopo': self.state[key]['vtopo'],\
1991                    }
1992            else:
1993                state = self.state[key]['experimentStatus']
1994        self.state_lock.release()
1995
1996        if rv: return rv
1997        else: 
1998            if state:
1999                raise service_error(service_error.partial, 
2000                        "Not ready: %s" % state)
2001            else:
2002                raise service_error(service_error.req, "No such experiment")
2003
2004    def get_vis(self, req, fid):
2005        """
2006        Return the stored visualization for this experiment
2007        """
2008        rv = None
2009        state = None
2010
2011        req = req.get('VisRequestBody', None)
2012        if not req:
2013            raise service_error(service_error.req,
2014                    "Bad request format (no VisRequestBody)")
2015        exp = req.get('experiment', None)
2016        if exp:
2017            if exp.has_key('fedid'):
2018                key = exp['fedid']
2019                keytype = "fedid"
2020            elif exp.has_key('localname'):
2021                key = exp['localname']
2022                keytype = "localname"
2023            else:
2024                raise service_error(service_error.req, "Unknown lookup type")
2025        else:
2026            raise service_error(service_error.req, "No request?")
2027
2028        self.check_experiment_access(fid, key)
2029
2030        self.state_lock.acquire()
2031        if self.state.has_key(key):
2032            if self.state[key].has_key('vis'):
2033                rv =  { 'experiment' : {keytype: key },\
2034                        'vis': self.state[key]['vis'],\
2035                        }
2036            else:
2037                state = self.state[key]['experimentStatus']
2038        self.state_lock.release()
2039
2040        if rv: return rv
2041        else:
2042            if state:
2043                raise service_error(service_error.partial, 
2044                        "Not ready: %s" % state)
2045            else:
2046                raise service_error(service_error.req, "No such experiment")
2047
2048    def clean_info_response(self, rv):
2049        """
2050        Remove the information in the experiment's state object that is not in
2051        the info response.
2052        """
2053        # Remove the owner info (should always be there, but...)
2054        if rv.has_key('owner'): del rv['owner']
2055
2056        # Convert the log into the allocationLog parameter and remove the
2057        # log entry (with defensive programming)
2058        if rv.has_key('log'):
2059            rv['allocationLog'] = "".join(rv['log'])
2060            del rv['log']
2061        else:
2062            rv['allocationLog'] = ""
2063
2064        if rv['experimentStatus'] != 'active':
2065            if rv.has_key('federant'): del rv['federant']
2066        else:
2067            # remove the allocationID and uri info from each federant
2068            for f in rv.get('federant', []):
2069                if f.has_key('allocID'): del f['allocID']
2070                if f.has_key('uri'): del f['uri']
2071
2072        return rv
2073
2074    def get_info(self, req, fid):
2075        """
2076        Return all the stored info about this experiment
2077        """
2078        rv = None
2079
2080        req = req.get('InfoRequestBody', None)
2081        if not req:
2082            raise service_error(service_error.req,
2083                    "Bad request format (no InfoRequestBody)")
2084        exp = req.get('experiment', None)
2085        if exp:
2086            if exp.has_key('fedid'):
2087                key = exp['fedid']
2088                keytype = "fedid"
2089            elif exp.has_key('localname'):
2090                key = exp['localname']
2091                keytype = "localname"
2092            else:
2093                raise service_error(service_error.req, "Unknown lookup type")
2094        else:
2095            raise service_error(service_error.req, "No request?")
2096
2097        self.check_experiment_access(fid, key)
2098
2099        # The state may be massaged by the service function that called
2100        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2101        # state.
2102        self.state_lock.acquire()
2103        if self.state.has_key(key):
2104            rv = copy.deepcopy(self.state[key])
2105        self.state_lock.release()
2106
2107        if rv:
2108            return self.clean_info_response(rv)
2109        else:
2110            raise service_error(service_error.req, "No such experiment")
2111
2112    def get_multi_info(self, req, fid):
2113        """
2114        Return all the stored info that this fedid can access
2115        """
2116        rv = { 'info': [ ] }
2117
2118        self.state_lock.acquire()
2119        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2120            try:
2121                self.check_experiment_access(fid, key)
2122            except service_error, e:
2123                if e.code == service_error.access:
2124                    continue
2125                else:
2126                    self.state_lock.release()
2127                    raise e
2128
2129            if self.state.has_key(key):
2130                e = copy.deepcopy(self.state[key])
2131                e = self.clean_info_response(e)
2132                rv['info'].append(e)
2133        self.state_lock.release()
2134        return rv
2135
2136    def terminate_experiment(self, req, fid):
2137        """
2138        Swap this experiment out on the federants and delete the shared
2139        information
2140        """
2141        tbparams = { }
2142        req = req.get('TerminateRequestBody', None)
2143        if not req:
2144            raise service_error(service_error.req,
2145                    "Bad request format (no TerminateRequestBody)")
2146        force = req.get('force', False)
2147        exp = req.get('experiment', None)
2148        if exp:
2149            if exp.has_key('fedid'):
2150                key = exp['fedid']
2151                keytype = "fedid"
2152            elif exp.has_key('localname'):
2153                key = exp['localname']
2154                keytype = "localname"
2155            else:
2156                raise service_error(service_error.req, "Unknown lookup type")
2157        else:
2158            raise service_error(service_error.req, "No request?")
2159
2160        self.check_experiment_access(fid, key)
2161
2162        dealloc_list = [ ]
2163
2164
2165        # Create a logger that logs to the dealloc_list as well as to the main
2166        # log file.
2167        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2168        h = logging.StreamHandler(self.list_log(dealloc_list))
2169        # XXX: there should be a global one of these rather than repeating the
2170        # code.
2171        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2172                    '%d %b %y %H:%M:%S'))
2173        dealloc_log.addHandler(h)
2174
2175        self.state_lock.acquire()
2176        fed_exp = self.state.get(key, None)
2177
2178        if fed_exp:
2179            # This branch of the conditional holds the lock to generate a
2180            # consistent temporary tbparams variable to deallocate experiments.
2181            # It releases the lock to do the deallocations and reacquires it to
2182            # remove the experiment state when the termination is complete.
2183
2184            # First make sure that the experiment creation is complete.
2185            status = fed_exp.get('experimentStatus', None)
2186
2187            if status:
2188                if status in ('starting', 'terminating'):
2189                    if not force:
2190                        self.state_lock.release()
2191                        raise service_error(service_error.partial, 
2192                                'Experiment still being created or destroyed')
2193                    else:
2194                        self.log.warning('Experiment in %s state ' % status + \
2195                                'being terminated by force.')
2196            else:
2197                # No status??? trouble
2198                self.state_lock.release()
2199                raise service_error(service_error.internal,
2200                        "Experiment has no status!?")
2201
2202            ids = []
2203            #  experimentID is a list of dicts that are self-describing
2204            #  identifiers.  This finds all the fedids and localnames - the
2205            #  keys of self.state - and puts them into ids.
2206            for id in fed_exp.get('experimentID', []):
2207                if id.has_key('fedid'): ids.append(id['fedid'])
2208                if id.has_key('localname'): ids.append(id['localname'])
2209
2210            # Collect the allocation/segment ids into a dict keyed by the fedid
2211            # of the allocation (or a monotonically increasing integer) that
2212            # contains a tuple of uri, aid (which is a dict...)
2213            for i, fed in enumerate(fed_exp.get('federant', [])):
2214                try:
2215                    uri = fed['uri']
2216                    aid = fed['allocID']
2217                    k = fed['allocID'].get('fedid', i)
2218                except KeyError, e:
2219                    continue
2220                tbparams[k] = (uri, aid)
2221            fed_exp['experimentStatus'] = 'terminating'
2222            if self.state_filename: self.write_state()
2223            self.state_lock.release()
2224
2225            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2226            # then completes, so we can't wait if nothing starts.  So, no
2227            # tbparams, no start.
2228            if len(tbparams) > 0:
2229                thread_pool = self.thread_pool(self.nthreads)
2230                for k in tbparams.keys():
2231                    # Create and start a thread to stop the segment
2232                    thread_pool.wait_for_slot()
2233                    uri, aid = tbparams[k]
2234                    t  = self.pooled_thread(\
2235                            target=self.terminate_segment(log=dealloc_log,
2236                                testbed=uri,
2237                                cert_file=self.cert_file, 
2238                                cert_pwd=self.cert_pwd,
2239                                trusted_certs=self.trusted_certs,
2240                                caller=self.call_TerminateSegment),
2241                            args=(uri, aid), name=k,
2242                            pdata=thread_pool, trace_file=self.trace_file)
2243                    t.start()
2244                # Wait for completions
2245                thread_pool.wait_for_all_done()
2246
2247            # release the allocations (failed experiments have done this
2248            # already, and starting experiments may be in odd states, so we
2249            # ignore errors releasing those allocations
2250            try: 
2251                for k in tbparams.keys():
2252                    # This releases access by uri
2253                    uri, aid = tbparams[k]
2254                    self.release_access(None, aid, uri=uri)
2255            except service_error, e:
2256                if status != 'failed' and not force:
2257                    raise e
2258
2259            # Remove the terminated experiment
2260            self.state_lock.acquire()
2261            for id in ids:
2262                if self.state.has_key(id): del self.state[id]
2263
2264            if self.state_filename: self.write_state()
2265            self.state_lock.release()
2266
2267            # Delete any synch points associated with this experiment.  All
2268            # synch points begin with the fedid of the experiment.
2269            fedid_keys = set(["fedid:%s" % f for f in ids \
2270                    if isinstance(f, fedid)])
2271            for k in self.synch_store.all_keys():
2272                try:
2273                    if len(k) > 45 and k[0:46] in fedid_keys:
2274                        self.synch_store.del_value(k)
2275                except synch_store.BadDeletionError:
2276                    pass
2277            self.write_store()
2278               
2279            return { 
2280                    'experiment': exp , 
2281                    'deallocationLog': "".join(dealloc_list),
2282                    }
2283        else:
2284            # Don't forget to release the lock
2285            self.state_lock.release()
2286            raise service_error(service_error.req, "No saved state")
2287
2288
2289    def GetValue(self, req, fid):
2290        """
2291        Get a value from the synchronized store
2292        """
2293        req = req.get('GetValueRequestBody', None)
2294        if not req:
2295            raise service_error(service_error.req,
2296                    "Bad request format (no GetValueRequestBody)")
2297       
2298        name = req['name']
2299        wait = req['wait']
2300        rv = { 'name': name }
2301
2302        if self.auth.check_attribute(fid, name):
2303            self.log.debug("[GetValue] asking for %s " % name)
2304            try:
2305                v = self.synch_store.get_value(name, wait)
2306            except synch_store.RevokedKeyError:
2307                # No more synch on this key
2308                raise service_error(service_error.federant, 
2309                        "Synch key %s revoked" % name)
2310            if v is not None:
2311                rv['value'] = v
2312            self.log.debug("[GetValue] got %s from %s" % (v, name))
2313            return rv
2314        else:
2315            raise service_error(service_error.access, "Access Denied")
2316       
2317
2318    def SetValue(self, req, fid):
2319        """
2320        Set a value in the synchronized store
2321        """
2322        req = req.get('SetValueRequestBody', None)
2323        if not req:
2324            raise service_error(service_error.req,
2325                    "Bad request format (no SetValueRequestBody)")
2326       
2327        name = req['name']
2328        v = req['value']
2329
2330        if self.auth.check_attribute(fid, name):
2331            try:
2332                self.synch_store.set_value(name, v)
2333                self.write_store()
2334                self.log.debug("[SetValue] set %s to %s" % (name, v))
2335            except synch_store.CollisionError:
2336                # Translate into a service_error
2337                raise service_error(service_error.req,
2338                        "Value already set: %s" %name)
2339            except synch_store.RevokedKeyError:
2340                # No more synch on this key
2341                raise service_error(service_error.federant, 
2342                        "Synch key %s revoked" % name)
2343            return { 'name': name, 'value': v }
2344        else:
2345            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.