source: fedd/federation/experiment_control.py @ 2bb8b35

compt_changes
Last change on this file since 2bb8b35 was 2bb8b35, checked in by Ted Faber <faber@…>, 12 years ago

Additional logging

  • Property mode set to 100644
File size: 94.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from deter import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info, federated_service
39from operation_status import operation_status
40
41from deter import topdl
42from deter import ip_allocator
43from deter import ip_addr
44import list_log
45
46
47class nullHandler(logging.Handler):
48    def emit(self, record): pass
49
50fl = logging.getLogger("fedd.experiment_control")
51fl.addHandler(nullHandler())
52
53class experiment_control_local(experiment_control_legacy):
54    """
55    Control of experiments that this system can directly access.
56
57    Includes experiment creation, termination and information dissemination.
58    Thred safe.
59    """
60
61    class ssh_cmd_timeout(RuntimeError): pass
62   
63    call_RequestAccess = service_caller('RequestAccess')
64    call_ReleaseAccess = service_caller('ReleaseAccess')
65    call_StartSegment = service_caller('StartSegment')
66    call_TerminateSegment = service_caller('TerminateSegment')
67    call_InfoSegment = service_caller('InfoSegment')
68    call_OperationSegment = service_caller('OperationSegment')
69    call_Ns2Topdl = service_caller('Ns2Topdl')
70
71    def __init__(self, config=None, auth=None):
72        """
73        Intialize the various attributes, most from the config object
74        """
75
76        def parse_tarfile_list(tf):
77            """
78            Parse a tarfile list from the configuration.  This is a set of
79            paths and tarfiles separated by spaces.
80            """
81            rv = [ ]
82            if tf is not None:
83                tl = tf.split()
84                while len(tl) > 1:
85                    p, t = tl[0:2]
86                    del tl[0:2]
87                    rv.append((p, t))
88            return rv
89
90        self.list_log = list_log.list_log
91
92        self.cert_file = config.get("experiment_control", "cert_file")
93        if self.cert_file:
94            self.cert_pwd = config.get("experiment_control", "cert_pwd")
95        else:
96            self.cert_file = config.get("globals", "cert_file")
97            self.cert_pwd = config.get("globals", "cert_pwd")
98
99        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
100                or config.get("globals", "trusted_certs")
101
102        self.repodir = config.get("experiment_control", "repodir")
103        self.repo_url = config.get("experiment_control", "repo_url", 
104                "https://users.isi.deterlab.net:23235");
105
106        self.exp_stem = "fed-stem"
107        self.log = logging.getLogger("fedd.experiment_control")
108        set_log_level(config, "experiment_control", self.log)
109        self.muxmax = 2
110        self.nthreads = 10
111        self.randomize_experiments = False
112
113        self.splitter = None
114        self.ssh_keygen = "/usr/bin/ssh-keygen"
115        self.ssh_identity_file = None
116
117
118        self.debug = config.getboolean("experiment_control", "create_debug")
119        self.cleanup = not config.getboolean("experiment_control", 
120                "leave_tmpfiles")
121        self.state_filename = config.get("experiment_control", 
122                "experiment_state")
123        self.store_filename = config.get("experiment_control", 
124                "synch_store")
125        self.store_url = config.get("experiment_control", "store_url")
126        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
127        self.fedkit = parse_tarfile_list(\
128                config.get("experiment_control", "fedkit"))
129        self.gatewaykit = parse_tarfile_list(\
130                config.get("experiment_control", "gatewaykit"))
131        accessdb_file = config.get("experiment_control", "accessdb")
132
133        dt = config.get("experiment_control", "direct_transit")
134        self.auth_type = config.get('experiment_control', 'auth_type') \
135                or 'legacy'
136        self.auth_dir = config.get('experiment_control', 'auth_dir')
137        # XXX: document this!
138        self.info_cache_limit = \
139                config.getint('experiment_control', 'info_cache', 600)
140        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
141        else: self.direct_transit = [ ]
142        # NB for internal master/slave ops, not experiment setup
143        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
144
145        self.overrides = set([])
146        ovr = config.get('experiment_control', 'overrides')
147        if ovr:
148            for o in ovr.split(","):
149                o = o.strip()
150                if o.startswith('fedid:'): o = o[len('fedid:'):]
151                self.overrides.add(fedid(hexstr=o))
152
153        self.state = { }
154        self.state_lock = Lock()
155        self.tclsh = "/usr/local/bin/otclsh"
156        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
157                config.get("experiment_control", "tcl_splitter",
158                        "/usr/testbed/lib/ns2ir/parse.tcl")
159        mapdb_file = config.get("experiment_control", "mapdb")
160        self.trace_file = sys.stderr
161
162        self.def_expstart = \
163                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
164                "/tmp/federate";
165        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
166                "FEDDIR/hosts";
167        self.def_gwstart = \
168                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
169                "/tmp/bridge.log";
170        self.def_mgwstart = \
171                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
172                "/tmp/bridge.log";
173        self.def_gwimage = "FBSD61-TUNNEL2";
174        self.def_gwtype = "pc";
175        self.local_access = { }
176
177        if self.auth_type == 'legacy':
178            if auth:
179                self.auth = auth
180            else:
181                self.log.error( "[access]: No authorizer initialized, " +\
182                        "creating local one.")
183                auth = authorizer()
184            self.get_access = self.legacy_get_access
185        elif self.auth_type == 'abac':
186            self.auth = abac_authorizer(load=self.auth_dir)
187        else:
188            raise service_error(service_error.internal, 
189                    "Unknown auth_type: %s" % self.auth_type)
190
191        if mapdb_file:
192            self.read_mapdb(mapdb_file)
193        else:
194            self.log.warn("[experiment_control] No testbed map, using defaults")
195            self.tbmap = { 
196                    'deter':'https://users.isi.deterlab.net:23235',
197                    'emulab':'https://users.isi.deterlab.net:23236',
198                    'ucb':'https://users.isi.deterlab.net:23237',
199                    }
200
201        if accessdb_file:
202                self.read_accessdb(accessdb_file)
203        else:
204            raise service_error(service_error.internal,
205                    "No accessdb specified in config")
206
207        # Grab saved state.  OK to do this w/o locking because it's read only
208        # and only one thread should be in existence that can see self.state at
209        # this point.
210        if self.state_filename:
211            self.read_state()
212
213        if self.store_filename:
214            self.read_store()
215        else:
216            self.log.warning("No saved synch store")
217            self.synch_store = synch_store
218
219        # Dispatch tables
220        self.soap_services = {\
221                'New': soap_handler('New', self.new_experiment),
222                'Create': soap_handler('Create', self.create_experiment),
223                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
224                'Vis': soap_handler('Vis', self.get_vis),
225                'Info': soap_handler('Info', self.get_info),
226                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
227                'Operation': soap_handler('Operation', self.do_operation),
228                'Terminate': soap_handler('Terminate', 
229                    self.terminate_experiment),
230                'GetValue': soap_handler('GetValue', self.GetValue),
231                'SetValue': soap_handler('SetValue', self.SetValue),
232        }
233
234        self.xmlrpc_services = {\
235                'New': xmlrpc_handler('New', self.new_experiment),
236                'Create': xmlrpc_handler('Create', self.create_experiment),
237                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
238                'Vis': xmlrpc_handler('Vis', self.get_vis),
239                'Info': xmlrpc_handler('Info', self.get_info),
240                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
241                'Terminate': xmlrpc_handler('Terminate',
242                    self.terminate_experiment),
243                'Operation': xmlrpc_handler('Operation', self.do_operation),
244                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
245                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
246        }
247
248    # Call while holding self.state_lock
249    def write_state(self):
250        """
251        Write a new copy of experiment state after copying the existing state
252        to a backup.
253
254        State format is a simple pickling of the state dictionary.
255        """
256        if os.access(self.state_filename, os.W_OK):
257            copy_file(self.state_filename, \
258                    "%s.bak" % self.state_filename)
259        try:
260            f = open(self.state_filename, 'w')
261            pickle.dump(self.state, f)
262        except EnvironmentError, e:
263            self.log.error("Can't write file %s: %s" % \
264                    (self.state_filename, e))
265        except pickle.PicklingError, e:
266            self.log.error("Pickling problem: %s" % e)
267        except TypeError, e:
268            self.log.error("Pickling problem (TypeError): %s" % e)
269
270    @staticmethod
271    def get_alloc_ids(exp):
272        """
273        Used by read_store and read state.  This used to be worse.
274        """
275
276        return [ a.allocID for a in exp.get_all_allocations() ]
277
278
279    # Call while holding self.state_lock
280    def read_state(self):
281        """
282        Read a new copy of experiment state.  Old state is overwritten.
283
284        State format is a simple pickling of the state dictionary.
285        """
286       
287        try:
288            f = open(self.state_filename, "r")
289            self.state = pickle.load(f)
290            self.log.debug("[read_state]: Read state from %s" % \
291                    self.state_filename)
292        except EnvironmentError, e:
293            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
294                    % (self.state_filename, e))
295        except pickle.UnpicklingError, e:
296            self.log.warning(("[read_state]: No saved state: " + \
297                    "Unpickling failed: %s") % e)
298       
299        for s in self.state.values():
300            try:
301
302                eid = s.fedid
303                if eid : 
304                    if self.auth_type == 'legacy':
305                        # XXX: legacy
306                        # Give the owner rights to the experiment
307                        #self.auth.set_attribute(s['owner'], eid)
308                        # And holders of the eid as well
309                        self.auth.set_attribute(eid, eid)
310                        # allow overrides to control experiments as well
311                        for o in self.overrides:
312                            self.auth.set_attribute(o, eid)
313                        # Set permissions to allow reading of the software
314                        # repo, if any, as well.
315                        for a in self.get_alloc_ids(s):
316                            self.auth.set_attribute(a, 'repo/%s' % eid)
317                else:
318                    raise KeyError("No experiment id")
319            except KeyError, e:
320                self.log.warning("[read_state]: State ownership or identity " +\
321                        "misformatted in %s: %s" % (self.state_filename, e))
322
323
324    def read_accessdb(self, accessdb_file):
325        """
326        Read the mapping from fedids that can create experiments to their name
327        in the 3-level access namespace.  All will be asserted from this
328        testbed and can include the local username and porject that will be
329        asserted on their behalf by this fedd.  Each fedid is also added to the
330        authorization system with the "create" attribute.
331        """
332        self.accessdb = {}
333        # These are the regexps for parsing the db
334        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
335        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
336                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
337        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
338                "\s*->\s*(" + name_expr + ")\s*$")
339        lineno = 0
340
341        # Parse the mappings and store in self.authdb, a dict of
342        # fedid -> (proj, user)
343        try:
344            f = open(accessdb_file, "r")
345            for line in f:
346                lineno += 1
347                line = line.strip()
348                if len(line) == 0 or line.startswith('#'):
349                    continue
350                m = project_line.match(line)
351                if m:
352                    fid = fedid(hexstr=m.group(1))
353                    project, user = m.group(2,3)
354                    if not self.accessdb.has_key(fid):
355                        self.accessdb[fid] = []
356                    self.accessdb[fid].append((project, user))
357                    continue
358
359                m = user_line.match(line)
360                if m:
361                    fid = fedid(hexstr=m.group(1))
362                    project = None
363                    user = m.group(2)
364                    if not self.accessdb.has_key(fid):
365                        self.accessdb[fid] = []
366                    self.accessdb[fid].append((project, user))
367                    continue
368                self.log.warn("[experiment_control] Error parsing access " +\
369                        "db %s at line %d" %  (accessdb_file, lineno))
370        except EnvironmentError:
371            raise service_error(service_error.internal,
372                    ("Error opening/reading %s as experiment " +\
373                            "control accessdb") %  accessdb_file)
374        f.close()
375
376        # Initialize the authorization attributes
377        # XXX: legacy
378        if self.auth_type == 'legacy':
379            for fid in self.accessdb.keys():
380                self.auth.set_attribute(fid, 'create')
381                self.auth.set_attribute(fid, 'new')
382
383    def read_mapdb(self, file):
384        """
385        Read a simple colon separated list of mappings for the
386        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
387        also adds testbeds to active if they include , active after
388        their name.
389        """
390
391        self.tbmap = { }
392        self.tbactive = set()
393        lineno =0
394        try:
395            f = open(file, "r")
396            for line in f:
397                lineno += 1
398                line = line.strip()
399                if line.startswith('#') or len(line) == 0:
400                    continue
401                try:
402                    label, url = line.split(':', 1)
403                    if ',' in label:
404                        label, act = label.split(',', 1)
405                        active = (act.strip() == 'active')
406                    else:
407                        active = False
408                    self.tbmap[label] = url
409                    if active: self.tbactive.add(label)
410                except ValueError, e:
411                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
412                            "map db: %s %s" % (lineno, line, e))
413        except EnvironmentError, e:
414            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
415                    "open %s: %s" % (file, e))
416        else:
417            f.close()
418
419    def read_store(self):
420        try:
421            self.synch_store = synch_store()
422            self.synch_store.load(self.store_filename)
423            self.log.debug("[read_store]: Read store from %s" % \
424                    self.store_filename)
425        except EnvironmentError, e:
426            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
427                    % (self.state_filename, e))
428            self.synch_store = synch_store()
429
430        # Set the initial permissions on data in the store.  XXX: This ad hoc
431        # authorization attribute initialization is getting out of hand.
432        # XXX: legacy
433        if self.auth_type == 'legacy':
434            for k in self.synch_store.all_keys():
435                try:
436                    if k.startswith('fedid:'):
437                        fid = fedid(hexstr=k[6:46])
438                        if self.state.has_key(fid):
439                            for a in self.get_alloc_ids(self.state[fid]):
440                                self.auth.set_attribute(a, k)
441                except ValueError, e:
442                    self.log.warn("Cannot deduce permissions for %s" % k)
443
444
445    def write_store(self):
446        """
447        Write a new copy of synch_store after writing current state
448        to a backup.  We use the internal synch_store pickle method to avoid
449        incinsistent data.
450
451        State format is a simple pickling of the store.
452        """
453        if os.access(self.store_filename, os.W_OK):
454            copy_file(self.store_filename, \
455                    "%s.bak" % self.store_filename)
456        try:
457            self.synch_store.save(self.store_filename)
458        except EnvironmentError, e:
459            self.log.error("Can't write file %s: %s" % \
460                    (self.store_filename, e))
461        except TypeError, e:
462            self.log.error("Pickling problem (TypeError): %s" % e)
463
464
465    def remove_dirs(self, dir):
466        """
467        Remove the directory tree and all files rooted at dir.  Log any errors,
468        but continue.
469        """
470        self.log.debug("[removedirs]: removing %s" % dir)
471        try:
472            for path, dirs, files in os.walk(dir, topdown=False):
473                for f in files:
474                    os.remove(os.path.join(path, f))
475                for d in dirs:
476                    os.rmdir(os.path.join(path, d))
477            os.rmdir(dir)
478        except EnvironmentError, e:
479            self.log.error("Error deleting directory tree in %s" % e);
480
481    @staticmethod
482    def make_temp_certfile(expcert, tmpdir):
483        """
484        make a protected copy of the access certificate so the experiment
485        controller can act as the experiment principal.  mkstemp is the most
486        secure way to do that. The directory should be created by
487        mkdtemp.  Return the filename.
488        """
489        if expcert and tmpdir:
490            try:
491                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
492                f = os.fdopen(certf, 'w')
493                print >> f, expcert
494                f.close()
495            except EnvironmentError, e:
496                raise service_error(service_error.internal, 
497                        "Cannot create temp cert file?")
498            return certfn
499        else:
500            return None
501
502       
503    def generate_ssh_keys(self, dest, type="rsa" ):
504        """
505        Generate a set of keys for the gateways to use to talk.
506
507        Keys are of type type and are stored in the required dest file.
508        """
509        valid_types = ("rsa", "dsa")
510        t = type.lower();
511        if t not in valid_types: raise ValueError
512        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
513
514        try:
515            trace = open("/dev/null", "w")
516        except EnvironmentError:
517            raise service_error(service_error.internal,
518                    "Cannot open /dev/null??");
519
520        # May raise CalledProcessError
521        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
522        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
523        if rv != 0:
524            raise service_error(service_error.internal, 
525                    "Cannot generate nonce ssh keys.  %s return code %d" \
526                            % (self.ssh_keygen, rv))
527
528    def generate_seer_certs(self, destdir):
529        '''
530        Create a SEER ca cert and a node cert in destdir/ca.pem and
531        destdir/node.pem respectively.  These will be distributed throughout
532        the federated experiment.  This routine reports errors via
533        service_errors.
534        '''
535        openssl = '/usr/bin/openssl'
536        # All the filenames and parameters we need for openssl calls below
537        ca_key =os.path.join(destdir, 'ca.key') 
538        ca_pem = os.path.join(destdir, 'ca.pem')
539        node_key =os.path.join(destdir, 'node.key') 
540        node_pem = os.path.join(destdir, 'node.pem')
541        node_req = os.path.join(destdir, 'node.req')
542        node_signed = os.path.join(destdir, 'node.signed')
543        days = '%s' % (365 * 10)
544        serial = '%s' % random.randint(0, 1<<16)
545
546        try:
547            # Sequence of calls to create a CA key, create a ca cert, create a
548            # node key, node signing request, and finally a signed node
549            # certificate.
550            sequence = (
551                    (openssl, 'genrsa', '-out', ca_key, '1024'),
552                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
553                        ca_pem, '-days', days, '-subj', 
554                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
555                    (openssl, 'genrsa', '-out', node_key, '1024'),
556                    (openssl, 'req', '-new', '-key', node_key, '-out', 
557                        node_req, '-days', days, '-subj', 
558                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
559                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
560                        '-set_serial', serial, '-req', '-in', node_req, 
561                        '-out', node_signed, '-days', days),
562                )
563            # Do all that stuff; bail if there's an error, and push all the
564            # output to dev/null.
565            for cmd in sequence:
566                trace = open("/dev/null", "w")
567                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
568                if rv != 0:
569                    raise service_error(service_error.internal, 
570                            "Cannot generate SEER certs.  %s return code %d" \
571                                    % (' '.join(cmd), rv))
572            # Concatinate the node key and signed certificate into node.pem
573            f = open(node_pem, 'w')
574            for comp in (node_signed, node_key):
575                g = open(comp, 'r')
576                f.write(g.read())
577                g.close()
578            f.close()
579
580            # Throw out intermediaries.
581            for fn in (ca_key, node_key, node_req, node_signed):
582                os.unlink(fn)
583
584        except EnvironmentError, e:
585            # Any difficulties with the file system wind up here
586            raise service_error(service_error.internal,
587                    "File error on  %s while creating SEER certs: %s" % \
588                            (e.filename, e.strerror))
589
590
591
592    def gentopo(self, str):
593        """
594        Generate the topology data structure from the splitter's XML
595        representation of it.
596
597        The topology XML looks like:
598            <experiment>
599                <nodes>
600                    <node><vname></vname><ips>ip1:ip2</ips></node>
601                </nodes>
602                <lans>
603                    <lan>
604                        <vname></vname><vnode></vnode><ip></ip>
605                        <bandwidth></bandwidth><member>node:port</member>
606                    </lan>
607                </lans>
608        """
609        class topo_parse:
610            """
611            Parse the topology XML and create the dats structure.
612            """
613            def __init__(self):
614                # Typing of the subelements for data conversion
615                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
616                self.int_subelements = ( 'bandwidth',)
617                self.float_subelements = ( 'delay',)
618                # The final data structure
619                self.nodes = [ ]
620                self.lans =  [ ]
621                self.topo = { \
622                        'node': self.nodes,\
623                        'lan' : self.lans,\
624                    }
625                self.element = { }  # Current element being created
626                self.chars = ""     # Last text seen
627
628            def end_element(self, name):
629                # After each sub element the contents is added to the current
630                # element or to the appropriate list.
631                if name == 'node':
632                    self.nodes.append(self.element)
633                    self.element = { }
634                elif name == 'lan':
635                    self.lans.append(self.element)
636                    self.element = { }
637                elif name in self.str_subelements:
638                    self.element[name] = self.chars
639                    self.chars = ""
640                elif name in self.int_subelements:
641                    self.element[name] = int(self.chars)
642                    self.chars = ""
643                elif name in self.float_subelements:
644                    self.element[name] = float(self.chars)
645                    self.chars = ""
646
647            def found_chars(self, data):
648                self.chars += data.rstrip()
649
650
651        tp = topo_parse();
652        parser = xml.parsers.expat.ParserCreate()
653        parser.EndElementHandler = tp.end_element
654        parser.CharacterDataHandler = tp.found_chars
655
656        parser.Parse(str)
657
658        return tp.topo
659       
660
661    def genviz(self, topo):
662        """
663        Generate the visualization the virtual topology
664        """
665
666        neato = "/usr/local/bin/neato"
667        # These are used to parse neato output and to create the visualization
668        # file.
669        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
670        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
671                "%s</type></node>"
672
673        try:
674            # Node names
675            nodes = [ n['vname'] for n in topo['node'] ]
676            topo_lans = topo['lan']
677        except KeyError, e:
678            raise service_error(service_error.internal, "Bad topology: %s" %e)
679
680        lans = { }
681        links = { }
682
683        # Walk through the virtual topology, organizing the connections into
684        # 2-node connections (links) and more-than-2-node connections (lans).
685        # When a lan is created, it's added to the list of nodes (there's a
686        # node in the visualization for the lan).
687        for l in topo_lans:
688            if links.has_key(l['vname']):
689                if len(links[l['vname']]) < 2:
690                    links[l['vname']].append(l['vnode'])
691                else:
692                    nodes.append(l['vname'])
693                    lans[l['vname']] = links[l['vname']]
694                    del links[l['vname']]
695                    lans[l['vname']].append(l['vnode'])
696            elif lans.has_key(l['vname']):
697                lans[l['vname']].append(l['vnode'])
698            else:
699                links[l['vname']] = [ l['vnode'] ]
700
701
702        # Open up a temporary file for dot to turn into a visualization
703        try:
704            df, dotname = tempfile.mkstemp()
705            dotfile = os.fdopen(df, 'w')
706        except EnvironmentError:
707            raise service_error(service_error.internal,
708                    "Failed to open file in genviz")
709
710        try:
711            dnull = open('/dev/null', 'w')
712        except EnvironmentError:
713            service_error(service_error.internal,
714                    "Failed to open /dev/null in genviz")
715
716        # Generate a dot/neato input file from the links, nodes and lans
717        try:
718            print >>dotfile, "graph G {"
719            for n in nodes:
720                print >>dotfile, '\t"%s"' % n
721            for l in links.keys():
722                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
723            for l in lans.keys():
724                for n in lans[l]:
725                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
726            print >>dotfile, "}"
727            dotfile.close()
728        except TypeError:
729            raise service_error(service_error.internal,
730                    "Single endpoint link in vtopo")
731        except EnvironmentError:
732            raise service_error(service_error.internal, "Cannot write dot file")
733
734        # Use dot to create a visualization
735        try:
736            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
737                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
738                stderr=dnull, close_fds=True)
739        except EnvironmentError:
740            raise service_error(service_error.internal, 
741                    "Cannot generate visualization: is graphviz available?")
742        dnull.close()
743
744        # Translate dot to vis format
745        vis_nodes = [ ]
746        vis = { 'node': vis_nodes }
747        for line in dot.stdout:
748            m = vis_re.match(line)
749            if m:
750                vn = m.group(1)
751                vis_node = {'name': vn, \
752                        'x': float(m.group(2)),\
753                        'y' : float(m.group(3)),\
754                    }
755                if vn in links.keys() or vn in lans.keys():
756                    vis_node['type'] = 'lan'
757                else:
758                    vis_node['type'] = 'node'
759                vis_nodes.append(vis_node)
760        rv = dot.wait()
761
762        os.remove(dotname)
763        # XXX: graphviz seems to use low return codes for warnings, like
764        # "couldn't find font"
765        if rv < 2 : return vis
766        else: return None
767
768
769    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
770            cert_pwd=None):
771        """
772        Release access to testbed through fedd
773        """
774
775        if not uri and tbmap:
776            uri = tbmap.get(tb, None)
777        if not uri:
778            raise service_error(service_error.server_config, 
779                    "Unknown testbed: %s" % tb)
780
781        if self.local_access.has_key(uri):
782            resp = self.local_access[uri].ReleaseAccess(\
783                    { 'ReleaseAccessRequestBody' : 
784                        {'allocID': {'fedid': aid}},}, 
785                    fedid(file=cert_file))
786            resp = { 'ReleaseAccessResponseBody': resp } 
787        else:
788            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
789                    cert_file, cert_pwd, self.trusted_certs)
790
791        # better error coding
792
793    def remote_ns2topdl(self, uri, desc):
794
795        req = {
796                'description' : { 'ns2description': desc },
797            }
798
799        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
800                self.trusted_certs)
801
802        if r.has_key('Ns2TopdlResponseBody'):
803            r = r['Ns2TopdlResponseBody']
804            ed = r.get('experimentdescription', None)
805            if ed.has_key('topdldescription'):
806                return topdl.Topology(**ed['topdldescription'])
807            else:
808                raise service_error(service_error.protocol, 
809                        "Bad splitter response (no output)")
810        else:
811            raise service_error(service_error.protocol, "Bad splitter response")
812
813    class start_segment:
814        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
815                cert_pwd=None, trusted_certs=None, caller=None,
816                log_collector=None):
817            self.log = log
818            self.debug = debug
819            self.cert_file = cert_file
820            self.cert_pwd = cert_pwd
821            self.trusted_certs = None
822            self.caller = caller
823            self.testbed = testbed
824            self.log_collector = log_collector
825            self.response = None
826            self.node = { }
827            self.proof = None
828
829        def make_map(self, resp):
830            if 'segmentdescription' not in resp  or \
831                    'topdldescription' not in resp['segmentdescription']:
832                self.log.warn('No topology returned from startsegment')
833                return 
834
835            top = topdl.Topology(
836                    **resp['segmentdescription']['topdldescription'])
837
838            for e in top.elements:
839                if isinstance(e, topdl.Computer):
840                    self.node[e.name] = e
841
842        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
843            req = {
844                    'allocID': { 'fedid' : aid }, 
845                    'segmentdescription': { 
846                        'topdldescription': topo.to_dict(),
847                    },
848                }
849
850            if connInfo:
851                req['connection'] = connInfo
852
853            import_svcs = [ s for m in masters.values() \
854                    for s in m if self.testbed in s.importers]
855
856            if import_svcs or self.testbed in masters:
857                req['service'] = []
858
859            for s in import_svcs:
860                for r in s.reqs:
861                    sr = copy.deepcopy(r)
862                    sr['visibility'] = 'import';
863                    req['service'].append(sr)
864
865            for s in masters.get(self.testbed, []):
866                for r in s.reqs:
867                    sr = copy.deepcopy(r)
868                    sr['visibility'] = 'export';
869                    req['service'].append(sr)
870
871            if attrs:
872                req['fedAttr'] = attrs
873
874            try:
875                self.log.debug("Calling StartSegment at %s " % uri)
876                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
877                        self.trusted_certs)
878                if r.has_key('StartSegmentResponseBody'):
879                    lval = r['StartSegmentResponseBody'].get('allocationLog',
880                            None)
881                    if lval and self.log_collector:
882                        for line in  lval.splitlines(True):
883                            self.log_collector.write(line)
884                    self.make_map(r['StartSegmentResponseBody'])
885                    if 'proof' in r: self.proof = r['proof']
886                    self.response = r
887                else:
888                    raise service_error(service_error.internal, 
889                            "Bad response!?: %s" %r)
890                return True
891            except service_error, e:
892                self.log.error("Start segment failed on %s: %s" % \
893                        (self.testbed, e))
894                return False
895
896
897
898    class terminate_segment:
899        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
900                cert_pwd=None, trusted_certs=None, caller=None):
901            self.log = log
902            self.debug = debug
903            self.cert_file = cert_file
904            self.cert_pwd = cert_pwd
905            self.trusted_certs = None
906            self.caller = caller
907            self.testbed = testbed
908
909        def __call__(self, uri, aid ):
910            req = {
911                    'allocID': {'fedid': aid }, 
912                }
913            self.log.info("Calling terminate segment")
914            try:
915                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
916                        self.trusted_certs)
917                self.log.info("Terminate segment succeeded")
918                return True
919            except service_error, e:
920                self.log.error("Terminate segment failed on %s: %s" % \
921                        (self.testbed, e))
922                return False
923
924    class info_segment(start_segment):
925        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
926                cert_pwd=None, trusted_certs=None, caller=None,
927                log_collector=None):
928            experiment_control_local.start_segment.__init__(self, debug, 
929                    log, testbed, cert_file, cert_pwd, trusted_certs, 
930                    caller, log_collector)
931
932        def __call__(self, uri, aid):
933            req = { 'allocID': { 'fedid' : aid } }
934
935            try:
936                self.log.debug("Calling InfoSegment at %s " % uri)
937                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
938                        self.trusted_certs)
939                if r.has_key('InfoSegmentResponseBody'):
940                    self.make_map(r['InfoSegmentResponseBody'])
941                    if 'proof' in r: self.proof = r['proof']
942                    self.response = r
943                else:
944                    raise service_error(service_error.internal, 
945                            "Bad response!?: %s" %r)
946                return True
947            except service_error, e:
948                self.log.error("Info segment failed on %s: %s" % \
949                        (self.testbed, e))
950                return False
951
952    class operation_segment:
953        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
954                cert_pwd=None, trusted_certs=None, caller=None,
955                log_collector=None):
956            self.log = log
957            self.debug = debug
958            self.cert_file = cert_file
959            self.cert_pwd = cert_pwd
960            self.trusted_certs = None
961            self.caller = caller
962            self.testbed = testbed
963            self.status = None
964
965        def __call__(self, uri, aid, op, targets, params):
966            req = { 
967                    'allocID': { 'fedid' : aid },
968                    'operation': op,
969                    'target': targets,
970                    }
971            if params: req['parameter'] = params
972
973
974            try:
975                self.log.debug("Calling OperationSegment at %s " % uri)
976                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
977                        self.trusted_certs)
978                if 'OperationSegmentResponseBody' in r:
979                    r = r['OperationSegmentResponseBody']
980                    if 'status' in r:
981                        self.status = r['status']
982                else:
983                    raise service_error(service_error.internal, 
984                            "Bad response!?: %s" %r)
985                return True
986            except service_error, e:
987                self.log.error("Operation segment failed on %s: %s" % \
988                        (self.testbed, e))
989                return False
990
991    def annotate_topology(self, top, data):
992        # These routines do various parts of the annotation
993        def add_new_names(nl, l):
994            """ add any names in nl to the list in l """
995            for n in nl:
996                if n not in l: l.append(n)
997       
998        def merge_services(ne, e):
999            for ns in ne.service:
1000                # NB: the else is on the for
1001                for s in e.service:
1002                    if ns.name == s.name:
1003                        s.importer = ns.importer
1004                        s.param = ns.param
1005                        s.description = ns.description
1006                        s.status = ns.status
1007                        break
1008                else:
1009                    e.service.append(ns)
1010       
1011        def merge_oses(ne, e):
1012            """
1013            Merge the operating system entries of ne into e
1014            """
1015            for nos in ne.os:
1016                # NB: the else is on the for
1017                for os in e.os:
1018                    if nos.name == os.name:
1019                        os.version = nos.version
1020                        os.version = nos.distribution
1021                        os.version = nos.distributionversion
1022                        for a in nos.attribute:
1023                            if os.get_attribute(a.attribute):
1024                                os.remove_attribute(a.attribute)
1025                            os.set_attribute(a.attribute, a.value)
1026                        break
1027                else:
1028                    # If both nodes have one OS, this is a replacement
1029                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
1030                    else: e.os.append(nos)
1031
1032        # Annotate the topology with embedding info
1033        for e in top.elements:
1034            if isinstance(e, topdl.Computer):
1035                for s in data:
1036                    ne = s.node.get(e.name, None)
1037                    if ne is not None:
1038                        add_new_names(ne.localname, e.localname)
1039                        e.status = ne.status
1040                        merge_services(ne, e)
1041                        add_new_names(ne.operation, e.operation)
1042                        if ne.os: merge_oses(ne, e)
1043                        break
1044
1045
1046
1047    def allocate_resources(self, allocated, masters, eid, expid, 
1048            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1049            attrs=None, connInfo={}, tbmap=None, expcert=None):
1050
1051        started = { }           # Testbeds where a sub-experiment started
1052                                # successfully
1053
1054        # XXX
1055        fail_soft = False
1056
1057        if tbmap is None: tbmap = { }
1058
1059        log = alloc_log or self.log
1060
1061        tp = thread_pool(self.nthreads)
1062        threads = [ ]
1063        starters = [ ]
1064
1065        if expcert:
1066            cert = expcert
1067            pw = None
1068        else:
1069            cert = self.cert_file
1070            pw = self.cert_pwd
1071
1072        for tb in allocated.keys():
1073            # Create and start a thread to start the segment, and save it
1074            # to get the return value later
1075            tb_attrs = copy.copy(attrs)
1076            tp.wait_for_slot()
1077            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1078            base, suffix = split_testbed(tb)
1079            if suffix:
1080                tb_attrs.append({'attribute': 'experiment_name', 
1081                    'value': "%s-%s" % (eid, suffix)})
1082            else:
1083                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1084            if not uri:
1085                raise service_error(service_error.internal, 
1086                        "Unknown testbed %s !?" % tb)
1087
1088            aid = tbparams[tb].allocID
1089            if not aid:
1090                raise service_error(service_error.internal, 
1091                        "No alloc id for testbed %s !?" % tb)
1092
1093            s = self.start_segment(log=log, debug=self.debug,
1094                    testbed=tb, cert_file=cert,
1095                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1096                    caller=self.call_StartSegment,
1097                    log_collector=log_collector)
1098            starters.append(s)
1099            t  = pooled_thread(\
1100                    target=s, name=tb,
1101                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1102                    pdata=tp, trace_file=self.trace_file)
1103            threads.append(t)
1104            t.start()
1105
1106        # Wait until all finish (keep pinging the log, though)
1107        mins = 0
1108        revoked = False
1109        while not tp.wait_for_all_done(60.0):
1110            mins += 1
1111            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1112                    % mins)
1113            if not revoked and \
1114                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1115                # a testbed has failed.  Revoke this experiment's
1116                # synchronizarion values so that sub experiments will not
1117                # deadlock waiting for synchronization that will never happen
1118                self.log.info("A subexperiment has failed to swap in, " + \
1119                        "revoking synch keys")
1120                var_key = "fedid:%s" % expid
1121                for k in self.synch_store.all_keys():
1122                    if len(k) > 45 and k[0:46] == var_key:
1123                        self.synch_store.revoke_key(k)
1124                revoked = True
1125
1126        failed = [ t.getName() for t in threads if not t.rv ]
1127        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1128
1129        # If one failed clean up, unless fail_soft is set
1130        if failed:
1131            if not fail_soft:
1132                tp.clear()
1133                for tb in succeeded:
1134                    # Create and start a thread to stop the segment
1135                    tp.wait_for_slot()
1136                    uri = tbparams[tb].uri
1137                    t  = pooled_thread(\
1138                            target=self.terminate_segment(log=log,
1139                                testbed=tb,
1140                                cert_file=cert, 
1141                                cert_pwd=pw,
1142                                trusted_certs=self.trusted_certs,
1143                                caller=self.call_TerminateSegment),
1144                            args=(uri, tbparams[tb].allocID),
1145                            name=tb,
1146                            pdata=tp, trace_file=self.trace_file)
1147                    t.start()
1148                # Wait until all finish (if any are being stopped)
1149                if succeeded:
1150                    tp.wait_for_all_done()
1151
1152                # release the allocations
1153                for tb in tbparams.keys():
1154                    try:
1155                        self.release_access(tb, tbparams[tb].allocID, 
1156                                tbmap=tbmap, uri=tbparams[tb].uri,
1157                                cert_file=cert, cert_pwd=pw)
1158                    except service_error, e:
1159                        self.log.warn("Error releasing access: %s" % e.desc)
1160                # Remove the placeholder
1161                self.state_lock.acquire()
1162                self.state[eid].status = 'failed'
1163                self.state[eid].updated()
1164                if self.state_filename: self.write_state()
1165                self.state_lock.release()
1166                # Remove the repo dir
1167                self.remove_dirs("%s/%s" %(self.repodir, expid))
1168                # Walk up tmpdir, deleting as we go
1169                if self.cleanup:
1170                    self.remove_dirs(tmpdir)
1171                else:
1172                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1173
1174
1175                log.error("Swap in failed on %s" % ",".join(failed))
1176                return
1177        else:
1178            # Walk through the successes and gather the proofs
1179            proofs = { }
1180            for s in starters:
1181                if s.proof:
1182                    proofs[s.testbed] = s.proof
1183            self.annotate_topology(top, starters)
1184            log.info("[start_segment]: Experiment %s active" % eid)
1185
1186
1187        # Walk up tmpdir, deleting as we go
1188        if self.cleanup:
1189            self.remove_dirs(tmpdir)
1190        else:
1191            log.debug("[start_experiment]: not removing %s" % tmpdir)
1192
1193        # Insert the experiment into our state and update the disk copy.
1194        self.state_lock.acquire()
1195        self.state[expid].status = 'active'
1196        self.state[eid] = self.state[expid]
1197        self.state[eid].top = top
1198        self.state[eid].updated()
1199        # Append startup proofs
1200        for f in self.state[eid].get_all_allocations():
1201            if f.tb in proofs:
1202                f.proof.append(proofs[f.tb])
1203
1204        if self.state_filename: self.write_state()
1205        self.state_lock.release()
1206        return
1207
1208
1209    def add_kit(self, e, kit):
1210        """
1211        Add a Software object created from the list of (install, location)
1212        tuples passed as kit  to the software attribute of an object e.  We
1213        do this enough to break out the code, but it's kind of a hack to
1214        avoid changing the old tuple rep.
1215        """
1216
1217        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1218
1219        if isinstance(e.software, list): e.software.extend(s)
1220        else: e.software = s
1221
1222    def append_experiment_authorization(self, expid, attrs, 
1223            need_state_lock=True):
1224        """
1225        Append the authorization information to system state
1226        """
1227
1228        for p, a in attrs:
1229            self.auth.set_attribute(p, a)
1230        self.auth.save()
1231
1232        if need_state_lock: self.state_lock.acquire()
1233        # XXX: really a no op?
1234        #self.state[expid]['auth'].update(attrs)
1235        if self.state_filename: self.write_state()
1236        if need_state_lock: self.state_lock.release()
1237
1238    def clear_experiment_authorization(self, expid, need_state_lock=True):
1239        """
1240        Attrs is a set of attribute principal pairs that need to be removed
1241        from the authenticator.  Remove them and save the authenticator.
1242        """
1243
1244        if need_state_lock: self.state_lock.acquire()
1245        # XXX: should be a no-op
1246        #if expid in self.state and 'auth' in self.state[expid]:
1247            #for p, a in self.state[expid]['auth']:
1248                #self.auth.unset_attribute(p, a)
1249            #self.state[expid]['auth'] = set()
1250        if self.state_filename: self.write_state()
1251        if need_state_lock: self.state_lock.release()
1252        self.auth.save()
1253
1254
1255    def create_experiment_state(self, fid, req, expid, expcert,
1256            state='starting'):
1257        """
1258        Create the initial entry in the experiment's state.  The expid and
1259        expcert are the experiment's fedid and certifacte that represents that
1260        ID, which are installed in the experiment state.  If the request
1261        includes a suggested local name that is used if possible.  If the local
1262        name is already taken by an experiment owned by this user that has
1263        failed, it is overwritten.  Otherwise new letters are added until a
1264        valid localname is found.  The generated local name is returned.
1265        """
1266
1267        if req.has_key('experimentID') and \
1268                req['experimentID'].has_key('localname'):
1269            overwrite = False
1270            eid = req['experimentID']['localname']
1271            # If there's an old failed experiment here with the same local name
1272            # and accessible by this user, we'll overwrite it, otherwise we'll
1273            # fall through and do the collision avoidance.
1274            old_expid = self.get_experiment_fedid(eid)
1275            if old_expid:
1276                users_experiment = True
1277                try:
1278                    self.check_experiment_access(fid, old_expid)
1279                except service_error, e:
1280                    if e.code == service_error.access: users_experiment = False
1281                    else: raise e
1282                if users_experiment:
1283                    self.state_lock.acquire()
1284                    status = self.state[eid].status
1285                    if status and status == 'failed':
1286                        # remove the old access attributes
1287                        self.clear_experiment_authorization(eid,
1288                                need_state_lock=False)
1289                        overwrite = True
1290                        del self.state[eid]
1291                        del self.state[old_expid]
1292                    self.state_lock.release()
1293                else:
1294                    self.log.info('Experiment %s exists, ' % eid + \
1295                            'but this user cannot access it')
1296            self.state_lock.acquire()
1297            while (self.state.has_key(eid) and not overwrite):
1298                eid += random.choice(string.ascii_letters)
1299            # Initial state
1300            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1301                    identity=expcert)
1302            self.state[expid] = self.state[eid]
1303            if self.state_filename: self.write_state()
1304            self.state_lock.release()
1305        else:
1306            eid = self.exp_stem
1307            for i in range(0,5):
1308                eid += random.choice(string.ascii_letters)
1309            self.state_lock.acquire()
1310            while (self.state.has_key(eid)):
1311                eid = self.exp_stem
1312                for i in range(0,5):
1313                    eid += random.choice(string.ascii_letters)
1314            # Initial state
1315            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1316                    identity=expcert)
1317            self.state[expid] = self.state[eid]
1318            if self.state_filename: self.write_state()
1319            self.state_lock.release()
1320
1321        # Let users touch the state.  Authorize this fid and the expid itself
1322        # to touch the experiment, as well as allowing th eoverrides.
1323        self.append_experiment_authorization(eid, 
1324                set([(fid, expid), (expid,expid)] + \
1325                        [ (o, expid) for o in self.overrides]))
1326
1327        return eid
1328
1329
1330    def allocate_ips_to_topo(self, top):
1331        """
1332        Add an ip4_address attribute to all the hosts in the topology, based on
1333        the shared substrates on which they sit.  An /etc/hosts file is also
1334        created and returned as a list of hostfiles entries.  We also return
1335        the allocator, because we may need to allocate IPs to portals
1336        (specifically DRAGON portals).
1337        """
1338        subs = sorted(top.substrates, 
1339                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1340                reverse=True)
1341        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1342        ifs = { }
1343        hosts = [ ]
1344
1345        for idx, s in enumerate(subs):
1346            net_size = len(s.interfaces)+2
1347
1348            a = ips.allocate(net_size)
1349            if a :
1350                base, num = a
1351                if num < net_size: 
1352                    raise service_error(service_error.internal,
1353                            "Allocator returned wrong number of IPs??")
1354            else:
1355                raise service_error(service_error.req, 
1356                        "Cannot allocate IP addresses")
1357            mask = ips.min_alloc
1358            while mask < net_size:
1359                mask *= 2
1360
1361            netmask = ((2**32-1) ^ (mask-1))
1362
1363            base += 1
1364            for i in s.interfaces:
1365                i.attribute.append(
1366                        topdl.Attribute('ip4_address', 
1367                            "%s" % ip_addr(base)))
1368                i.attribute.append(
1369                        topdl.Attribute('ip4_netmask', 
1370                            "%s" % ip_addr(int(netmask))))
1371
1372                hname = i.element.name
1373                if ifs.has_key(hname):
1374                    hosts.append("%s\t%s-%s %s-%d" % \
1375                            (ip_addr(base), hname, s.name, hname,
1376                                ifs[hname]))
1377                else:
1378                    ifs[hname] = 0
1379                    hosts.append("%s\t%s-%s %s-%d %s" % \
1380                            (ip_addr(base), hname, s.name, hname,
1381                                ifs[hname], hname))
1382
1383                ifs[hname] += 1
1384                base += 1
1385        return hosts, ips
1386
1387    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1388            tbparam, masters, tbmap, expid=None, expcert=None):
1389        for tb in testbeds:
1390            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1391                    expcert)
1392            allocated[tb] = 1
1393
1394    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1395            expcert=None):
1396        """
1397        Get access to testbed through fedd and set the parameters for that tb
1398        """
1399        def get_export_project(svcs):
1400            """
1401            Look through for the list of federated_service for this testbed
1402            objects for a project_export service, and extract the project
1403            parameter.
1404            """
1405
1406            pe = [s for s in svcs if s.name=='project_export']
1407            if len(pe) == 1:
1408                return pe[0].params.get('project', None)
1409            elif len(pe) == 0:
1410                return None
1411            else:
1412                raise service_error(service_error.req,
1413                        "More than one project export is not supported")
1414
1415        def add_services(svcs, type, slist, keys):
1416            """
1417            Add the given services to slist.  type is import or export.  Also
1418            add a mapping entry from the assigned id to the original service
1419            record.
1420            """
1421            for i, s in enumerate(svcs):
1422                idx = '%s%d' % (type, i)
1423                keys[idx] = s
1424                sr = {'id': idx, 'name': s.name, 'visibility': type }
1425                if s.params:
1426                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1427                            for k, v in s.params.items()]
1428                slist.append(sr)
1429
1430        uri = tbmap.get(testbed_base(tb), None)
1431        if not uri:
1432            raise service_error(service_error.server_config, 
1433                    "Unknown testbed: %s" % tb)
1434
1435        export_svcs = masters.get(tb,[])
1436        import_svcs = [ s for m in masters.values() \
1437                for s in m \
1438                    if tb in s.importers ]
1439
1440        export_project = get_export_project(export_svcs)
1441        # Compose the credential list so that IDs come before attributes
1442        creds = set()
1443        keys = set()
1444        certs = self.auth.get_creds_for_principal(fid)
1445        if expid:
1446            certs.update(self.auth.get_creds_for_principal(expid))
1447        for c in certs:
1448            keys.add(c.issuer_cert())
1449            creds.add(c.attribute_cert())
1450        creds = list(keys) + list(creds)
1451
1452        if expcert: cert, pw = expcert, None
1453        else: cert, pw = self.cert_file, self.cert_pw
1454
1455        # Request credentials
1456        req = {
1457                'abac_credential': creds,
1458            }
1459        # Make the service request from the services we're importing and
1460        # exporting.  Keep track of the export request ids so we can
1461        # collect the resulting info from the access response.
1462        e_keys = { }
1463        if import_svcs or export_svcs:
1464            slist = []
1465            add_services(import_svcs, 'import', slist, e_keys)
1466            add_services(export_svcs, 'export', slist, e_keys)
1467            req['service'] = slist
1468
1469        if self.local_access.has_key(uri):
1470            # Local access call
1471            req = { 'RequestAccessRequestBody' : req }
1472            r = self.local_access[uri].RequestAccess(req, 
1473                    fedid(file=self.cert_file))
1474            r = { 'RequestAccessResponseBody' : r }
1475        else:
1476            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1477
1478        if r.has_key('RequestAccessResponseBody'):
1479            # Through to here we have a valid response, not a fault.
1480            # Access denied is a fault, so something better or worse than
1481            # access denied has happened.
1482            r = r['RequestAccessResponseBody']
1483            self.log.debug("[get_access] Access granted")
1484        else:
1485            raise service_error(service_error.protocol,
1486                        "Bad proxy response")
1487        if 'proof' not in r:
1488            raise service_error(service_error.protocol,
1489                        "Bad access response (no access proof)")
1490
1491        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1492                tb=tb, uri=uri, proof=[r['proof']], 
1493                services=masters.get(tb, None))
1494
1495        # Collect the responses corresponding to the services this testbed
1496        # exports.  These will be the service requests that we will include in
1497        # the start segment requests (with appropriate visibility values) to
1498        # import and export the segments.
1499        for s in r.get('service', []):
1500            id = s.get('id', None)
1501            # Note that this attaches the response to the object in the masters
1502            # data structure.  (The e_keys index disappears when this fcn
1503            # returns)
1504            if id and id in e_keys:
1505                e_keys[id].reqs.append(s)
1506
1507        # Add attributes to parameter space.  We don't allow attributes to
1508        # overlay any parameters already installed.
1509        for a in r.get('fedAttr', []):
1510            try:
1511                if a['attribute']:
1512                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1513            except KeyError:
1514                self.log.error("Bad attribute in response: %s" % a)
1515
1516
1517    def split_topology(self, top, topo, testbeds):
1518        """
1519        Create the sub-topologies that are needed for experiment instantiation.
1520        """
1521        for tb in testbeds:
1522            topo[tb] = top.clone()
1523            # copy in for loop allows deletions from the original
1524            for e in [ e for e in topo[tb].elements]:
1525                etb = e.get_attribute('testbed')
1526                # NB: elements without a testbed attribute won't appear in any
1527                # sub topologies. 
1528                if not etb or etb != tb:
1529                    for i in e.interface:
1530                        for s in i.subs:
1531                            try:
1532                                s.interfaces.remove(i)
1533                            except ValueError:
1534                                raise service_error(service_error.internal,
1535                                        "Can't remove interface??")
1536                    topo[tb].elements.remove(e)
1537            topo[tb].make_indices()
1538
1539    def confirm_software(self, top):
1540        """
1541        Make sure that the software to be loaded in the topo is all available
1542        before we begin making access requests, etc.  This is a subset of
1543        wrangle_software.
1544        """
1545        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1546        pkgs.update([x.location for e in top.elements for x in e.software])
1547
1548        for pkg in pkgs:
1549            loc = pkg
1550
1551            scheme, host, path = urlparse(loc)[0:3]
1552            dest = os.path.basename(path)
1553            if not scheme:
1554                if not loc.startswith('/'):
1555                    loc = "/%s" % loc
1556                loc = "file://%s" %loc
1557            # NB: if scheme was found, loc == pkg
1558            try:
1559                u = urlopen(loc)
1560                u.close()
1561            except Exception, e:
1562                raise service_error(service_error.req, 
1563                        "Cannot open %s: %s" % (loc, e))
1564        return True
1565
1566    def wrangle_software(self, expid, top, topo, tbparams):
1567        """
1568        Copy software out to the repository directory, allocate permissions and
1569        rewrite the segment topologies to look for the software in local
1570        places.
1571        """
1572
1573        # Copy the rpms and tarfiles to a distribution directory from
1574        # which the federants can retrieve them
1575        linkpath = "%s/software" %  expid
1576        softdir ="%s/%s" % ( self.repodir, linkpath)
1577        softmap = { }
1578
1579        # self.fedkit and self.gateway kit are lists of tuples of
1580        # (install_location, download_location) this extracts the download
1581        # locations.
1582        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1583        pkgs.update([x.location for e in top.elements for x in e.software])
1584        try:
1585            os.makedirs(softdir)
1586        except EnvironmentError, e:
1587            raise service_error(
1588                    "Cannot create software directory: %s" % e)
1589        # The actual copying.  Everything's converted into a url for copying.
1590        auth_attrs = set()
1591        for pkg in pkgs:
1592            loc = pkg
1593
1594            scheme, host, path = urlparse(loc)[0:3]
1595            dest = os.path.basename(path)
1596            if not scheme:
1597                if not loc.startswith('/'):
1598                    loc = "/%s" % loc
1599                loc = "file://%s" %loc
1600            # NB: if scheme was found, loc == pkg
1601            try:
1602                u = urlopen(loc)
1603            except Exception, e:
1604                raise service_error(service_error.req, 
1605                        "Cannot open %s: %s" % (loc, e))
1606            try:
1607                f = open("%s/%s" % (softdir, dest) , "w")
1608                self.log.debug("Writing %s/%s" % (softdir,dest) )
1609                data = u.read(4096)
1610                while data:
1611                    f.write(data)
1612                    data = u.read(4096)
1613                f.close()
1614                u.close()
1615            except Exception, e:
1616                raise service_error(service_error.internal,
1617                        "Could not copy %s: %s" % (loc, e))
1618            path = re.sub("/tmp", "", linkpath)
1619            # XXX
1620            softmap[pkg] = \
1621                    "%s/%s/%s" %\
1622                    ( self.repo_url, path, dest)
1623
1624            # Allow the individual segments to access the software by assigning
1625            # an attribute to each testbed allocation that encodes the data to
1626            # be released.  This expression collects the data for each run of
1627            # the loop.
1628            auth_attrs.update([
1629                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1630                        for tb in tbparams.keys()])
1631
1632        self.append_experiment_authorization(expid, auth_attrs)
1633
1634        # Convert the software locations in the segments into the local
1635        # copies on this host
1636        for soft in [ s for tb in topo.values() \
1637                for e in tb.elements \
1638                    if getattr(e, 'software', False) \
1639                        for s in e.software ]:
1640            if softmap.has_key(soft.location):
1641                soft.location = softmap[soft.location]
1642
1643
1644    def new_experiment(self, req, fid):
1645        """
1646        The external interface to empty initial experiment creation called from
1647        the dispatcher.
1648
1649        Creates a working directory, splits the incoming description using the
1650        splitter script and parses out the avrious subsections using the
1651        lcasses above.  Once each sub-experiment is created, use pooled threads
1652        to instantiate them and start it all up.
1653        """
1654        self.log.info("New experiment call started for %s" % fid)
1655        req = req.get('NewRequestBody', None)
1656        if not req:
1657            raise service_error(service_error.req,
1658                    "Bad request format (no NewRequestBody)")
1659
1660        if self.auth.import_credentials(data_list=req.get('credential', [])):
1661            self.auth.save()
1662
1663        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1664                with_proof=True)
1665
1666        if not access_ok:
1667            self.log.info("New experiment call for %s: Access denied" % fid)
1668            raise service_error(service_error.access, "New access denied",
1669                    proof=[proof])
1670
1671        try:
1672            tmpdir = tempfile.mkdtemp(prefix="split-")
1673        except EnvironmentError:
1674            raise service_error(service_error.internal, "Cannot create tmp dir")
1675
1676        try:
1677            access_user = self.accessdb[fid]
1678        except KeyError:
1679            raise service_error(service_error.internal,
1680                    "Access map and authorizer out of sync in " + \
1681                            "new_experiment for fedid %s"  % fid)
1682
1683        # Generate an ID for the experiment (slice) and a certificate that the
1684        # allocator can use to prove they own it.  We'll ship it back through
1685        # the encrypted connection.  If the requester supplied one, use it.
1686        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1687            expcert = req['experimentAccess']['X509']
1688            expid = fedid(certstr=expcert)
1689            self.state_lock.acquire()
1690            if expid in self.state:
1691                self.state_lock.release()
1692                raise service_error(service_error.req, 
1693                        'fedid %s identifies an existing experiment' % expid)
1694            self.state_lock.release()
1695        else:
1696            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1697
1698        #now we're done with the tmpdir, and it should be empty
1699        if self.cleanup:
1700            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1701            os.rmdir(tmpdir)
1702        else:
1703            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1704
1705        eid = self.create_experiment_state(fid, req, expid, expcert, 
1706                state='empty')
1707
1708        rv = {
1709                'experimentID': [
1710                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1711                ],
1712                'experimentStatus': 'empty',
1713                'experimentAccess': { 'X509' : expcert },
1714                'proof': proof.to_dict(),
1715            }
1716
1717        self.log.info("New experiment call succeeded for %s" % fid)
1718        return rv
1719
1720    # create_experiment sub-functions
1721
1722    @staticmethod
1723    def get_experiment_key(req, field='experimentID'):
1724        """
1725        Parse the experiment identifiers out of the request (the request body
1726        tag has been removed).  Specifically this pulls either the fedid or the
1727        localname out of the experimentID field.  A fedid is preferred.  If
1728        neither is present or the request does not contain the fields,
1729        service_errors are raised.
1730        """
1731        # Get the experiment access
1732        exp = req.get(field, None)
1733        if exp:
1734            if exp.has_key('fedid'):
1735                key = exp['fedid']
1736            elif exp.has_key('localname'):
1737                key = exp['localname']
1738            else:
1739                raise service_error(service_error.req, "Unknown lookup type")
1740        else:
1741            raise service_error(service_error.req, "No request?")
1742
1743        return key
1744
1745    def get_experiment_ids_and_start(self, key, tmpdir):
1746        """
1747        Get the experiment name, id and access certificate from the state, and
1748        set the experiment state to 'starting'.  returns a triple (fedid,
1749        localname, access_cert_file). The access_cert_file is a copy of the
1750        contents of the access certificate, created in the tempdir with
1751        restricted permissions.  If things are confused, raise an exception.
1752        """
1753
1754        expid = eid = None
1755        self.state_lock.acquire()
1756        if key in self.state:
1757            exp = self.state[key]
1758            exp.status = "starting"
1759            exp.updated()
1760            expid = exp.fedid
1761            eid = exp.localname
1762            expcert = exp.identity
1763        self.state_lock.release()
1764
1765        # make a protected copy of the access certificate so the experiment
1766        # controller can act as the experiment principal.
1767        if expcert:
1768            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1769            if not expcert_file:
1770                raise service_error(service_error.internal, 
1771                        "Cannot create temp cert file?")
1772        else:
1773            expcert_file = None
1774
1775        return (eid, expid, expcert_file)
1776
1777    def get_topology(self, req, tmpdir):
1778        """
1779        Get the ns2 content and put it into a file for parsing.  Call the local
1780        or remote parser and return the topdl.Topology.  Errors result in
1781        exceptions.  req is the request and tmpdir is a work directory.
1782        """
1783
1784        # The tcl parser needs to read a file so put the content into that file
1785        descr=req.get('experimentdescription', None)
1786        if descr:
1787            if 'ns2description' in descr:
1788                file_content=descr['ns2description']
1789            elif 'topdldescription' in descr:
1790                return topdl.Topology(**descr['topdldescription'])
1791            else:
1792                raise service_error(service_error.req, 
1793                        'Unknown experiment description type')
1794        else:
1795            raise service_error(service_error.req, "No experiment description")
1796
1797
1798        if self.splitter_url:
1799            self.log.debug("Calling remote topdl translator at %s" % \
1800                    self.splitter_url)
1801            top = self.remote_ns2topdl(self.splitter_url, file_content)
1802        else:
1803            tclfile = os.path.join(tmpdir, "experiment.tcl")
1804            if file_content:
1805                try:
1806                    f = open(tclfile, 'w')
1807                    f.write(file_content)
1808                    f.close()
1809                except EnvironmentError:
1810                    raise service_error(service_error.internal,
1811                            "Cannot write temp experiment description")
1812            else:
1813                raise service_error(service_error.req, 
1814                        "Only ns2descriptions supported")
1815            pid = "dummy"
1816            gid = "dummy"
1817            eid = "dummy"
1818
1819            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1820                str(self.muxmax), '-m', 'dummy']
1821
1822            tclcmd.extend([pid, gid, eid, tclfile])
1823
1824            self.log.debug("running local splitter %s", " ".join(tclcmd))
1825            # This is just fantastic.  As a side effect the parser copies
1826            # tb_compat.tcl into the current directory, so that directory
1827            # must be writable by the fedd user.  Doing this in the
1828            # temporary subdir ensures this is the case.
1829            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1830                    cwd=tmpdir)
1831            split_data = tclparser.stdout
1832
1833            top = topdl.topology_from_xml(file=split_data, top="experiment")
1834            os.remove(tclfile)
1835
1836        return top
1837
1838    def get_testbed_services(self, req, testbeds):
1839        """
1840        Parse the services section of the request into two dicts mapping
1841        testbed to lists of federated_service objects.  The first dict maps all
1842        exporters of services to those service objects, the second maps
1843        testbeds to service objects only for services requiring portals.
1844        """
1845        # We construct both dicts here because deriving the second is more
1846        # comples than it looks - both the keys and lists can differ, and it's
1847        # much easier to generate both in one pass.
1848        masters = { }
1849        pmasters = { }
1850        for s in req.get('service', []):
1851            # If this is a service request with the importall field
1852            # set, fill it out.
1853
1854            if s.get('importall', False):
1855                s['import'] = [ tb for tb in testbeds \
1856                        if tb not in s.get('export',[])]
1857                del s['importall']
1858
1859            # Add the service to masters
1860            for tb in s.get('export', []):
1861                if s.get('name', None):
1862
1863                    params = { }
1864                    for a in s.get('fedAttr', []):
1865                        params[a.get('attribute', '')] = a.get('value','')
1866
1867                    fser = federated_service(name=s['name'],
1868                            exporter=tb, importers=s.get('import',[]),
1869                            params=params)
1870                    if fser.name == 'hide_hosts' \
1871                            and 'hosts' not in fser.params:
1872                        fser.params['hosts'] = \
1873                                ",".join(tb_hosts.get(fser.exporter, []))
1874                    if tb in masters: masters[tb].append(fser)
1875                    else: masters[tb] = [fser]
1876
1877                    if fser.portal:
1878                        if tb in pmasters: pmasters[tb].append(fser)
1879                        else: pmasters[tb] = [fser]
1880                else:
1881                    self.log.error('Testbed service does not have name " + \
1882                            "and importers')
1883        return masters, pmasters
1884
1885    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1886        """
1887        Create the ssh keys necessary for interconnecting the portal nodes and
1888        the global hosts file for letting each segment know about the IP
1889        addresses in play.  Save these into the repo.  Add attributes to the
1890        autorizer allowing access controllers to download them and return a set
1891        of attributes that inform the segments where to find this stuff.  May
1892        raise service_errors in if there are problems.
1893        """
1894        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1895        gw_secretkey_base = "fed.%s" % self.ssh_type
1896        keydir = os.path.join(tmpdir, 'keys')
1897        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1898        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1899
1900        try:
1901            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1902        except ValueError:
1903            raise service_error(service_error.server_config, 
1904                    "Bad key type (%s)" % self.ssh_type)
1905
1906        self.generate_seer_certs(keydir)
1907
1908        # Copy configuration files into the remote file store
1909        # The config urlpath
1910        configpath = "/%s/config" % expid
1911        # The config file system location
1912        configdir ="%s%s" % ( self.repodir, configpath)
1913        try:
1914            os.makedirs(configdir)
1915        except EnvironmentError, e:
1916            raise service_error(service_error.internal,
1917                    "Cannot create config directory: %s" % e)
1918        try:
1919            f = open("%s/hosts" % configdir, "w")
1920            print >> f, string.join(hosts, '\n')
1921            f.close()
1922        except EnvironmentError, e:
1923            raise service_error(service_error.internal, 
1924                    "Cannot write hosts file: %s" % e)
1925        try:
1926            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1927            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1928            copy_file(os.path.join(keydir, 'ca.pem'), 
1929                    os.path.join(configdir, 'ca.pem'))
1930            copy_file(os.path.join(keydir, 'node.pem'), 
1931                    os.path.join(configdir, 'node.pem'))
1932        except EnvironmentError, e:
1933            raise service_error(service_error.internal, 
1934                    "Cannot copy keyfiles: %s" % e)
1935
1936        # Allow the individual testbeds to access the configuration files,
1937        # again by setting an attribute for the relevant pathnames on each
1938        # allocation principal.  Yeah, that's a long list comprehension.
1939        self.append_experiment_authorization(expid, set([
1940            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1941                    for tb in tbparams.keys() \
1942                        for f in ("hosts", 'ca.pem', 'node.pem', 
1943                            gw_secretkey_base, gw_pubkey_base)]))
1944
1945        attrs = [ 
1946                {
1947                    'attribute': 'ssh_pubkey', 
1948                    'value': '%s/%s/config/%s' % \
1949                            (self.repo_url, expid, gw_pubkey_base)
1950                },
1951                {
1952                    'attribute': 'ssh_secretkey', 
1953                    'value': '%s/%s/config/%s' % \
1954                            (self.repo_url, expid, gw_secretkey_base)
1955                },
1956                {
1957                    'attribute': 'hosts', 
1958                    'value': '%s/%s/config/hosts' % \
1959                            (self.repo_url, expid)
1960                },
1961                {
1962                    'attribute': 'seer_ca_pem', 
1963                    'value': '%s/%s/config/%s' % \
1964                            (self.repo_url, expid, 'ca.pem')
1965                },
1966                {
1967                    'attribute': 'seer_node_pem', 
1968                    'value': '%s/%s/config/%s' % \
1969                            (self.repo_url, expid, 'node.pem')
1970                },
1971            ]
1972        return attrs
1973
1974
1975    def get_vtopo(self, req, fid):
1976        """
1977        Return the stored virtual topology for this experiment
1978        """
1979        rv = None
1980        state = None
1981        self.log.info("vtopo call started for %s" %  fid)
1982
1983        req = req.get('VtopoRequestBody', None)
1984        if not req:
1985            raise service_error(service_error.req,
1986                    "Bad request format (no VtopoRequestBody)")
1987        exp = req.get('experiment', None)
1988        if exp:
1989            if exp.has_key('fedid'):
1990                key = exp['fedid']
1991                keytype = "fedid"
1992            elif exp.has_key('localname'):
1993                key = exp['localname']
1994                keytype = "localname"
1995            else:
1996                raise service_error(service_error.req, "Unknown lookup type")
1997        else:
1998            raise service_error(service_error.req, "No request?")
1999
2000        proof = self.check_experiment_access(fid, key)
2001
2002        self.state_lock.acquire()
2003        # XXX: this needs to be recalculated
2004        if key in self.state:
2005            if self.state[key].top is not None:
2006                vtopo = topdl.topology_to_vtopo(self.state[key].top)
2007                rv = { 'experiment' : {keytype: key },
2008                        'vtopo': vtopo,
2009                        'proof': proof.to_dict(), 
2010                    }
2011            else:
2012                state = self.state[key].status
2013        self.state_lock.release()
2014
2015        if rv: 
2016            self.log.info("vtopo call completed for %s %s " % \
2017                (key, fid))
2018            return rv
2019        else: 
2020            if state:
2021                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2022                    (key, fid))
2023                raise service_error(service_error.partial, 
2024                        "Not ready: %s" % state)
2025            else:
2026                self.log.info("vtopo call completed for %s %s (No experiment)"\
2027                        % (key, fid))
2028                raise service_error(service_error.req, "No such experiment")
2029
2030    def get_vis(self, req, fid):
2031        """
2032        Return the stored visualization for this experiment
2033        """
2034        rv = None
2035        state = None
2036
2037        self.log.info("vis call started for %s" %  fid)
2038        req = req.get('VisRequestBody', None)
2039        if not req:
2040            raise service_error(service_error.req,
2041                    "Bad request format (no VisRequestBody)")
2042        exp = req.get('experiment', None)
2043        if exp:
2044            if exp.has_key('fedid'):
2045                key = exp['fedid']
2046                keytype = "fedid"
2047            elif exp.has_key('localname'):
2048                key = exp['localname']
2049                keytype = "localname"
2050            else:
2051                raise service_error(service_error.req, "Unknown lookup type")
2052        else:
2053            raise service_error(service_error.req, "No request?")
2054
2055        proof = self.check_experiment_access(fid, key)
2056
2057        self.state_lock.acquire()
2058        # Generate the visualization
2059        if key in self.state:
2060            if self.state[key].top is not None:
2061                try:
2062                    vis = self.genviz(
2063                            topdl.topology_to_vtopo(self.state[key].top))
2064                except service_error, e:
2065                    self.state_lock.release()
2066                    raise e
2067                rv =  { 'experiment' : {keytype: key },
2068                        'vis': vis,
2069                        'proof': proof.to_dict(), 
2070                        }
2071            else:
2072                state = self.state[key].status
2073        self.state_lock.release()
2074
2075        if rv: 
2076            self.log.info("vis call completed for %s %s " % \
2077                (key, fid))
2078            return rv
2079        else:
2080            if state:
2081                self.log.info("vis call completed for %s %s (not ready)" % \
2082                    (key, fid))
2083                raise service_error(service_error.partial, 
2084                        "Not ready: %s" % state)
2085            else:
2086                self.log.info("vis call completed for %s %s (no experiment)" % \
2087                    (key, fid))
2088                raise service_error(service_error.req, "No such experiment")
2089
2090   
2091    def save_federant_information(self, allocated, tbparams, eid, top):
2092        """
2093        Store the various data that have changed in the experiment state
2094        between when it was started and the beginning of resource allocation.
2095        This is basically the information about each local allocation.  This
2096        fills in the values of the placeholder allocation in the state.  It
2097        also collects the access proofs and returns them as dicts for a
2098        response message.
2099        """
2100        self.state_lock.acquire()
2101        exp = self.state[eid]
2102        exp.top = top.clone()
2103        # save federant information
2104        for k in allocated.keys():
2105            exp.add_allocation(tbparams[k])
2106            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2107                type="testbed", localname=[k], 
2108                service=[ s.to_topdl() for s in tbparams[k].services]))
2109
2110        # Access proofs for the response message
2111        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2112                    for p in tbparams[k].proof]
2113        exp.updated()
2114        if self.state_filename: 
2115            self.write_state()
2116        self.state_lock.release()
2117        return proofs
2118
2119    def clear_placeholder(self, eid, expid, tmpdir):
2120        """
2121        Clear the placeholder and remove any allocated temporary dir.
2122        """
2123
2124        self.state_lock.acquire()
2125        del self.state[eid]
2126        del self.state[expid]
2127        if self.state_filename: self.write_state()
2128        self.state_lock.release()
2129        if tmpdir and self.cleanup:
2130            self.remove_dirs(tmpdir)
2131
2132    # end of create_experiment sub-functions
2133
2134    def create_experiment(self, req, fid):
2135        """
2136        The external interface to experiment creation called from the
2137        dispatcher.
2138
2139        Creates a working directory, splits the incoming description using the
2140        splitter script and parses out the various subsections using the
2141        classes above.  Once each sub-experiment is created, use pooled threads
2142        to instantiate them and start it all up.
2143        """
2144
2145        self.log.info("Create experiment call started for %s" % fid)
2146        req = req.get('CreateRequestBody', None)
2147        if req:
2148            key = self.get_experiment_key(req)
2149        else:
2150            raise service_error(service_error.req,
2151                    "Bad request format (no CreateRequestBody)")
2152
2153        # Import information from the requester
2154        if self.auth.import_credentials(data_list=req.get('credential', [])):
2155            self.auth.save()
2156
2157        # Make sure that the caller can talk to us
2158        proof = self.check_experiment_access(fid, key)
2159
2160        # Install the testbed map entries supplied with the request into a copy
2161        # of the testbed map.
2162        tbmap = dict(self.tbmap)
2163        tbactive = set(self.tbactive)
2164        for m in req.get('testbedmap', []):
2165            if 'testbed' in m and 'uri' in m:
2166                tbmap[m['testbed']] = m['uri']
2167                if 'active' in m and m['active']: tbactive.add(m['testbed'])
2168
2169        # a place to work
2170        try:
2171            tmpdir = tempfile.mkdtemp(prefix="split-")
2172            os.mkdir(tmpdir+"/keys")
2173        except EnvironmentError:
2174            raise service_error(service_error.internal, "Cannot create tmp dir")
2175
2176        tbparams = { }
2177
2178        eid, expid, expcert_file = \
2179                self.get_experiment_ids_and_start(key, tmpdir)
2180
2181        # This catches exceptions to clear the placeholder if necessary
2182        try: 
2183            if not (eid and expid):
2184                raise service_error(service_error.internal, 
2185                        "Cannot find local experiment info!?")
2186
2187            top = self.get_topology(req, tmpdir)
2188            self.confirm_software(top)
2189            # Assign the IPs
2190            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2191            # Find the testbeds to look up
2192            tb_hosts = { }
2193            testbeds = [ ]
2194            for e in top.elements:
2195                if isinstance(e, topdl.Computer):
2196                    tb = e.get_attribute('testbed') or 'default'
2197                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2198                    else: 
2199                        tb_hosts[tb] = [ e.name ]
2200                        testbeds.append(tb)
2201
2202            masters, pmasters = self.get_testbed_services(req, testbeds)
2203            allocated = { }         # Testbeds we can access
2204            topo ={ }               # Sub topologies
2205            connInfo = { }          # Connection information
2206
2207            self.split_topology(top, topo, testbeds)
2208
2209            self.get_access_to_testbeds(testbeds, fid, allocated, 
2210                    tbparams, masters, tbmap, expid, expcert_file)
2211
2212            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2213
2214            part = experiment_partition(self.auth, self.store_url, tbmap,
2215                    self.muxmax, self.direct_transit, tbactive)
2216            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2217                    connInfo, expid)
2218
2219            auth_attrs = set()
2220            # Now get access to the dynamic testbeds (those added above)
2221            for tb in [ t for t in topo if t not in allocated]:
2222                self.get_access(tb, tbparams, fid, masters, tbmap, 
2223                        expid, expcert_file)
2224                allocated[tb] = 1
2225                store_keys = topo[tb].get_attribute('store_keys')
2226                # Give the testbed access to keys it exports or imports
2227                if store_keys:
2228                    auth_attrs.update(set([
2229                        (tbparams[tb].allocID, sk) \
2230                                for sk in store_keys.split(" ")]))
2231
2232            if auth_attrs:
2233                self.append_experiment_authorization(expid, auth_attrs)
2234
2235            # transit and disconnected testbeds may not have a connInfo entry.
2236            # Fill in the blanks.
2237            for t in allocated.keys():
2238                if not connInfo.has_key(t):
2239                    connInfo[t] = { }
2240
2241            self.wrangle_software(expid, top, topo, tbparams)
2242
2243            proofs = self.save_federant_information(allocated, tbparams, 
2244                    eid, top)
2245        except service_error, e:
2246            # If something goes wrong in the parse (usually an access error)
2247            # clear the placeholder state.  From here on out the code delays
2248            # exceptions.  Failing at this point returns a fault to the remote
2249            # caller.
2250
2251            self.log.info("Create experiment call failed for %s %s: %s" % 
2252                    (eid, fid, e))
2253            self.clear_placeholder(eid, expid, tmpdir)
2254            raise e
2255
2256        # Start the background swapper and return the starting state.  From
2257        # here on out, the state will stick around a while.
2258
2259        # Create a logger that logs to the experiment's state object as well as
2260        # to the main log file.
2261        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2262        alloc_collector = self.list_log(self.state[eid].log)
2263        h = logging.StreamHandler(alloc_collector)
2264        # XXX: there should be a global one of these rather than repeating the
2265        # code.
2266        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2267                    '%d %b %y %H:%M:%S'))
2268        alloc_log.addHandler(h)
2269
2270        # Start a thread to do the resource allocation
2271        t  = Thread(target=self.allocate_resources,
2272                args=(allocated, masters, eid, expid, tbparams, 
2273                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2274                    connInfo, tbmap, expcert_file),
2275                name=eid)
2276        t.start()
2277
2278        rv = {
2279                'experimentID': [
2280                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2281                ],
2282                'experimentStatus': 'starting',
2283                'proof': [ proof.to_dict() ] + proofs,
2284            }
2285        self.log.info("Create experiment call succeeded for %s %s" % \
2286                (eid, fid))
2287
2288        return rv
2289   
2290    def get_experiment_fedid(self, key):
2291        """
2292        find the fedid associated with the localname key in the state database.
2293        """
2294
2295        rv = None
2296        self.state_lock.acquire()
2297        if key in self.state:
2298            rv = self.state[key].fedid
2299        self.state_lock.release()
2300        return rv
2301
2302    def check_experiment_access(self, fid, key):
2303        """
2304        Confirm that the fid has access to the experiment.  Though a request
2305        may be made in terms of a local name, the access attribute is always
2306        the experiment's fedid.
2307        """
2308        if not isinstance(key, fedid):
2309            key = self.get_experiment_fedid(key)
2310
2311        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2312
2313        if access_ok:
2314            return proof
2315        else:
2316            raise service_error(service_error.access, "Access Denied",
2317                proof)
2318
2319
2320    def get_handler(self, path, fid):
2321        """
2322        Perhaps surprisingly named, this function handles HTTP GET requests to
2323        this server (SOAP requests are POSTs).
2324        """
2325        self.log.info("Get handler %s %s" % (path, fid))
2326        # XXX: log proofs?
2327        if self.auth.check_attribute(fid, path):
2328            return ("%s/%s" % (self.repodir, path), "application/binary")
2329        else:
2330            return (None, None)
2331
2332    def update_info(self, key, force=False):
2333        top = None
2334        self.state_lock.acquire()
2335        if key in self.state:
2336            if force or self.state[key].older_than(self.info_cache_limit):
2337                top = self.state[key].top
2338                if top is not None: top = top.clone()
2339                d1, info_params, cert, d2 = \
2340                        self.get_segment_info(self.state[key], need_lock=False)
2341        self.state_lock.release()
2342
2343        if top is None: return
2344
2345        try:
2346            tmpdir = tempfile.mkdtemp(prefix="info-")
2347        except EnvironmentError:
2348            raise service_error(service_error.internal, 
2349                    "Cannot create tmp dir")
2350        cert_file = self.make_temp_certfile(cert, tmpdir)
2351
2352        data = []
2353        try:
2354            for k, (uri, aid) in info_params.items():
2355                info=self.info_segment(log=self.log, testbed=uri,
2356                            cert_file=cert_file, cert_pwd=None,
2357                            trusted_certs=self.trusted_certs,
2358                            caller=self.call_InfoSegment)
2359                info(uri, aid)
2360                data.append(info)
2361        # Clean up the tmpdir no matter what
2362        finally:
2363            if tmpdir: self.remove_dirs(tmpdir)
2364
2365        self.annotate_topology(top, data)
2366        self.state_lock.acquire()
2367        if key in self.state:
2368            self.state[key].top = top
2369            self.state[key].updated()
2370            if self.state_filename: self.write_state()
2371        self.state_lock.release()
2372
2373   
2374    def get_info(self, req, fid):
2375        """
2376        Return all the stored info about this experiment
2377        """
2378        rv = None
2379
2380        self.log.info("Info call started for %s" %  fid)
2381        req = req.get('InfoRequestBody', None)
2382        if not req:
2383            raise service_error(service_error.req,
2384                    "Bad request format (no InfoRequestBody)")
2385        exp = req.get('experiment', None)
2386        legacy = req.get('legacy', False)
2387        fresh = req.get('fresh', False)
2388        if exp:
2389            if exp.has_key('fedid'):
2390                key = exp['fedid']
2391                keytype = "fedid"
2392            elif exp.has_key('localname'):
2393                key = exp['localname']
2394                keytype = "localname"
2395            else:
2396                raise service_error(service_error.req, "Unknown lookup type")
2397        else:
2398            raise service_error(service_error.req, "No request?")
2399
2400        proof = self.check_experiment_access(fid, key)
2401
2402        self.update_info(key, fresh)
2403
2404        self.state_lock.acquire()
2405        if self.state.has_key(key):
2406            rv = self.state[key].get_info()
2407            # Copy the topo if we need legacy annotations
2408            if legacy:
2409                top = self.state[key].top
2410                if top is not None: top = top.clone()
2411        self.state_lock.release()
2412        self.log.info("Gathered Info for %s %s" % (key, fid))
2413
2414        # If the legacy visualization and topology representations are
2415        # requested, calculate them and add them to the return.
2416        if legacy and rv is not None:
2417            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2418            if top is not None:
2419                vtopo = topdl.topology_to_vtopo(top)
2420                if vtopo is not None:
2421                    rv['vtopo'] = vtopo
2422                    try:
2423                        vis = self.genviz(vtopo)
2424                    except service_error, e:
2425                        self.log.debug('Problem generating visualization: %s' \
2426                                % e)
2427                        vis = None
2428                    if vis is not None:
2429                        rv['vis'] = vis
2430        if rv:
2431            self.log.info("Info succeded for %s %s" % (key, fid))
2432            rv['proof'] = proof.to_dict()
2433            return rv
2434        else: 
2435            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2436            raise service_error(service_error.req, "No such experiment")
2437
2438    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2439            results):
2440        """
2441        Call OperateSegment on multiple testbeds and gather the results.
2442        op_params contains the parameters needed to contact that testbed, cert
2443        is a certificate containing the fedid to use, op is the operation,
2444        testbeds is a dict mapping testbed name to targets in that testbed,
2445        params are the parameters to include a,d results is a growing list of
2446        the results of the calls.
2447        """
2448        try:
2449            tmpdir = tempfile.mkdtemp(prefix="info-")
2450        except EnvironmentError:
2451            raise service_error(service_error.internal, 
2452                    "Cannot create tmp dir")
2453        cert_file = self.make_temp_certfile(cert, tmpdir)
2454
2455        try:
2456            for tb, targets in testbeds.items():
2457                if tb in op_params:
2458                    uri, aid = op_params[tb]
2459                    operate=self.operation_segment(log=self.log, testbed=uri,
2460                                cert_file=cert_file, cert_pwd=None,
2461                                trusted_certs=self.trusted_certs,
2462                                caller=self.call_OperationSegment)
2463                    if operate(uri, aid, op, targets, params):
2464                        if operate.status is not None:
2465                            results.extend(operate.status)
2466                            continue
2467                # Something went wrong in a weird way.  Add statuses
2468                # that reflect that to results
2469                for t in targets:
2470                    results.append(operation_status(t, 
2471                        operation_status.federant,
2472                        'Unexpected error on %s' % tb))
2473        # Clean up the tmpdir no matter what
2474        finally:
2475            if tmpdir: self.remove_dirs(tmpdir)
2476
2477    def do_operation(self, req, fid):
2478        """
2479        Find the testbeds holding each target and ask them to carry out the
2480        operation.  Return the statuses.
2481        """
2482        # Map an element to the testbed containing it
2483        def element_to_tb(e):
2484            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2485            elif isinstance(e, topdl.Testbed): return e.name
2486            else: return None
2487        # If d is an operation_status object, make it a dict
2488        def make_dict(d):
2489            if isinstance(d, dict): return d
2490            elif isinstance(d, operation_status): return d.to_dict()
2491            else: return { }
2492
2493        def element_name(e):
2494            if isinstance(e, topdl.Computer): return e.name
2495            elif isinstance(e, topdl.Testbed): 
2496                if e.localname: return e.localname[0]
2497                else: return None
2498            else: return None
2499
2500        req = req.get('OperationRequestBody', None)
2501        if not req:
2502            raise service_error(service_error.req,
2503                    "Bad request format (no OperationRequestBody)")
2504        exp = req.get('experiment', None)
2505        op = req.get('operation', None)
2506        targets = set(req.get('target', []))
2507        params = req.get('parameter', None)
2508
2509        if exp:
2510            if 'fedid' in exp:
2511                key = exp['fedid']
2512                keytype = "fedid"
2513            elif 'localname' in exp:
2514                key = exp['localname']
2515                keytype = "localname"
2516            else:
2517                raise service_error(service_error.req, "Unknown lookup type")
2518        else:
2519            raise service_error(service_error.req, "No request?")
2520
2521        if op is None or not targets:
2522            raise service_error(service_error.req, "No request?")
2523
2524        proof = self.check_experiment_access(fid, key)
2525        self.state_lock.acquire()
2526        if key in self.state:
2527            d1, op_params, cert, d2 = \
2528                    self.get_segment_info(self.state[key], need_lock=False,
2529                            key='tb')
2530            top = self.state[key].top
2531            if top is not None:
2532                top = top.clone()
2533        self.state_lock.release()
2534
2535        if top is None:
2536            raise service_error(service_error.partial, "No topology yet", 
2537                    proof=proof)
2538
2539        testbeds = { }
2540        results = []
2541        for e in top.elements:
2542            ename = element_name(e)
2543            if ename in targets:
2544                tb = element_to_tb(e)
2545                targets.remove(ename)
2546                if tb is not None:
2547                    if tb in testbeds: testbeds[tb].append(ename)
2548                    else: testbeds[tb] = [ ename ]
2549                else:
2550                    results.append(operation_status(e.name, 
2551                        code=operation_status.no_target, 
2552                        description='Cannot map target to testbed'))
2553
2554        for t in targets:
2555            results.append(operation_status(t, operation_status.no_target))
2556
2557        self.operate_on_segments(op_params, cert, op, testbeds, params,
2558                results)
2559
2560        return { 
2561                'experiment': exp, 
2562                'status': [make_dict(r) for r in results],
2563                'proof': proof.to_dict()
2564                }
2565
2566
2567    def get_multi_info(self, req, fid):
2568        """
2569        Return all the stored info that this fedid can access
2570        """
2571        rv = { 'info': [ ], 'proof': [ ] }
2572
2573        self.log.info("Multi Info call started for %s" %  fid)
2574        self.state_lock.acquire()
2575        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2576            try:
2577                proof = self.check_experiment_access(fid, key)
2578            except service_error, e:
2579                if e.code == service_error.access:
2580                    continue
2581                else:
2582                    self.log.info("Multi Info call failed for %s: %s" %  \
2583                            (e,fid))
2584                    self.state_lock.release()
2585                    raise e
2586
2587            if self.state.has_key(key):
2588                e = self.state[key].get_info()
2589                e['proof'] = proof.to_dict()
2590                rv['info'].append(e)
2591                rv['proof'].append(proof.to_dict())
2592        self.state_lock.release()
2593        self.log.info("Multi Info call succeeded for %s" %  fid)
2594        return rv
2595
2596    def check_termination_status(self, fed_exp, force):
2597        """
2598        Confirm that the experiment is sin a valid state to stop (or force it)
2599        return the state - invalid states for deletion and force settings cause
2600        exceptions.
2601        """
2602        self.state_lock.acquire()
2603        status = fed_exp.status
2604
2605        if status:
2606            if status in ('starting', 'terminating'):
2607                if not force:
2608                    self.state_lock.release()
2609                    raise service_error(service_error.partial, 
2610                            'Experiment still being created or destroyed')
2611                else:
2612                    self.log.warning('Experiment in %s state ' % status + \
2613                            'being terminated by force.')
2614            self.state_lock.release()
2615            return status
2616        else:
2617            # No status??? trouble
2618            self.state_lock.release()
2619            raise service_error(service_error.internal,
2620                    "Experiment has no status!?")
2621
2622    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2623        ids = []
2624        term_params = { }
2625        if need_lock: self.state_lock.acquire()
2626        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2627        expcert = fed_exp.identity
2628        repo = "%s" % fed_exp.fedid
2629
2630        # Collect the allocation/segment ids into a dict keyed by the fedid
2631        # of the allocation that contains a tuple of uri, aid
2632        for i, fed in enumerate(fed_exp.get_all_allocations()):
2633            uri = fed.uri
2634            aid = fed.allocID
2635            if key == 'aid': term_params[aid] = (uri, aid)
2636            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2637
2638        if need_lock: self.state_lock.release()
2639        return ids, term_params, expcert, repo
2640
2641
2642    def get_termination_info(self, fed_exp):
2643        self.state_lock.acquire()
2644        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2645        # Change the experiment state
2646        fed_exp.status = 'terminating'
2647        fed_exp.updated()
2648        if self.state_filename: self.write_state()
2649        self.state_lock.release()
2650
2651        return ids, term_params, expcert, repo
2652
2653
2654    def deallocate_resources(self, term_params, expcert, status, force, 
2655            dealloc_log):
2656        tmpdir = None
2657        # This try block makes sure the tempdir is cleared
2658        try:
2659            # If no expcert, try the deallocation as the experiment
2660            # controller instance.
2661            if expcert and self.auth_type != 'legacy': 
2662                try:
2663                    tmpdir = tempfile.mkdtemp(prefix="term-")
2664                except EnvironmentError:
2665                    raise service_error(service_error.internal, 
2666                            "Cannot create tmp dir")
2667                cert_file = self.make_temp_certfile(expcert, tmpdir)
2668                pw = None
2669            else: 
2670                cert_file = self.cert_file
2671                pw = self.cert_pwd
2672
2673            # Stop everyone.  NB, wait_for_all waits until a thread starts
2674            # and then completes, so we can't wait if nothing starts.  So,
2675            # no tbparams, no start.
2676            if len(term_params) > 0:
2677                tp = thread_pool(self.nthreads)
2678                for k, (uri, aid) in term_params.items():
2679                    # Create and start a thread to stop the segment
2680                    tp.wait_for_slot()
2681                    t  = pooled_thread(\
2682                            target=self.terminate_segment(log=dealloc_log,
2683                                testbed=uri,
2684                                cert_file=cert_file, 
2685                                cert_pwd=pw,
2686                                trusted_certs=self.trusted_certs,
2687                                caller=self.call_TerminateSegment),
2688                            args=(uri, aid), name=k,
2689                            pdata=tp, trace_file=self.trace_file)
2690                    t.start()
2691                # Wait for completions
2692                tp.wait_for_all_done()
2693
2694            # release the allocations (failed experiments have done this
2695            # already, and starting experiments may be in odd states, so we
2696            # ignore errors releasing those allocations
2697            try: 
2698                for k, (uri, aid)  in term_params.items():
2699                    self.release_access(None, aid, uri=uri,
2700                            cert_file=cert_file, cert_pwd=pw)
2701            except service_error, e:
2702                if status != 'failed' and not force:
2703                    raise e
2704
2705        # Clean up the tmpdir no matter what
2706        finally:
2707            if tmpdir: self.remove_dirs(tmpdir)
2708
2709    def terminate_experiment(self, req, fid):
2710        """
2711        Swap this experiment out on the federants and delete the shared
2712        information
2713        """
2714        self.log.info("Terminate experiment call started for %s" % fid)
2715        tbparams = { }
2716        req = req.get('TerminateRequestBody', None)
2717        if not req:
2718            raise service_error(service_error.req,
2719                    "Bad request format (no TerminateRequestBody)")
2720
2721        key = self.get_experiment_key(req, 'experiment')
2722        proof = self.check_experiment_access(fid, key)
2723        exp = req.get('experiment', False)
2724        force = req.get('force', False)
2725
2726        dealloc_list = [ ]
2727
2728
2729        # Create a logger that logs to the dealloc_list as well as to the main
2730        # log file.
2731        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2732        dealloc_log.info("Terminating %s " %key)
2733        h = logging.StreamHandler(self.list_log(dealloc_list))
2734        # XXX: there should be a global one of these rather than repeating the
2735        # code.
2736        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2737                    '%d %b %y %H:%M:%S'))
2738        dealloc_log.addHandler(h)
2739
2740        self.state_lock.acquire()
2741        fed_exp = self.state.get(key, None)
2742        self.state_lock.release()
2743        repo = None
2744
2745        if fed_exp:
2746            status = self.check_termination_status(fed_exp, force)
2747            # get_termination_info updates the experiment state
2748            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2749            self.deallocate_resources(term_params, expcert, status, force, 
2750                    dealloc_log)
2751
2752            # Remove the terminated experiment
2753            self.state_lock.acquire()
2754            for id in ids:
2755                self.clear_experiment_authorization(id, need_state_lock=False)
2756                if id in self.state: del self.state[id]
2757
2758            if self.state_filename: self.write_state()
2759            self.state_lock.release()
2760
2761            # Delete any synch points associated with this experiment.  All
2762            # synch points begin with the fedid of the experiment.
2763            fedid_keys = set(["fedid:%s" % f for f in ids \
2764                    if isinstance(f, fedid)])
2765            for k in self.synch_store.all_keys():
2766                try:
2767                    if len(k) > 45 and k[0:46] in fedid_keys:
2768                        self.synch_store.del_value(k)
2769                except synch_store.BadDeletionError:
2770                    pass
2771            self.write_store()
2772
2773            # Remove software and other cached stuff from the filesystem.
2774            if repo:
2775                self.remove_dirs("%s/%s" % (self.repodir, repo))
2776       
2777            self.log.info("Terminate experiment succeeded for %s %s" % \
2778                    (key, fid))
2779            return { 
2780                    'experiment': exp , 
2781                    'deallocationLog': string.join(dealloc_list, ''),
2782                    'proof': [proof.to_dict()],
2783                    }
2784        else:
2785            self.log.info("Terminate experiment failed for %s %s: no state" % \
2786                    (key, fid))
2787            raise service_error(service_error.req, "No saved state")
2788
2789
2790    def GetValue(self, req, fid):
2791        """
2792        Get a value from the synchronized store
2793        """
2794        req = req.get('GetValueRequestBody', None)
2795        if not req:
2796            raise service_error(service_error.req,
2797                    "Bad request format (no GetValueRequestBody)")
2798       
2799        name = req.get('name', None)
2800        wait = req.get('wait', False)
2801        rv = { 'name': name }
2802
2803        if not name:
2804            raise service_error(service_error.req, "No name?")
2805
2806        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2807
2808        if access_ok:
2809            self.log.debug("[GetValue] asking for %s " % name)
2810            try:
2811                v = self.synch_store.get_value(name, wait)
2812            except synch_store.RevokedKeyError:
2813                # No more synch on this key
2814                raise service_error(service_error.federant, 
2815                        "Synch key %s revoked" % name)
2816            if v is not None:
2817                rv['value'] = v
2818            rv['proof'] = proof.to_dict()
2819            self.log.debug("[GetValue] got %s from %s" % (v, name))
2820            return rv
2821        else:
2822            raise service_error(service_error.access, "Access Denied",
2823                    proof=proof)
2824       
2825
2826    def SetValue(self, req, fid):
2827        """
2828        Set a value in the synchronized store
2829        """
2830        req = req.get('SetValueRequestBody', None)
2831        if not req:
2832            raise service_error(service_error.req,
2833                    "Bad request format (no SetValueRequestBody)")
2834       
2835        name = req.get('name', None)
2836        v = req.get('value', '')
2837
2838        if not name:
2839            raise service_error(service_error.req, "No name?")
2840
2841        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2842
2843        if access_ok:
2844            try:
2845                self.synch_store.set_value(name, v)
2846                self.write_store()
2847                self.log.debug("[SetValue] set %s to %s" % (name, v))
2848            except synch_store.CollisionError:
2849                # Translate into a service_error
2850                raise service_error(service_error.req,
2851                        "Value already set: %s" %name)
2852            except synch_store.RevokedKeyError:
2853                # No more synch on this key
2854                raise service_error(service_error.federant, 
2855                        "Synch key %s revoked" % name)
2856                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2857        else:
2858            raise service_error(service_error.access, "Access Denied",
2859                    proof=proof)
Note: See TracBrowser for help on using the repository browser.