source: fedd/federation/experiment_control.py @ 9e5e251

compt_changes
Last change on this file since 9e5e251 was 9e5e251, checked in by Ted Faber <faber@…>, 12 years ago

Actually annotate substrates

  • Property mode set to 100644
File size: 95.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from deter import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info, federated_service
39from operation_status import operation_status
40
41from deter import topdl
42from deter import ip_allocator
43from deter import ip_addr
44import list_log
45
46
47class nullHandler(logging.Handler):
48    def emit(self, record): pass
49
50fl = logging.getLogger("fedd.experiment_control")
51fl.addHandler(nullHandler())
52
53class experiment_control_local(experiment_control_legacy):
54    """
55    Control of experiments that this system can directly access.
56
57    Includes experiment creation, termination and information dissemination.
58    Thred safe.
59    """
60
61    class ssh_cmd_timeout(RuntimeError): pass
62   
63    call_RequestAccess = service_caller('RequestAccess')
64    call_ReleaseAccess = service_caller('ReleaseAccess')
65    call_StartSegment = service_caller('StartSegment')
66    call_TerminateSegment = service_caller('TerminateSegment')
67    call_InfoSegment = service_caller('InfoSegment')
68    call_OperationSegment = service_caller('OperationSegment')
69    call_Ns2Topdl = service_caller('Ns2Topdl')
70
71    def __init__(self, config=None, auth=None):
72        """
73        Intialize the various attributes, most from the config object
74        """
75
76        def parse_tarfile_list(tf):
77            """
78            Parse a tarfile list from the configuration.  This is a set of
79            paths and tarfiles separated by spaces.
80            """
81            rv = [ ]
82            if tf is not None:
83                tl = tf.split()
84                while len(tl) > 1:
85                    p, t = tl[0:2]
86                    del tl[0:2]
87                    rv.append((p, t))
88            return rv
89
90        self.list_log = list_log.list_log
91
92        self.cert_file = config.get("experiment_control", "cert_file")
93        if self.cert_file:
94            self.cert_pwd = config.get("experiment_control", "cert_pwd")
95        else:
96            self.cert_file = config.get("globals", "cert_file")
97            self.cert_pwd = config.get("globals", "cert_pwd")
98
99        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
100                or config.get("globals", "trusted_certs")
101
102        self.repodir = config.get("experiment_control", "repodir")
103        self.repo_url = config.get("experiment_control", "repo_url", 
104                "https://users.isi.deterlab.net:23235");
105
106        self.exp_stem = "fed-stem"
107        self.log = logging.getLogger("fedd.experiment_control")
108        set_log_level(config, "experiment_control", self.log)
109        self.muxmax = 2
110        self.nthreads = 10
111        self.randomize_experiments = False
112
113        self.splitter = None
114        self.ssh_keygen = "/usr/bin/ssh-keygen"
115        self.ssh_identity_file = None
116
117
118        self.debug = config.getboolean("experiment_control", "create_debug")
119        self.cleanup = not config.getboolean("experiment_control", 
120                "leave_tmpfiles")
121        self.state_filename = config.get("experiment_control", 
122                "experiment_state")
123        self.store_filename = config.get("experiment_control", 
124                "synch_store")
125        self.store_url = config.get("experiment_control", "store_url")
126        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
127        self.fedkit = parse_tarfile_list(\
128                config.get("experiment_control", "fedkit"))
129        self.gatewaykit = parse_tarfile_list(\
130                config.get("experiment_control", "gatewaykit"))
131        accessdb_file = config.get("experiment_control", "accessdb")
132
133        dt = config.get("experiment_control", "direct_transit")
134        self.auth_type = config.get('experiment_control', 'auth_type') \
135                or 'legacy'
136        self.auth_dir = config.get('experiment_control', 'auth_dir')
137        # XXX: document this!
138        self.info_cache_limit = \
139                config.getint('experiment_control', 'info_cache', 600)
140        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
141        else: self.direct_transit = [ ]
142        # NB for internal master/slave ops, not experiment setup
143        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
144
145        self.overrides = set([])
146        ovr = config.get('experiment_control', 'overrides')
147        if ovr:
148            for o in ovr.split(","):
149                o = o.strip()
150                if o.startswith('fedid:'): o = o[len('fedid:'):]
151                self.overrides.add(fedid(hexstr=o))
152
153        self.state = { }
154        self.state_lock = Lock()
155        self.tclsh = "/usr/local/bin/otclsh"
156        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
157                config.get("experiment_control", "tcl_splitter",
158                        "/usr/testbed/lib/ns2ir/parse.tcl")
159        mapdb_file = config.get("experiment_control", "mapdb")
160        self.trace_file = sys.stderr
161
162        self.def_expstart = \
163                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
164                "/tmp/federate";
165        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
166                "FEDDIR/hosts";
167        self.def_gwstart = \
168                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
169                "/tmp/bridge.log";
170        self.def_mgwstart = \
171                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
172                "/tmp/bridge.log";
173        self.def_gwimage = "FBSD61-TUNNEL2";
174        self.def_gwtype = "pc";
175        self.local_access = { }
176
177        if self.auth_type == 'legacy':
178            if auth:
179                self.auth = auth
180            else:
181                self.log.error( "[access]: No authorizer initialized, " +\
182                        "creating local one.")
183                auth = authorizer()
184            self.get_access = self.legacy_get_access
185        elif self.auth_type == 'abac':
186            self.auth = abac_authorizer(load=self.auth_dir)
187        else:
188            raise service_error(service_error.internal, 
189                    "Unknown auth_type: %s" % self.auth_type)
190
191        if mapdb_file:
192            self.read_mapdb(mapdb_file)
193        else:
194            self.log.warn("[experiment_control] No testbed map, using defaults")
195            self.tbmap = { 
196                    'deter':'https://users.isi.deterlab.net:23235',
197                    'emulab':'https://users.isi.deterlab.net:23236',
198                    'ucb':'https://users.isi.deterlab.net:23237',
199                    }
200
201        if accessdb_file:
202                self.read_accessdb(accessdb_file)
203        else:
204            raise service_error(service_error.internal,
205                    "No accessdb specified in config")
206
207        # Grab saved state.  OK to do this w/o locking because it's read only
208        # and only one thread should be in existence that can see self.state at
209        # this point.
210        if self.state_filename:
211            self.read_state()
212
213        if self.store_filename:
214            self.read_store()
215        else:
216            self.log.warning("No saved synch store")
217            self.synch_store = synch_store
218
219        # Dispatch tables
220        self.soap_services = {\
221                'New': soap_handler('New', self.new_experiment),
222                'Create': soap_handler('Create', self.create_experiment),
223                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
224                'Vis': soap_handler('Vis', self.get_vis),
225                'Info': soap_handler('Info', self.get_info),
226                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
227                'Operation': soap_handler('Operation', self.do_operation),
228                'Terminate': soap_handler('Terminate', 
229                    self.terminate_experiment),
230                'GetValue': soap_handler('GetValue', self.GetValue),
231                'SetValue': soap_handler('SetValue', self.SetValue),
232        }
233
234        self.xmlrpc_services = {\
235                'New': xmlrpc_handler('New', self.new_experiment),
236                'Create': xmlrpc_handler('Create', self.create_experiment),
237                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
238                'Vis': xmlrpc_handler('Vis', self.get_vis),
239                'Info': xmlrpc_handler('Info', self.get_info),
240                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
241                'Terminate': xmlrpc_handler('Terminate',
242                    self.terminate_experiment),
243                'Operation': xmlrpc_handler('Operation', self.do_operation),
244                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
245                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
246        }
247
248    # Call while holding self.state_lock
249    def write_state(self):
250        """
251        Write a new copy of experiment state after copying the existing state
252        to a backup.
253
254        State format is a simple pickling of the state dictionary.
255        """
256        if os.access(self.state_filename, os.W_OK):
257            copy_file(self.state_filename, \
258                    "%s.bak" % self.state_filename)
259        try:
260            f = open(self.state_filename, 'w')
261            pickle.dump(self.state, f)
262        except EnvironmentError, e:
263            self.log.error("Can't write file %s: %s" % \
264                    (self.state_filename, e))
265        except pickle.PicklingError, e:
266            self.log.error("Pickling problem: %s" % e)
267        except TypeError, e:
268            self.log.error("Pickling problem (TypeError): %s" % e)
269
270    @staticmethod
271    def get_alloc_ids(exp):
272        """
273        Used by read_store and read state.  This used to be worse.
274        """
275
276        return [ a.allocID for a in exp.get_all_allocations() ]
277
278
279    # Call while holding self.state_lock
280    def read_state(self):
281        """
282        Read a new copy of experiment state.  Old state is overwritten.
283
284        State format is a simple pickling of the state dictionary.
285        """
286       
287        try:
288            f = open(self.state_filename, "r")
289            self.state = pickle.load(f)
290            self.log.debug("[read_state]: Read state from %s" % \
291                    self.state_filename)
292        except EnvironmentError, e:
293            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
294                    % (self.state_filename, e))
295        except pickle.UnpicklingError, e:
296            self.log.warning(("[read_state]: No saved state: " + \
297                    "Unpickling failed: %s") % e)
298       
299        for s in self.state.values():
300            try:
301
302                eid = s.fedid
303                if eid : 
304                    if self.auth_type == 'legacy':
305                        # XXX: legacy
306                        # Give the owner rights to the experiment
307                        #self.auth.set_attribute(s['owner'], eid)
308                        # And holders of the eid as well
309                        self.auth.set_attribute(eid, eid)
310                        # allow overrides to control experiments as well
311                        for o in self.overrides:
312                            self.auth.set_attribute(o, eid)
313                        # Set permissions to allow reading of the software
314                        # repo, if any, as well.
315                        for a in self.get_alloc_ids(s):
316                            self.auth.set_attribute(a, 'repo/%s' % eid)
317                else:
318                    raise KeyError("No experiment id")
319            except KeyError, e:
320                self.log.warning("[read_state]: State ownership or identity " +\
321                        "misformatted in %s: %s" % (self.state_filename, e))
322
323
324    def read_accessdb(self, accessdb_file):
325        """
326        Read the mapping from fedids that can create experiments to their name
327        in the 3-level access namespace.  All will be asserted from this
328        testbed and can include the local username and porject that will be
329        asserted on their behalf by this fedd.  Each fedid is also added to the
330        authorization system with the "create" attribute.
331        """
332        self.accessdb = {}
333        # These are the regexps for parsing the db
334        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
335        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
336                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
337        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
338                "\s*->\s*(" + name_expr + ")\s*$")
339        lineno = 0
340
341        # Parse the mappings and store in self.authdb, a dict of
342        # fedid -> (proj, user)
343        try:
344            f = open(accessdb_file, "r")
345            for line in f:
346                lineno += 1
347                line = line.strip()
348                if len(line) == 0 or line.startswith('#'):
349                    continue
350                m = project_line.match(line)
351                if m:
352                    fid = fedid(hexstr=m.group(1))
353                    project, user = m.group(2,3)
354                    if not self.accessdb.has_key(fid):
355                        self.accessdb[fid] = []
356                    self.accessdb[fid].append((project, user))
357                    continue
358
359                m = user_line.match(line)
360                if m:
361                    fid = fedid(hexstr=m.group(1))
362                    project = None
363                    user = m.group(2)
364                    if not self.accessdb.has_key(fid):
365                        self.accessdb[fid] = []
366                    self.accessdb[fid].append((project, user))
367                    continue
368                self.log.warn("[experiment_control] Error parsing access " +\
369                        "db %s at line %d" %  (accessdb_file, lineno))
370        except EnvironmentError:
371            raise service_error(service_error.internal,
372                    ("Error opening/reading %s as experiment " +\
373                            "control accessdb") %  accessdb_file)
374        f.close()
375
376        # Initialize the authorization attributes
377        # XXX: legacy
378        if self.auth_type == 'legacy':
379            for fid in self.accessdb.keys():
380                self.auth.set_attribute(fid, 'create')
381                self.auth.set_attribute(fid, 'new')
382
383    def read_mapdb(self, file):
384        """
385        Read a simple colon separated list of mappings for the
386        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
387        also adds testbeds to active if they include , active after
388        their name.
389        """
390
391        self.tbmap = { }
392        self.tbactive = set()
393        lineno =0
394        try:
395            f = open(file, "r")
396            for line in f:
397                lineno += 1
398                line = line.strip()
399                if line.startswith('#') or len(line) == 0:
400                    continue
401                try:
402                    label, url = line.split(':', 1)
403                    if ',' in label:
404                        label, act = label.split(',', 1)
405                        active = (act.strip() == 'active')
406                    else:
407                        active = False
408                    self.tbmap[label] = url
409                    if active: self.tbactive.add(label)
410                except ValueError, e:
411                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
412                            "map db: %s %s" % (lineno, line, e))
413        except EnvironmentError, e:
414            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
415                    "open %s: %s" % (file, e))
416        else:
417            f.close()
418
419    def read_store(self):
420        try:
421            self.synch_store = synch_store()
422            self.synch_store.load(self.store_filename)
423            self.log.debug("[read_store]: Read store from %s" % \
424                    self.store_filename)
425        except EnvironmentError, e:
426            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
427                    % (self.state_filename, e))
428            self.synch_store = synch_store()
429
430        # Set the initial permissions on data in the store.  XXX: This ad hoc
431        # authorization attribute initialization is getting out of hand.
432        # XXX: legacy
433        if self.auth_type == 'legacy':
434            for k in self.synch_store.all_keys():
435                try:
436                    if k.startswith('fedid:'):
437                        fid = fedid(hexstr=k[6:46])
438                        if self.state.has_key(fid):
439                            for a in self.get_alloc_ids(self.state[fid]):
440                                self.auth.set_attribute(a, k)
441                except ValueError, e:
442                    self.log.warn("Cannot deduce permissions for %s" % k)
443
444
445    def write_store(self):
446        """
447        Write a new copy of synch_store after writing current state
448        to a backup.  We use the internal synch_store pickle method to avoid
449        incinsistent data.
450
451        State format is a simple pickling of the store.
452        """
453        if os.access(self.store_filename, os.W_OK):
454            copy_file(self.store_filename, \
455                    "%s.bak" % self.store_filename)
456        try:
457            self.synch_store.save(self.store_filename)
458        except EnvironmentError, e:
459            self.log.error("Can't write file %s: %s" % \
460                    (self.store_filename, e))
461        except TypeError, e:
462            self.log.error("Pickling problem (TypeError): %s" % e)
463
464
465    def remove_dirs(self, dir):
466        """
467        Remove the directory tree and all files rooted at dir.  Log any errors,
468        but continue.
469        """
470        self.log.debug("[removedirs]: removing %s" % dir)
471        try:
472            for path, dirs, files in os.walk(dir, topdown=False):
473                for f in files:
474                    os.remove(os.path.join(path, f))
475                for d in dirs:
476                    os.rmdir(os.path.join(path, d))
477            os.rmdir(dir)
478        except EnvironmentError, e:
479            self.log.error("Error deleting directory tree in %s" % e);
480
481    @staticmethod
482    def make_temp_certfile(expcert, tmpdir):
483        """
484        make a protected copy of the access certificate so the experiment
485        controller can act as the experiment principal.  mkstemp is the most
486        secure way to do that. The directory should be created by
487        mkdtemp.  Return the filename.
488        """
489        if expcert and tmpdir:
490            try:
491                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
492                f = os.fdopen(certf, 'w')
493                print >> f, expcert
494                f.close()
495            except EnvironmentError, e:
496                raise service_error(service_error.internal, 
497                        "Cannot create temp cert file?")
498            return certfn
499        else:
500            return None
501
502       
503    def generate_ssh_keys(self, dest, type="rsa" ):
504        """
505        Generate a set of keys for the gateways to use to talk.
506
507        Keys are of type type and are stored in the required dest file.
508        """
509        valid_types = ("rsa", "dsa")
510        t = type.lower();
511        if t not in valid_types: raise ValueError
512        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
513
514        try:
515            trace = open("/dev/null", "w")
516        except EnvironmentError:
517            raise service_error(service_error.internal,
518                    "Cannot open /dev/null??");
519
520        # May raise CalledProcessError
521        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
522        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
523        if rv != 0:
524            raise service_error(service_error.internal, 
525                    "Cannot generate nonce ssh keys.  %s return code %d" \
526                            % (self.ssh_keygen, rv))
527
528    def generate_seer_certs(self, destdir):
529        '''
530        Create a SEER ca cert and a node cert in destdir/ca.pem and
531        destdir/node.pem respectively.  These will be distributed throughout
532        the federated experiment.  This routine reports errors via
533        service_errors.
534        '''
535        openssl = '/usr/bin/openssl'
536        # All the filenames and parameters we need for openssl calls below
537        ca_key =os.path.join(destdir, 'ca.key') 
538        ca_pem = os.path.join(destdir, 'ca.pem')
539        node_key =os.path.join(destdir, 'node.key') 
540        node_pem = os.path.join(destdir, 'node.pem')
541        node_req = os.path.join(destdir, 'node.req')
542        node_signed = os.path.join(destdir, 'node.signed')
543        days = '%s' % (365 * 10)
544        serial = '%s' % random.randint(0, 1<<16)
545
546        try:
547            # Sequence of calls to create a CA key, create a ca cert, create a
548            # node key, node signing request, and finally a signed node
549            # certificate.
550            sequence = (
551                    (openssl, 'genrsa', '-out', ca_key, '1024'),
552                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
553                        ca_pem, '-days', days, '-subj', 
554                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
555                    (openssl, 'genrsa', '-out', node_key, '1024'),
556                    (openssl, 'req', '-new', '-key', node_key, '-out', 
557                        node_req, '-days', days, '-subj', 
558                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
559                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
560                        '-set_serial', serial, '-req', '-in', node_req, 
561                        '-out', node_signed, '-days', days),
562                )
563            # Do all that stuff; bail if there's an error, and push all the
564            # output to dev/null.
565            for cmd in sequence:
566                trace = open("/dev/null", "w")
567                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
568                if rv != 0:
569                    raise service_error(service_error.internal, 
570                            "Cannot generate SEER certs.  %s return code %d" \
571                                    % (' '.join(cmd), rv))
572            # Concatinate the node key and signed certificate into node.pem
573            f = open(node_pem, 'w')
574            for comp in (node_signed, node_key):
575                g = open(comp, 'r')
576                f.write(g.read())
577                g.close()
578            f.close()
579
580            # Throw out intermediaries.
581            for fn in (ca_key, node_key, node_req, node_signed):
582                os.unlink(fn)
583
584        except EnvironmentError, e:
585            # Any difficulties with the file system wind up here
586            raise service_error(service_error.internal,
587                    "File error on  %s while creating SEER certs: %s" % \
588                            (e.filename, e.strerror))
589
590
591
592    def gentopo(self, str):
593        """
594        Generate the topology data structure from the splitter's XML
595        representation of it.
596
597        The topology XML looks like:
598            <experiment>
599                <nodes>
600                    <node><vname></vname><ips>ip1:ip2</ips></node>
601                </nodes>
602                <lans>
603                    <lan>
604                        <vname></vname><vnode></vnode><ip></ip>
605                        <bandwidth></bandwidth><member>node:port</member>
606                    </lan>
607                </lans>
608        """
609        class topo_parse:
610            """
611            Parse the topology XML and create the dats structure.
612            """
613            def __init__(self):
614                # Typing of the subelements for data conversion
615                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
616                self.int_subelements = ( 'bandwidth',)
617                self.float_subelements = ( 'delay',)
618                # The final data structure
619                self.nodes = [ ]
620                self.lans =  [ ]
621                self.topo = { \
622                        'node': self.nodes,\
623                        'lan' : self.lans,\
624                    }
625                self.element = { }  # Current element being created
626                self.chars = ""     # Last text seen
627
628            def end_element(self, name):
629                # After each sub element the contents is added to the current
630                # element or to the appropriate list.
631                if name == 'node':
632                    self.nodes.append(self.element)
633                    self.element = { }
634                elif name == 'lan':
635                    self.lans.append(self.element)
636                    self.element = { }
637                elif name in self.str_subelements:
638                    self.element[name] = self.chars
639                    self.chars = ""
640                elif name in self.int_subelements:
641                    self.element[name] = int(self.chars)
642                    self.chars = ""
643                elif name in self.float_subelements:
644                    self.element[name] = float(self.chars)
645                    self.chars = ""
646
647            def found_chars(self, data):
648                self.chars += data.rstrip()
649
650
651        tp = topo_parse();
652        parser = xml.parsers.expat.ParserCreate()
653        parser.EndElementHandler = tp.end_element
654        parser.CharacterDataHandler = tp.found_chars
655
656        parser.Parse(str)
657
658        return tp.topo
659       
660
661    def genviz(self, topo):
662        """
663        Generate the visualization the virtual topology
664        """
665
666        neato = "/usr/local/bin/neato"
667        # These are used to parse neato output and to create the visualization
668        # file.
669        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
670        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
671                "%s</type></node>"
672
673        try:
674            # Node names
675            nodes = [ n['vname'] for n in topo['node'] ]
676            topo_lans = topo['lan']
677        except KeyError, e:
678            raise service_error(service_error.internal, "Bad topology: %s" %e)
679
680        lans = { }
681        links = { }
682
683        # Walk through the virtual topology, organizing the connections into
684        # 2-node connections (links) and more-than-2-node connections (lans).
685        # When a lan is created, it's added to the list of nodes (there's a
686        # node in the visualization for the lan).
687        for l in topo_lans:
688            if links.has_key(l['vname']):
689                if len(links[l['vname']]) < 2:
690                    links[l['vname']].append(l['vnode'])
691                else:
692                    nodes.append(l['vname'])
693                    lans[l['vname']] = links[l['vname']]
694                    del links[l['vname']]
695                    lans[l['vname']].append(l['vnode'])
696            elif lans.has_key(l['vname']):
697                lans[l['vname']].append(l['vnode'])
698            else:
699                links[l['vname']] = [ l['vnode'] ]
700
701
702        # Open up a temporary file for dot to turn into a visualization
703        try:
704            df, dotname = tempfile.mkstemp()
705            dotfile = os.fdopen(df, 'w')
706        except EnvironmentError:
707            raise service_error(service_error.internal,
708                    "Failed to open file in genviz")
709
710        try:
711            dnull = open('/dev/null', 'w')
712        except EnvironmentError:
713            service_error(service_error.internal,
714                    "Failed to open /dev/null in genviz")
715
716        # Generate a dot/neato input file from the links, nodes and lans
717        try:
718            print >>dotfile, "graph G {"
719            for n in nodes:
720                print >>dotfile, '\t"%s"' % n
721            for l in links.keys():
722                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
723            for l in lans.keys():
724                for n in lans[l]:
725                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
726            print >>dotfile, "}"
727            dotfile.close()
728        except TypeError:
729            raise service_error(service_error.internal,
730                    "Single endpoint link in vtopo")
731        except EnvironmentError:
732            raise service_error(service_error.internal, "Cannot write dot file")
733
734        # Use dot to create a visualization
735        try:
736            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
737                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
738                stderr=dnull, close_fds=True)
739        except EnvironmentError:
740            raise service_error(service_error.internal, 
741                    "Cannot generate visualization: is graphviz available?")
742        dnull.close()
743
744        # Translate dot to vis format
745        vis_nodes = [ ]
746        vis = { 'node': vis_nodes }
747        for line in dot.stdout:
748            m = vis_re.match(line)
749            if m:
750                vn = m.group(1)
751                vis_node = {'name': vn, \
752                        'x': float(m.group(2)),\
753                        'y' : float(m.group(3)),\
754                    }
755                if vn in links.keys() or vn in lans.keys():
756                    vis_node['type'] = 'lan'
757                else:
758                    vis_node['type'] = 'node'
759                vis_nodes.append(vis_node)
760        rv = dot.wait()
761
762        os.remove(dotname)
763        # XXX: graphviz seems to use low return codes for warnings, like
764        # "couldn't find font"
765        if rv < 2 : return vis
766        else: return None
767
768
769    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
770            cert_pwd=None):
771        """
772        Release access to testbed through fedd
773        """
774
775        if not uri and tbmap:
776            uri = tbmap.get(tb, None)
777        if not uri:
778            raise service_error(service_error.server_config, 
779                    "Unknown testbed: %s" % tb)
780
781        if self.local_access.has_key(uri):
782            resp = self.local_access[uri].ReleaseAccess(\
783                    { 'ReleaseAccessRequestBody' : 
784                        {'allocID': {'fedid': aid}},}, 
785                    fedid(file=cert_file))
786            resp = { 'ReleaseAccessResponseBody': resp } 
787        else:
788            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
789                    cert_file, cert_pwd, self.trusted_certs)
790
791        # better error coding
792
793    def remote_ns2topdl(self, uri, desc):
794
795        req = {
796                'description' : { 'ns2description': desc },
797            }
798
799        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
800                self.trusted_certs)
801
802        if r.has_key('Ns2TopdlResponseBody'):
803            r = r['Ns2TopdlResponseBody']
804            ed = r.get('experimentdescription', None)
805            if ed.has_key('topdldescription'):
806                return topdl.Topology(**ed['topdldescription'])
807            else:
808                raise service_error(service_error.protocol, 
809                        "Bad splitter response (no output)")
810        else:
811            raise service_error(service_error.protocol, "Bad splitter response")
812
813    class start_segment:
814        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
815                cert_pwd=None, trusted_certs=None, caller=None,
816                log_collector=None):
817            self.log = log
818            self.debug = debug
819            self.cert_file = cert_file
820            self.cert_pwd = cert_pwd
821            self.trusted_certs = None
822            self.caller = caller
823            self.testbed = testbed
824            self.log_collector = log_collector
825            self.response = None
826            self.node = { }
827            self.subs = { }
828            self.proof = None
829
830        def make_map(self, resp):
831            if 'segmentdescription' not in resp  or \
832                    'topdldescription' not in resp['segmentdescription']:
833                self.log.warn('No topology returned from startsegment')
834                return 
835
836            top = topdl.Topology(
837                    **resp['segmentdescription']['topdldescription'])
838
839            for e in top.elements:
840                if isinstance(e, topdl.Computer):
841                    self.node[e.name] = e
842            for s in top.substrates:
843                self.subs[s.name] = s
844
845        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
846            req = {
847                    'allocID': { 'fedid' : aid }, 
848                    'segmentdescription': { 
849                        'topdldescription': topo.to_dict(),
850                    },
851                }
852
853            if connInfo:
854                req['connection'] = connInfo
855
856            import_svcs = [ s for m in masters.values() \
857                    for s in m if self.testbed in s.importers]
858
859            if import_svcs or self.testbed in masters:
860                req['service'] = []
861
862            for s in import_svcs:
863                for r in s.reqs:
864                    sr = copy.deepcopy(r)
865                    sr['visibility'] = 'import';
866                    req['service'].append(sr)
867
868            for s in masters.get(self.testbed, []):
869                for r in s.reqs:
870                    sr = copy.deepcopy(r)
871                    sr['visibility'] = 'export';
872                    req['service'].append(sr)
873
874            if attrs:
875                req['fedAttr'] = attrs
876
877            try:
878                self.log.debug("Calling StartSegment at %s " % uri)
879                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
880                        self.trusted_certs)
881                if r.has_key('StartSegmentResponseBody'):
882                    lval = r['StartSegmentResponseBody'].get('allocationLog',
883                            None)
884                    if lval and self.log_collector:
885                        for line in  lval.splitlines(True):
886                            self.log_collector.write(line)
887                    self.make_map(r['StartSegmentResponseBody'])
888                    if 'proof' in r: self.proof = r['proof']
889                    self.response = r
890                else:
891                    raise service_error(service_error.internal, 
892                            "Bad response!?: %s" %r)
893                return True
894            except service_error, e:
895                self.log.error("Start segment failed on %s: %s" % \
896                        (self.testbed, e))
897                return False
898
899
900
901    class terminate_segment:
902        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
903                cert_pwd=None, trusted_certs=None, caller=None):
904            self.log = log
905            self.debug = debug
906            self.cert_file = cert_file
907            self.cert_pwd = cert_pwd
908            self.trusted_certs = None
909            self.caller = caller
910            self.testbed = testbed
911
912        def __call__(self, uri, aid ):
913            req = {
914                    'allocID': {'fedid': aid }, 
915                }
916            self.log.info("Calling terminate segment")
917            try:
918                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
919                        self.trusted_certs)
920                self.log.info("Terminate segment succeeded")
921                return True
922            except service_error, e:
923                self.log.error("Terminate segment failed on %s: %s" % \
924                        (self.testbed, e))
925                return False
926
927    class info_segment(start_segment):
928        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
929                cert_pwd=None, trusted_certs=None, caller=None,
930                log_collector=None):
931            experiment_control_local.start_segment.__init__(self, debug, 
932                    log, testbed, cert_file, cert_pwd, trusted_certs, 
933                    caller, log_collector)
934
935        def __call__(self, uri, aid):
936            req = { 'allocID': { 'fedid' : aid } }
937
938            try:
939                self.log.debug("Calling InfoSegment at %s " % uri)
940                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
941                        self.trusted_certs)
942                if r.has_key('InfoSegmentResponseBody'):
943                    self.make_map(r['InfoSegmentResponseBody'])
944                    if 'proof' in r: self.proof = r['proof']
945                    self.response = r
946                else:
947                    raise service_error(service_error.internal, 
948                            "Bad response!?: %s" %r)
949                return True
950            except service_error, e:
951                self.log.error("Info segment failed on %s: %s" % \
952                        (self.testbed, e))
953                return False
954
955    class operation_segment:
956        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
957                cert_pwd=None, trusted_certs=None, caller=None,
958                log_collector=None):
959            self.log = log
960            self.debug = debug
961            self.cert_file = cert_file
962            self.cert_pwd = cert_pwd
963            self.trusted_certs = None
964            self.caller = caller
965            self.testbed = testbed
966            self.status = None
967
968        def __call__(self, uri, aid, op, targets, params):
969            req = { 
970                    'allocID': { 'fedid' : aid },
971                    'operation': op,
972                    'target': targets,
973                    }
974            if params: req['parameter'] = params
975
976
977            try:
978                self.log.debug("Calling OperationSegment at %s " % uri)
979                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
980                        self.trusted_certs)
981                if 'OperationSegmentResponseBody' in r:
982                    r = r['OperationSegmentResponseBody']
983                    if 'status' in r:
984                        self.status = r['status']
985                else:
986                    raise service_error(service_error.internal, 
987                            "Bad response!?: %s" %r)
988                return True
989            except service_error, e:
990                self.log.error("Operation segment failed on %s: %s" % \
991                        (self.testbed, e))
992                return False
993
994    def annotate_topology(self, top, data):
995        # These routines do various parts of the annotation
996        def add_new_names(nl, l):
997            """ add any names in nl to the list in l """
998            for n in nl:
999                if n not in l: l.append(n)
1000       
1001        def merge_services(ne, e):
1002            for ns in ne.service:
1003                # NB: the else is on the for
1004                for s in e.service:
1005                    if ns.name == s.name:
1006                        s.importer = ns.importer
1007                        s.param = ns.param
1008                        s.description = ns.description
1009                        s.status = ns.status
1010                        break
1011                else:
1012                    e.service.append(ns)
1013       
1014        def merge_oses(ne, e):
1015            """
1016            Merge the operating system entries of ne into e
1017            """
1018            for nos in ne.os:
1019                # NB: the else is on the for
1020                for os in e.os:
1021                    if nos.name == os.name:
1022                        os.version = nos.version
1023                        os.version = nos.distribution
1024                        os.version = nos.distributionversion
1025                        for a in nos.attribute:
1026                            if os.get_attribute(a.attribute):
1027                                os.remove_attribute(a.attribute)
1028                            os.set_attribute(a.attribute, a.value)
1029                        break
1030                else:
1031                    # If both nodes have one OS, this is a replacement
1032                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
1033                    else: e.os.append(nos)
1034
1035        # Annotate the topology with embedding info
1036        for e in top.elements:
1037            if isinstance(e, topdl.Computer):
1038                for s in data:
1039                    ne = s.node.get(e.name, None)
1040                    if ne is not None:
1041                        add_new_names(ne.localname, e.localname)
1042                        e.status = ne.status
1043                        merge_services(ne, e)
1044                        add_new_names(ne.operation, e.operation)
1045                        if ne.os: merge_oses(ne, e)
1046                        break
1047        # Annotate substrates
1048        for s in top.substrates:
1049            for d in data:
1050                ss = d.subs.get(s.name, None)
1051                if ss is not None:
1052                    if ss.capacity is not None:
1053                        s.capacity = ss.capacity
1054                    if s.latency is not None:
1055                        s.latency = ss.latency
1056
1057
1058
1059    def allocate_resources(self, allocated, masters, eid, expid, 
1060            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1061            attrs=None, connInfo={}, tbmap=None, expcert=None):
1062
1063        started = { }           # Testbeds where a sub-experiment started
1064                                # successfully
1065
1066        # XXX
1067        fail_soft = False
1068
1069        if tbmap is None: tbmap = { }
1070
1071        log = alloc_log or self.log
1072
1073        tp = thread_pool(self.nthreads)
1074        threads = [ ]
1075        starters = [ ]
1076
1077        if expcert:
1078            cert = expcert
1079            pw = None
1080        else:
1081            cert = self.cert_file
1082            pw = self.cert_pwd
1083
1084        for tb in allocated.keys():
1085            # Create and start a thread to start the segment, and save it
1086            # to get the return value later
1087            tb_attrs = copy.copy(attrs)
1088            tp.wait_for_slot()
1089            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1090            base, suffix = split_testbed(tb)
1091            if suffix:
1092                tb_attrs.append({'attribute': 'experiment_name', 
1093                    'value': "%s-%s" % (eid, suffix)})
1094            else:
1095                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1096            if not uri:
1097                raise service_error(service_error.internal, 
1098                        "Unknown testbed %s !?" % tb)
1099
1100            aid = tbparams[tb].allocID
1101            if not aid:
1102                raise service_error(service_error.internal, 
1103                        "No alloc id for testbed %s !?" % tb)
1104
1105            s = self.start_segment(log=log, debug=self.debug,
1106                    testbed=tb, cert_file=cert,
1107                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1108                    caller=self.call_StartSegment,
1109                    log_collector=log_collector)
1110            starters.append(s)
1111            t  = pooled_thread(\
1112                    target=s, name=tb,
1113                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1114                    pdata=tp, trace_file=self.trace_file)
1115            threads.append(t)
1116            t.start()
1117
1118        # Wait until all finish (keep pinging the log, though)
1119        mins = 0
1120        revoked = False
1121        while not tp.wait_for_all_done(60.0):
1122            mins += 1
1123            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1124                    % mins)
1125            if not revoked and \
1126                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1127                # a testbed has failed.  Revoke this experiment's
1128                # synchronizarion values so that sub experiments will not
1129                # deadlock waiting for synchronization that will never happen
1130                self.log.info("A subexperiment has failed to swap in, " + \
1131                        "revoking synch keys")
1132                var_key = "fedid:%s" % expid
1133                for k in self.synch_store.all_keys():
1134                    if len(k) > 45 and k[0:46] == var_key:
1135                        self.synch_store.revoke_key(k)
1136                revoked = True
1137
1138        failed = [ t.getName() for t in threads if not t.rv ]
1139        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1140
1141        # If one failed clean up, unless fail_soft is set
1142        if failed:
1143            if not fail_soft:
1144                tp.clear()
1145                for tb in succeeded:
1146                    # Create and start a thread to stop the segment
1147                    tp.wait_for_slot()
1148                    uri = tbparams[tb].uri
1149                    t  = pooled_thread(\
1150                            target=self.terminate_segment(log=log,
1151                                testbed=tb,
1152                                cert_file=cert, 
1153                                cert_pwd=pw,
1154                                trusted_certs=self.trusted_certs,
1155                                caller=self.call_TerminateSegment),
1156                            args=(uri, tbparams[tb].allocID),
1157                            name=tb,
1158                            pdata=tp, trace_file=self.trace_file)
1159                    t.start()
1160                # Wait until all finish (if any are being stopped)
1161                if succeeded:
1162                    tp.wait_for_all_done()
1163
1164                # release the allocations
1165                for tb in tbparams.keys():
1166                    try:
1167                        self.release_access(tb, tbparams[tb].allocID, 
1168                                tbmap=tbmap, uri=tbparams[tb].uri,
1169                                cert_file=cert, cert_pwd=pw)
1170                    except service_error, e:
1171                        self.log.warn("Error releasing access: %s" % e.desc)
1172                # Remove the placeholder
1173                self.state_lock.acquire()
1174                self.state[eid].status = 'failed'
1175                self.state[eid].updated()
1176                if self.state_filename: self.write_state()
1177                self.state_lock.release()
1178                # Remove the repo dir
1179                self.remove_dirs("%s/%s" %(self.repodir, expid))
1180                # Walk up tmpdir, deleting as we go
1181                if self.cleanup:
1182                    self.remove_dirs(tmpdir)
1183                else:
1184                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1185
1186
1187                log.error("Swap in failed on %s" % ",".join(failed))
1188                return
1189        else:
1190            # Walk through the successes and gather the proofs
1191            proofs = { }
1192            for s in starters:
1193                if s.proof:
1194                    proofs[s.testbed] = s.proof
1195            self.annotate_topology(top, starters)
1196            log.info("[start_segment]: Experiment %s active" % eid)
1197
1198
1199        # Walk up tmpdir, deleting as we go
1200        if self.cleanup:
1201            self.remove_dirs(tmpdir)
1202        else:
1203            log.debug("[start_experiment]: not removing %s" % tmpdir)
1204
1205        # Insert the experiment into our state and update the disk copy.
1206        self.state_lock.acquire()
1207        self.state[expid].status = 'active'
1208        self.state[eid] = self.state[expid]
1209        self.state[eid].top = top
1210        self.state[eid].updated()
1211        # Append startup proofs
1212        for f in self.state[eid].get_all_allocations():
1213            if f.tb in proofs:
1214                f.proof.append(proofs[f.tb])
1215
1216        if self.state_filename: self.write_state()
1217        self.state_lock.release()
1218        return
1219
1220
1221    def add_kit(self, e, kit):
1222        """
1223        Add a Software object created from the list of (install, location)
1224        tuples passed as kit  to the software attribute of an object e.  We
1225        do this enough to break out the code, but it's kind of a hack to
1226        avoid changing the old tuple rep.
1227        """
1228
1229        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1230
1231        if isinstance(e.software, list): e.software.extend(s)
1232        else: e.software = s
1233
1234    def append_experiment_authorization(self, expid, attrs, 
1235            need_state_lock=True):
1236        """
1237        Append the authorization information to system state
1238        """
1239
1240        for p, a in attrs:
1241            self.auth.set_attribute(p, a)
1242        self.auth.save()
1243
1244        if need_state_lock: self.state_lock.acquire()
1245        # XXX: really a no op?
1246        #self.state[expid]['auth'].update(attrs)
1247        if self.state_filename: self.write_state()
1248        if need_state_lock: self.state_lock.release()
1249
1250    def clear_experiment_authorization(self, expid, need_state_lock=True):
1251        """
1252        Attrs is a set of attribute principal pairs that need to be removed
1253        from the authenticator.  Remove them and save the authenticator.
1254        """
1255
1256        if need_state_lock: self.state_lock.acquire()
1257        # XXX: should be a no-op
1258        #if expid in self.state and 'auth' in self.state[expid]:
1259            #for p, a in self.state[expid]['auth']:
1260                #self.auth.unset_attribute(p, a)
1261            #self.state[expid]['auth'] = set()
1262        if self.state_filename: self.write_state()
1263        if need_state_lock: self.state_lock.release()
1264        self.auth.save()
1265
1266
1267    def create_experiment_state(self, fid, req, expid, expcert,
1268            state='starting'):
1269        """
1270        Create the initial entry in the experiment's state.  The expid and
1271        expcert are the experiment's fedid and certifacte that represents that
1272        ID, which are installed in the experiment state.  If the request
1273        includes a suggested local name that is used if possible.  If the local
1274        name is already taken by an experiment owned by this user that has
1275        failed, it is overwritten.  Otherwise new letters are added until a
1276        valid localname is found.  The generated local name is returned.
1277        """
1278
1279        if req.has_key('experimentID') and \
1280                req['experimentID'].has_key('localname'):
1281            overwrite = False
1282            eid = req['experimentID']['localname']
1283            # If there's an old failed experiment here with the same local name
1284            # and accessible by this user, we'll overwrite it, otherwise we'll
1285            # fall through and do the collision avoidance.
1286            old_expid = self.get_experiment_fedid(eid)
1287            if old_expid:
1288                users_experiment = True
1289                try:
1290                    self.check_experiment_access(fid, old_expid)
1291                except service_error, e:
1292                    if e.code == service_error.access: users_experiment = False
1293                    else: raise e
1294                if users_experiment:
1295                    self.state_lock.acquire()
1296                    status = self.state[eid].status
1297                    if status and status == 'failed':
1298                        # remove the old access attributes
1299                        self.clear_experiment_authorization(eid,
1300                                need_state_lock=False)
1301                        overwrite = True
1302                        del self.state[eid]
1303                        del self.state[old_expid]
1304                    self.state_lock.release()
1305                else:
1306                    self.log.info('Experiment %s exists, ' % eid + \
1307                            'but this user cannot access it')
1308            self.state_lock.acquire()
1309            while (self.state.has_key(eid) and not overwrite):
1310                eid += random.choice(string.ascii_letters)
1311            # Initial state
1312            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1313                    identity=expcert)
1314            self.state[expid] = self.state[eid]
1315            if self.state_filename: self.write_state()
1316            self.state_lock.release()
1317        else:
1318            eid = self.exp_stem
1319            for i in range(0,5):
1320                eid += random.choice(string.ascii_letters)
1321            self.state_lock.acquire()
1322            while (self.state.has_key(eid)):
1323                eid = self.exp_stem
1324                for i in range(0,5):
1325                    eid += random.choice(string.ascii_letters)
1326            # Initial state
1327            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1328                    identity=expcert)
1329            self.state[expid] = self.state[eid]
1330            if self.state_filename: self.write_state()
1331            self.state_lock.release()
1332
1333        # Let users touch the state.  Authorize this fid and the expid itself
1334        # to touch the experiment, as well as allowing th eoverrides.
1335        self.append_experiment_authorization(eid, 
1336                set([(fid, expid), (expid,expid)] + \
1337                        [ (o, expid) for o in self.overrides]))
1338
1339        return eid
1340
1341
1342    def allocate_ips_to_topo(self, top):
1343        """
1344        Add an ip4_address attribute to all the hosts in the topology, based on
1345        the shared substrates on which they sit.  An /etc/hosts file is also
1346        created and returned as a list of hostfiles entries.  We also return
1347        the allocator, because we may need to allocate IPs to portals
1348        (specifically DRAGON portals).
1349        """
1350        subs = sorted(top.substrates, 
1351                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1352                reverse=True)
1353        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1354        ifs = { }
1355        hosts = [ ]
1356
1357        for idx, s in enumerate(subs):
1358            net_size = len(s.interfaces)+2
1359
1360            a = ips.allocate(net_size)
1361            if a :
1362                base, num = a
1363                if num < net_size: 
1364                    raise service_error(service_error.internal,
1365                            "Allocator returned wrong number of IPs??")
1366            else:
1367                raise service_error(service_error.req, 
1368                        "Cannot allocate IP addresses")
1369            mask = ips.min_alloc
1370            while mask < net_size:
1371                mask *= 2
1372
1373            netmask = ((2**32-1) ^ (mask-1))
1374
1375            base += 1
1376            for i in s.interfaces:
1377                i.attribute.append(
1378                        topdl.Attribute('ip4_address', 
1379                            "%s" % ip_addr(base)))
1380                i.attribute.append(
1381                        topdl.Attribute('ip4_netmask', 
1382                            "%s" % ip_addr(int(netmask))))
1383
1384                hname = i.element.name
1385                if ifs.has_key(hname):
1386                    hosts.append("%s\t%s-%s %s-%d" % \
1387                            (ip_addr(base), hname, s.name, hname,
1388                                ifs[hname]))
1389                else:
1390                    ifs[hname] = 0
1391                    hosts.append("%s\t%s-%s %s-%d %s" % \
1392                            (ip_addr(base), hname, s.name, hname,
1393                                ifs[hname], hname))
1394
1395                ifs[hname] += 1
1396                base += 1
1397        return hosts, ips
1398
1399    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1400            tbparam, masters, tbmap, expid=None, expcert=None):
1401        for tb in testbeds:
1402            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1403                    expcert)
1404            allocated[tb] = 1
1405
1406    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1407            expcert=None):
1408        """
1409        Get access to testbed through fedd and set the parameters for that tb
1410        """
1411        def get_export_project(svcs):
1412            """
1413            Look through for the list of federated_service for this testbed
1414            objects for a project_export service, and extract the project
1415            parameter.
1416            """
1417
1418            pe = [s for s in svcs if s.name=='project_export']
1419            if len(pe) == 1:
1420                return pe[0].params.get('project', None)
1421            elif len(pe) == 0:
1422                return None
1423            else:
1424                raise service_error(service_error.req,
1425                        "More than one project export is not supported")
1426
1427        def add_services(svcs, type, slist, keys):
1428            """
1429            Add the given services to slist.  type is import or export.  Also
1430            add a mapping entry from the assigned id to the original service
1431            record.
1432            """
1433            for i, s in enumerate(svcs):
1434                idx = '%s%d' % (type, i)
1435                keys[idx] = s
1436                sr = {'id': idx, 'name': s.name, 'visibility': type }
1437                if s.params:
1438                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1439                            for k, v in s.params.items()]
1440                slist.append(sr)
1441
1442        uri = tbmap.get(testbed_base(tb), None)
1443        if not uri:
1444            raise service_error(service_error.server_config, 
1445                    "Unknown testbed: %s" % tb)
1446
1447        export_svcs = masters.get(tb,[])
1448        import_svcs = [ s for m in masters.values() \
1449                for s in m \
1450                    if tb in s.importers ]
1451
1452        export_project = get_export_project(export_svcs)
1453        # Compose the credential list so that IDs come before attributes
1454        creds = set()
1455        keys = set()
1456        certs = self.auth.get_creds_for_principal(fid)
1457        if expid:
1458            certs.update(self.auth.get_creds_for_principal(expid))
1459        for c in certs:
1460            keys.add(c.issuer_cert())
1461            creds.add(c.attribute_cert())
1462        creds = list(keys) + list(creds)
1463
1464        if expcert: cert, pw = expcert, None
1465        else: cert, pw = self.cert_file, self.cert_pw
1466
1467        # Request credentials
1468        req = {
1469                'abac_credential': creds,
1470            }
1471        # Make the service request from the services we're importing and
1472        # exporting.  Keep track of the export request ids so we can
1473        # collect the resulting info from the access response.
1474        e_keys = { }
1475        if import_svcs or export_svcs:
1476            slist = []
1477            add_services(import_svcs, 'import', slist, e_keys)
1478            add_services(export_svcs, 'export', slist, e_keys)
1479            req['service'] = slist
1480
1481        if self.local_access.has_key(uri):
1482            # Local access call
1483            req = { 'RequestAccessRequestBody' : req }
1484            r = self.local_access[uri].RequestAccess(req, 
1485                    fedid(file=self.cert_file))
1486            r = { 'RequestAccessResponseBody' : r }
1487        else:
1488            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1489
1490        if r.has_key('RequestAccessResponseBody'):
1491            # Through to here we have a valid response, not a fault.
1492            # Access denied is a fault, so something better or worse than
1493            # access denied has happened.
1494            r = r['RequestAccessResponseBody']
1495            self.log.debug("[get_access] Access granted")
1496        else:
1497            raise service_error(service_error.protocol,
1498                        "Bad proxy response")
1499        if 'proof' not in r:
1500            raise service_error(service_error.protocol,
1501                        "Bad access response (no access proof)")
1502
1503        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1504                tb=tb, uri=uri, proof=[r['proof']], 
1505                services=masters.get(tb, None))
1506
1507        # Collect the responses corresponding to the services this testbed
1508        # exports.  These will be the service requests that we will include in
1509        # the start segment requests (with appropriate visibility values) to
1510        # import and export the segments.
1511        for s in r.get('service', []):
1512            id = s.get('id', None)
1513            # Note that this attaches the response to the object in the masters
1514            # data structure.  (The e_keys index disappears when this fcn
1515            # returns)
1516            if id and id in e_keys:
1517                e_keys[id].reqs.append(s)
1518
1519        # Add attributes to parameter space.  We don't allow attributes to
1520        # overlay any parameters already installed.
1521        for a in r.get('fedAttr', []):
1522            try:
1523                if a['attribute']:
1524                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1525            except KeyError:
1526                self.log.error("Bad attribute in response: %s" % a)
1527
1528
1529    def split_topology(self, top, topo, testbeds):
1530        """
1531        Create the sub-topologies that are needed for experiment instantiation.
1532        """
1533        for tb in testbeds:
1534            topo[tb] = top.clone()
1535            # copy in for loop allows deletions from the original
1536            for e in [ e for e in topo[tb].elements]:
1537                etb = e.get_attribute('testbed')
1538                # NB: elements without a testbed attribute won't appear in any
1539                # sub topologies. 
1540                if not etb or etb != tb:
1541                    for i in e.interface:
1542                        for s in i.subs:
1543                            try:
1544                                s.interfaces.remove(i)
1545                            except ValueError:
1546                                raise service_error(service_error.internal,
1547                                        "Can't remove interface??")
1548                    topo[tb].elements.remove(e)
1549            topo[tb].make_indices()
1550
1551    def confirm_software(self, top):
1552        """
1553        Make sure that the software to be loaded in the topo is all available
1554        before we begin making access requests, etc.  This is a subset of
1555        wrangle_software.
1556        """
1557        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1558        pkgs.update([x.location for e in top.elements for x in e.software])
1559
1560        for pkg in pkgs:
1561            loc = pkg
1562
1563            scheme, host, path = urlparse(loc)[0:3]
1564            dest = os.path.basename(path)
1565            if not scheme:
1566                if not loc.startswith('/'):
1567                    loc = "/%s" % loc
1568                loc = "file://%s" %loc
1569            # NB: if scheme was found, loc == pkg
1570            try:
1571                u = urlopen(loc)
1572                u.close()
1573            except Exception, e:
1574                raise service_error(service_error.req, 
1575                        "Cannot open %s: %s" % (loc, e))
1576        return True
1577
1578    def wrangle_software(self, expid, top, topo, tbparams):
1579        """
1580        Copy software out to the repository directory, allocate permissions and
1581        rewrite the segment topologies to look for the software in local
1582        places.
1583        """
1584
1585        # Copy the rpms and tarfiles to a distribution directory from
1586        # which the federants can retrieve them
1587        linkpath = "%s/software" %  expid
1588        softdir ="%s/%s" % ( self.repodir, linkpath)
1589        softmap = { }
1590
1591        # self.fedkit and self.gateway kit are lists of tuples of
1592        # (install_location, download_location) this extracts the download
1593        # locations.
1594        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1595        pkgs.update([x.location for e in top.elements for x in e.software])
1596        try:
1597            os.makedirs(softdir)
1598        except EnvironmentError, e:
1599            raise service_error(
1600                    "Cannot create software directory: %s" % e)
1601        # The actual copying.  Everything's converted into a url for copying.
1602        auth_attrs = set()
1603        for pkg in pkgs:
1604            loc = pkg
1605
1606            scheme, host, path = urlparse(loc)[0:3]
1607            dest = os.path.basename(path)
1608            if not scheme:
1609                if not loc.startswith('/'):
1610                    loc = "/%s" % loc
1611                loc = "file://%s" %loc
1612            # NB: if scheme was found, loc == pkg
1613            try:
1614                u = urlopen(loc)
1615            except Exception, e:
1616                raise service_error(service_error.req, 
1617                        "Cannot open %s: %s" % (loc, e))
1618            try:
1619                f = open("%s/%s" % (softdir, dest) , "w")
1620                self.log.debug("Writing %s/%s" % (softdir,dest) )
1621                data = u.read(4096)
1622                while data:
1623                    f.write(data)
1624                    data = u.read(4096)
1625                f.close()
1626                u.close()
1627            except Exception, e:
1628                raise service_error(service_error.internal,
1629                        "Could not copy %s: %s" % (loc, e))
1630            path = re.sub("/tmp", "", linkpath)
1631            # XXX
1632            softmap[pkg] = \
1633                    "%s/%s/%s" %\
1634                    ( self.repo_url, path, dest)
1635
1636            # Allow the individual segments to access the software by assigning
1637            # an attribute to each testbed allocation that encodes the data to
1638            # be released.  This expression collects the data for each run of
1639            # the loop.
1640            auth_attrs.update([
1641                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1642                        for tb in tbparams.keys()])
1643
1644        self.append_experiment_authorization(expid, auth_attrs)
1645
1646        # Convert the software locations in the segments into the local
1647        # copies on this host
1648        for soft in [ s for tb in topo.values() \
1649                for e in tb.elements \
1650                    if getattr(e, 'software', False) \
1651                        for s in e.software ]:
1652            if softmap.has_key(soft.location):
1653                soft.location = softmap[soft.location]
1654
1655
1656    def new_experiment(self, req, fid):
1657        """
1658        The external interface to empty initial experiment creation called from
1659        the dispatcher.
1660
1661        Creates a working directory, splits the incoming description using the
1662        splitter script and parses out the avrious subsections using the
1663        lcasses above.  Once each sub-experiment is created, use pooled threads
1664        to instantiate them and start it all up.
1665        """
1666        self.log.info("New experiment call started for %s" % fid)
1667        req = req.get('NewRequestBody', None)
1668        if not req:
1669            raise service_error(service_error.req,
1670                    "Bad request format (no NewRequestBody)")
1671
1672        if self.auth.import_credentials(data_list=req.get('credential', [])):
1673            self.auth.save()
1674
1675        try:
1676            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1677                    with_proof=True)
1678        except service_error, e:
1679            self.log.info("New experiment call for %s: access denied" % fid)
1680            raise e
1681
1682
1683        if not access_ok:
1684            self.log.info("New experiment call for %s: Access denied" % fid)
1685            raise service_error(service_error.access, "New access denied",
1686                    proof=[proof])
1687
1688        try:
1689            tmpdir = tempfile.mkdtemp(prefix="split-")
1690        except EnvironmentError:
1691            raise service_error(service_error.internal, "Cannot create tmp dir")
1692
1693        try:
1694            access_user = self.accessdb[fid]
1695        except KeyError:
1696            raise service_error(service_error.internal,
1697                    "Access map and authorizer out of sync in " + \
1698                            "new_experiment for fedid %s"  % fid)
1699
1700        # Generate an ID for the experiment (slice) and a certificate that the
1701        # allocator can use to prove they own it.  We'll ship it back through
1702        # the encrypted connection.  If the requester supplied one, use it.
1703        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1704            expcert = req['experimentAccess']['X509']
1705            expid = fedid(certstr=expcert)
1706            self.state_lock.acquire()
1707            if expid in self.state:
1708                self.state_lock.release()
1709                raise service_error(service_error.req, 
1710                        'fedid %s identifies an existing experiment' % expid)
1711            self.state_lock.release()
1712        else:
1713            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1714
1715        #now we're done with the tmpdir, and it should be empty
1716        if self.cleanup:
1717            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1718            os.rmdir(tmpdir)
1719        else:
1720            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1721
1722        eid = self.create_experiment_state(fid, req, expid, expcert, 
1723                state='empty')
1724
1725        rv = {
1726                'experimentID': [
1727                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1728                ],
1729                'experimentStatus': 'empty',
1730                'experimentAccess': { 'X509' : expcert },
1731                'proof': proof.to_dict(),
1732            }
1733
1734        self.log.info("New experiment call succeeded for %s" % fid)
1735        return rv
1736
1737    # create_experiment sub-functions
1738
1739    @staticmethod
1740    def get_experiment_key(req, field='experimentID'):
1741        """
1742        Parse the experiment identifiers out of the request (the request body
1743        tag has been removed).  Specifically this pulls either the fedid or the
1744        localname out of the experimentID field.  A fedid is preferred.  If
1745        neither is present or the request does not contain the fields,
1746        service_errors are raised.
1747        """
1748        # Get the experiment access
1749        exp = req.get(field, None)
1750        if exp:
1751            if exp.has_key('fedid'):
1752                key = exp['fedid']
1753            elif exp.has_key('localname'):
1754                key = exp['localname']
1755            else:
1756                raise service_error(service_error.req, "Unknown lookup type")
1757        else:
1758            raise service_error(service_error.req, "No request?")
1759
1760        return key
1761
1762    def get_experiment_ids_and_start(self, key, tmpdir):
1763        """
1764        Get the experiment name, id and access certificate from the state, and
1765        set the experiment state to 'starting'.  returns a triple (fedid,
1766        localname, access_cert_file). The access_cert_file is a copy of the
1767        contents of the access certificate, created in the tempdir with
1768        restricted permissions.  If things are confused, raise an exception.
1769        """
1770
1771        expid = eid = None
1772        self.state_lock.acquire()
1773        if key in self.state:
1774            exp = self.state[key]
1775            exp.status = "starting"
1776            exp.updated()
1777            expid = exp.fedid
1778            eid = exp.localname
1779            expcert = exp.identity
1780        self.state_lock.release()
1781
1782        # make a protected copy of the access certificate so the experiment
1783        # controller can act as the experiment principal.
1784        if expcert:
1785            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1786            if not expcert_file:
1787                raise service_error(service_error.internal, 
1788                        "Cannot create temp cert file?")
1789        else:
1790            expcert_file = None
1791
1792        return (eid, expid, expcert_file)
1793
1794    def get_topology(self, req, tmpdir):
1795        """
1796        Get the ns2 content and put it into a file for parsing.  Call the local
1797        or remote parser and return the topdl.Topology.  Errors result in
1798        exceptions.  req is the request and tmpdir is a work directory.
1799        """
1800
1801        # The tcl parser needs to read a file so put the content into that file
1802        descr=req.get('experimentdescription', None)
1803        if descr:
1804            if 'ns2description' in descr:
1805                file_content=descr['ns2description']
1806            elif 'topdldescription' in descr:
1807                return topdl.Topology(**descr['topdldescription'])
1808            else:
1809                raise service_error(service_error.req, 
1810                        'Unknown experiment description type')
1811        else:
1812            raise service_error(service_error.req, "No experiment description")
1813
1814
1815        if self.splitter_url:
1816            self.log.debug("Calling remote topdl translator at %s" % \
1817                    self.splitter_url)
1818            top = self.remote_ns2topdl(self.splitter_url, file_content)
1819        else:
1820            tclfile = os.path.join(tmpdir, "experiment.tcl")
1821            if file_content:
1822                try:
1823                    f = open(tclfile, 'w')
1824                    f.write(file_content)
1825                    f.close()
1826                except EnvironmentError:
1827                    raise service_error(service_error.internal,
1828                            "Cannot write temp experiment description")
1829            else:
1830                raise service_error(service_error.req, 
1831                        "Only ns2descriptions supported")
1832            pid = "dummy"
1833            gid = "dummy"
1834            eid = "dummy"
1835
1836            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1837                str(self.muxmax), '-m', 'dummy']
1838
1839            tclcmd.extend([pid, gid, eid, tclfile])
1840
1841            self.log.debug("running local splitter %s", " ".join(tclcmd))
1842            # This is just fantastic.  As a side effect the parser copies
1843            # tb_compat.tcl into the current directory, so that directory
1844            # must be writable by the fedd user.  Doing this in the
1845            # temporary subdir ensures this is the case.
1846            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1847                    cwd=tmpdir)
1848            split_data = tclparser.stdout
1849
1850            top = topdl.topology_from_xml(file=split_data, top="experiment")
1851            os.remove(tclfile)
1852
1853        return top
1854
1855    def get_testbed_services(self, req, testbeds):
1856        """
1857        Parse the services section of the request into two dicts mapping
1858        testbed to lists of federated_service objects.  The first dict maps all
1859        exporters of services to those service objects, the second maps
1860        testbeds to service objects only for services requiring portals.
1861        """
1862        # We construct both dicts here because deriving the second is more
1863        # comples than it looks - both the keys and lists can differ, and it's
1864        # much easier to generate both in one pass.
1865        masters = { }
1866        pmasters = { }
1867        for s in req.get('service', []):
1868            # If this is a service request with the importall field
1869            # set, fill it out.
1870
1871            if s.get('importall', False):
1872                s['import'] = [ tb for tb in testbeds \
1873                        if tb not in s.get('export',[])]
1874                del s['importall']
1875
1876            # Add the service to masters
1877            for tb in s.get('export', []):
1878                if s.get('name', None):
1879
1880                    params = { }
1881                    for a in s.get('fedAttr', []):
1882                        params[a.get('attribute', '')] = a.get('value','')
1883
1884                    fser = federated_service(name=s['name'],
1885                            exporter=tb, importers=s.get('import',[]),
1886                            params=params)
1887                    if fser.name == 'hide_hosts' \
1888                            and 'hosts' not in fser.params:
1889                        fser.params['hosts'] = \
1890                                ",".join(tb_hosts.get(fser.exporter, []))
1891                    if tb in masters: masters[tb].append(fser)
1892                    else: masters[tb] = [fser]
1893
1894                    if fser.portal:
1895                        if tb in pmasters: pmasters[tb].append(fser)
1896                        else: pmasters[tb] = [fser]
1897                else:
1898                    self.log.error('Testbed service does not have name " + \
1899                            "and importers')
1900        return masters, pmasters
1901
1902    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1903        """
1904        Create the ssh keys necessary for interconnecting the portal nodes and
1905        the global hosts file for letting each segment know about the IP
1906        addresses in play.  Save these into the repo.  Add attributes to the
1907        autorizer allowing access controllers to download them and return a set
1908        of attributes that inform the segments where to find this stuff.  May
1909        raise service_errors in if there are problems.
1910        """
1911        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1912        gw_secretkey_base = "fed.%s" % self.ssh_type
1913        keydir = os.path.join(tmpdir, 'keys')
1914        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1915        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1916
1917        try:
1918            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1919        except ValueError:
1920            raise service_error(service_error.server_config, 
1921                    "Bad key type (%s)" % self.ssh_type)
1922
1923        self.generate_seer_certs(keydir)
1924
1925        # Copy configuration files into the remote file store
1926        # The config urlpath
1927        configpath = "/%s/config" % expid
1928        # The config file system location
1929        configdir ="%s%s" % ( self.repodir, configpath)
1930        try:
1931            os.makedirs(configdir)
1932        except EnvironmentError, e:
1933            raise service_error(service_error.internal,
1934                    "Cannot create config directory: %s" % e)
1935        try:
1936            f = open("%s/hosts" % configdir, "w")
1937            print >> f, string.join(hosts, '\n')
1938            f.close()
1939        except EnvironmentError, e:
1940            raise service_error(service_error.internal, 
1941                    "Cannot write hosts file: %s" % e)
1942        try:
1943            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1944            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1945            copy_file(os.path.join(keydir, 'ca.pem'), 
1946                    os.path.join(configdir, 'ca.pem'))
1947            copy_file(os.path.join(keydir, 'node.pem'), 
1948                    os.path.join(configdir, 'node.pem'))
1949        except EnvironmentError, e:
1950            raise service_error(service_error.internal, 
1951                    "Cannot copy keyfiles: %s" % e)
1952
1953        # Allow the individual testbeds to access the configuration files,
1954        # again by setting an attribute for the relevant pathnames on each
1955        # allocation principal.  Yeah, that's a long list comprehension.
1956        self.append_experiment_authorization(expid, set([
1957            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1958                    for tb in tbparams.keys() \
1959                        for f in ("hosts", 'ca.pem', 'node.pem', 
1960                            gw_secretkey_base, gw_pubkey_base)]))
1961
1962        attrs = [ 
1963                {
1964                    'attribute': 'ssh_pubkey', 
1965                    'value': '%s/%s/config/%s' % \
1966                            (self.repo_url, expid, gw_pubkey_base)
1967                },
1968                {
1969                    'attribute': 'ssh_secretkey', 
1970                    'value': '%s/%s/config/%s' % \
1971                            (self.repo_url, expid, gw_secretkey_base)
1972                },
1973                {
1974                    'attribute': 'hosts', 
1975                    'value': '%s/%s/config/hosts' % \
1976                            (self.repo_url, expid)
1977                },
1978                {
1979                    'attribute': 'seer_ca_pem', 
1980                    'value': '%s/%s/config/%s' % \
1981                            (self.repo_url, expid, 'ca.pem')
1982                },
1983                {
1984                    'attribute': 'seer_node_pem', 
1985                    'value': '%s/%s/config/%s' % \
1986                            (self.repo_url, expid, 'node.pem')
1987                },
1988            ]
1989        return attrs
1990
1991
1992    def get_vtopo(self, req, fid):
1993        """
1994        Return the stored virtual topology for this experiment
1995        """
1996        rv = None
1997        state = None
1998        self.log.info("vtopo call started for %s" %  fid)
1999
2000        req = req.get('VtopoRequestBody', None)
2001        if not req:
2002            raise service_error(service_error.req,
2003                    "Bad request format (no VtopoRequestBody)")
2004        exp = req.get('experiment', None)
2005        if exp:
2006            if exp.has_key('fedid'):
2007                key = exp['fedid']
2008                keytype = "fedid"
2009            elif exp.has_key('localname'):
2010                key = exp['localname']
2011                keytype = "localname"
2012            else:
2013                raise service_error(service_error.req, "Unknown lookup type")
2014        else:
2015            raise service_error(service_error.req, "No request?")
2016
2017        try:
2018            proof = self.check_experiment_access(fid, key)
2019        except service_error, e:
2020            self.log.info("vtopo call failed for %s: access denied" %  fid)
2021            raise e
2022
2023        self.state_lock.acquire()
2024        # XXX: this needs to be recalculated
2025        if key in self.state:
2026            if self.state[key].top is not None:
2027                vtopo = topdl.topology_to_vtopo(self.state[key].top)
2028                rv = { 'experiment' : {keytype: key },
2029                        'vtopo': vtopo,
2030                        'proof': proof.to_dict(), 
2031                    }
2032            else:
2033                state = self.state[key].status
2034        self.state_lock.release()
2035
2036        if rv: 
2037            self.log.info("vtopo call completed for %s %s " % \
2038                (key, fid))
2039            return rv
2040        else: 
2041            if state:
2042                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2043                    (key, fid))
2044                raise service_error(service_error.partial, 
2045                        "Not ready: %s" % state)
2046            else:
2047                self.log.info("vtopo call completed for %s %s (No experiment)"\
2048                        % (key, fid))
2049                raise service_error(service_error.req, "No such experiment")
2050
2051    def get_vis(self, req, fid):
2052        """
2053        Return the stored visualization for this experiment
2054        """
2055        rv = None
2056        state = None
2057
2058        self.log.info("vis call started for %s" %  fid)
2059        req = req.get('VisRequestBody', None)
2060        if not req:
2061            raise service_error(service_error.req,
2062                    "Bad request format (no VisRequestBody)")
2063        exp = req.get('experiment', None)
2064        if exp:
2065            if exp.has_key('fedid'):
2066                key = exp['fedid']
2067                keytype = "fedid"
2068            elif exp.has_key('localname'):
2069                key = exp['localname']
2070                keytype = "localname"
2071            else:
2072                raise service_error(service_error.req, "Unknown lookup type")
2073        else:
2074            raise service_error(service_error.req, "No request?")
2075
2076        try:
2077            proof = self.check_experiment_access(fid, key)
2078        except service_error, e:
2079            self.log.info("vis call failed for %s: access denied" %  fid)
2080            raise e
2081
2082        self.state_lock.acquire()
2083        # Generate the visualization
2084        if key in self.state:
2085            if self.state[key].top is not None:
2086                try:
2087                    vis = self.genviz(
2088                            topdl.topology_to_vtopo(self.state[key].top))
2089                except service_error, e:
2090                    self.state_lock.release()
2091                    raise e
2092                rv =  { 'experiment' : {keytype: key },
2093                        'vis': vis,
2094                        'proof': proof.to_dict(), 
2095                        }
2096            else:
2097                state = self.state[key].status
2098        self.state_lock.release()
2099
2100        if rv: 
2101            self.log.info("vis call completed for %s %s " % \
2102                (key, fid))
2103            return rv
2104        else:
2105            if state:
2106                self.log.info("vis call completed for %s %s (not ready)" % \
2107                    (key, fid))
2108                raise service_error(service_error.partial, 
2109                        "Not ready: %s" % state)
2110            else:
2111                self.log.info("vis call completed for %s %s (no experiment)" % \
2112                    (key, fid))
2113                raise service_error(service_error.req, "No such experiment")
2114
2115   
2116    def save_federant_information(self, allocated, tbparams, eid, top):
2117        """
2118        Store the various data that have changed in the experiment state
2119        between when it was started and the beginning of resource allocation.
2120        This is basically the information about each local allocation.  This
2121        fills in the values of the placeholder allocation in the state.  It
2122        also collects the access proofs and returns them as dicts for a
2123        response message.
2124        """
2125        self.state_lock.acquire()
2126        exp = self.state[eid]
2127        exp.top = top.clone()
2128        # save federant information
2129        for k in allocated.keys():
2130            exp.add_allocation(tbparams[k])
2131            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2132                type="testbed", localname=[k], 
2133                service=[ s.to_topdl() for s in tbparams[k].services]))
2134
2135        # Access proofs for the response message
2136        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2137                    for p in tbparams[k].proof]
2138        exp.updated()
2139        if self.state_filename: 
2140            self.write_state()
2141        self.state_lock.release()
2142        return proofs
2143
2144    def clear_placeholder(self, eid, expid, tmpdir):
2145        """
2146        Clear the placeholder and remove any allocated temporary dir.
2147        """
2148
2149        self.state_lock.acquire()
2150        del self.state[eid]
2151        del self.state[expid]
2152        if self.state_filename: self.write_state()
2153        self.state_lock.release()
2154        if tmpdir and self.cleanup:
2155            self.remove_dirs(tmpdir)
2156
2157    # end of create_experiment sub-functions
2158
2159    def create_experiment(self, req, fid):
2160        """
2161        The external interface to experiment creation called from the
2162        dispatcher.
2163
2164        Creates a working directory, splits the incoming description using the
2165        splitter script and parses out the various subsections using the
2166        classes above.  Once each sub-experiment is created, use pooled threads
2167        to instantiate them and start it all up.
2168        """
2169
2170        self.log.info("Create experiment call started for %s" % fid)
2171        req = req.get('CreateRequestBody', None)
2172        if req:
2173            key = self.get_experiment_key(req)
2174        else:
2175            raise service_error(service_error.req,
2176                    "Bad request format (no CreateRequestBody)")
2177
2178        # Import information from the requester
2179        if self.auth.import_credentials(data_list=req.get('credential', [])):
2180            self.auth.save()
2181
2182        try:
2183            # Make sure that the caller can talk to us
2184            proof = self.check_experiment_access(fid, key)
2185        except service_error, e:
2186            self.log.info("Create experiment call failed for %s: access denied"\
2187                    % fid)
2188            raise e
2189
2190
2191        # Install the testbed map entries supplied with the request into a copy
2192        # of the testbed map.
2193        tbmap = dict(self.tbmap)
2194        tbactive = set(self.tbactive)
2195        for m in req.get('testbedmap', []):
2196            if 'testbed' in m and 'uri' in m:
2197                tbmap[m['testbed']] = m['uri']
2198                if 'active' in m and m['active']: tbactive.add(m['testbed'])
2199
2200        # a place to work
2201        try:
2202            tmpdir = tempfile.mkdtemp(prefix="split-")
2203            os.mkdir(tmpdir+"/keys")
2204        except EnvironmentError:
2205            raise service_error(service_error.internal, "Cannot create tmp dir")
2206
2207        tbparams = { }
2208
2209        eid, expid, expcert_file = \
2210                self.get_experiment_ids_and_start(key, tmpdir)
2211
2212        # This catches exceptions to clear the placeholder if necessary
2213        try: 
2214            if not (eid and expid):
2215                raise service_error(service_error.internal, 
2216                        "Cannot find local experiment info!?")
2217
2218            top = self.get_topology(req, tmpdir)
2219            self.confirm_software(top)
2220            # Assign the IPs
2221            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2222            # Find the testbeds to look up
2223            tb_hosts = { }
2224            testbeds = [ ]
2225            for e in top.elements:
2226                if isinstance(e, topdl.Computer):
2227                    tb = e.get_attribute('testbed') or 'default'
2228                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2229                    else: 
2230                        tb_hosts[tb] = [ e.name ]
2231                        testbeds.append(tb)
2232
2233            masters, pmasters = self.get_testbed_services(req, testbeds)
2234            allocated = { }         # Testbeds we can access
2235            topo ={ }               # Sub topologies
2236            connInfo = { }          # Connection information
2237
2238            self.split_topology(top, topo, testbeds)
2239
2240            self.get_access_to_testbeds(testbeds, fid, allocated, 
2241                    tbparams, masters, tbmap, expid, expcert_file)
2242
2243            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2244
2245            part = experiment_partition(self.auth, self.store_url, tbmap,
2246                    self.muxmax, self.direct_transit, tbactive)
2247            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2248                    connInfo, expid)
2249
2250            auth_attrs = set()
2251            # Now get access to the dynamic testbeds (those added above)
2252            for tb in [ t for t in topo if t not in allocated]:
2253                self.get_access(tb, tbparams, fid, masters, tbmap, 
2254                        expid, expcert_file)
2255                allocated[tb] = 1
2256                store_keys = topo[tb].get_attribute('store_keys')
2257                # Give the testbed access to keys it exports or imports
2258                if store_keys:
2259                    auth_attrs.update(set([
2260                        (tbparams[tb].allocID, sk) \
2261                                for sk in store_keys.split(" ")]))
2262
2263            if auth_attrs:
2264                self.append_experiment_authorization(expid, auth_attrs)
2265
2266            # transit and disconnected testbeds may not have a connInfo entry.
2267            # Fill in the blanks.
2268            for t in allocated.keys():
2269                if not connInfo.has_key(t):
2270                    connInfo[t] = { }
2271
2272            self.wrangle_software(expid, top, topo, tbparams)
2273
2274            proofs = self.save_federant_information(allocated, tbparams, 
2275                    eid, top)
2276        except service_error, e:
2277            # If something goes wrong in the parse (usually an access error)
2278            # clear the placeholder state.  From here on out the code delays
2279            # exceptions.  Failing at this point returns a fault to the remote
2280            # caller.
2281
2282            self.log.info("Create experiment call failed for %s %s: %s" % 
2283                    (eid, fid, e))
2284            self.clear_placeholder(eid, expid, tmpdir)
2285            raise e
2286
2287        # Start the background swapper and return the starting state.  From
2288        # here on out, the state will stick around a while.
2289
2290        # Create a logger that logs to the experiment's state object as well as
2291        # to the main log file.
2292        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2293        alloc_collector = self.list_log(self.state[eid].log)
2294        h = logging.StreamHandler(alloc_collector)
2295        # XXX: there should be a global one of these rather than repeating the
2296        # code.
2297        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2298                    '%d %b %y %H:%M:%S'))
2299        alloc_log.addHandler(h)
2300
2301        # Start a thread to do the resource allocation
2302        t  = Thread(target=self.allocate_resources,
2303                args=(allocated, masters, eid, expid, tbparams, 
2304                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2305                    connInfo, tbmap, expcert_file),
2306                name=eid)
2307        t.start()
2308
2309        rv = {
2310                'experimentID': [
2311                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2312                ],
2313                'experimentStatus': 'starting',
2314                'proof': [ proof.to_dict() ] + proofs,
2315            }
2316        self.log.info("Create experiment call succeeded for %s %s" % \
2317                (eid, fid))
2318
2319        return rv
2320   
2321    def get_experiment_fedid(self, key):
2322        """
2323        find the fedid associated with the localname key in the state database.
2324        """
2325
2326        rv = None
2327        self.state_lock.acquire()
2328        if key in self.state:
2329            rv = self.state[key].fedid
2330        self.state_lock.release()
2331        return rv
2332
2333    def check_experiment_access(self, fid, key):
2334        """
2335        Confirm that the fid has access to the experiment.  Though a request
2336        may be made in terms of a local name, the access attribute is always
2337        the experiment's fedid.
2338        """
2339        if not isinstance(key, fedid):
2340            key = self.get_experiment_fedid(key)
2341
2342        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2343
2344        if access_ok:
2345            return proof
2346        else:
2347            raise service_error(service_error.access, "Access Denied",
2348                proof)
2349
2350
2351    def get_handler(self, path, fid):
2352        """
2353        Perhaps surprisingly named, this function handles HTTP GET requests to
2354        this server (SOAP requests are POSTs).
2355        """
2356        self.log.info("Get handler %s %s" % (path, fid))
2357        # XXX: log proofs?
2358        if self.auth.check_attribute(fid, path):
2359            return ("%s/%s" % (self.repodir, path), "application/binary")
2360        else:
2361            return (None, None)
2362
2363    def update_info(self, key, force=False):
2364        top = None
2365        self.state_lock.acquire()
2366        if key in self.state:
2367            if force or self.state[key].older_than(self.info_cache_limit):
2368                top = self.state[key].top
2369                if top is not None: top = top.clone()
2370                d1, info_params, cert, d2 = \
2371                        self.get_segment_info(self.state[key], need_lock=False)
2372        self.state_lock.release()
2373
2374        if top is None: return
2375
2376        try:
2377            tmpdir = tempfile.mkdtemp(prefix="info-")
2378        except EnvironmentError:
2379            raise service_error(service_error.internal, 
2380                    "Cannot create tmp dir")
2381        cert_file = self.make_temp_certfile(cert, tmpdir)
2382
2383        data = []
2384        try:
2385            for k, (uri, aid) in info_params.items():
2386                info=self.info_segment(log=self.log, testbed=uri,
2387                            cert_file=cert_file, cert_pwd=None,
2388                            trusted_certs=self.trusted_certs,
2389                            caller=self.call_InfoSegment)
2390                info(uri, aid)
2391                data.append(info)
2392        # Clean up the tmpdir no matter what
2393        finally:
2394            if tmpdir: self.remove_dirs(tmpdir)
2395
2396        self.annotate_topology(top, data)
2397        self.state_lock.acquire()
2398        if key in self.state:
2399            self.state[key].top = top
2400            self.state[key].updated()
2401            if self.state_filename: self.write_state()
2402        self.state_lock.release()
2403
2404   
2405    def get_info(self, req, fid):
2406        """
2407        Return all the stored info about this experiment
2408        """
2409        rv = None
2410
2411        self.log.info("Info call started for %s" %  fid)
2412        req = req.get('InfoRequestBody', None)
2413        if not req:
2414            raise service_error(service_error.req,
2415                    "Bad request format (no InfoRequestBody)")
2416        exp = req.get('experiment', None)
2417        legacy = req.get('legacy', False)
2418        fresh = req.get('fresh', False)
2419        if exp:
2420            if exp.has_key('fedid'):
2421                key = exp['fedid']
2422                keytype = "fedid"
2423            elif exp.has_key('localname'):
2424                key = exp['localname']
2425                keytype = "localname"
2426            else:
2427                raise service_error(service_error.req, "Unknown lookup type")
2428        else:
2429            raise service_error(service_error.req, "No request?")
2430
2431        try:
2432            proof = self.check_experiment_access(fid, key)
2433        except service_error, e:
2434            self.log.info("Info call failed for %s: access denied" %  fid)
2435
2436
2437        self.update_info(key, fresh)
2438
2439        self.state_lock.acquire()
2440        if self.state.has_key(key):
2441            rv = self.state[key].get_info()
2442            # Copy the topo if we need legacy annotations
2443            if legacy:
2444                top = self.state[key].top
2445                if top is not None: top = top.clone()
2446        self.state_lock.release()
2447        self.log.info("Gathered Info for %s %s" % (key, fid))
2448
2449        # If the legacy visualization and topology representations are
2450        # requested, calculate them and add them to the return.
2451        if legacy and rv is not None:
2452            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2453            if top is not None:
2454                vtopo = topdl.topology_to_vtopo(top)
2455                if vtopo is not None:
2456                    rv['vtopo'] = vtopo
2457                    try:
2458                        vis = self.genviz(vtopo)
2459                    except service_error, e:
2460                        self.log.debug('Problem generating visualization: %s' \
2461                                % e)
2462                        vis = None
2463                    if vis is not None:
2464                        rv['vis'] = vis
2465        if rv:
2466            self.log.info("Info succeded for %s %s" % (key, fid))
2467            rv['proof'] = proof.to_dict()
2468            return rv
2469        else: 
2470            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2471            raise service_error(service_error.req, "No such experiment")
2472
2473    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2474            results):
2475        """
2476        Call OperateSegment on multiple testbeds and gather the results.
2477        op_params contains the parameters needed to contact that testbed, cert
2478        is a certificate containing the fedid to use, op is the operation,
2479        testbeds is a dict mapping testbed name to targets in that testbed,
2480        params are the parameters to include a,d results is a growing list of
2481        the results of the calls.
2482        """
2483        try:
2484            tmpdir = tempfile.mkdtemp(prefix="info-")
2485        except EnvironmentError:
2486            raise service_error(service_error.internal, 
2487                    "Cannot create tmp dir")
2488        cert_file = self.make_temp_certfile(cert, tmpdir)
2489
2490        try:
2491            for tb, targets in testbeds.items():
2492                if tb in op_params:
2493                    uri, aid = op_params[tb]
2494                    operate=self.operation_segment(log=self.log, testbed=uri,
2495                                cert_file=cert_file, cert_pwd=None,
2496                                trusted_certs=self.trusted_certs,
2497                                caller=self.call_OperationSegment)
2498                    if operate(uri, aid, op, targets, params):
2499                        if operate.status is not None:
2500                            results.extend(operate.status)
2501                            continue
2502                # Something went wrong in a weird way.  Add statuses
2503                # that reflect that to results
2504                for t in targets:
2505                    results.append(operation_status(t, 
2506                        operation_status.federant,
2507                        'Unexpected error on %s' % tb))
2508        # Clean up the tmpdir no matter what
2509        finally:
2510            if tmpdir: self.remove_dirs(tmpdir)
2511
2512    def do_operation(self, req, fid):
2513        """
2514        Find the testbeds holding each target and ask them to carry out the
2515        operation.  Return the statuses.
2516        """
2517        # Map an element to the testbed containing it
2518        def element_to_tb(e):
2519            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2520            elif isinstance(e, topdl.Testbed): return e.name
2521            else: return None
2522        # If d is an operation_status object, make it a dict
2523        def make_dict(d):
2524            if isinstance(d, dict): return d
2525            elif isinstance(d, operation_status): return d.to_dict()
2526            else: return { }
2527
2528        def element_name(e):
2529            if isinstance(e, topdl.Computer): return e.name
2530            elif isinstance(e, topdl.Testbed): 
2531                if e.localname: return e.localname[0]
2532                else: return None
2533            else: return None
2534
2535        self.log.info("Operation call started for %s" %  fid)
2536        req = req.get('OperationRequestBody', None)
2537        if not req:
2538            raise service_error(service_error.req,
2539                    "Bad request format (no OperationRequestBody)")
2540        exp = req.get('experiment', None)
2541        op = req.get('operation', None)
2542        targets = set(req.get('target', []))
2543        params = req.get('parameter', None)
2544
2545        if exp:
2546            if 'fedid' in exp:
2547                key = exp['fedid']
2548                keytype = "fedid"
2549            elif 'localname' in exp:
2550                key = exp['localname']
2551                keytype = "localname"
2552            else:
2553                raise service_error(service_error.req, "Unknown lookup type")
2554        else:
2555            raise service_error(service_error.req, "No request?")
2556
2557        if op is None or not targets:
2558            raise service_error(service_error.req, "No request?")
2559
2560        try:
2561            proof = self.check_experiment_access(fid, key)
2562        except service_error, e:
2563            self.log.info("Operation call failed for %s: access denied" %  fid)
2564            raise e
2565
2566        self.state_lock.acquire()
2567        if key in self.state:
2568            d1, op_params, cert, d2 = \
2569                    self.get_segment_info(self.state[key], need_lock=False,
2570                            key='tb')
2571            top = self.state[key].top
2572            if top is not None:
2573                top = top.clone()
2574        self.state_lock.release()
2575
2576        if top is None:
2577            self.log.info("Operation call failed for %s: not active" %  fid)
2578            raise service_error(service_error.partial, "No topology yet", 
2579                    proof=proof)
2580
2581        testbeds = { }
2582        results = []
2583        for e in top.elements:
2584            ename = element_name(e)
2585            if ename in targets:
2586                tb = element_to_tb(e)
2587                targets.remove(ename)
2588                if tb is not None:
2589                    if tb in testbeds: testbeds[tb].append(ename)
2590                    else: testbeds[tb] = [ ename ]
2591                else:
2592                    results.append(operation_status(e.name, 
2593                        code=operation_status.no_target, 
2594                        description='Cannot map target to testbed'))
2595
2596        for t in targets:
2597            results.append(operation_status(t, operation_status.no_target))
2598
2599        self.operate_on_segments(op_params, cert, op, testbeds, params,
2600                results)
2601
2602        self.log.info("Operation call succeeded for %s" %  fid)
2603        return { 
2604                'experiment': exp, 
2605                'status': [make_dict(r) for r in results],
2606                'proof': proof.to_dict()
2607                }
2608
2609
2610    def get_multi_info(self, req, fid):
2611        """
2612        Return all the stored info that this fedid can access
2613        """
2614        rv = { 'info': [ ], 'proof': [ ] }
2615
2616        self.log.info("Multi Info call started for %s" %  fid)
2617        self.state_lock.acquire()
2618        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2619            try:
2620                proof = self.check_experiment_access(fid, key)
2621            except service_error, e:
2622                if e.code == service_error.access:
2623                    continue
2624                else:
2625                    self.log.info("Multi Info call failed for %s: %s" %  \
2626                            (e,fid))
2627                    self.state_lock.release()
2628                    raise e
2629
2630            if self.state.has_key(key):
2631                e = self.state[key].get_info()
2632                e['proof'] = proof.to_dict()
2633                rv['info'].append(e)
2634                rv['proof'].append(proof.to_dict())
2635        self.state_lock.release()
2636        self.log.info("Multi Info call succeeded for %s" %  fid)
2637        return rv
2638
2639    def check_termination_status(self, fed_exp, force):
2640        """
2641        Confirm that the experiment is sin a valid state to stop (or force it)
2642        return the state - invalid states for deletion and force settings cause
2643        exceptions.
2644        """
2645        self.state_lock.acquire()
2646        status = fed_exp.status
2647
2648        if status:
2649            if status in ('starting', 'terminating'):
2650                if not force:
2651                    self.state_lock.release()
2652                    raise service_error(service_error.partial, 
2653                            'Experiment still being created or destroyed')
2654                else:
2655                    self.log.warning('Experiment in %s state ' % status + \
2656                            'being terminated by force.')
2657            self.state_lock.release()
2658            return status
2659        else:
2660            # No status??? trouble
2661            self.state_lock.release()
2662            raise service_error(service_error.internal,
2663                    "Experiment has no status!?")
2664
2665    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2666        ids = []
2667        term_params = { }
2668        if need_lock: self.state_lock.acquire()
2669        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2670        expcert = fed_exp.identity
2671        repo = "%s" % fed_exp.fedid
2672
2673        # Collect the allocation/segment ids into a dict keyed by the fedid
2674        # of the allocation that contains a tuple of uri, aid
2675        for i, fed in enumerate(fed_exp.get_all_allocations()):
2676            uri = fed.uri
2677            aid = fed.allocID
2678            if key == 'aid': term_params[aid] = (uri, aid)
2679            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2680
2681        if need_lock: self.state_lock.release()
2682        return ids, term_params, expcert, repo
2683
2684
2685    def get_termination_info(self, fed_exp):
2686        self.state_lock.acquire()
2687        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2688        # Change the experiment state
2689        fed_exp.status = 'terminating'
2690        fed_exp.updated()
2691        if self.state_filename: self.write_state()
2692        self.state_lock.release()
2693
2694        return ids, term_params, expcert, repo
2695
2696
2697    def deallocate_resources(self, term_params, expcert, status, force, 
2698            dealloc_log):
2699        tmpdir = None
2700        # This try block makes sure the tempdir is cleared
2701        try:
2702            # If no expcert, try the deallocation as the experiment
2703            # controller instance.
2704            if expcert and self.auth_type != 'legacy': 
2705                try:
2706                    tmpdir = tempfile.mkdtemp(prefix="term-")
2707                except EnvironmentError:
2708                    raise service_error(service_error.internal, 
2709                            "Cannot create tmp dir")
2710                cert_file = self.make_temp_certfile(expcert, tmpdir)
2711                pw = None
2712            else: 
2713                cert_file = self.cert_file
2714                pw = self.cert_pwd
2715
2716            # Stop everyone.  NB, wait_for_all waits until a thread starts
2717            # and then completes, so we can't wait if nothing starts.  So,
2718            # no tbparams, no start.
2719            if len(term_params) > 0:
2720                tp = thread_pool(self.nthreads)
2721                for k, (uri, aid) in term_params.items():
2722                    # Create and start a thread to stop the segment
2723                    tp.wait_for_slot()
2724                    t  = pooled_thread(\
2725                            target=self.terminate_segment(log=dealloc_log,
2726                                testbed=uri,
2727                                cert_file=cert_file, 
2728                                cert_pwd=pw,
2729                                trusted_certs=self.trusted_certs,
2730                                caller=self.call_TerminateSegment),
2731                            args=(uri, aid), name=k,
2732                            pdata=tp, trace_file=self.trace_file)
2733                    t.start()
2734                # Wait for completions
2735                tp.wait_for_all_done()
2736
2737            # release the allocations (failed experiments have done this
2738            # already, and starting experiments may be in odd states, so we
2739            # ignore errors releasing those allocations
2740            try: 
2741                for k, (uri, aid)  in term_params.items():
2742                    self.release_access(None, aid, uri=uri,
2743                            cert_file=cert_file, cert_pwd=pw)
2744            except service_error, e:
2745                if status != 'failed' and not force:
2746                    raise e
2747
2748        # Clean up the tmpdir no matter what
2749        finally:
2750            if tmpdir: self.remove_dirs(tmpdir)
2751
2752    def terminate_experiment(self, req, fid):
2753        """
2754        Swap this experiment out on the federants and delete the shared
2755        information
2756        """
2757        self.log.info("Terminate experiment call started for %s" % fid)
2758        tbparams = { }
2759        req = req.get('TerminateRequestBody', None)
2760        if not req:
2761            raise service_error(service_error.req,
2762                    "Bad request format (no TerminateRequestBody)")
2763
2764        key = self.get_experiment_key(req, 'experiment')
2765        try:
2766            proof = self.check_experiment_access(fid, key)
2767        except service_error, e:
2768            self.log.info(
2769                    "Terminate experiment call failed for %s: access denied" \
2770                            % fid)
2771            raise e
2772        exp = req.get('experiment', False)
2773        force = req.get('force', False)
2774
2775        dealloc_list = [ ]
2776
2777
2778        # Create a logger that logs to the dealloc_list as well as to the main
2779        # log file.
2780        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2781        dealloc_log.info("Terminating %s " %key)
2782        h = logging.StreamHandler(self.list_log(dealloc_list))
2783        # XXX: there should be a global one of these rather than repeating the
2784        # code.
2785        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2786                    '%d %b %y %H:%M:%S'))
2787        dealloc_log.addHandler(h)
2788
2789        self.state_lock.acquire()
2790        fed_exp = self.state.get(key, None)
2791        self.state_lock.release()
2792        repo = None
2793
2794        if fed_exp:
2795            status = self.check_termination_status(fed_exp, force)
2796            # get_termination_info updates the experiment state
2797            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2798            self.deallocate_resources(term_params, expcert, status, force, 
2799                    dealloc_log)
2800
2801            # Remove the terminated experiment
2802            self.state_lock.acquire()
2803            for id in ids:
2804                self.clear_experiment_authorization(id, need_state_lock=False)
2805                if id in self.state: del self.state[id]
2806
2807            if self.state_filename: self.write_state()
2808            self.state_lock.release()
2809
2810            # Delete any synch points associated with this experiment.  All
2811            # synch points begin with the fedid of the experiment.
2812            fedid_keys = set(["fedid:%s" % f for f in ids \
2813                    if isinstance(f, fedid)])
2814            for k in self.synch_store.all_keys():
2815                try:
2816                    if len(k) > 45 and k[0:46] in fedid_keys:
2817                        self.synch_store.del_value(k)
2818                except synch_store.BadDeletionError:
2819                    pass
2820            self.write_store()
2821
2822            # Remove software and other cached stuff from the filesystem.
2823            if repo:
2824                self.remove_dirs("%s/%s" % (self.repodir, repo))
2825       
2826            self.log.info("Terminate experiment succeeded for %s %s" % \
2827                    (key, fid))
2828            return { 
2829                    'experiment': exp , 
2830                    'deallocationLog': string.join(dealloc_list, ''),
2831                    'proof': [proof.to_dict()],
2832                    }
2833        else:
2834            self.log.info("Terminate experiment failed for %s %s: no state" % \
2835                    (key, fid))
2836            raise service_error(service_error.req, "No saved state")
2837
2838
2839    def GetValue(self, req, fid):
2840        """
2841        Get a value from the synchronized store
2842        """
2843        req = req.get('GetValueRequestBody', None)
2844        if not req:
2845            raise service_error(service_error.req,
2846                    "Bad request format (no GetValueRequestBody)")
2847       
2848        name = req.get('name', None)
2849        wait = req.get('wait', False)
2850        rv = { 'name': name }
2851
2852        if not name:
2853            raise service_error(service_error.req, "No name?")
2854
2855        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2856
2857        if access_ok:
2858            self.log.debug("[GetValue] asking for %s " % name)
2859            try:
2860                v = self.synch_store.get_value(name, wait)
2861            except synch_store.RevokedKeyError:
2862                # No more synch on this key
2863                raise service_error(service_error.federant, 
2864                        "Synch key %s revoked" % name)
2865            if v is not None:
2866                rv['value'] = v
2867            rv['proof'] = proof.to_dict()
2868            self.log.debug("[GetValue] got %s from %s" % (v, name))
2869            return rv
2870        else:
2871            raise service_error(service_error.access, "Access Denied",
2872                    proof=proof)
2873       
2874
2875    def SetValue(self, req, fid):
2876        """
2877        Set a value in the synchronized store
2878        """
2879        req = req.get('SetValueRequestBody', None)
2880        if not req:
2881            raise service_error(service_error.req,
2882                    "Bad request format (no SetValueRequestBody)")
2883       
2884        name = req.get('name', None)
2885        v = req.get('value', '')
2886
2887        if not name:
2888            raise service_error(service_error.req, "No name?")
2889
2890        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2891
2892        if access_ok:
2893            try:
2894                self.synch_store.set_value(name, v)
2895                self.write_store()
2896                self.log.debug("[SetValue] set %s to %s" % (name, v))
2897            except synch_store.CollisionError:
2898                # Translate into a service_error
2899                raise service_error(service_error.req,
2900                        "Value already set: %s" %name)
2901            except synch_store.RevokedKeyError:
2902                # No more synch on this key
2903                raise service_error(service_error.federant, 
2904                        "Synch key %s revoked" % name)
2905                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2906        else:
2907            raise service_error(service_error.access, "Access Denied",
2908                    proof=proof)
Note: See TracBrowser for help on using the repository browser.