source: fedd/federation/experiment_control.py @ 8cab4c2

compt_changes
Last change on this file since 8cab4c2 was 8cab4c2, checked in by Ted Faber <faber@…>, 12 years ago

More improved logging

  • Property mode set to 100644
File size: 95.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from deter import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info, federated_service
39from operation_status import operation_status
40
41from deter import topdl
42from deter import ip_allocator
43from deter import ip_addr
44import list_log
45
46
47class nullHandler(logging.Handler):
48    def emit(self, record): pass
49
50fl = logging.getLogger("fedd.experiment_control")
51fl.addHandler(nullHandler())
52
53class experiment_control_local(experiment_control_legacy):
54    """
55    Control of experiments that this system can directly access.
56
57    Includes experiment creation, termination and information dissemination.
58    Thred safe.
59    """
60
61    class ssh_cmd_timeout(RuntimeError): pass
62   
63    call_RequestAccess = service_caller('RequestAccess')
64    call_ReleaseAccess = service_caller('ReleaseAccess')
65    call_StartSegment = service_caller('StartSegment')
66    call_TerminateSegment = service_caller('TerminateSegment')
67    call_InfoSegment = service_caller('InfoSegment')
68    call_OperationSegment = service_caller('OperationSegment')
69    call_Ns2Topdl = service_caller('Ns2Topdl')
70
71    def __init__(self, config=None, auth=None):
72        """
73        Intialize the various attributes, most from the config object
74        """
75
76        def parse_tarfile_list(tf):
77            """
78            Parse a tarfile list from the configuration.  This is a set of
79            paths and tarfiles separated by spaces.
80            """
81            rv = [ ]
82            if tf is not None:
83                tl = tf.split()
84                while len(tl) > 1:
85                    p, t = tl[0:2]
86                    del tl[0:2]
87                    rv.append((p, t))
88            return rv
89
90        self.list_log = list_log.list_log
91
92        self.cert_file = config.get("experiment_control", "cert_file")
93        if self.cert_file:
94            self.cert_pwd = config.get("experiment_control", "cert_pwd")
95        else:
96            self.cert_file = config.get("globals", "cert_file")
97            self.cert_pwd = config.get("globals", "cert_pwd")
98
99        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
100                or config.get("globals", "trusted_certs")
101
102        self.repodir = config.get("experiment_control", "repodir")
103        self.repo_url = config.get("experiment_control", "repo_url", 
104                "https://users.isi.deterlab.net:23235");
105
106        self.exp_stem = "fed-stem"
107        self.log = logging.getLogger("fedd.experiment_control")
108        set_log_level(config, "experiment_control", self.log)
109        self.muxmax = 2
110        self.nthreads = 10
111        self.randomize_experiments = False
112
113        self.splitter = None
114        self.ssh_keygen = "/usr/bin/ssh-keygen"
115        self.ssh_identity_file = None
116
117
118        self.debug = config.getboolean("experiment_control", "create_debug")
119        self.cleanup = not config.getboolean("experiment_control", 
120                "leave_tmpfiles")
121        self.state_filename = config.get("experiment_control", 
122                "experiment_state")
123        self.store_filename = config.get("experiment_control", 
124                "synch_store")
125        self.store_url = config.get("experiment_control", "store_url")
126        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
127        self.fedkit = parse_tarfile_list(\
128                config.get("experiment_control", "fedkit"))
129        self.gatewaykit = parse_tarfile_list(\
130                config.get("experiment_control", "gatewaykit"))
131        accessdb_file = config.get("experiment_control", "accessdb")
132
133        dt = config.get("experiment_control", "direct_transit")
134        self.auth_type = config.get('experiment_control', 'auth_type') \
135                or 'legacy'
136        self.auth_dir = config.get('experiment_control', 'auth_dir')
137        # XXX: document this!
138        self.info_cache_limit = \
139                config.getint('experiment_control', 'info_cache', 600)
140        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
141        else: self.direct_transit = [ ]
142        # NB for internal master/slave ops, not experiment setup
143        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
144
145        self.overrides = set([])
146        ovr = config.get('experiment_control', 'overrides')
147        if ovr:
148            for o in ovr.split(","):
149                o = o.strip()
150                if o.startswith('fedid:'): o = o[len('fedid:'):]
151                self.overrides.add(fedid(hexstr=o))
152
153        self.state = { }
154        self.state_lock = Lock()
155        self.tclsh = "/usr/local/bin/otclsh"
156        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
157                config.get("experiment_control", "tcl_splitter",
158                        "/usr/testbed/lib/ns2ir/parse.tcl")
159        mapdb_file = config.get("experiment_control", "mapdb")
160        self.trace_file = sys.stderr
161
162        self.def_expstart = \
163                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
164                "/tmp/federate";
165        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
166                "FEDDIR/hosts";
167        self.def_gwstart = \
168                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
169                "/tmp/bridge.log";
170        self.def_mgwstart = \
171                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
172                "/tmp/bridge.log";
173        self.def_gwimage = "FBSD61-TUNNEL2";
174        self.def_gwtype = "pc";
175        self.local_access = { }
176
177        if self.auth_type == 'legacy':
178            if auth:
179                self.auth = auth
180            else:
181                self.log.error( "[access]: No authorizer initialized, " +\
182                        "creating local one.")
183                auth = authorizer()
184            self.get_access = self.legacy_get_access
185        elif self.auth_type == 'abac':
186            self.auth = abac_authorizer(load=self.auth_dir)
187        else:
188            raise service_error(service_error.internal, 
189                    "Unknown auth_type: %s" % self.auth_type)
190
191        if mapdb_file:
192            self.read_mapdb(mapdb_file)
193        else:
194            self.log.warn("[experiment_control] No testbed map, using defaults")
195            self.tbmap = { 
196                    'deter':'https://users.isi.deterlab.net:23235',
197                    'emulab':'https://users.isi.deterlab.net:23236',
198                    'ucb':'https://users.isi.deterlab.net:23237',
199                    }
200
201        if accessdb_file:
202                self.read_accessdb(accessdb_file)
203        else:
204            raise service_error(service_error.internal,
205                    "No accessdb specified in config")
206
207        # Grab saved state.  OK to do this w/o locking because it's read only
208        # and only one thread should be in existence that can see self.state at
209        # this point.
210        if self.state_filename:
211            self.read_state()
212
213        if self.store_filename:
214            self.read_store()
215        else:
216            self.log.warning("No saved synch store")
217            self.synch_store = synch_store
218
219        # Dispatch tables
220        self.soap_services = {\
221                'New': soap_handler('New', self.new_experiment),
222                'Create': soap_handler('Create', self.create_experiment),
223                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
224                'Vis': soap_handler('Vis', self.get_vis),
225                'Info': soap_handler('Info', self.get_info),
226                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
227                'Operation': soap_handler('Operation', self.do_operation),
228                'Terminate': soap_handler('Terminate', 
229                    self.terminate_experiment),
230                'GetValue': soap_handler('GetValue', self.GetValue),
231                'SetValue': soap_handler('SetValue', self.SetValue),
232        }
233
234        self.xmlrpc_services = {\
235                'New': xmlrpc_handler('New', self.new_experiment),
236                'Create': xmlrpc_handler('Create', self.create_experiment),
237                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
238                'Vis': xmlrpc_handler('Vis', self.get_vis),
239                'Info': xmlrpc_handler('Info', self.get_info),
240                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
241                'Terminate': xmlrpc_handler('Terminate',
242                    self.terminate_experiment),
243                'Operation': xmlrpc_handler('Operation', self.do_operation),
244                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
245                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
246        }
247
248    # Call while holding self.state_lock
249    def write_state(self):
250        """
251        Write a new copy of experiment state after copying the existing state
252        to a backup.
253
254        State format is a simple pickling of the state dictionary.
255        """
256        if os.access(self.state_filename, os.W_OK):
257            copy_file(self.state_filename, \
258                    "%s.bak" % self.state_filename)
259        try:
260            f = open(self.state_filename, 'w')
261            pickle.dump(self.state, f)
262        except EnvironmentError, e:
263            self.log.error("Can't write file %s: %s" % \
264                    (self.state_filename, e))
265        except pickle.PicklingError, e:
266            self.log.error("Pickling problem: %s" % e)
267        except TypeError, e:
268            self.log.error("Pickling problem (TypeError): %s" % e)
269
270    @staticmethod
271    def get_alloc_ids(exp):
272        """
273        Used by read_store and read state.  This used to be worse.
274        """
275
276        return [ a.allocID for a in exp.get_all_allocations() ]
277
278
279    # Call while holding self.state_lock
280    def read_state(self):
281        """
282        Read a new copy of experiment state.  Old state is overwritten.
283
284        State format is a simple pickling of the state dictionary.
285        """
286       
287        try:
288            f = open(self.state_filename, "r")
289            self.state = pickle.load(f)
290            self.log.debug("[read_state]: Read state from %s" % \
291                    self.state_filename)
292        except EnvironmentError, e:
293            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
294                    % (self.state_filename, e))
295        except pickle.UnpicklingError, e:
296            self.log.warning(("[read_state]: No saved state: " + \
297                    "Unpickling failed: %s") % e)
298       
299        for s in self.state.values():
300            try:
301
302                eid = s.fedid
303                if eid : 
304                    if self.auth_type == 'legacy':
305                        # XXX: legacy
306                        # Give the owner rights to the experiment
307                        #self.auth.set_attribute(s['owner'], eid)
308                        # And holders of the eid as well
309                        self.auth.set_attribute(eid, eid)
310                        # allow overrides to control experiments as well
311                        for o in self.overrides:
312                            self.auth.set_attribute(o, eid)
313                        # Set permissions to allow reading of the software
314                        # repo, if any, as well.
315                        for a in self.get_alloc_ids(s):
316                            self.auth.set_attribute(a, 'repo/%s' % eid)
317                else:
318                    raise KeyError("No experiment id")
319            except KeyError, e:
320                self.log.warning("[read_state]: State ownership or identity " +\
321                        "misformatted in %s: %s" % (self.state_filename, e))
322
323
324    def read_accessdb(self, accessdb_file):
325        """
326        Read the mapping from fedids that can create experiments to their name
327        in the 3-level access namespace.  All will be asserted from this
328        testbed and can include the local username and porject that will be
329        asserted on their behalf by this fedd.  Each fedid is also added to the
330        authorization system with the "create" attribute.
331        """
332        self.accessdb = {}
333        # These are the regexps for parsing the db
334        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
335        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
336                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
337        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
338                "\s*->\s*(" + name_expr + ")\s*$")
339        lineno = 0
340
341        # Parse the mappings and store in self.authdb, a dict of
342        # fedid -> (proj, user)
343        try:
344            f = open(accessdb_file, "r")
345            for line in f:
346                lineno += 1
347                line = line.strip()
348                if len(line) == 0 or line.startswith('#'):
349                    continue
350                m = project_line.match(line)
351                if m:
352                    fid = fedid(hexstr=m.group(1))
353                    project, user = m.group(2,3)
354                    if not self.accessdb.has_key(fid):
355                        self.accessdb[fid] = []
356                    self.accessdb[fid].append((project, user))
357                    continue
358
359                m = user_line.match(line)
360                if m:
361                    fid = fedid(hexstr=m.group(1))
362                    project = None
363                    user = m.group(2)
364                    if not self.accessdb.has_key(fid):
365                        self.accessdb[fid] = []
366                    self.accessdb[fid].append((project, user))
367                    continue
368                self.log.warn("[experiment_control] Error parsing access " +\
369                        "db %s at line %d" %  (accessdb_file, lineno))
370        except EnvironmentError:
371            raise service_error(service_error.internal,
372                    ("Error opening/reading %s as experiment " +\
373                            "control accessdb") %  accessdb_file)
374        f.close()
375
376        # Initialize the authorization attributes
377        # XXX: legacy
378        if self.auth_type == 'legacy':
379            for fid in self.accessdb.keys():
380                self.auth.set_attribute(fid, 'create')
381                self.auth.set_attribute(fid, 'new')
382
383    def read_mapdb(self, file):
384        """
385        Read a simple colon separated list of mappings for the
386        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
387        also adds testbeds to active if they include , active after
388        their name.
389        """
390
391        self.tbmap = { }
392        self.tbactive = set()
393        lineno =0
394        try:
395            f = open(file, "r")
396            for line in f:
397                lineno += 1
398                line = line.strip()
399                if line.startswith('#') or len(line) == 0:
400                    continue
401                try:
402                    label, url = line.split(':', 1)
403                    if ',' in label:
404                        label, act = label.split(',', 1)
405                        active = (act.strip() == 'active')
406                    else:
407                        active = False
408                    self.tbmap[label] = url
409                    if active: self.tbactive.add(label)
410                except ValueError, e:
411                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
412                            "map db: %s %s" % (lineno, line, e))
413        except EnvironmentError, e:
414            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
415                    "open %s: %s" % (file, e))
416        else:
417            f.close()
418
419    def read_store(self):
420        try:
421            self.synch_store = synch_store()
422            self.synch_store.load(self.store_filename)
423            self.log.debug("[read_store]: Read store from %s" % \
424                    self.store_filename)
425        except EnvironmentError, e:
426            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
427                    % (self.state_filename, e))
428            self.synch_store = synch_store()
429
430        # Set the initial permissions on data in the store.  XXX: This ad hoc
431        # authorization attribute initialization is getting out of hand.
432        # XXX: legacy
433        if self.auth_type == 'legacy':
434            for k in self.synch_store.all_keys():
435                try:
436                    if k.startswith('fedid:'):
437                        fid = fedid(hexstr=k[6:46])
438                        if self.state.has_key(fid):
439                            for a in self.get_alloc_ids(self.state[fid]):
440                                self.auth.set_attribute(a, k)
441                except ValueError, e:
442                    self.log.warn("Cannot deduce permissions for %s" % k)
443
444
445    def write_store(self):
446        """
447        Write a new copy of synch_store after writing current state
448        to a backup.  We use the internal synch_store pickle method to avoid
449        incinsistent data.
450
451        State format is a simple pickling of the store.
452        """
453        if os.access(self.store_filename, os.W_OK):
454            copy_file(self.store_filename, \
455                    "%s.bak" % self.store_filename)
456        try:
457            self.synch_store.save(self.store_filename)
458        except EnvironmentError, e:
459            self.log.error("Can't write file %s: %s" % \
460                    (self.store_filename, e))
461        except TypeError, e:
462            self.log.error("Pickling problem (TypeError): %s" % e)
463
464
465    def remove_dirs(self, dir):
466        """
467        Remove the directory tree and all files rooted at dir.  Log any errors,
468        but continue.
469        """
470        self.log.debug("[removedirs]: removing %s" % dir)
471        try:
472            for path, dirs, files in os.walk(dir, topdown=False):
473                for f in files:
474                    os.remove(os.path.join(path, f))
475                for d in dirs:
476                    os.rmdir(os.path.join(path, d))
477            os.rmdir(dir)
478        except EnvironmentError, e:
479            self.log.error("Error deleting directory tree in %s" % e);
480
481    @staticmethod
482    def make_temp_certfile(expcert, tmpdir):
483        """
484        make a protected copy of the access certificate so the experiment
485        controller can act as the experiment principal.  mkstemp is the most
486        secure way to do that. The directory should be created by
487        mkdtemp.  Return the filename.
488        """
489        if expcert and tmpdir:
490            try:
491                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
492                f = os.fdopen(certf, 'w')
493                print >> f, expcert
494                f.close()
495            except EnvironmentError, e:
496                raise service_error(service_error.internal, 
497                        "Cannot create temp cert file?")
498            return certfn
499        else:
500            return None
501
502       
503    def generate_ssh_keys(self, dest, type="rsa" ):
504        """
505        Generate a set of keys for the gateways to use to talk.
506
507        Keys are of type type and are stored in the required dest file.
508        """
509        valid_types = ("rsa", "dsa")
510        t = type.lower();
511        if t not in valid_types: raise ValueError
512        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
513
514        try:
515            trace = open("/dev/null", "w")
516        except EnvironmentError:
517            raise service_error(service_error.internal,
518                    "Cannot open /dev/null??");
519
520        # May raise CalledProcessError
521        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
522        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
523        if rv != 0:
524            raise service_error(service_error.internal, 
525                    "Cannot generate nonce ssh keys.  %s return code %d" \
526                            % (self.ssh_keygen, rv))
527
528    def generate_seer_certs(self, destdir):
529        '''
530        Create a SEER ca cert and a node cert in destdir/ca.pem and
531        destdir/node.pem respectively.  These will be distributed throughout
532        the federated experiment.  This routine reports errors via
533        service_errors.
534        '''
535        openssl = '/usr/bin/openssl'
536        # All the filenames and parameters we need for openssl calls below
537        ca_key =os.path.join(destdir, 'ca.key') 
538        ca_pem = os.path.join(destdir, 'ca.pem')
539        node_key =os.path.join(destdir, 'node.key') 
540        node_pem = os.path.join(destdir, 'node.pem')
541        node_req = os.path.join(destdir, 'node.req')
542        node_signed = os.path.join(destdir, 'node.signed')
543        days = '%s' % (365 * 10)
544        serial = '%s' % random.randint(0, 1<<16)
545
546        try:
547            # Sequence of calls to create a CA key, create a ca cert, create a
548            # node key, node signing request, and finally a signed node
549            # certificate.
550            sequence = (
551                    (openssl, 'genrsa', '-out', ca_key, '1024'),
552                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
553                        ca_pem, '-days', days, '-subj', 
554                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
555                    (openssl, 'genrsa', '-out', node_key, '1024'),
556                    (openssl, 'req', '-new', '-key', node_key, '-out', 
557                        node_req, '-days', days, '-subj', 
558                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
559                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
560                        '-set_serial', serial, '-req', '-in', node_req, 
561                        '-out', node_signed, '-days', days),
562                )
563            # Do all that stuff; bail if there's an error, and push all the
564            # output to dev/null.
565            for cmd in sequence:
566                trace = open("/dev/null", "w")
567                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
568                if rv != 0:
569                    raise service_error(service_error.internal, 
570                            "Cannot generate SEER certs.  %s return code %d" \
571                                    % (' '.join(cmd), rv))
572            # Concatinate the node key and signed certificate into node.pem
573            f = open(node_pem, 'w')
574            for comp in (node_signed, node_key):
575                g = open(comp, 'r')
576                f.write(g.read())
577                g.close()
578            f.close()
579
580            # Throw out intermediaries.
581            for fn in (ca_key, node_key, node_req, node_signed):
582                os.unlink(fn)
583
584        except EnvironmentError, e:
585            # Any difficulties with the file system wind up here
586            raise service_error(service_error.internal,
587                    "File error on  %s while creating SEER certs: %s" % \
588                            (e.filename, e.strerror))
589
590
591
592    def gentopo(self, str):
593        """
594        Generate the topology data structure from the splitter's XML
595        representation of it.
596
597        The topology XML looks like:
598            <experiment>
599                <nodes>
600                    <node><vname></vname><ips>ip1:ip2</ips></node>
601                </nodes>
602                <lans>
603                    <lan>
604                        <vname></vname><vnode></vnode><ip></ip>
605                        <bandwidth></bandwidth><member>node:port</member>
606                    </lan>
607                </lans>
608        """
609        class topo_parse:
610            """
611            Parse the topology XML and create the dats structure.
612            """
613            def __init__(self):
614                # Typing of the subelements for data conversion
615                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
616                self.int_subelements = ( 'bandwidth',)
617                self.float_subelements = ( 'delay',)
618                # The final data structure
619                self.nodes = [ ]
620                self.lans =  [ ]
621                self.topo = { \
622                        'node': self.nodes,\
623                        'lan' : self.lans,\
624                    }
625                self.element = { }  # Current element being created
626                self.chars = ""     # Last text seen
627
628            def end_element(self, name):
629                # After each sub element the contents is added to the current
630                # element or to the appropriate list.
631                if name == 'node':
632                    self.nodes.append(self.element)
633                    self.element = { }
634                elif name == 'lan':
635                    self.lans.append(self.element)
636                    self.element = { }
637                elif name in self.str_subelements:
638                    self.element[name] = self.chars
639                    self.chars = ""
640                elif name in self.int_subelements:
641                    self.element[name] = int(self.chars)
642                    self.chars = ""
643                elif name in self.float_subelements:
644                    self.element[name] = float(self.chars)
645                    self.chars = ""
646
647            def found_chars(self, data):
648                self.chars += data.rstrip()
649
650
651        tp = topo_parse();
652        parser = xml.parsers.expat.ParserCreate()
653        parser.EndElementHandler = tp.end_element
654        parser.CharacterDataHandler = tp.found_chars
655
656        parser.Parse(str)
657
658        return tp.topo
659       
660
661    def genviz(self, topo):
662        """
663        Generate the visualization the virtual topology
664        """
665
666        neato = "/usr/local/bin/neato"
667        # These are used to parse neato output and to create the visualization
668        # file.
669        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
670        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
671                "%s</type></node>"
672
673        try:
674            # Node names
675            nodes = [ n['vname'] for n in topo['node'] ]
676            topo_lans = topo['lan']
677        except KeyError, e:
678            raise service_error(service_error.internal, "Bad topology: %s" %e)
679
680        lans = { }
681        links = { }
682
683        # Walk through the virtual topology, organizing the connections into
684        # 2-node connections (links) and more-than-2-node connections (lans).
685        # When a lan is created, it's added to the list of nodes (there's a
686        # node in the visualization for the lan).
687        for l in topo_lans:
688            if links.has_key(l['vname']):
689                if len(links[l['vname']]) < 2:
690                    links[l['vname']].append(l['vnode'])
691                else:
692                    nodes.append(l['vname'])
693                    lans[l['vname']] = links[l['vname']]
694                    del links[l['vname']]
695                    lans[l['vname']].append(l['vnode'])
696            elif lans.has_key(l['vname']):
697                lans[l['vname']].append(l['vnode'])
698            else:
699                links[l['vname']] = [ l['vnode'] ]
700
701
702        # Open up a temporary file for dot to turn into a visualization
703        try:
704            df, dotname = tempfile.mkstemp()
705            dotfile = os.fdopen(df, 'w')
706        except EnvironmentError:
707            raise service_error(service_error.internal,
708                    "Failed to open file in genviz")
709
710        try:
711            dnull = open('/dev/null', 'w')
712        except EnvironmentError:
713            service_error(service_error.internal,
714                    "Failed to open /dev/null in genviz")
715
716        # Generate a dot/neato input file from the links, nodes and lans
717        try:
718            print >>dotfile, "graph G {"
719            for n in nodes:
720                print >>dotfile, '\t"%s"' % n
721            for l in links.keys():
722                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
723            for l in lans.keys():
724                for n in lans[l]:
725                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
726            print >>dotfile, "}"
727            dotfile.close()
728        except TypeError:
729            raise service_error(service_error.internal,
730                    "Single endpoint link in vtopo")
731        except EnvironmentError:
732            raise service_error(service_error.internal, "Cannot write dot file")
733
734        # Use dot to create a visualization
735        try:
736            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
737                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
738                stderr=dnull, close_fds=True)
739        except EnvironmentError:
740            raise service_error(service_error.internal, 
741                    "Cannot generate visualization: is graphviz available?")
742        dnull.close()
743
744        # Translate dot to vis format
745        vis_nodes = [ ]
746        vis = { 'node': vis_nodes }
747        for line in dot.stdout:
748            m = vis_re.match(line)
749            if m:
750                vn = m.group(1)
751                vis_node = {'name': vn, \
752                        'x': float(m.group(2)),\
753                        'y' : float(m.group(3)),\
754                    }
755                if vn in links.keys() or vn in lans.keys():
756                    vis_node['type'] = 'lan'
757                else:
758                    vis_node['type'] = 'node'
759                vis_nodes.append(vis_node)
760        rv = dot.wait()
761
762        os.remove(dotname)
763        # XXX: graphviz seems to use low return codes for warnings, like
764        # "couldn't find font"
765        if rv < 2 : return vis
766        else: return None
767
768
769    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
770            cert_pwd=None):
771        """
772        Release access to testbed through fedd
773        """
774
775        if not uri and tbmap:
776            uri = tbmap.get(tb, None)
777        if not uri:
778            raise service_error(service_error.server_config, 
779                    "Unknown testbed: %s" % tb)
780
781        if self.local_access.has_key(uri):
782            resp = self.local_access[uri].ReleaseAccess(\
783                    { 'ReleaseAccessRequestBody' : 
784                        {'allocID': {'fedid': aid}},}, 
785                    fedid(file=cert_file))
786            resp = { 'ReleaseAccessResponseBody': resp } 
787        else:
788            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
789                    cert_file, cert_pwd, self.trusted_certs)
790
791        # better error coding
792
793    def remote_ns2topdl(self, uri, desc):
794
795        req = {
796                'description' : { 'ns2description': desc },
797            }
798
799        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
800                self.trusted_certs)
801
802        if r.has_key('Ns2TopdlResponseBody'):
803            r = r['Ns2TopdlResponseBody']
804            ed = r.get('experimentdescription', None)
805            if ed.has_key('topdldescription'):
806                return topdl.Topology(**ed['topdldescription'])
807            else:
808                raise service_error(service_error.protocol, 
809                        "Bad splitter response (no output)")
810        else:
811            raise service_error(service_error.protocol, "Bad splitter response")
812
813    class start_segment:
814        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
815                cert_pwd=None, trusted_certs=None, caller=None,
816                log_collector=None):
817            self.log = log
818            self.debug = debug
819            self.cert_file = cert_file
820            self.cert_pwd = cert_pwd
821            self.trusted_certs = None
822            self.caller = caller
823            self.testbed = testbed
824            self.log_collector = log_collector
825            self.response = None
826            self.node = { }
827            self.proof = None
828
829        def make_map(self, resp):
830            if 'segmentdescription' not in resp  or \
831                    'topdldescription' not in resp['segmentdescription']:
832                self.log.warn('No topology returned from startsegment')
833                return 
834
835            top = topdl.Topology(
836                    **resp['segmentdescription']['topdldescription'])
837
838            for e in top.elements:
839                if isinstance(e, topdl.Computer):
840                    self.node[e.name] = e
841
842        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
843            req = {
844                    'allocID': { 'fedid' : aid }, 
845                    'segmentdescription': { 
846                        'topdldescription': topo.to_dict(),
847                    },
848                }
849
850            if connInfo:
851                req['connection'] = connInfo
852
853            import_svcs = [ s for m in masters.values() \
854                    for s in m if self.testbed in s.importers]
855
856            if import_svcs or self.testbed in masters:
857                req['service'] = []
858
859            for s in import_svcs:
860                for r in s.reqs:
861                    sr = copy.deepcopy(r)
862                    sr['visibility'] = 'import';
863                    req['service'].append(sr)
864
865            for s in masters.get(self.testbed, []):
866                for r in s.reqs:
867                    sr = copy.deepcopy(r)
868                    sr['visibility'] = 'export';
869                    req['service'].append(sr)
870
871            if attrs:
872                req['fedAttr'] = attrs
873
874            try:
875                self.log.debug("Calling StartSegment at %s " % uri)
876                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
877                        self.trusted_certs)
878                if r.has_key('StartSegmentResponseBody'):
879                    lval = r['StartSegmentResponseBody'].get('allocationLog',
880                            None)
881                    if lval and self.log_collector:
882                        for line in  lval.splitlines(True):
883                            self.log_collector.write(line)
884                    self.make_map(r['StartSegmentResponseBody'])
885                    if 'proof' in r: self.proof = r['proof']
886                    self.response = r
887                else:
888                    raise service_error(service_error.internal, 
889                            "Bad response!?: %s" %r)
890                return True
891            except service_error, e:
892                self.log.error("Start segment failed on %s: %s" % \
893                        (self.testbed, e))
894                return False
895
896
897
898    class terminate_segment:
899        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
900                cert_pwd=None, trusted_certs=None, caller=None):
901            self.log = log
902            self.debug = debug
903            self.cert_file = cert_file
904            self.cert_pwd = cert_pwd
905            self.trusted_certs = None
906            self.caller = caller
907            self.testbed = testbed
908
909        def __call__(self, uri, aid ):
910            req = {
911                    'allocID': {'fedid': aid }, 
912                }
913            self.log.info("Calling terminate segment")
914            try:
915                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
916                        self.trusted_certs)
917                self.log.info("Terminate segment succeeded")
918                return True
919            except service_error, e:
920                self.log.error("Terminate segment failed on %s: %s" % \
921                        (self.testbed, e))
922                return False
923
924    class info_segment(start_segment):
925        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
926                cert_pwd=None, trusted_certs=None, caller=None,
927                log_collector=None):
928            experiment_control_local.start_segment.__init__(self, debug, 
929                    log, testbed, cert_file, cert_pwd, trusted_certs, 
930                    caller, log_collector)
931
932        def __call__(self, uri, aid):
933            req = { 'allocID': { 'fedid' : aid } }
934
935            try:
936                self.log.debug("Calling InfoSegment at %s " % uri)
937                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
938                        self.trusted_certs)
939                if r.has_key('InfoSegmentResponseBody'):
940                    self.make_map(r['InfoSegmentResponseBody'])
941                    if 'proof' in r: self.proof = r['proof']
942                    self.response = r
943                else:
944                    raise service_error(service_error.internal, 
945                            "Bad response!?: %s" %r)
946                return True
947            except service_error, e:
948                self.log.error("Info segment failed on %s: %s" % \
949                        (self.testbed, e))
950                return False
951
952    class operation_segment:
953        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
954                cert_pwd=None, trusted_certs=None, caller=None,
955                log_collector=None):
956            self.log = log
957            self.debug = debug
958            self.cert_file = cert_file
959            self.cert_pwd = cert_pwd
960            self.trusted_certs = None
961            self.caller = caller
962            self.testbed = testbed
963            self.status = None
964
965        def __call__(self, uri, aid, op, targets, params):
966            req = { 
967                    'allocID': { 'fedid' : aid },
968                    'operation': op,
969                    'target': targets,
970                    }
971            if params: req['parameter'] = params
972
973
974            try:
975                self.log.debug("Calling OperationSegment at %s " % uri)
976                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
977                        self.trusted_certs)
978                if 'OperationSegmentResponseBody' in r:
979                    r = r['OperationSegmentResponseBody']
980                    if 'status' in r:
981                        self.status = r['status']
982                else:
983                    raise service_error(service_error.internal, 
984                            "Bad response!?: %s" %r)
985                return True
986            except service_error, e:
987                self.log.error("Operation segment failed on %s: %s" % \
988                        (self.testbed, e))
989                return False
990
991    def annotate_topology(self, top, data):
992        # These routines do various parts of the annotation
993        def add_new_names(nl, l):
994            """ add any names in nl to the list in l """
995            for n in nl:
996                if n not in l: l.append(n)
997       
998        def merge_services(ne, e):
999            for ns in ne.service:
1000                # NB: the else is on the for
1001                for s in e.service:
1002                    if ns.name == s.name:
1003                        s.importer = ns.importer
1004                        s.param = ns.param
1005                        s.description = ns.description
1006                        s.status = ns.status
1007                        break
1008                else:
1009                    e.service.append(ns)
1010       
1011        def merge_oses(ne, e):
1012            """
1013            Merge the operating system entries of ne into e
1014            """
1015            for nos in ne.os:
1016                # NB: the else is on the for
1017                for os in e.os:
1018                    if nos.name == os.name:
1019                        os.version = nos.version
1020                        os.version = nos.distribution
1021                        os.version = nos.distributionversion
1022                        for a in nos.attribute:
1023                            if os.get_attribute(a.attribute):
1024                                os.remove_attribute(a.attribute)
1025                            os.set_attribute(a.attribute, a.value)
1026                        break
1027                else:
1028                    # If both nodes have one OS, this is a replacement
1029                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
1030                    else: e.os.append(nos)
1031
1032        # Annotate the topology with embedding info
1033        for e in top.elements:
1034            if isinstance(e, topdl.Computer):
1035                for s in data:
1036                    ne = s.node.get(e.name, None)
1037                    if ne is not None:
1038                        add_new_names(ne.localname, e.localname)
1039                        e.status = ne.status
1040                        merge_services(ne, e)
1041                        add_new_names(ne.operation, e.operation)
1042                        if ne.os: merge_oses(ne, e)
1043                        break
1044
1045
1046
1047    def allocate_resources(self, allocated, masters, eid, expid, 
1048            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1049            attrs=None, connInfo={}, tbmap=None, expcert=None):
1050
1051        started = { }           # Testbeds where a sub-experiment started
1052                                # successfully
1053
1054        # XXX
1055        fail_soft = False
1056
1057        if tbmap is None: tbmap = { }
1058
1059        log = alloc_log or self.log
1060
1061        tp = thread_pool(self.nthreads)
1062        threads = [ ]
1063        starters = [ ]
1064
1065        if expcert:
1066            cert = expcert
1067            pw = None
1068        else:
1069            cert = self.cert_file
1070            pw = self.cert_pwd
1071
1072        for tb in allocated.keys():
1073            # Create and start a thread to start the segment, and save it
1074            # to get the return value later
1075            tb_attrs = copy.copy(attrs)
1076            tp.wait_for_slot()
1077            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1078            base, suffix = split_testbed(tb)
1079            if suffix:
1080                tb_attrs.append({'attribute': 'experiment_name', 
1081                    'value': "%s-%s" % (eid, suffix)})
1082            else:
1083                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1084            if not uri:
1085                raise service_error(service_error.internal, 
1086                        "Unknown testbed %s !?" % tb)
1087
1088            aid = tbparams[tb].allocID
1089            if not aid:
1090                raise service_error(service_error.internal, 
1091                        "No alloc id for testbed %s !?" % tb)
1092
1093            s = self.start_segment(log=log, debug=self.debug,
1094                    testbed=tb, cert_file=cert,
1095                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1096                    caller=self.call_StartSegment,
1097                    log_collector=log_collector)
1098            starters.append(s)
1099            t  = pooled_thread(\
1100                    target=s, name=tb,
1101                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1102                    pdata=tp, trace_file=self.trace_file)
1103            threads.append(t)
1104            t.start()
1105
1106        # Wait until all finish (keep pinging the log, though)
1107        mins = 0
1108        revoked = False
1109        while not tp.wait_for_all_done(60.0):
1110            mins += 1
1111            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1112                    % mins)
1113            if not revoked and \
1114                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1115                # a testbed has failed.  Revoke this experiment's
1116                # synchronizarion values so that sub experiments will not
1117                # deadlock waiting for synchronization that will never happen
1118                self.log.info("A subexperiment has failed to swap in, " + \
1119                        "revoking synch keys")
1120                var_key = "fedid:%s" % expid
1121                for k in self.synch_store.all_keys():
1122                    if len(k) > 45 and k[0:46] == var_key:
1123                        self.synch_store.revoke_key(k)
1124                revoked = True
1125
1126        failed = [ t.getName() for t in threads if not t.rv ]
1127        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1128
1129        # If one failed clean up, unless fail_soft is set
1130        if failed:
1131            if not fail_soft:
1132                tp.clear()
1133                for tb in succeeded:
1134                    # Create and start a thread to stop the segment
1135                    tp.wait_for_slot()
1136                    uri = tbparams[tb].uri
1137                    t  = pooled_thread(\
1138                            target=self.terminate_segment(log=log,
1139                                testbed=tb,
1140                                cert_file=cert, 
1141                                cert_pwd=pw,
1142                                trusted_certs=self.trusted_certs,
1143                                caller=self.call_TerminateSegment),
1144                            args=(uri, tbparams[tb].allocID),
1145                            name=tb,
1146                            pdata=tp, trace_file=self.trace_file)
1147                    t.start()
1148                # Wait until all finish (if any are being stopped)
1149                if succeeded:
1150                    tp.wait_for_all_done()
1151
1152                # release the allocations
1153                for tb in tbparams.keys():
1154                    try:
1155                        self.release_access(tb, tbparams[tb].allocID, 
1156                                tbmap=tbmap, uri=tbparams[tb].uri,
1157                                cert_file=cert, cert_pwd=pw)
1158                    except service_error, e:
1159                        self.log.warn("Error releasing access: %s" % e.desc)
1160                # Remove the placeholder
1161                self.state_lock.acquire()
1162                self.state[eid].status = 'failed'
1163                self.state[eid].updated()
1164                if self.state_filename: self.write_state()
1165                self.state_lock.release()
1166                # Remove the repo dir
1167                self.remove_dirs("%s/%s" %(self.repodir, expid))
1168                # Walk up tmpdir, deleting as we go
1169                if self.cleanup:
1170                    self.remove_dirs(tmpdir)
1171                else:
1172                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1173
1174
1175                log.error("Swap in failed on %s" % ",".join(failed))
1176                return
1177        else:
1178            # Walk through the successes and gather the proofs
1179            proofs = { }
1180            for s in starters:
1181                if s.proof:
1182                    proofs[s.testbed] = s.proof
1183            self.annotate_topology(top, starters)
1184            log.info("[start_segment]: Experiment %s active" % eid)
1185
1186
1187        # Walk up tmpdir, deleting as we go
1188        if self.cleanup:
1189            self.remove_dirs(tmpdir)
1190        else:
1191            log.debug("[start_experiment]: not removing %s" % tmpdir)
1192
1193        # Insert the experiment into our state and update the disk copy.
1194        self.state_lock.acquire()
1195        self.state[expid].status = 'active'
1196        self.state[eid] = self.state[expid]
1197        self.state[eid].top = top
1198        self.state[eid].updated()
1199        # Append startup proofs
1200        for f in self.state[eid].get_all_allocations():
1201            if f.tb in proofs:
1202                f.proof.append(proofs[f.tb])
1203
1204        if self.state_filename: self.write_state()
1205        self.state_lock.release()
1206        return
1207
1208
1209    def add_kit(self, e, kit):
1210        """
1211        Add a Software object created from the list of (install, location)
1212        tuples passed as kit  to the software attribute of an object e.  We
1213        do this enough to break out the code, but it's kind of a hack to
1214        avoid changing the old tuple rep.
1215        """
1216
1217        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1218
1219        if isinstance(e.software, list): e.software.extend(s)
1220        else: e.software = s
1221
1222    def append_experiment_authorization(self, expid, attrs, 
1223            need_state_lock=True):
1224        """
1225        Append the authorization information to system state
1226        """
1227
1228        for p, a in attrs:
1229            self.auth.set_attribute(p, a)
1230        self.auth.save()
1231
1232        if need_state_lock: self.state_lock.acquire()
1233        # XXX: really a no op?
1234        #self.state[expid]['auth'].update(attrs)
1235        if self.state_filename: self.write_state()
1236        if need_state_lock: self.state_lock.release()
1237
1238    def clear_experiment_authorization(self, expid, need_state_lock=True):
1239        """
1240        Attrs is a set of attribute principal pairs that need to be removed
1241        from the authenticator.  Remove them and save the authenticator.
1242        """
1243
1244        if need_state_lock: self.state_lock.acquire()
1245        # XXX: should be a no-op
1246        #if expid in self.state and 'auth' in self.state[expid]:
1247            #for p, a in self.state[expid]['auth']:
1248                #self.auth.unset_attribute(p, a)
1249            #self.state[expid]['auth'] = set()
1250        if self.state_filename: self.write_state()
1251        if need_state_lock: self.state_lock.release()
1252        self.auth.save()
1253
1254
1255    def create_experiment_state(self, fid, req, expid, expcert,
1256            state='starting'):
1257        """
1258        Create the initial entry in the experiment's state.  The expid and
1259        expcert are the experiment's fedid and certifacte that represents that
1260        ID, which are installed in the experiment state.  If the request
1261        includes a suggested local name that is used if possible.  If the local
1262        name is already taken by an experiment owned by this user that has
1263        failed, it is overwritten.  Otherwise new letters are added until a
1264        valid localname is found.  The generated local name is returned.
1265        """
1266
1267        if req.has_key('experimentID') and \
1268                req['experimentID'].has_key('localname'):
1269            overwrite = False
1270            eid = req['experimentID']['localname']
1271            # If there's an old failed experiment here with the same local name
1272            # and accessible by this user, we'll overwrite it, otherwise we'll
1273            # fall through and do the collision avoidance.
1274            old_expid = self.get_experiment_fedid(eid)
1275            if old_expid:
1276                users_experiment = True
1277                try:
1278                    self.check_experiment_access(fid, old_expid)
1279                except service_error, e:
1280                    if e.code == service_error.access: users_experiment = False
1281                    else: raise e
1282                if users_experiment:
1283                    self.state_lock.acquire()
1284                    status = self.state[eid].status
1285                    if status and status == 'failed':
1286                        # remove the old access attributes
1287                        self.clear_experiment_authorization(eid,
1288                                need_state_lock=False)
1289                        overwrite = True
1290                        del self.state[eid]
1291                        del self.state[old_expid]
1292                    self.state_lock.release()
1293                else:
1294                    self.log.info('Experiment %s exists, ' % eid + \
1295                            'but this user cannot access it')
1296            self.state_lock.acquire()
1297            while (self.state.has_key(eid) and not overwrite):
1298                eid += random.choice(string.ascii_letters)
1299            # Initial state
1300            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1301                    identity=expcert)
1302            self.state[expid] = self.state[eid]
1303            if self.state_filename: self.write_state()
1304            self.state_lock.release()
1305        else:
1306            eid = self.exp_stem
1307            for i in range(0,5):
1308                eid += random.choice(string.ascii_letters)
1309            self.state_lock.acquire()
1310            while (self.state.has_key(eid)):
1311                eid = self.exp_stem
1312                for i in range(0,5):
1313                    eid += random.choice(string.ascii_letters)
1314            # Initial state
1315            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1316                    identity=expcert)
1317            self.state[expid] = self.state[eid]
1318            if self.state_filename: self.write_state()
1319            self.state_lock.release()
1320
1321        # Let users touch the state.  Authorize this fid and the expid itself
1322        # to touch the experiment, as well as allowing th eoverrides.
1323        self.append_experiment_authorization(eid, 
1324                set([(fid, expid), (expid,expid)] + \
1325                        [ (o, expid) for o in self.overrides]))
1326
1327        return eid
1328
1329
1330    def allocate_ips_to_topo(self, top):
1331        """
1332        Add an ip4_address attribute to all the hosts in the topology, based on
1333        the shared substrates on which they sit.  An /etc/hosts file is also
1334        created and returned as a list of hostfiles entries.  We also return
1335        the allocator, because we may need to allocate IPs to portals
1336        (specifically DRAGON portals).
1337        """
1338        subs = sorted(top.substrates, 
1339                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1340                reverse=True)
1341        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1342        ifs = { }
1343        hosts = [ ]
1344
1345        for idx, s in enumerate(subs):
1346            net_size = len(s.interfaces)+2
1347
1348            a = ips.allocate(net_size)
1349            if a :
1350                base, num = a
1351                if num < net_size: 
1352                    raise service_error(service_error.internal,
1353                            "Allocator returned wrong number of IPs??")
1354            else:
1355                raise service_error(service_error.req, 
1356                        "Cannot allocate IP addresses")
1357            mask = ips.min_alloc
1358            while mask < net_size:
1359                mask *= 2
1360
1361            netmask = ((2**32-1) ^ (mask-1))
1362
1363            base += 1
1364            for i in s.interfaces:
1365                i.attribute.append(
1366                        topdl.Attribute('ip4_address', 
1367                            "%s" % ip_addr(base)))
1368                i.attribute.append(
1369                        topdl.Attribute('ip4_netmask', 
1370                            "%s" % ip_addr(int(netmask))))
1371
1372                hname = i.element.name
1373                if ifs.has_key(hname):
1374                    hosts.append("%s\t%s-%s %s-%d" % \
1375                            (ip_addr(base), hname, s.name, hname,
1376                                ifs[hname]))
1377                else:
1378                    ifs[hname] = 0
1379                    hosts.append("%s\t%s-%s %s-%d %s" % \
1380                            (ip_addr(base), hname, s.name, hname,
1381                                ifs[hname], hname))
1382
1383                ifs[hname] += 1
1384                base += 1
1385        return hosts, ips
1386
1387    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1388            tbparam, masters, tbmap, expid=None, expcert=None):
1389        for tb in testbeds:
1390            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1391                    expcert)
1392            allocated[tb] = 1
1393
1394    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1395            expcert=None):
1396        """
1397        Get access to testbed through fedd and set the parameters for that tb
1398        """
1399        def get_export_project(svcs):
1400            """
1401            Look through for the list of federated_service for this testbed
1402            objects for a project_export service, and extract the project
1403            parameter.
1404            """
1405
1406            pe = [s for s in svcs if s.name=='project_export']
1407            if len(pe) == 1:
1408                return pe[0].params.get('project', None)
1409            elif len(pe) == 0:
1410                return None
1411            else:
1412                raise service_error(service_error.req,
1413                        "More than one project export is not supported")
1414
1415        def add_services(svcs, type, slist, keys):
1416            """
1417            Add the given services to slist.  type is import or export.  Also
1418            add a mapping entry from the assigned id to the original service
1419            record.
1420            """
1421            for i, s in enumerate(svcs):
1422                idx = '%s%d' % (type, i)
1423                keys[idx] = s
1424                sr = {'id': idx, 'name': s.name, 'visibility': type }
1425                if s.params:
1426                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1427                            for k, v in s.params.items()]
1428                slist.append(sr)
1429
1430        uri = tbmap.get(testbed_base(tb), None)
1431        if not uri:
1432            raise service_error(service_error.server_config, 
1433                    "Unknown testbed: %s" % tb)
1434
1435        export_svcs = masters.get(tb,[])
1436        import_svcs = [ s for m in masters.values() \
1437                for s in m \
1438                    if tb in s.importers ]
1439
1440        export_project = get_export_project(export_svcs)
1441        # Compose the credential list so that IDs come before attributes
1442        creds = set()
1443        keys = set()
1444        certs = self.auth.get_creds_for_principal(fid)
1445        if expid:
1446            certs.update(self.auth.get_creds_for_principal(expid))
1447        for c in certs:
1448            keys.add(c.issuer_cert())
1449            creds.add(c.attribute_cert())
1450        creds = list(keys) + list(creds)
1451
1452        if expcert: cert, pw = expcert, None
1453        else: cert, pw = self.cert_file, self.cert_pw
1454
1455        # Request credentials
1456        req = {
1457                'abac_credential': creds,
1458            }
1459        # Make the service request from the services we're importing and
1460        # exporting.  Keep track of the export request ids so we can
1461        # collect the resulting info from the access response.
1462        e_keys = { }
1463        if import_svcs or export_svcs:
1464            slist = []
1465            add_services(import_svcs, 'import', slist, e_keys)
1466            add_services(export_svcs, 'export', slist, e_keys)
1467            req['service'] = slist
1468
1469        if self.local_access.has_key(uri):
1470            # Local access call
1471            req = { 'RequestAccessRequestBody' : req }
1472            r = self.local_access[uri].RequestAccess(req, 
1473                    fedid(file=self.cert_file))
1474            r = { 'RequestAccessResponseBody' : r }
1475        else:
1476            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1477
1478        if r.has_key('RequestAccessResponseBody'):
1479            # Through to here we have a valid response, not a fault.
1480            # Access denied is a fault, so something better or worse than
1481            # access denied has happened.
1482            r = r['RequestAccessResponseBody']
1483            self.log.debug("[get_access] Access granted")
1484        else:
1485            raise service_error(service_error.protocol,
1486                        "Bad proxy response")
1487        if 'proof' not in r:
1488            raise service_error(service_error.protocol,
1489                        "Bad access response (no access proof)")
1490
1491        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1492                tb=tb, uri=uri, proof=[r['proof']], 
1493                services=masters.get(tb, None))
1494
1495        # Collect the responses corresponding to the services this testbed
1496        # exports.  These will be the service requests that we will include in
1497        # the start segment requests (with appropriate visibility values) to
1498        # import and export the segments.
1499        for s in r.get('service', []):
1500            id = s.get('id', None)
1501            # Note that this attaches the response to the object in the masters
1502            # data structure.  (The e_keys index disappears when this fcn
1503            # returns)
1504            if id and id in e_keys:
1505                e_keys[id].reqs.append(s)
1506
1507        # Add attributes to parameter space.  We don't allow attributes to
1508        # overlay any parameters already installed.
1509        for a in r.get('fedAttr', []):
1510            try:
1511                if a['attribute']:
1512                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1513            except KeyError:
1514                self.log.error("Bad attribute in response: %s" % a)
1515
1516
1517    def split_topology(self, top, topo, testbeds):
1518        """
1519        Create the sub-topologies that are needed for experiment instantiation.
1520        """
1521        for tb in testbeds:
1522            topo[tb] = top.clone()
1523            # copy in for loop allows deletions from the original
1524            for e in [ e for e in topo[tb].elements]:
1525                etb = e.get_attribute('testbed')
1526                # NB: elements without a testbed attribute won't appear in any
1527                # sub topologies. 
1528                if not etb or etb != tb:
1529                    for i in e.interface:
1530                        for s in i.subs:
1531                            try:
1532                                s.interfaces.remove(i)
1533                            except ValueError:
1534                                raise service_error(service_error.internal,
1535                                        "Can't remove interface??")
1536                    topo[tb].elements.remove(e)
1537            topo[tb].make_indices()
1538
1539    def confirm_software(self, top):
1540        """
1541        Make sure that the software to be loaded in the topo is all available
1542        before we begin making access requests, etc.  This is a subset of
1543        wrangle_software.
1544        """
1545        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1546        pkgs.update([x.location for e in top.elements for x in e.software])
1547
1548        for pkg in pkgs:
1549            loc = pkg
1550
1551            scheme, host, path = urlparse(loc)[0:3]
1552            dest = os.path.basename(path)
1553            if not scheme:
1554                if not loc.startswith('/'):
1555                    loc = "/%s" % loc
1556                loc = "file://%s" %loc
1557            # NB: if scheme was found, loc == pkg
1558            try:
1559                u = urlopen(loc)
1560                u.close()
1561            except Exception, e:
1562                raise service_error(service_error.req, 
1563                        "Cannot open %s: %s" % (loc, e))
1564        return True
1565
1566    def wrangle_software(self, expid, top, topo, tbparams):
1567        """
1568        Copy software out to the repository directory, allocate permissions and
1569        rewrite the segment topologies to look for the software in local
1570        places.
1571        """
1572
1573        # Copy the rpms and tarfiles to a distribution directory from
1574        # which the federants can retrieve them
1575        linkpath = "%s/software" %  expid
1576        softdir ="%s/%s" % ( self.repodir, linkpath)
1577        softmap = { }
1578
1579        # self.fedkit and self.gateway kit are lists of tuples of
1580        # (install_location, download_location) this extracts the download
1581        # locations.
1582        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1583        pkgs.update([x.location for e in top.elements for x in e.software])
1584        try:
1585            os.makedirs(softdir)
1586        except EnvironmentError, e:
1587            raise service_error(
1588                    "Cannot create software directory: %s" % e)
1589        # The actual copying.  Everything's converted into a url for copying.
1590        auth_attrs = set()
1591        for pkg in pkgs:
1592            loc = pkg
1593
1594            scheme, host, path = urlparse(loc)[0:3]
1595            dest = os.path.basename(path)
1596            if not scheme:
1597                if not loc.startswith('/'):
1598                    loc = "/%s" % loc
1599                loc = "file://%s" %loc
1600            # NB: if scheme was found, loc == pkg
1601            try:
1602                u = urlopen(loc)
1603            except Exception, e:
1604                raise service_error(service_error.req, 
1605                        "Cannot open %s: %s" % (loc, e))
1606            try:
1607                f = open("%s/%s" % (softdir, dest) , "w")
1608                self.log.debug("Writing %s/%s" % (softdir,dest) )
1609                data = u.read(4096)
1610                while data:
1611                    f.write(data)
1612                    data = u.read(4096)
1613                f.close()
1614                u.close()
1615            except Exception, e:
1616                raise service_error(service_error.internal,
1617                        "Could not copy %s: %s" % (loc, e))
1618            path = re.sub("/tmp", "", linkpath)
1619            # XXX
1620            softmap[pkg] = \
1621                    "%s/%s/%s" %\
1622                    ( self.repo_url, path, dest)
1623
1624            # Allow the individual segments to access the software by assigning
1625            # an attribute to each testbed allocation that encodes the data to
1626            # be released.  This expression collects the data for each run of
1627            # the loop.
1628            auth_attrs.update([
1629                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1630                        for tb in tbparams.keys()])
1631
1632        self.append_experiment_authorization(expid, auth_attrs)
1633
1634        # Convert the software locations in the segments into the local
1635        # copies on this host
1636        for soft in [ s for tb in topo.values() \
1637                for e in tb.elements \
1638                    if getattr(e, 'software', False) \
1639                        for s in e.software ]:
1640            if softmap.has_key(soft.location):
1641                soft.location = softmap[soft.location]
1642
1643
1644    def new_experiment(self, req, fid):
1645        """
1646        The external interface to empty initial experiment creation called from
1647        the dispatcher.
1648
1649        Creates a working directory, splits the incoming description using the
1650        splitter script and parses out the avrious subsections using the
1651        lcasses above.  Once each sub-experiment is created, use pooled threads
1652        to instantiate them and start it all up.
1653        """
1654        self.log.info("New experiment call started for %s" % fid)
1655        req = req.get('NewRequestBody', None)
1656        if not req:
1657            raise service_error(service_error.req,
1658                    "Bad request format (no NewRequestBody)")
1659
1660        if self.auth.import_credentials(data_list=req.get('credential', [])):
1661            self.auth.save()
1662
1663        try:
1664            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1665                    with_proof=True)
1666        except service_error, e:
1667            self.log.info("New experiment call for %s: access denied" % fid)
1668            raise e
1669
1670
1671        if not access_ok:
1672            self.log.info("New experiment call for %s: Access denied" % fid)
1673            raise service_error(service_error.access, "New access denied",
1674                    proof=[proof])
1675
1676        try:
1677            tmpdir = tempfile.mkdtemp(prefix="split-")
1678        except EnvironmentError:
1679            raise service_error(service_error.internal, "Cannot create tmp dir")
1680
1681        try:
1682            access_user = self.accessdb[fid]
1683        except KeyError:
1684            raise service_error(service_error.internal,
1685                    "Access map and authorizer out of sync in " + \
1686                            "new_experiment for fedid %s"  % fid)
1687
1688        # Generate an ID for the experiment (slice) and a certificate that the
1689        # allocator can use to prove they own it.  We'll ship it back through
1690        # the encrypted connection.  If the requester supplied one, use it.
1691        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1692            expcert = req['experimentAccess']['X509']
1693            expid = fedid(certstr=expcert)
1694            self.state_lock.acquire()
1695            if expid in self.state:
1696                self.state_lock.release()
1697                raise service_error(service_error.req, 
1698                        'fedid %s identifies an existing experiment' % expid)
1699            self.state_lock.release()
1700        else:
1701            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1702
1703        #now we're done with the tmpdir, and it should be empty
1704        if self.cleanup:
1705            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1706            os.rmdir(tmpdir)
1707        else:
1708            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1709
1710        eid = self.create_experiment_state(fid, req, expid, expcert, 
1711                state='empty')
1712
1713        rv = {
1714                'experimentID': [
1715                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1716                ],
1717                'experimentStatus': 'empty',
1718                'experimentAccess': { 'X509' : expcert },
1719                'proof': proof.to_dict(),
1720            }
1721
1722        self.log.info("New experiment call succeeded for %s" % fid)
1723        return rv
1724
1725    # create_experiment sub-functions
1726
1727    @staticmethod
1728    def get_experiment_key(req, field='experimentID'):
1729        """
1730        Parse the experiment identifiers out of the request (the request body
1731        tag has been removed).  Specifically this pulls either the fedid or the
1732        localname out of the experimentID field.  A fedid is preferred.  If
1733        neither is present or the request does not contain the fields,
1734        service_errors are raised.
1735        """
1736        # Get the experiment access
1737        exp = req.get(field, None)
1738        if exp:
1739            if exp.has_key('fedid'):
1740                key = exp['fedid']
1741            elif exp.has_key('localname'):
1742                key = exp['localname']
1743            else:
1744                raise service_error(service_error.req, "Unknown lookup type")
1745        else:
1746            raise service_error(service_error.req, "No request?")
1747
1748        return key
1749
1750    def get_experiment_ids_and_start(self, key, tmpdir):
1751        """
1752        Get the experiment name, id and access certificate from the state, and
1753        set the experiment state to 'starting'.  returns a triple (fedid,
1754        localname, access_cert_file). The access_cert_file is a copy of the
1755        contents of the access certificate, created in the tempdir with
1756        restricted permissions.  If things are confused, raise an exception.
1757        """
1758
1759        expid = eid = None
1760        self.state_lock.acquire()
1761        if key in self.state:
1762            exp = self.state[key]
1763            exp.status = "starting"
1764            exp.updated()
1765            expid = exp.fedid
1766            eid = exp.localname
1767            expcert = exp.identity
1768        self.state_lock.release()
1769
1770        # make a protected copy of the access certificate so the experiment
1771        # controller can act as the experiment principal.
1772        if expcert:
1773            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1774            if not expcert_file:
1775                raise service_error(service_error.internal, 
1776                        "Cannot create temp cert file?")
1777        else:
1778            expcert_file = None
1779
1780        return (eid, expid, expcert_file)
1781
1782    def get_topology(self, req, tmpdir):
1783        """
1784        Get the ns2 content and put it into a file for parsing.  Call the local
1785        or remote parser and return the topdl.Topology.  Errors result in
1786        exceptions.  req is the request and tmpdir is a work directory.
1787        """
1788
1789        # The tcl parser needs to read a file so put the content into that file
1790        descr=req.get('experimentdescription', None)
1791        if descr:
1792            if 'ns2description' in descr:
1793                file_content=descr['ns2description']
1794            elif 'topdldescription' in descr:
1795                return topdl.Topology(**descr['topdldescription'])
1796            else:
1797                raise service_error(service_error.req, 
1798                        'Unknown experiment description type')
1799        else:
1800            raise service_error(service_error.req, "No experiment description")
1801
1802
1803        if self.splitter_url:
1804            self.log.debug("Calling remote topdl translator at %s" % \
1805                    self.splitter_url)
1806            top = self.remote_ns2topdl(self.splitter_url, file_content)
1807        else:
1808            tclfile = os.path.join(tmpdir, "experiment.tcl")
1809            if file_content:
1810                try:
1811                    f = open(tclfile, 'w')
1812                    f.write(file_content)
1813                    f.close()
1814                except EnvironmentError:
1815                    raise service_error(service_error.internal,
1816                            "Cannot write temp experiment description")
1817            else:
1818                raise service_error(service_error.req, 
1819                        "Only ns2descriptions supported")
1820            pid = "dummy"
1821            gid = "dummy"
1822            eid = "dummy"
1823
1824            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1825                str(self.muxmax), '-m', 'dummy']
1826
1827            tclcmd.extend([pid, gid, eid, tclfile])
1828
1829            self.log.debug("running local splitter %s", " ".join(tclcmd))
1830            # This is just fantastic.  As a side effect the parser copies
1831            # tb_compat.tcl into the current directory, so that directory
1832            # must be writable by the fedd user.  Doing this in the
1833            # temporary subdir ensures this is the case.
1834            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1835                    cwd=tmpdir)
1836            split_data = tclparser.stdout
1837
1838            top = topdl.topology_from_xml(file=split_data, top="experiment")
1839            os.remove(tclfile)
1840
1841        return top
1842
1843    def get_testbed_services(self, req, testbeds):
1844        """
1845        Parse the services section of the request into two dicts mapping
1846        testbed to lists of federated_service objects.  The first dict maps all
1847        exporters of services to those service objects, the second maps
1848        testbeds to service objects only for services requiring portals.
1849        """
1850        # We construct both dicts here because deriving the second is more
1851        # comples than it looks - both the keys and lists can differ, and it's
1852        # much easier to generate both in one pass.
1853        masters = { }
1854        pmasters = { }
1855        for s in req.get('service', []):
1856            # If this is a service request with the importall field
1857            # set, fill it out.
1858
1859            if s.get('importall', False):
1860                s['import'] = [ tb for tb in testbeds \
1861                        if tb not in s.get('export',[])]
1862                del s['importall']
1863
1864            # Add the service to masters
1865            for tb in s.get('export', []):
1866                if s.get('name', None):
1867
1868                    params = { }
1869                    for a in s.get('fedAttr', []):
1870                        params[a.get('attribute', '')] = a.get('value','')
1871
1872                    fser = federated_service(name=s['name'],
1873                            exporter=tb, importers=s.get('import',[]),
1874                            params=params)
1875                    if fser.name == 'hide_hosts' \
1876                            and 'hosts' not in fser.params:
1877                        fser.params['hosts'] = \
1878                                ",".join(tb_hosts.get(fser.exporter, []))
1879                    if tb in masters: masters[tb].append(fser)
1880                    else: masters[tb] = [fser]
1881
1882                    if fser.portal:
1883                        if tb in pmasters: pmasters[tb].append(fser)
1884                        else: pmasters[tb] = [fser]
1885                else:
1886                    self.log.error('Testbed service does not have name " + \
1887                            "and importers')
1888        return masters, pmasters
1889
1890    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1891        """
1892        Create the ssh keys necessary for interconnecting the portal nodes and
1893        the global hosts file for letting each segment know about the IP
1894        addresses in play.  Save these into the repo.  Add attributes to the
1895        autorizer allowing access controllers to download them and return a set
1896        of attributes that inform the segments where to find this stuff.  May
1897        raise service_errors in if there are problems.
1898        """
1899        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1900        gw_secretkey_base = "fed.%s" % self.ssh_type
1901        keydir = os.path.join(tmpdir, 'keys')
1902        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1903        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1904
1905        try:
1906            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1907        except ValueError:
1908            raise service_error(service_error.server_config, 
1909                    "Bad key type (%s)" % self.ssh_type)
1910
1911        self.generate_seer_certs(keydir)
1912
1913        # Copy configuration files into the remote file store
1914        # The config urlpath
1915        configpath = "/%s/config" % expid
1916        # The config file system location
1917        configdir ="%s%s" % ( self.repodir, configpath)
1918        try:
1919            os.makedirs(configdir)
1920        except EnvironmentError, e:
1921            raise service_error(service_error.internal,
1922                    "Cannot create config directory: %s" % e)
1923        try:
1924            f = open("%s/hosts" % configdir, "w")
1925            print >> f, string.join(hosts, '\n')
1926            f.close()
1927        except EnvironmentError, e:
1928            raise service_error(service_error.internal, 
1929                    "Cannot write hosts file: %s" % e)
1930        try:
1931            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1932            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1933            copy_file(os.path.join(keydir, 'ca.pem'), 
1934                    os.path.join(configdir, 'ca.pem'))
1935            copy_file(os.path.join(keydir, 'node.pem'), 
1936                    os.path.join(configdir, 'node.pem'))
1937        except EnvironmentError, e:
1938            raise service_error(service_error.internal, 
1939                    "Cannot copy keyfiles: %s" % e)
1940
1941        # Allow the individual testbeds to access the configuration files,
1942        # again by setting an attribute for the relevant pathnames on each
1943        # allocation principal.  Yeah, that's a long list comprehension.
1944        self.append_experiment_authorization(expid, set([
1945            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1946                    for tb in tbparams.keys() \
1947                        for f in ("hosts", 'ca.pem', 'node.pem', 
1948                            gw_secretkey_base, gw_pubkey_base)]))
1949
1950        attrs = [ 
1951                {
1952                    'attribute': 'ssh_pubkey', 
1953                    'value': '%s/%s/config/%s' % \
1954                            (self.repo_url, expid, gw_pubkey_base)
1955                },
1956                {
1957                    'attribute': 'ssh_secretkey', 
1958                    'value': '%s/%s/config/%s' % \
1959                            (self.repo_url, expid, gw_secretkey_base)
1960                },
1961                {
1962                    'attribute': 'hosts', 
1963                    'value': '%s/%s/config/hosts' % \
1964                            (self.repo_url, expid)
1965                },
1966                {
1967                    'attribute': 'seer_ca_pem', 
1968                    'value': '%s/%s/config/%s' % \
1969                            (self.repo_url, expid, 'ca.pem')
1970                },
1971                {
1972                    'attribute': 'seer_node_pem', 
1973                    'value': '%s/%s/config/%s' % \
1974                            (self.repo_url, expid, 'node.pem')
1975                },
1976            ]
1977        return attrs
1978
1979
1980    def get_vtopo(self, req, fid):
1981        """
1982        Return the stored virtual topology for this experiment
1983        """
1984        rv = None
1985        state = None
1986        self.log.info("vtopo call started for %s" %  fid)
1987
1988        req = req.get('VtopoRequestBody', None)
1989        if not req:
1990            raise service_error(service_error.req,
1991                    "Bad request format (no VtopoRequestBody)")
1992        exp = req.get('experiment', None)
1993        if exp:
1994            if exp.has_key('fedid'):
1995                key = exp['fedid']
1996                keytype = "fedid"
1997            elif exp.has_key('localname'):
1998                key = exp['localname']
1999                keytype = "localname"
2000            else:
2001                raise service_error(service_error.req, "Unknown lookup type")
2002        else:
2003            raise service_error(service_error.req, "No request?")
2004
2005        try:
2006            proof = self.check_experiment_access(fid, key)
2007        except service_error, e:
2008            self.log.info("vtopo call failed for %s: access denied" %  fid)
2009            raise e
2010
2011        self.state_lock.acquire()
2012        # XXX: this needs to be recalculated
2013        if key in self.state:
2014            if self.state[key].top is not None:
2015                vtopo = topdl.topology_to_vtopo(self.state[key].top)
2016                rv = { 'experiment' : {keytype: key },
2017                        'vtopo': vtopo,
2018                        'proof': proof.to_dict(), 
2019                    }
2020            else:
2021                state = self.state[key].status
2022        self.state_lock.release()
2023
2024        if rv: 
2025            self.log.info("vtopo call completed for %s %s " % \
2026                (key, fid))
2027            return rv
2028        else: 
2029            if state:
2030                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2031                    (key, fid))
2032                raise service_error(service_error.partial, 
2033                        "Not ready: %s" % state)
2034            else:
2035                self.log.info("vtopo call completed for %s %s (No experiment)"\
2036                        % (key, fid))
2037                raise service_error(service_error.req, "No such experiment")
2038
2039    def get_vis(self, req, fid):
2040        """
2041        Return the stored visualization for this experiment
2042        """
2043        rv = None
2044        state = None
2045
2046        self.log.info("vis call started for %s" %  fid)
2047        req = req.get('VisRequestBody', None)
2048        if not req:
2049            raise service_error(service_error.req,
2050                    "Bad request format (no VisRequestBody)")
2051        exp = req.get('experiment', None)
2052        if exp:
2053            if exp.has_key('fedid'):
2054                key = exp['fedid']
2055                keytype = "fedid"
2056            elif exp.has_key('localname'):
2057                key = exp['localname']
2058                keytype = "localname"
2059            else:
2060                raise service_error(service_error.req, "Unknown lookup type")
2061        else:
2062            raise service_error(service_error.req, "No request?")
2063
2064        try:
2065            proof = self.check_experiment_access(fid, key)
2066        except service_error, e:
2067            self.log.info("vis call failed for %s: access denied" %  fid)
2068            raise e
2069
2070        self.state_lock.acquire()
2071        # Generate the visualization
2072        if key in self.state:
2073            if self.state[key].top is not None:
2074                try:
2075                    vis = self.genviz(
2076                            topdl.topology_to_vtopo(self.state[key].top))
2077                except service_error, e:
2078                    self.state_lock.release()
2079                    raise e
2080                rv =  { 'experiment' : {keytype: key },
2081                        'vis': vis,
2082                        'proof': proof.to_dict(), 
2083                        }
2084            else:
2085                state = self.state[key].status
2086        self.state_lock.release()
2087
2088        if rv: 
2089            self.log.info("vis call completed for %s %s " % \
2090                (key, fid))
2091            return rv
2092        else:
2093            if state:
2094                self.log.info("vis call completed for %s %s (not ready)" % \
2095                    (key, fid))
2096                raise service_error(service_error.partial, 
2097                        "Not ready: %s" % state)
2098            else:
2099                self.log.info("vis call completed for %s %s (no experiment)" % \
2100                    (key, fid))
2101                raise service_error(service_error.req, "No such experiment")
2102
2103   
2104    def save_federant_information(self, allocated, tbparams, eid, top):
2105        """
2106        Store the various data that have changed in the experiment state
2107        between when it was started and the beginning of resource allocation.
2108        This is basically the information about each local allocation.  This
2109        fills in the values of the placeholder allocation in the state.  It
2110        also collects the access proofs and returns them as dicts for a
2111        response message.
2112        """
2113        self.state_lock.acquire()
2114        exp = self.state[eid]
2115        exp.top = top.clone()
2116        # save federant information
2117        for k in allocated.keys():
2118            exp.add_allocation(tbparams[k])
2119            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2120                type="testbed", localname=[k], 
2121                service=[ s.to_topdl() for s in tbparams[k].services]))
2122
2123        # Access proofs for the response message
2124        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2125                    for p in tbparams[k].proof]
2126        exp.updated()
2127        if self.state_filename: 
2128            self.write_state()
2129        self.state_lock.release()
2130        return proofs
2131
2132    def clear_placeholder(self, eid, expid, tmpdir):
2133        """
2134        Clear the placeholder and remove any allocated temporary dir.
2135        """
2136
2137        self.state_lock.acquire()
2138        del self.state[eid]
2139        del self.state[expid]
2140        if self.state_filename: self.write_state()
2141        self.state_lock.release()
2142        if tmpdir and self.cleanup:
2143            self.remove_dirs(tmpdir)
2144
2145    # end of create_experiment sub-functions
2146
2147    def create_experiment(self, req, fid):
2148        """
2149        The external interface to experiment creation called from the
2150        dispatcher.
2151
2152        Creates a working directory, splits the incoming description using the
2153        splitter script and parses out the various subsections using the
2154        classes above.  Once each sub-experiment is created, use pooled threads
2155        to instantiate them and start it all up.
2156        """
2157
2158        self.log.info("Create experiment call started for %s" % fid)
2159        req = req.get('CreateRequestBody', None)
2160        if req:
2161            key = self.get_experiment_key(req)
2162        else:
2163            raise service_error(service_error.req,
2164                    "Bad request format (no CreateRequestBody)")
2165
2166        # Import information from the requester
2167        if self.auth.import_credentials(data_list=req.get('credential', [])):
2168            self.auth.save()
2169
2170        try:
2171            # Make sure that the caller can talk to us
2172            proof = self.check_experiment_access(fid, key)
2173        except service_error, e:
2174            self.log.info("Create experiment call failed for %s: access denied"\
2175                    % fid)
2176            raise e
2177
2178
2179        # Install the testbed map entries supplied with the request into a copy
2180        # of the testbed map.
2181        tbmap = dict(self.tbmap)
2182        tbactive = set(self.tbactive)
2183        for m in req.get('testbedmap', []):
2184            if 'testbed' in m and 'uri' in m:
2185                tbmap[m['testbed']] = m['uri']
2186                if 'active' in m and m['active']: tbactive.add(m['testbed'])
2187
2188        # a place to work
2189        try:
2190            tmpdir = tempfile.mkdtemp(prefix="split-")
2191            os.mkdir(tmpdir+"/keys")
2192        except EnvironmentError:
2193            raise service_error(service_error.internal, "Cannot create tmp dir")
2194
2195        tbparams = { }
2196
2197        eid, expid, expcert_file = \
2198                self.get_experiment_ids_and_start(key, tmpdir)
2199
2200        # This catches exceptions to clear the placeholder if necessary
2201        try: 
2202            if not (eid and expid):
2203                raise service_error(service_error.internal, 
2204                        "Cannot find local experiment info!?")
2205
2206            top = self.get_topology(req, tmpdir)
2207            self.confirm_software(top)
2208            # Assign the IPs
2209            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2210            # Find the testbeds to look up
2211            tb_hosts = { }
2212            testbeds = [ ]
2213            for e in top.elements:
2214                if isinstance(e, topdl.Computer):
2215                    tb = e.get_attribute('testbed') or 'default'
2216                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2217                    else: 
2218                        tb_hosts[tb] = [ e.name ]
2219                        testbeds.append(tb)
2220
2221            masters, pmasters = self.get_testbed_services(req, testbeds)
2222            allocated = { }         # Testbeds we can access
2223            topo ={ }               # Sub topologies
2224            connInfo = { }          # Connection information
2225
2226            self.split_topology(top, topo, testbeds)
2227
2228            self.get_access_to_testbeds(testbeds, fid, allocated, 
2229                    tbparams, masters, tbmap, expid, expcert_file)
2230
2231            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2232
2233            part = experiment_partition(self.auth, self.store_url, tbmap,
2234                    self.muxmax, self.direct_transit, tbactive)
2235            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2236                    connInfo, expid)
2237
2238            auth_attrs = set()
2239            # Now get access to the dynamic testbeds (those added above)
2240            for tb in [ t for t in topo if t not in allocated]:
2241                self.get_access(tb, tbparams, fid, masters, tbmap, 
2242                        expid, expcert_file)
2243                allocated[tb] = 1
2244                store_keys = topo[tb].get_attribute('store_keys')
2245                # Give the testbed access to keys it exports or imports
2246                if store_keys:
2247                    auth_attrs.update(set([
2248                        (tbparams[tb].allocID, sk) \
2249                                for sk in store_keys.split(" ")]))
2250
2251            if auth_attrs:
2252                self.append_experiment_authorization(expid, auth_attrs)
2253
2254            # transit and disconnected testbeds may not have a connInfo entry.
2255            # Fill in the blanks.
2256            for t in allocated.keys():
2257                if not connInfo.has_key(t):
2258                    connInfo[t] = { }
2259
2260            self.wrangle_software(expid, top, topo, tbparams)
2261
2262            proofs = self.save_federant_information(allocated, tbparams, 
2263                    eid, top)
2264        except service_error, e:
2265            # If something goes wrong in the parse (usually an access error)
2266            # clear the placeholder state.  From here on out the code delays
2267            # exceptions.  Failing at this point returns a fault to the remote
2268            # caller.
2269
2270            self.log.info("Create experiment call failed for %s %s: %s" % 
2271                    (eid, fid, e))
2272            self.clear_placeholder(eid, expid, tmpdir)
2273            raise e
2274
2275        # Start the background swapper and return the starting state.  From
2276        # here on out, the state will stick around a while.
2277
2278        # Create a logger that logs to the experiment's state object as well as
2279        # to the main log file.
2280        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2281        alloc_collector = self.list_log(self.state[eid].log)
2282        h = logging.StreamHandler(alloc_collector)
2283        # XXX: there should be a global one of these rather than repeating the
2284        # code.
2285        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2286                    '%d %b %y %H:%M:%S'))
2287        alloc_log.addHandler(h)
2288
2289        # Start a thread to do the resource allocation
2290        t  = Thread(target=self.allocate_resources,
2291                args=(allocated, masters, eid, expid, tbparams, 
2292                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2293                    connInfo, tbmap, expcert_file),
2294                name=eid)
2295        t.start()
2296
2297        rv = {
2298                'experimentID': [
2299                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2300                ],
2301                'experimentStatus': 'starting',
2302                'proof': [ proof.to_dict() ] + proofs,
2303            }
2304        self.log.info("Create experiment call succeeded for %s %s" % \
2305                (eid, fid))
2306
2307        return rv
2308   
2309    def get_experiment_fedid(self, key):
2310        """
2311        find the fedid associated with the localname key in the state database.
2312        """
2313
2314        rv = None
2315        self.state_lock.acquire()
2316        if key in self.state:
2317            rv = self.state[key].fedid
2318        self.state_lock.release()
2319        return rv
2320
2321    def check_experiment_access(self, fid, key):
2322        """
2323        Confirm that the fid has access to the experiment.  Though a request
2324        may be made in terms of a local name, the access attribute is always
2325        the experiment's fedid.
2326        """
2327        if not isinstance(key, fedid):
2328            key = self.get_experiment_fedid(key)
2329
2330        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2331
2332        if access_ok:
2333            return proof
2334        else:
2335            raise service_error(service_error.access, "Access Denied",
2336                proof)
2337
2338
2339    def get_handler(self, path, fid):
2340        """
2341        Perhaps surprisingly named, this function handles HTTP GET requests to
2342        this server (SOAP requests are POSTs).
2343        """
2344        self.log.info("Get handler %s %s" % (path, fid))
2345        # XXX: log proofs?
2346        if self.auth.check_attribute(fid, path):
2347            return ("%s/%s" % (self.repodir, path), "application/binary")
2348        else:
2349            return (None, None)
2350
2351    def update_info(self, key, force=False):
2352        top = None
2353        self.state_lock.acquire()
2354        if key in self.state:
2355            if force or self.state[key].older_than(self.info_cache_limit):
2356                top = self.state[key].top
2357                if top is not None: top = top.clone()
2358                d1, info_params, cert, d2 = \
2359                        self.get_segment_info(self.state[key], need_lock=False)
2360        self.state_lock.release()
2361
2362        if top is None: return
2363
2364        try:
2365            tmpdir = tempfile.mkdtemp(prefix="info-")
2366        except EnvironmentError:
2367            raise service_error(service_error.internal, 
2368                    "Cannot create tmp dir")
2369        cert_file = self.make_temp_certfile(cert, tmpdir)
2370
2371        data = []
2372        try:
2373            for k, (uri, aid) in info_params.items():
2374                info=self.info_segment(log=self.log, testbed=uri,
2375                            cert_file=cert_file, cert_pwd=None,
2376                            trusted_certs=self.trusted_certs,
2377                            caller=self.call_InfoSegment)
2378                info(uri, aid)
2379                data.append(info)
2380        # Clean up the tmpdir no matter what
2381        finally:
2382            if tmpdir: self.remove_dirs(tmpdir)
2383
2384        self.annotate_topology(top, data)
2385        self.state_lock.acquire()
2386        if key in self.state:
2387            self.state[key].top = top
2388            self.state[key].updated()
2389            if self.state_filename: self.write_state()
2390        self.state_lock.release()
2391
2392   
2393    def get_info(self, req, fid):
2394        """
2395        Return all the stored info about this experiment
2396        """
2397        rv = None
2398
2399        self.log.info("Info call started for %s" %  fid)
2400        req = req.get('InfoRequestBody', None)
2401        if not req:
2402            raise service_error(service_error.req,
2403                    "Bad request format (no InfoRequestBody)")
2404        exp = req.get('experiment', None)
2405        legacy = req.get('legacy', False)
2406        fresh = req.get('fresh', False)
2407        if exp:
2408            if exp.has_key('fedid'):
2409                key = exp['fedid']
2410                keytype = "fedid"
2411            elif exp.has_key('localname'):
2412                key = exp['localname']
2413                keytype = "localname"
2414            else:
2415                raise service_error(service_error.req, "Unknown lookup type")
2416        else:
2417            raise service_error(service_error.req, "No request?")
2418
2419        try:
2420            proof = self.check_experiment_access(fid, key)
2421        except service_error, e:
2422            self.log.info("Info call failed for %s: access denied" %  fid)
2423
2424
2425        self.update_info(key, fresh)
2426
2427        self.state_lock.acquire()
2428        if self.state.has_key(key):
2429            rv = self.state[key].get_info()
2430            # Copy the topo if we need legacy annotations
2431            if legacy:
2432                top = self.state[key].top
2433                if top is not None: top = top.clone()
2434        self.state_lock.release()
2435        self.log.info("Gathered Info for %s %s" % (key, fid))
2436
2437        # If the legacy visualization and topology representations are
2438        # requested, calculate them and add them to the return.
2439        if legacy and rv is not None:
2440            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2441            if top is not None:
2442                vtopo = topdl.topology_to_vtopo(top)
2443                if vtopo is not None:
2444                    rv['vtopo'] = vtopo
2445                    try:
2446                        vis = self.genviz(vtopo)
2447                    except service_error, e:
2448                        self.log.debug('Problem generating visualization: %s' \
2449                                % e)
2450                        vis = None
2451                    if vis is not None:
2452                        rv['vis'] = vis
2453        if rv:
2454            self.log.info("Info succeded for %s %s" % (key, fid))
2455            rv['proof'] = proof.to_dict()
2456            return rv
2457        else: 
2458            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2459            raise service_error(service_error.req, "No such experiment")
2460
2461    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2462            results):
2463        """
2464        Call OperateSegment on multiple testbeds and gather the results.
2465        op_params contains the parameters needed to contact that testbed, cert
2466        is a certificate containing the fedid to use, op is the operation,
2467        testbeds is a dict mapping testbed name to targets in that testbed,
2468        params are the parameters to include a,d results is a growing list of
2469        the results of the calls.
2470        """
2471        try:
2472            tmpdir = tempfile.mkdtemp(prefix="info-")
2473        except EnvironmentError:
2474            raise service_error(service_error.internal, 
2475                    "Cannot create tmp dir")
2476        cert_file = self.make_temp_certfile(cert, tmpdir)
2477
2478        try:
2479            for tb, targets in testbeds.items():
2480                if tb in op_params:
2481                    uri, aid = op_params[tb]
2482                    operate=self.operation_segment(log=self.log, testbed=uri,
2483                                cert_file=cert_file, cert_pwd=None,
2484                                trusted_certs=self.trusted_certs,
2485                                caller=self.call_OperationSegment)
2486                    if operate(uri, aid, op, targets, params):
2487                        if operate.status is not None:
2488                            results.extend(operate.status)
2489                            continue
2490                # Something went wrong in a weird way.  Add statuses
2491                # that reflect that to results
2492                for t in targets:
2493                    results.append(operation_status(t, 
2494                        operation_status.federant,
2495                        'Unexpected error on %s' % tb))
2496        # Clean up the tmpdir no matter what
2497        finally:
2498            if tmpdir: self.remove_dirs(tmpdir)
2499
2500    def do_operation(self, req, fid):
2501        """
2502        Find the testbeds holding each target and ask them to carry out the
2503        operation.  Return the statuses.
2504        """
2505        # Map an element to the testbed containing it
2506        def element_to_tb(e):
2507            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2508            elif isinstance(e, topdl.Testbed): return e.name
2509            else: return None
2510        # If d is an operation_status object, make it a dict
2511        def make_dict(d):
2512            if isinstance(d, dict): return d
2513            elif isinstance(d, operation_status): return d.to_dict()
2514            else: return { }
2515
2516        def element_name(e):
2517            if isinstance(e, topdl.Computer): return e.name
2518            elif isinstance(e, topdl.Testbed): 
2519                if e.localname: return e.localname[0]
2520                else: return None
2521            else: return None
2522
2523        self.log.info("Operation call started for %s" %  fid)
2524        req = req.get('OperationRequestBody', None)
2525        if not req:
2526            raise service_error(service_error.req,
2527                    "Bad request format (no OperationRequestBody)")
2528        exp = req.get('experiment', None)
2529        op = req.get('operation', None)
2530        targets = set(req.get('target', []))
2531        params = req.get('parameter', None)
2532
2533        if exp:
2534            if 'fedid' in exp:
2535                key = exp['fedid']
2536                keytype = "fedid"
2537            elif 'localname' in exp:
2538                key = exp['localname']
2539                keytype = "localname"
2540            else:
2541                raise service_error(service_error.req, "Unknown lookup type")
2542        else:
2543            raise service_error(service_error.req, "No request?")
2544
2545        if op is None or not targets:
2546            raise service_error(service_error.req, "No request?")
2547
2548        try:
2549            proof = self.check_experiment_access(fid, key)
2550        except service_error, e:
2551            self.log.info("Operation call failed for %s: access denied" %  fid)
2552            raise e
2553
2554        self.state_lock.acquire()
2555        if key in self.state:
2556            d1, op_params, cert, d2 = \
2557                    self.get_segment_info(self.state[key], need_lock=False,
2558                            key='tb')
2559            top = self.state[key].top
2560            if top is not None:
2561                top = top.clone()
2562        self.state_lock.release()
2563
2564        if top is None:
2565            self.log.info("Operation call failed for %s: not active" %  fid)
2566            raise service_error(service_error.partial, "No topology yet", 
2567                    proof=proof)
2568
2569        testbeds = { }
2570        results = []
2571        for e in top.elements:
2572            ename = element_name(e)
2573            if ename in targets:
2574                tb = element_to_tb(e)
2575                targets.remove(ename)
2576                if tb is not None:
2577                    if tb in testbeds: testbeds[tb].append(ename)
2578                    else: testbeds[tb] = [ ename ]
2579                else:
2580                    results.append(operation_status(e.name, 
2581                        code=operation_status.no_target, 
2582                        description='Cannot map target to testbed'))
2583
2584        for t in targets:
2585            results.append(operation_status(t, operation_status.no_target))
2586
2587        self.operate_on_segments(op_params, cert, op, testbeds, params,
2588                results)
2589
2590        self.log.info("Operation call succeeded for %s" %  fid)
2591        return { 
2592                'experiment': exp, 
2593                'status': [make_dict(r) for r in results],
2594                'proof': proof.to_dict()
2595                }
2596
2597
2598    def get_multi_info(self, req, fid):
2599        """
2600        Return all the stored info that this fedid can access
2601        """
2602        rv = { 'info': [ ], 'proof': [ ] }
2603
2604        self.log.info("Multi Info call started for %s" %  fid)
2605        self.state_lock.acquire()
2606        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2607            try:
2608                proof = self.check_experiment_access(fid, key)
2609            except service_error, e:
2610                if e.code == service_error.access:
2611                    continue
2612                else:
2613                    self.log.info("Multi Info call failed for %s: %s" %  \
2614                            (e,fid))
2615                    self.state_lock.release()
2616                    raise e
2617
2618            if self.state.has_key(key):
2619                e = self.state[key].get_info()
2620                e['proof'] = proof.to_dict()
2621                rv['info'].append(e)
2622                rv['proof'].append(proof.to_dict())
2623        self.state_lock.release()
2624        self.log.info("Multi Info call succeeded for %s" %  fid)
2625        return rv
2626
2627    def check_termination_status(self, fed_exp, force):
2628        """
2629        Confirm that the experiment is sin a valid state to stop (or force it)
2630        return the state - invalid states for deletion and force settings cause
2631        exceptions.
2632        """
2633        self.state_lock.acquire()
2634        status = fed_exp.status
2635
2636        if status:
2637            if status in ('starting', 'terminating'):
2638                if not force:
2639                    self.state_lock.release()
2640                    raise service_error(service_error.partial, 
2641                            'Experiment still being created or destroyed')
2642                else:
2643                    self.log.warning('Experiment in %s state ' % status + \
2644                            'being terminated by force.')
2645            self.state_lock.release()
2646            return status
2647        else:
2648            # No status??? trouble
2649            self.state_lock.release()
2650            raise service_error(service_error.internal,
2651                    "Experiment has no status!?")
2652
2653    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2654        ids = []
2655        term_params = { }
2656        if need_lock: self.state_lock.acquire()
2657        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2658        expcert = fed_exp.identity
2659        repo = "%s" % fed_exp.fedid
2660
2661        # Collect the allocation/segment ids into a dict keyed by the fedid
2662        # of the allocation that contains a tuple of uri, aid
2663        for i, fed in enumerate(fed_exp.get_all_allocations()):
2664            uri = fed.uri
2665            aid = fed.allocID
2666            if key == 'aid': term_params[aid] = (uri, aid)
2667            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2668
2669        if need_lock: self.state_lock.release()
2670        return ids, term_params, expcert, repo
2671
2672
2673    def get_termination_info(self, fed_exp):
2674        self.state_lock.acquire()
2675        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2676        # Change the experiment state
2677        fed_exp.status = 'terminating'
2678        fed_exp.updated()
2679        if self.state_filename: self.write_state()
2680        self.state_lock.release()
2681
2682        return ids, term_params, expcert, repo
2683
2684
2685    def deallocate_resources(self, term_params, expcert, status, force, 
2686            dealloc_log):
2687        tmpdir = None
2688        # This try block makes sure the tempdir is cleared
2689        try:
2690            # If no expcert, try the deallocation as the experiment
2691            # controller instance.
2692            if expcert and self.auth_type != 'legacy': 
2693                try:
2694                    tmpdir = tempfile.mkdtemp(prefix="term-")
2695                except EnvironmentError:
2696                    raise service_error(service_error.internal, 
2697                            "Cannot create tmp dir")
2698                cert_file = self.make_temp_certfile(expcert, tmpdir)
2699                pw = None
2700            else: 
2701                cert_file = self.cert_file
2702                pw = self.cert_pwd
2703
2704            # Stop everyone.  NB, wait_for_all waits until a thread starts
2705            # and then completes, so we can't wait if nothing starts.  So,
2706            # no tbparams, no start.
2707            if len(term_params) > 0:
2708                tp = thread_pool(self.nthreads)
2709                for k, (uri, aid) in term_params.items():
2710                    # Create and start a thread to stop the segment
2711                    tp.wait_for_slot()
2712                    t  = pooled_thread(\
2713                            target=self.terminate_segment(log=dealloc_log,
2714                                testbed=uri,
2715                                cert_file=cert_file, 
2716                                cert_pwd=pw,
2717                                trusted_certs=self.trusted_certs,
2718                                caller=self.call_TerminateSegment),
2719                            args=(uri, aid), name=k,
2720                            pdata=tp, trace_file=self.trace_file)
2721                    t.start()
2722                # Wait for completions
2723                tp.wait_for_all_done()
2724
2725            # release the allocations (failed experiments have done this
2726            # already, and starting experiments may be in odd states, so we
2727            # ignore errors releasing those allocations
2728            try: 
2729                for k, (uri, aid)  in term_params.items():
2730                    self.release_access(None, aid, uri=uri,
2731                            cert_file=cert_file, cert_pwd=pw)
2732            except service_error, e:
2733                if status != 'failed' and not force:
2734                    raise e
2735
2736        # Clean up the tmpdir no matter what
2737        finally:
2738            if tmpdir: self.remove_dirs(tmpdir)
2739
2740    def terminate_experiment(self, req, fid):
2741        """
2742        Swap this experiment out on the federants and delete the shared
2743        information
2744        """
2745        self.log.info("Terminate experiment call started for %s" % fid)
2746        tbparams = { }
2747        req = req.get('TerminateRequestBody', None)
2748        if not req:
2749            raise service_error(service_error.req,
2750                    "Bad request format (no TerminateRequestBody)")
2751
2752        key = self.get_experiment_key(req, 'experiment')
2753        try:
2754            proof = self.check_experiment_access(fid, key)
2755        except service_error, e:
2756            self.log.info(
2757                    "Terminate experiment call failed for %s: access denied" \
2758                            % fid)
2759            raise e
2760        exp = req.get('experiment', False)
2761        force = req.get('force', False)
2762
2763        dealloc_list = [ ]
2764
2765
2766        # Create a logger that logs to the dealloc_list as well as to the main
2767        # log file.
2768        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2769        dealloc_log.info("Terminating %s " %key)
2770        h = logging.StreamHandler(self.list_log(dealloc_list))
2771        # XXX: there should be a global one of these rather than repeating the
2772        # code.
2773        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2774                    '%d %b %y %H:%M:%S'))
2775        dealloc_log.addHandler(h)
2776
2777        self.state_lock.acquire()
2778        fed_exp = self.state.get(key, None)
2779        self.state_lock.release()
2780        repo = None
2781
2782        if fed_exp:
2783            status = self.check_termination_status(fed_exp, force)
2784            # get_termination_info updates the experiment state
2785            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2786            self.deallocate_resources(term_params, expcert, status, force, 
2787                    dealloc_log)
2788
2789            # Remove the terminated experiment
2790            self.state_lock.acquire()
2791            for id in ids:
2792                self.clear_experiment_authorization(id, need_state_lock=False)
2793                if id in self.state: del self.state[id]
2794
2795            if self.state_filename: self.write_state()
2796            self.state_lock.release()
2797
2798            # Delete any synch points associated with this experiment.  All
2799            # synch points begin with the fedid of the experiment.
2800            fedid_keys = set(["fedid:%s" % f for f in ids \
2801                    if isinstance(f, fedid)])
2802            for k in self.synch_store.all_keys():
2803                try:
2804                    if len(k) > 45 and k[0:46] in fedid_keys:
2805                        self.synch_store.del_value(k)
2806                except synch_store.BadDeletionError:
2807                    pass
2808            self.write_store()
2809
2810            # Remove software and other cached stuff from the filesystem.
2811            if repo:
2812                self.remove_dirs("%s/%s" % (self.repodir, repo))
2813       
2814            self.log.info("Terminate experiment succeeded for %s %s" % \
2815                    (key, fid))
2816            return { 
2817                    'experiment': exp , 
2818                    'deallocationLog': string.join(dealloc_list, ''),
2819                    'proof': [proof.to_dict()],
2820                    }
2821        else:
2822            self.log.info("Terminate experiment failed for %s %s: no state" % \
2823                    (key, fid))
2824            raise service_error(service_error.req, "No saved state")
2825
2826
2827    def GetValue(self, req, fid):
2828        """
2829        Get a value from the synchronized store
2830        """
2831        req = req.get('GetValueRequestBody', None)
2832        if not req:
2833            raise service_error(service_error.req,
2834                    "Bad request format (no GetValueRequestBody)")
2835       
2836        name = req.get('name', None)
2837        wait = req.get('wait', False)
2838        rv = { 'name': name }
2839
2840        if not name:
2841            raise service_error(service_error.req, "No name?")
2842
2843        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2844
2845        if access_ok:
2846            self.log.debug("[GetValue] asking for %s " % name)
2847            try:
2848                v = self.synch_store.get_value(name, wait)
2849            except synch_store.RevokedKeyError:
2850                # No more synch on this key
2851                raise service_error(service_error.federant, 
2852                        "Synch key %s revoked" % name)
2853            if v is not None:
2854                rv['value'] = v
2855            rv['proof'] = proof.to_dict()
2856            self.log.debug("[GetValue] got %s from %s" % (v, name))
2857            return rv
2858        else:
2859            raise service_error(service_error.access, "Access Denied",
2860                    proof=proof)
2861       
2862
2863    def SetValue(self, req, fid):
2864        """
2865        Set a value in the synchronized store
2866        """
2867        req = req.get('SetValueRequestBody', None)
2868        if not req:
2869            raise service_error(service_error.req,
2870                    "Bad request format (no SetValueRequestBody)")
2871       
2872        name = req.get('name', None)
2873        v = req.get('value', '')
2874
2875        if not name:
2876            raise service_error(service_error.req, "No name?")
2877
2878        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2879
2880        if access_ok:
2881            try:
2882                self.synch_store.set_value(name, v)
2883                self.write_store()
2884                self.log.debug("[SetValue] set %s to %s" % (name, v))
2885            except synch_store.CollisionError:
2886                # Translate into a service_error
2887                raise service_error(service_error.req,
2888                        "Value already set: %s" %name)
2889            except synch_store.RevokedKeyError:
2890                # No more synch on this key
2891                raise service_error(service_error.federant, 
2892                        "Synch key %s revoked" % name)
2893                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2894        else:
2895            raise service_error(service_error.access, "Access Denied",
2896                    proof=proof)
Note: See TracBrowser for help on using the repository browser.