source: fedd/federation/experiment_control.py @ d0912be

compt_changesinfo-ops
Last change on this file since d0912be was d0912be, checked in by Ted Faber <faber@…>, 8 years ago

Need to keep track of service records to update.

  • Property mode set to 100644
File size: 83.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from experiment_control_legacy import experiment_control_legacy
34from authorizer import abac_authorizer
35from thread_pool import thread_pool, pooled_thread
36
37import topdl
38import list_log
39from ip_allocator import ip_allocator
40from ip_addr import ip_addr
41
42
43class nullHandler(logging.Handler):
44    def emit(self, record): pass
45
46fl = logging.getLogger("fedd.experiment_control")
47fl.addHandler(nullHandler())
48
49
50# Right now, no support for composition.
51class federated_service:
52    def __init__(self, name, exporter=None, importers=None, params=None, 
53            reqs=None, portal=None):
54        self.name=name
55        self.exporter=exporter
56        if importers is None: self.importers = []
57        else: self.importers=importers
58        if params is None: self.params = { }
59        else: self.params = params
60        if reqs is None: self.reqs = []
61        else: self.reqs = reqs
62
63        if portal is not None:
64            self.portal = portal
65        else:
66            self.portal = (name in federated_service.needs_portal)
67
68    def __str__(self):
69        return "name %s export %s import %s params %s reqs %s" % \
70                (self.name, self.exporter, self.importers, self.params,
71                        [ (r['name'], r['visibility']) for r in self.reqs] )
72
73    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
74
75class experiment_control_local(experiment_control_legacy):
76    """
77    Control of experiments that this system can directly access.
78
79    Includes experiment creation, termination and information dissemination.
80    Thred safe.
81    """
82
83    class ssh_cmd_timeout(RuntimeError): pass
84   
85    call_RequestAccess = service_caller('RequestAccess')
86    call_ReleaseAccess = service_caller('ReleaseAccess')
87    call_StartSegment = service_caller('StartSegment')
88    call_TerminateSegment = service_caller('TerminateSegment')
89    call_Ns2Topdl = service_caller('Ns2Topdl')
90
91    def __init__(self, config=None, auth=None):
92        """
93        Intialize the various attributes, most from the config object
94        """
95
96        def parse_tarfile_list(tf):
97            """
98            Parse a tarfile list from the configuration.  This is a set of
99            paths and tarfiles separated by spaces.
100            """
101            rv = [ ]
102            if tf is not None:
103                tl = tf.split()
104                while len(tl) > 1:
105                    p, t = tl[0:2]
106                    del tl[0:2]
107                    rv.append((p, t))
108            return rv
109
110        self.list_log = list_log.list_log
111
112        self.cert_file = config.get("experiment_control", "cert_file")
113        if self.cert_file:
114            self.cert_pwd = config.get("experiment_control", "cert_pwd")
115        else:
116            self.cert_file = config.get("globals", "cert_file")
117            self.cert_pwd = config.get("globals", "cert_pwd")
118
119        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
120                or config.get("globals", "trusted_certs")
121
122        self.repodir = config.get("experiment_control", "repodir")
123        self.repo_url = config.get("experiment_control", "repo_url", 
124                "https://users.isi.deterlab.net:23235");
125
126        self.exp_stem = "fed-stem"
127        self.log = logging.getLogger("fedd.experiment_control")
128        set_log_level(config, "experiment_control", self.log)
129        self.muxmax = 2
130        self.nthreads = 10
131        self.randomize_experiments = False
132
133        self.splitter = None
134        self.ssh_keygen = "/usr/bin/ssh-keygen"
135        self.ssh_identity_file = None
136
137
138        self.debug = config.getboolean("experiment_control", "create_debug")
139        self.cleanup = not config.getboolean("experiment_control", 
140                "leave_tmpfiles")
141        self.state_filename = config.get("experiment_control", 
142                "experiment_state")
143        self.store_filename = config.get("experiment_control", 
144                "synch_store")
145        self.store_url = config.get("experiment_control", "store_url")
146        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
147        self.fedkit = parse_tarfile_list(\
148                config.get("experiment_control", "fedkit"))
149        self.gatewaykit = parse_tarfile_list(\
150                config.get("experiment_control", "gatewaykit"))
151        accessdb_file = config.get("experiment_control", "accessdb")
152
153        dt = config.get("experiment_control", "direct_transit")
154        self.auth_type = config.get('experiment_control', 'auth_type') \
155                or 'legacy'
156        self.auth_dir = config.get('experiment_control', 'auth_dir')
157        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
158        else: self.direct_transit = [ ]
159        # NB for internal master/slave ops, not experiment setup
160        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
161
162        self.overrides = set([])
163        ovr = config.get('experiment_control', 'overrides')
164        if ovr:
165            for o in ovr.split(","):
166                o = o.strip()
167                if o.startswith('fedid:'): o = o[len('fedid:'):]
168                self.overrides.add(fedid(hexstr=o))
169
170        self.state = { }
171        self.state_lock = Lock()
172        self.tclsh = "/usr/local/bin/otclsh"
173        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
174                config.get("experiment_control", "tcl_splitter",
175                        "/usr/testbed/lib/ns2ir/parse.tcl")
176        mapdb_file = config.get("experiment_control", "mapdb")
177        self.trace_file = sys.stderr
178
179        self.def_expstart = \
180                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
181                "/tmp/federate";
182        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
183                "FEDDIR/hosts";
184        self.def_gwstart = \
185                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
186                "/tmp/bridge.log";
187        self.def_mgwstart = \
188                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
189                "/tmp/bridge.log";
190        self.def_gwimage = "FBSD61-TUNNEL2";
191        self.def_gwtype = "pc";
192        self.local_access = { }
193
194        if self.auth_type == 'legacy':
195            if auth:
196                self.auth = auth
197            else:
198                self.log.error( "[access]: No authorizer initialized, " +\
199                        "creating local one.")
200                auth = authorizer()
201            self.get_access = self.legacy_get_access
202        elif self.auth_type == 'abac':
203            self.auth = abac_authorizer(load=self.auth_dir)
204        else:
205            raise service_error(service_error.internal, 
206                    "Unknown auth_type: %s" % self.auth_type)
207
208        if mapdb_file:
209            self.read_mapdb(mapdb_file)
210        else:
211            self.log.warn("[experiment_control] No testbed map, using defaults")
212            self.tbmap = { 
213                    'deter':'https://users.isi.deterlab.net:23235',
214                    'emulab':'https://users.isi.deterlab.net:23236',
215                    'ucb':'https://users.isi.deterlab.net:23237',
216                    }
217
218        if accessdb_file:
219                self.read_accessdb(accessdb_file)
220        else:
221            raise service_error(service_error.internal,
222                    "No accessdb specified in config")
223
224        # Grab saved state.  OK to do this w/o locking because it's read only
225        # and only one thread should be in existence that can see self.state at
226        # this point.
227        if self.state_filename:
228            self.read_state()
229
230        if self.store_filename:
231            self.read_store()
232        else:
233            self.log.warning("No saved synch store")
234            self.synch_store = synch_store
235
236        # Dispatch tables
237        self.soap_services = {\
238                'New': soap_handler('New', self.new_experiment),
239                'Create': soap_handler('Create', self.create_experiment),
240                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
241                'Vis': soap_handler('Vis', self.get_vis),
242                'Info': soap_handler('Info', self.get_info),
243                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
244                'Terminate': soap_handler('Terminate', 
245                    self.terminate_experiment),
246                'GetValue': soap_handler('GetValue', self.GetValue),
247                'SetValue': soap_handler('SetValue', self.SetValue),
248        }
249
250        self.xmlrpc_services = {\
251                'New': xmlrpc_handler('New', self.new_experiment),
252                'Create': xmlrpc_handler('Create', self.create_experiment),
253                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
254                'Vis': xmlrpc_handler('Vis', self.get_vis),
255                'Info': xmlrpc_handler('Info', self.get_info),
256                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
257                'Terminate': xmlrpc_handler('Terminate',
258                    self.terminate_experiment),
259                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
260                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
261        }
262
263    # Call while holding self.state_lock
264    def write_state(self):
265        """
266        Write a new copy of experiment state after copying the existing state
267        to a backup.
268
269        State format is a simple pickling of the state dictionary.
270        """
271        if os.access(self.state_filename, os.W_OK):
272            copy_file(self.state_filename, \
273                    "%s.bak" % self.state_filename)
274        try:
275            f = open(self.state_filename, 'w')
276            pickle.dump(self.state, f)
277        except EnvironmentError, e:
278            self.log.error("Can't write file %s: %s" % \
279                    (self.state_filename, e))
280        except pickle.PicklingError, e:
281            self.log.error("Pickling problem: %s" % e)
282        except TypeError, e:
283            self.log.error("Pickling problem (TypeError): %s" % e)
284
285    @staticmethod
286    def get_alloc_ids(state):
287        """
288        Pull the fedids of the identifiers of each allocation from the
289        state.  Again, a dict dive that's best isolated.
290
291        Used by read_store and read state
292        """
293
294        return [ f['allocID']['fedid'] 
295                for f in state.get('federant',[]) \
296                    if f.has_key('allocID') and \
297                        f['allocID'].has_key('fedid')]
298
299    # Call while holding self.state_lock
300    def read_state(self):
301        """
302        Read a new copy of experiment state.  Old state is overwritten.
303
304        State format is a simple pickling of the state dictionary.
305        """
306       
307        def get_experiment_id(state):
308            """
309            Pull the fedid experimentID out of the saved state.  This is kind
310            of a gross walk through the dict.
311            """
312
313            if state.has_key('experimentID'):
314                for e in state['experimentID']:
315                    if e.has_key('fedid'):
316                        return e['fedid']
317                else:
318                    return None
319            else:
320                return None
321
322        try:
323            f = open(self.state_filename, "r")
324            self.state = pickle.load(f)
325            self.log.debug("[read_state]: Read state from %s" % \
326                    self.state_filename)
327        except EnvironmentError, e:
328            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
329                    % (self.state_filename, e))
330        except pickle.UnpicklingError, e:
331            self.log.warning(("[read_state]: No saved state: " + \
332                    "Unpickling failed: %s") % e)
333       
334        for s in self.state.values():
335            try:
336
337                eid = get_experiment_id(s)
338                if eid : 
339                    if self.auth_type == 'legacy':
340                        # XXX: legacy
341                        # Give the owner rights to the experiment
342                        self.auth.set_attribute(s['owner'], eid)
343                        # And holders of the eid as well
344                        self.auth.set_attribute(eid, eid)
345                        # allow overrides to control experiments as well
346                        for o in self.overrides:
347                            self.auth.set_attribute(o, eid)
348                        # Set permissions to allow reading of the software
349                        # repo, if any, as well.
350                        for a in self.get_alloc_ids(s):
351                            self.auth.set_attribute(a, 'repo/%s' % eid)
352                else:
353                    raise KeyError("No experiment id")
354            except KeyError, e:
355                self.log.warning("[read_state]: State ownership or identity " +\
356                        "misformatted in %s: %s" % (self.state_filename, e))
357
358
359    def read_accessdb(self, accessdb_file):
360        """
361        Read the mapping from fedids that can create experiments to their name
362        in the 3-level access namespace.  All will be asserted from this
363        testbed and can include the local username and porject that will be
364        asserted on their behalf by this fedd.  Each fedid is also added to the
365        authorization system with the "create" attribute.
366        """
367        self.accessdb = {}
368        # These are the regexps for parsing the db
369        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
370        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
371                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
372        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
373                "\s*->\s*(" + name_expr + ")\s*$")
374        lineno = 0
375
376        # Parse the mappings and store in self.authdb, a dict of
377        # fedid -> (proj, user)
378        try:
379            f = open(accessdb_file, "r")
380            for line in f:
381                lineno += 1
382                line = line.strip()
383                if len(line) == 0 or line.startswith('#'):
384                    continue
385                m = project_line.match(line)
386                if m:
387                    fid = fedid(hexstr=m.group(1))
388                    project, user = m.group(2,3)
389                    if not self.accessdb.has_key(fid):
390                        self.accessdb[fid] = []
391                    self.accessdb[fid].append((project, user))
392                    continue
393
394                m = user_line.match(line)
395                if m:
396                    fid = fedid(hexstr=m.group(1))
397                    project = None
398                    user = m.group(2)
399                    if not self.accessdb.has_key(fid):
400                        self.accessdb[fid] = []
401                    self.accessdb[fid].append((project, user))
402                    continue
403                self.log.warn("[experiment_control] Error parsing access " +\
404                        "db %s at line %d" %  (accessdb_file, lineno))
405        except EnvironmentError:
406            raise service_error(service_error.internal,
407                    ("Error opening/reading %s as experiment " +\
408                            "control accessdb") %  accessdb_file)
409        f.close()
410
411        # Initialize the authorization attributes
412        # XXX: legacy
413        if self.auth_type == 'legacy':
414            for fid in self.accessdb.keys():
415                self.auth.set_attribute(fid, 'create')
416                self.auth.set_attribute(fid, 'new')
417
418    def read_mapdb(self, file):
419        """
420        Read a simple colon separated list of mappings for the
421        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
422        """
423
424        self.tbmap = { }
425        lineno =0
426        try:
427            f = open(file, "r")
428            for line in f:
429                lineno += 1
430                line = line.strip()
431                if line.startswith('#') or len(line) == 0:
432                    continue
433                try:
434                    label, url = line.split(':', 1)
435                    self.tbmap[label] = url
436                except ValueError, e:
437                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
438                            "map db: %s %s" % (lineno, line, e))
439        except EnvironmentError, e:
440            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
441                    "open %s: %s" % (file, e))
442        else:
443            f.close()
444
445    def read_store(self):
446        try:
447            self.synch_store = synch_store()
448            self.synch_store.load(self.store_filename)
449            self.log.debug("[read_store]: Read store from %s" % \
450                    self.store_filename)
451        except EnvironmentError, e:
452            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
453                    % (self.state_filename, e))
454            self.synch_store = synch_store()
455
456        # Set the initial permissions on data in the store.  XXX: This ad hoc
457        # authorization attribute initialization is getting out of hand.
458        # XXX: legacy
459        if self.auth_type == 'legacy':
460            for k in self.synch_store.all_keys():
461                try:
462                    if k.startswith('fedid:'):
463                        fid = fedid(hexstr=k[6:46])
464                        if self.state.has_key(fid):
465                            for a in self.get_alloc_ids(self.state[fid]):
466                                self.auth.set_attribute(a, k)
467                except ValueError, e:
468                    self.log.warn("Cannot deduce permissions for %s" % k)
469
470
471    def write_store(self):
472        """
473        Write a new copy of synch_store after writing current state
474        to a backup.  We use the internal synch_store pickle method to avoid
475        incinsistent data.
476
477        State format is a simple pickling of the store.
478        """
479        if os.access(self.store_filename, os.W_OK):
480            copy_file(self.store_filename, \
481                    "%s.bak" % self.store_filename)
482        try:
483            self.synch_store.save(self.store_filename)
484        except EnvironmentError, e:
485            self.log.error("Can't write file %s: %s" % \
486                    (self.store_filename, e))
487        except TypeError, e:
488            self.log.error("Pickling problem (TypeError): %s" % e)
489
490
491    def remove_dirs(self, dir):
492        """
493        Remove the directory tree and all files rooted at dir.  Log any errors,
494        but continue.
495        """
496        self.log.debug("[removedirs]: removing %s" % dir)
497        try:
498            for path, dirs, files in os.walk(dir, topdown=False):
499                for f in files:
500                    os.remove(os.path.join(path, f))
501                for d in dirs:
502                    os.rmdir(os.path.join(path, d))
503            os.rmdir(dir)
504        except EnvironmentError, e:
505            self.log.error("Error deleting directory tree in %s" % e);
506
507    @staticmethod
508    def make_temp_certfile(expcert, tmpdir):
509        """
510        make a protected copy of the access certificate so the experiment
511        controller can act as the experiment principal.  mkstemp is the most
512        secure way to do that. The directory should be created by
513        mkdtemp.  Return the filename.
514        """
515        if expcert and tmpdir:
516            try:
517                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
518                f = os.fdopen(certf, 'w')
519                print >> f, expcert
520                f.close()
521            except EnvironmentError, e:
522                raise service_error(service_error.internal, 
523                        "Cannot create temp cert file?")
524            return certfn
525        else:
526            return None
527
528       
529    def generate_ssh_keys(self, dest, type="rsa" ):
530        """
531        Generate a set of keys for the gateways to use to talk.
532
533        Keys are of type type and are stored in the required dest file.
534        """
535        valid_types = ("rsa", "dsa")
536        t = type.lower();
537        if t not in valid_types: raise ValueError
538        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
539
540        try:
541            trace = open("/dev/null", "w")
542        except EnvironmentError:
543            raise service_error(service_error.internal,
544                    "Cannot open /dev/null??");
545
546        # May raise CalledProcessError
547        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
548        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
549        if rv != 0:
550            raise service_error(service_error.internal, 
551                    "Cannot generate nonce ssh keys.  %s return code %d" \
552                            % (self.ssh_keygen, rv))
553
554    def gentopo(self, str):
555        """
556        Generate the topology data structure from the splitter's XML
557        representation of it.
558
559        The topology XML looks like:
560            <experiment>
561                <nodes>
562                    <node><vname></vname><ips>ip1:ip2</ips></node>
563                </nodes>
564                <lans>
565                    <lan>
566                        <vname></vname><vnode></vnode><ip></ip>
567                        <bandwidth></bandwidth><member>node:port</member>
568                    </lan>
569                </lans>
570        """
571        class topo_parse:
572            """
573            Parse the topology XML and create the dats structure.
574            """
575            def __init__(self):
576                # Typing of the subelements for data conversion
577                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
578                self.int_subelements = ( 'bandwidth',)
579                self.float_subelements = ( 'delay',)
580                # The final data structure
581                self.nodes = [ ]
582                self.lans =  [ ]
583                self.topo = { \
584                        'node': self.nodes,\
585                        'lan' : self.lans,\
586                    }
587                self.element = { }  # Current element being created
588                self.chars = ""     # Last text seen
589
590            def end_element(self, name):
591                # After each sub element the contents is added to the current
592                # element or to the appropriate list.
593                if name == 'node':
594                    self.nodes.append(self.element)
595                    self.element = { }
596                elif name == 'lan':
597                    self.lans.append(self.element)
598                    self.element = { }
599                elif name in self.str_subelements:
600                    self.element[name] = self.chars
601                    self.chars = ""
602                elif name in self.int_subelements:
603                    self.element[name] = int(self.chars)
604                    self.chars = ""
605                elif name in self.float_subelements:
606                    self.element[name] = float(self.chars)
607                    self.chars = ""
608
609            def found_chars(self, data):
610                self.chars += data.rstrip()
611
612
613        tp = topo_parse();
614        parser = xml.parsers.expat.ParserCreate()
615        parser.EndElementHandler = tp.end_element
616        parser.CharacterDataHandler = tp.found_chars
617
618        parser.Parse(str)
619
620        return tp.topo
621       
622
623    def genviz(self, topo):
624        """
625        Generate the visualization the virtual topology
626        """
627
628        neato = "/usr/local/bin/neato"
629        # These are used to parse neato output and to create the visualization
630        # file.
631        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
632        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
633                "%s</type></node>"
634
635        try:
636            # Node names
637            nodes = [ n['vname'] for n in topo['node'] ]
638            topo_lans = topo['lan']
639        except KeyError, e:
640            raise service_error(service_error.internal, "Bad topology: %s" %e)
641
642        lans = { }
643        links = { }
644
645        # Walk through the virtual topology, organizing the connections into
646        # 2-node connections (links) and more-than-2-node connections (lans).
647        # When a lan is created, it's added to the list of nodes (there's a
648        # node in the visualization for the lan).
649        for l in topo_lans:
650            if links.has_key(l['vname']):
651                if len(links[l['vname']]) < 2:
652                    links[l['vname']].append(l['vnode'])
653                else:
654                    nodes.append(l['vname'])
655                    lans[l['vname']] = links[l['vname']]
656                    del links[l['vname']]
657                    lans[l['vname']].append(l['vnode'])
658            elif lans.has_key(l['vname']):
659                lans[l['vname']].append(l['vnode'])
660            else:
661                links[l['vname']] = [ l['vnode'] ]
662
663
664        # Open up a temporary file for dot to turn into a visualization
665        try:
666            df, dotname = tempfile.mkstemp()
667            dotfile = os.fdopen(df, 'w')
668        except EnvironmentError:
669            raise service_error(service_error.internal,
670                    "Failed to open file in genviz")
671
672        try:
673            dnull = open('/dev/null', 'w')
674        except EnvironmentError:
675            service_error(service_error.internal,
676                    "Failed to open /dev/null in genviz")
677
678        # Generate a dot/neato input file from the links, nodes and lans
679        try:
680            print >>dotfile, "graph G {"
681            for n in nodes:
682                print >>dotfile, '\t"%s"' % n
683            for l in links.keys():
684                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
685            for l in lans.keys():
686                for n in lans[l]:
687                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
688            print >>dotfile, "}"
689            dotfile.close()
690        except TypeError:
691            raise service_error(service_error.internal,
692                    "Single endpoint link in vtopo")
693        except EnvironmentError:
694            raise service_error(service_error.internal, "Cannot write dot file")
695
696        # Use dot to create a visualization
697        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
698                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
699                close_fds=True)
700        dnull.close()
701
702        # Translate dot to vis format
703        vis_nodes = [ ]
704        vis = { 'node': vis_nodes }
705        for line in dot.stdout:
706            m = vis_re.match(line)
707            if m:
708                vn = m.group(1)
709                vis_node = {'name': vn, \
710                        'x': float(m.group(2)),\
711                        'y' : float(m.group(3)),\
712                    }
713                if vn in links.keys() or vn in lans.keys():
714                    vis_node['type'] = 'lan'
715                else:
716                    vis_node['type'] = 'node'
717                vis_nodes.append(vis_node)
718        rv = dot.wait()
719
720        os.remove(dotname)
721        if rv == 0 : return vis
722        else: return None
723
724
725    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
726            cert_pwd=None):
727        """
728        Release access to testbed through fedd
729        """
730
731        if not uri and tbmap:
732            uri = tbmap.get(tb, None)
733        if not uri:
734            raise service_error(service_error.server_config, 
735                    "Unknown testbed: %s" % tb)
736
737        if self.local_access.has_key(uri):
738            resp = self.local_access[uri].ReleaseAccess(\
739                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
740                    fedid(file=cert_file))
741            resp = { 'ReleaseAccessResponseBody': resp } 
742        else:
743            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
744                    cert_file, cert_pwd, self.trusted_certs)
745
746        # better error coding
747
748    def remote_ns2topdl(self, uri, desc):
749
750        req = {
751                'description' : { 'ns2description': desc },
752            }
753
754        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
755                self.trusted_certs)
756
757        if r.has_key('Ns2TopdlResponseBody'):
758            r = r['Ns2TopdlResponseBody']
759            ed = r.get('experimentdescription', None)
760            if ed.has_key('topdldescription'):
761                return topdl.Topology(**ed['topdldescription'])
762            else:
763                raise service_error(service_error.protocol, 
764                        "Bad splitter response (no output)")
765        else:
766            raise service_error(service_error.protocol, "Bad splitter response")
767
768    class start_segment:
769        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
770                cert_pwd=None, trusted_certs=None, caller=None,
771                log_collector=None):
772            self.log = log
773            self.debug = debug
774            self.cert_file = cert_file
775            self.cert_pwd = cert_pwd
776            self.trusted_certs = None
777            self.caller = caller
778            self.testbed = testbed
779            self.log_collector = log_collector
780            self.response = None
781            self.node = { }
782            self.proof = None
783
784        def make_map(self, resp):
785            for e in resp.get('embedding', []):
786                if 'toponame' in e and 'physname' in e:
787                    self.node[e['toponame']] = e['physname'][0]
788
789        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
790            req = {
791                    'allocID': { 'fedid' : aid }, 
792                    'segmentdescription': { 
793                        'topdldescription': topo.to_dict(),
794                    },
795                }
796
797            if connInfo:
798                req['connection'] = connInfo
799
800            import_svcs = [ s for m in masters.values() \
801                    for s in m if self.testbed in s.importers]
802
803            if import_svcs or self.testbed in masters:
804                req['service'] = []
805
806            for s in import_svcs:
807                for r in s.reqs:
808                    sr = copy.deepcopy(r)
809                    sr['visibility'] = 'import';
810                    req['service'].append(sr)
811
812            for s in masters.get(self.testbed, []):
813                for r in s.reqs:
814                    sr = copy.deepcopy(r)
815                    sr['visibility'] = 'export';
816                    req['service'].append(sr)
817
818            if attrs:
819                req['fedAttr'] = attrs
820
821            try:
822                self.log.debug("Calling StartSegment at %s " % uri)
823                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
824                        self.trusted_certs)
825                if r.has_key('StartSegmentResponseBody'):
826                    lval = r['StartSegmentResponseBody'].get('allocationLog',
827                            None)
828                    if lval and self.log_collector:
829                        for line in  lval.splitlines(True):
830                            self.log_collector.write(line)
831                    self.make_map(r['StartSegmentResponseBody'])
832                    if 'proof' in r: self.proof = r['proof']
833                    self.response = r
834                else:
835                    raise service_error(service_error.internal, 
836                            "Bad response!?: %s" %r)
837                return True
838            except service_error, e:
839                self.log.error("Start segment failed on %s: %s" % \
840                        (self.testbed, e))
841                return False
842
843
844
845    class terminate_segment:
846        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
847                cert_pwd=None, trusted_certs=None, caller=None):
848            self.log = log
849            self.debug = debug
850            self.cert_file = cert_file
851            self.cert_pwd = cert_pwd
852            self.trusted_certs = None
853            self.caller = caller
854            self.testbed = testbed
855
856        def __call__(self, uri, aid ):
857            req = {
858                    'allocID': aid , 
859                }
860            try:
861                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
862                        self.trusted_certs)
863                return True
864            except service_error, e:
865                self.log.error("Terminate segment failed on %s: %s" % \
866                        (self.testbed, e))
867                return False
868
869    def allocate_resources(self, allocated, masters, eid, expid, 
870            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
871            attrs=None, connInfo={}, tbmap=None, expcert=None):
872
873        started = { }           # Testbeds where a sub-experiment started
874                                # successfully
875
876        # XXX
877        fail_soft = False
878
879        if tbmap is None: tbmap = { }
880
881        log = alloc_log or self.log
882
883        tp = thread_pool(self.nthreads)
884        threads = [ ]
885        starters = [ ]
886
887        if expcert:
888            cert = expcert
889            pw = None
890        else:
891            cert = self.cert_file
892            pw = self.cert_pwd
893
894        for tb in allocated.keys():
895            # Create and start a thread to start the segment, and save it
896            # to get the return value later
897            tb_attrs = copy.copy(attrs)
898            tp.wait_for_slot()
899            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
900            base, suffix = split_testbed(tb)
901            if suffix:
902                tb_attrs.append({'attribute': 'experiment_name', 
903                    'value': "%s-%s" % (eid, suffix)})
904            else:
905                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
906            if not uri:
907                raise service_error(service_error.internal, 
908                        "Unknown testbed %s !?" % tb)
909
910            if tbparams[tb].has_key('allocID') and \
911                    tbparams[tb]['allocID'].has_key('fedid'):
912                aid = tbparams[tb]['allocID']['fedid']
913            else:
914                raise service_error(service_error.internal, 
915                        "No alloc id for testbed %s !?" % tb)
916
917            s = self.start_segment(log=log, debug=self.debug,
918                    testbed=tb, cert_file=cert,
919                    cert_pwd=pw, trusted_certs=self.trusted_certs,
920                    caller=self.call_StartSegment,
921                    log_collector=log_collector)
922            starters.append(s)
923            t  = pooled_thread(\
924                    target=s, name=tb,
925                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
926                    pdata=tp, trace_file=self.trace_file)
927            threads.append(t)
928            t.start()
929
930        # Wait until all finish (keep pinging the log, though)
931        mins = 0
932        revoked = False
933        while not tp.wait_for_all_done(60.0):
934            mins += 1
935            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
936                    % mins)
937            if not revoked and \
938                    len([ t.getName() for t in threads if t.rv == False]) > 0:
939                # a testbed has failed.  Revoke this experiment's
940                # synchronizarion values so that sub experiments will not
941                # deadlock waiting for synchronization that will never happen
942                self.log.info("A subexperiment has failed to swap in, " + \
943                        "revoking synch keys")
944                var_key = "fedid:%s" % expid
945                for k in self.synch_store.all_keys():
946                    if len(k) > 45 and k[0:46] == var_key:
947                        self.synch_store.revoke_key(k)
948                revoked = True
949
950        failed = [ t.getName() for t in threads if not t.rv ]
951        succeeded = [tb for tb in allocated.keys() if tb not in failed]
952
953        # If one failed clean up, unless fail_soft is set
954        if failed:
955            if not fail_soft:
956                tp.clear()
957                for tb in succeeded:
958                    # Create and start a thread to stop the segment
959                    tp.wait_for_slot()
960                    uri = tbparams[tb]['uri']
961                    t  = pooled_thread(\
962                            target=self.terminate_segment(log=log,
963                                testbed=tb,
964                                cert_file=cert, 
965                                cert_pwd=pw,
966                                trusted_certs=self.trusted_certs,
967                                caller=self.call_TerminateSegment),
968                            args=(uri, tbparams[tb]['federant']['allocID']),
969                            name=tb,
970                            pdata=tp, trace_file=self.trace_file)
971                    t.start()
972                # Wait until all finish (if any are being stopped)
973                if succeeded:
974                    tp.wait_for_all_done()
975
976                # release the allocations
977                for tb in tbparams.keys():
978                    try:
979                        self.release_access(tb, tbparams[tb]['allocID'], 
980                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
981                                cert_file=cert, cert_pwd=pw)
982                    except service_error, e:
983                        self.log.warn("Error releasing access: %s" % e.desc)
984                # Remove the placeholder
985                self.state_lock.acquire()
986                self.state[eid]['experimentStatus'] = 'failed'
987                if self.state_filename: self.write_state()
988                self.state_lock.release()
989                # Remove the repo dir
990                self.remove_dirs("%s/%s" %(self.repodir, expid))
991                # Walk up tmpdir, deleting as we go
992                if self.cleanup:
993                    self.remove_dirs(tmpdir)
994                else:
995                    log.debug("[start_experiment]: not removing %s" % tmpdir)
996
997
998                log.error("Swap in failed on %s" % ",".join(failed))
999                return
1000        else:
1001            # Walk through the successes and gather the virtual to physical
1002            # mapping.
1003            embedding = [ ]
1004            proofs = { }
1005            for s in starters:
1006                for k, v in s.node.items():
1007                    embedding.append({
1008                        'toponame': k, 
1009                        'physname': [ v],
1010                        'testbed': s.testbed
1011                        })
1012                if s.proof:
1013                    proofs[s.testbed] = s.proof
1014            log.info("[start_segment]: Experiment %s active" % eid)
1015
1016
1017        # Walk up tmpdir, deleting as we go
1018        if self.cleanup:
1019            self.remove_dirs(tmpdir)
1020        else:
1021            log.debug("[start_experiment]: not removing %s" % tmpdir)
1022
1023        # Insert the experiment into our state and update the disk copy.
1024        self.state_lock.acquire()
1025        self.state[expid]['experimentStatus'] = 'active'
1026        self.state[eid] = self.state[expid]
1027        self.state[eid]['experimentdescription']['topdldescription'] = \
1028                top.to_dict()
1029        self.state[eid]['embedding'] = embedding
1030        # Append startup proofs
1031        for f in self.state[eid]['federant']:
1032            if 'name' in f and 'localname' in f['name']:
1033                if f['name']['localname'] in proofs:
1034                    f['proof'].append(proofs[f['name']['localname']])
1035
1036        if self.state_filename: self.write_state()
1037        self.state_lock.release()
1038        return
1039
1040
1041    def add_kit(self, e, kit):
1042        """
1043        Add a Software object created from the list of (install, location)
1044        tuples passed as kit  to the software attribute of an object e.  We
1045        do this enough to break out the code, but it's kind of a hack to
1046        avoid changing the old tuple rep.
1047        """
1048
1049        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1050
1051        if isinstance(e.software, list): e.software.extend(s)
1052        else: e.software = s
1053
1054    def append_experiment_authorization(self, expid, attrs, 
1055            need_state_lock=True):
1056        """
1057        Append the authorization information to system state
1058        """
1059
1060        for p, a in attrs:
1061            self.auth.set_attribute(p, a)
1062        self.auth.save()
1063
1064        if need_state_lock: self.state_lock.acquire()
1065        self.state[expid]['auth'].update(attrs)
1066        if self.state_filename: self.write_state()
1067        if need_state_lock: self.state_lock.release()
1068
1069    def clear_experiment_authorization(self, expid, need_state_lock=True):
1070        """
1071        Attrs is a set of attribute principal pairs that need to be removed
1072        from the authenticator.  Remove them and save the authenticator.
1073        """
1074
1075        if need_state_lock: self.state_lock.acquire()
1076        if expid in self.state and 'auth' in self.state[expid]:
1077            for p, a in self.state[expid]['auth']:
1078                self.auth.unset_attribute(p, a)
1079            self.state[expid]['auth'] = set()
1080        if self.state_filename: self.write_state()
1081        if need_state_lock: self.state_lock.release()
1082        self.auth.save()
1083
1084
1085    def create_experiment_state(self, fid, req, expid, expcert,
1086            state='starting'):
1087        """
1088        Create the initial entry in the experiment's state.  The expid and
1089        expcert are the experiment's fedid and certifacte that represents that
1090        ID, which are installed in the experiment state.  If the request
1091        includes a suggested local name that is used if possible.  If the local
1092        name is already taken by an experiment owned by this user that has
1093        failed, it is overwritten.  Otherwise new letters are added until a
1094        valid localname is found.  The generated local name is returned.
1095        """
1096
1097        if req.has_key('experimentID') and \
1098                req['experimentID'].has_key('localname'):
1099            overwrite = False
1100            eid = req['experimentID']['localname']
1101            # If there's an old failed experiment here with the same local name
1102            # and accessible by this user, we'll overwrite it, otherwise we'll
1103            # fall through and do the collision avoidance.
1104            old_expid = self.get_experiment_fedid(eid)
1105            if old_expid and self.check_experiment_access(fid, old_expid):
1106                self.state_lock.acquire()
1107                status = self.state[eid].get('experimentStatus', None)
1108                if status and status == 'failed':
1109                    # remove the old access attributes
1110                    self.clear_experiment_authorization(eid,
1111                            need_state_lock=False)
1112                    overwrite = True
1113                    del self.state[eid]
1114                    del self.state[old_expid]
1115                self.state_lock.release()
1116            self.state_lock.acquire()
1117            while (self.state.has_key(eid) and not overwrite):
1118                eid += random.choice(string.ascii_letters)
1119            # Initial state
1120            self.state[eid] = {
1121                    'experimentID' : \
1122                            [ { 'localname' : eid }, {'fedid': expid } ],
1123                    'experimentStatus': state,
1124                    'experimentAccess': { 'X509' : expcert },
1125                    'owner': fid,
1126                    'log' : [],
1127                    'auth': set(),
1128                }
1129            self.state[expid] = self.state[eid]
1130            if self.state_filename: self.write_state()
1131            self.state_lock.release()
1132        else:
1133            eid = self.exp_stem
1134            for i in range(0,5):
1135                eid += random.choice(string.ascii_letters)
1136            self.state_lock.acquire()
1137            while (self.state.has_key(eid)):
1138                eid = self.exp_stem
1139                for i in range(0,5):
1140                    eid += random.choice(string.ascii_letters)
1141            # Initial state
1142            self.state[eid] = {
1143                    'experimentID' : \
1144                            [ { 'localname' : eid }, {'fedid': expid } ],
1145                    'experimentStatus': state,
1146                    'experimentAccess': { 'X509' : expcert },
1147                    'owner': fid,
1148                    'log' : [],
1149                    'auth': set(),
1150                }
1151            self.state[expid] = self.state[eid]
1152            if self.state_filename: self.write_state()
1153            self.state_lock.release()
1154
1155        # Let users touch the state.  Authorize this fid and the expid itself
1156        # to touch the experiment, as well as allowing th eoverrides.
1157        self.append_experiment_authorization(eid, 
1158                set([(fid, expid), (expid,expid)] + \
1159                        [ (o, expid) for o in self.overrides]))
1160
1161        return eid
1162
1163
1164    def allocate_ips_to_topo(self, top):
1165        """
1166        Add an ip4_address attribute to all the hosts in the topology, based on
1167        the shared substrates on which they sit.  An /etc/hosts file is also
1168        created and returned as a list of hostfiles entries.  We also return
1169        the allocator, because we may need to allocate IPs to portals
1170        (specifically DRAGON portals).
1171        """
1172        subs = sorted(top.substrates, 
1173                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1174                reverse=True)
1175        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1176        ifs = { }
1177        hosts = [ ]
1178
1179        for idx, s in enumerate(subs):
1180            net_size = len(s.interfaces)+2
1181
1182            a = ips.allocate(net_size)
1183            if a :
1184                base, num = a
1185                if num < net_size: 
1186                    raise service_error(service_error.internal,
1187                            "Allocator returned wrong number of IPs??")
1188            else:
1189                raise service_error(service_error.req, 
1190                        "Cannot allocate IP addresses")
1191            mask = ips.min_alloc
1192            while mask < net_size:
1193                mask *= 2
1194
1195            netmask = ((2**32-1) ^ (mask-1))
1196
1197            base += 1
1198            for i in s.interfaces:
1199                i.attribute.append(
1200                        topdl.Attribute('ip4_address', 
1201                            "%s" % ip_addr(base)))
1202                i.attribute.append(
1203                        topdl.Attribute('ip4_netmask', 
1204                            "%s" % ip_addr(int(netmask))))
1205
1206                hname = i.element.name
1207                if ifs.has_key(hname):
1208                    hosts.append("%s\t%s-%s %s-%d" % \
1209                            (ip_addr(base), hname, s.name, hname,
1210                                ifs[hname]))
1211                else:
1212                    ifs[hname] = 0
1213                    hosts.append("%s\t%s-%s %s-%d %s" % \
1214                            (ip_addr(base), hname, s.name, hname,
1215                                ifs[hname], hname))
1216
1217                ifs[hname] += 1
1218                base += 1
1219        return hosts, ips
1220
1221    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1222            tbparam, masters, tbmap, expid=None, expcert=None):
1223        for tb in testbeds:
1224            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1225                    expcert)
1226            allocated[tb] = 1
1227
1228    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1229            expcert=None):
1230        """
1231        Get access to testbed through fedd and set the parameters for that tb
1232        """
1233        def get_export_project(svcs):
1234            """
1235            Look through for the list of federated_service for this testbed
1236            objects for a project_export service, and extract the project
1237            parameter.
1238            """
1239
1240            pe = [s for s in svcs if s.name=='project_export']
1241            if len(pe) == 1:
1242                return pe[0].params.get('project', None)
1243            elif len(pe) == 0:
1244                return None
1245            else:
1246                raise service_error(service_error.req,
1247                        "More than one project export is not supported")
1248
1249        def add_services(svcs, type, slist, keys):
1250            """
1251            Add the given services to slist.  type is import or export.  Also
1252            add a mapping entry from the assigned id to the original service
1253            record.
1254            """
1255            for i, s in enumerate(svcs):
1256                idx = '%s%d' % (type, i)
1257                keys[idx] = s
1258                sr = {'id': idx, 'name': s.name, 'visibility': type }
1259                if s.params:
1260                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1261                            for k, v in s.params.items()]
1262                slist.append(sr)
1263
1264        uri = tbmap.get(testbed_base(tb), None)
1265        if not uri:
1266            raise service_error(service_error.server_config, 
1267                    "Unknown testbed: %s" % tb)
1268
1269        export_svcs = masters.get(tb,[])
1270        import_svcs = [ s for m in masters.values() \
1271                for s in m \
1272                    if tb in s.importers ]
1273
1274        export_project = get_export_project(export_svcs)
1275        # Compose the credential list so that IDs come before attributes
1276        creds = set()
1277        keys = set()
1278        certs = self.auth.get_creds_for_principal(fid)
1279        if expid:
1280            certs.update(self.auth.get_creds_for_principal(expid))
1281        for c in certs:
1282            keys.add(c.issuer_cert())
1283            creds.add(c.attribute_cert())
1284        creds = list(keys) + list(creds)
1285
1286        if expcert: cert, pw = expcert, None
1287        else: cert, pw = self.cert_file, self.cert_pw
1288
1289        # Request credentials
1290        req = {
1291                'abac_credential': creds,
1292            }
1293        # Make the service request from the services we're importing and
1294        # exporting.  Keep track of the export request ids so we can
1295        # collect the resulting info from the access response.
1296        e_keys = { }
1297        if import_svcs or export_svcs:
1298            slist = []
1299            add_services(import_svcs, 'import', slist, e_keys)
1300            add_services(export_svcs, 'export', slist, e_keys)
1301            req['service'] = slist
1302
1303        if self.local_access.has_key(uri):
1304            # Local access call
1305            req = { 'RequestAccessRequestBody' : req }
1306            r = self.local_access[uri].RequestAccess(req, 
1307                    fedid(file=self.cert_file))
1308            r = { 'RequestAccessResponseBody' : r }
1309        else:
1310            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1311
1312        if r.has_key('RequestAccessResponseBody'):
1313            # Through to here we have a valid response, not a fault.
1314            # Access denied is a fault, so something better or worse than
1315            # access denied has happened.
1316            r = r['RequestAccessResponseBody']
1317            self.log.debug("[get_access] Access granted")
1318        else:
1319            raise service_error(service_error.protocol,
1320                        "Bad proxy response")
1321        if 'proof' not in r:
1322            raise service_error(service_error.protocol,
1323                        "Bad access response (no access proof)")
1324       
1325        tbparam[tb] = { 
1326                "allocID" : r['allocID'],
1327                "uri": uri,
1328                "proof": [ r['proof'] ],
1329                }
1330
1331        # Collect the responses corresponding to the services this testbed
1332        # exports.  These will be the service requests that we will include in
1333        # the start segment requests (with appropriate visibility values) to
1334        # import and export the segments.
1335        for s in r.get('service', []):
1336            id = s.get('id', None)
1337            if id and id in e_keys:
1338                e_keys[id].reqs.append(s)
1339
1340        # Add attributes to parameter space.  We don't allow attributes to
1341        # overlay any parameters already installed.
1342        for a in r.get('fedAttr', []):
1343            try:
1344                if a['attribute'] and \
1345                        isinstance(a['attribute'], basestring)\
1346                        and not tbparam[tb].has_key(a['attribute'].lower()):
1347                    tbparam[tb][a['attribute'].lower()] = a['value']
1348            except KeyError:
1349                self.log.error("Bad attribute in response: %s" % a)
1350
1351
1352    def split_topology(self, top, topo, testbeds):
1353        """
1354        Create the sub-topologies that are needed for experiment instantiation.
1355        """
1356        for tb in testbeds:
1357            topo[tb] = top.clone()
1358            # copy in for loop allows deletions from the original
1359            for e in [ e for e in topo[tb].elements]:
1360                etb = e.get_attribute('testbed')
1361                # NB: elements without a testbed attribute won't appear in any
1362                # sub topologies. 
1363                if not etb or etb != tb:
1364                    for i in e.interface:
1365                        for s in i.subs:
1366                            try:
1367                                s.interfaces.remove(i)
1368                            except ValueError:
1369                                raise service_error(service_error.internal,
1370                                        "Can't remove interface??")
1371                    topo[tb].elements.remove(e)
1372            topo[tb].make_indices()
1373
1374    def confirm_software(self, top):
1375        """
1376        Make sure that the software to be loaded in the topo is all available
1377        before we begin making access requests, etc.  This is a subset of
1378        wrangle_software.
1379        """
1380        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1381        pkgs.update([x.location for e in top.elements for x in e.software])
1382
1383        for pkg in pkgs:
1384            loc = pkg
1385
1386            scheme, host, path = urlparse(loc)[0:3]
1387            dest = os.path.basename(path)
1388            if not scheme:
1389                if not loc.startswith('/'):
1390                    loc = "/%s" % loc
1391                loc = "file://%s" %loc
1392            # NB: if scheme was found, loc == pkg
1393            try:
1394                u = urlopen(loc)
1395                u.close()
1396            except Exception, e:
1397                raise service_error(service_error.req, 
1398                        "Cannot open %s: %s" % (loc, e))
1399        return True
1400
1401    def wrangle_software(self, expid, top, topo, tbparams):
1402        """
1403        Copy software out to the repository directory, allocate permissions and
1404        rewrite the segment topologies to look for the software in local
1405        places.
1406        """
1407
1408        # Copy the rpms and tarfiles to a distribution directory from
1409        # which the federants can retrieve them
1410        linkpath = "%s/software" %  expid
1411        softdir ="%s/%s" % ( self.repodir, linkpath)
1412        softmap = { }
1413
1414        # self.fedkit and self.gateway kit are lists of tuples of
1415        # (install_location, download_location) this extracts the download
1416        # locations.
1417        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1418        pkgs.update([x.location for e in top.elements for x in e.software])
1419        try:
1420            os.makedirs(softdir)
1421        except EnvironmentError, e:
1422            raise service_error(
1423                    "Cannot create software directory: %s" % e)
1424        # The actual copying.  Everything's converted into a url for copying.
1425        auth_attrs = set()
1426        for pkg in pkgs:
1427            loc = pkg
1428
1429            scheme, host, path = urlparse(loc)[0:3]
1430            dest = os.path.basename(path)
1431            if not scheme:
1432                if not loc.startswith('/'):
1433                    loc = "/%s" % loc
1434                loc = "file://%s" %loc
1435            # NB: if scheme was found, loc == pkg
1436            try:
1437                u = urlopen(loc)
1438            except Exception, e:
1439                raise service_error(service_error.req, 
1440                        "Cannot open %s: %s" % (loc, e))
1441            try:
1442                f = open("%s/%s" % (softdir, dest) , "w")
1443                self.log.debug("Writing %s/%s" % (softdir,dest) )
1444                data = u.read(4096)
1445                while data:
1446                    f.write(data)
1447                    data = u.read(4096)
1448                f.close()
1449                u.close()
1450            except Exception, e:
1451                raise service_error(service_error.internal,
1452                        "Could not copy %s: %s" % (loc, e))
1453            path = re.sub("/tmp", "", linkpath)
1454            # XXX
1455            softmap[pkg] = \
1456                    "%s/%s/%s" %\
1457                    ( self.repo_url, path, dest)
1458
1459            # Allow the individual segments to access the software by assigning
1460            # an attribute to each testbed allocation that encodes the data to
1461            # be released.  This expression collects the data for each run of
1462            # the loop.
1463            auth_attrs.update([
1464                (tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) \
1465                        for tb in tbparams.keys()])
1466
1467        self.append_experiment_authorization(expid, auth_attrs)
1468
1469        # Convert the software locations in the segments into the local
1470        # copies on this host
1471        for soft in [ s for tb in topo.values() \
1472                for e in tb.elements \
1473                    if getattr(e, 'software', False) \
1474                        for s in e.software ]:
1475            if softmap.has_key(soft.location):
1476                soft.location = softmap[soft.location]
1477
1478
1479    def new_experiment(self, req, fid):
1480        """
1481        The external interface to empty initial experiment creation called from
1482        the dispatcher.
1483
1484        Creates a working directory, splits the incoming description using the
1485        splitter script and parses out the avrious subsections using the
1486        lcasses above.  Once each sub-experiment is created, use pooled threads
1487        to instantiate them and start it all up.
1488        """
1489        req = req.get('NewRequestBody', None)
1490        if not req:
1491            raise service_error(service_error.req,
1492                    "Bad request format (no NewRequestBody)")
1493
1494        if self.auth.import_credentials(data_list=req.get('credential', [])):
1495            self.auth.save()
1496
1497        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1498                with_proof=True)
1499
1500        if not access_ok:
1501            raise service_error(service_error.access, "New access denied",
1502                    proof=[proof])
1503
1504        try:
1505            tmpdir = tempfile.mkdtemp(prefix="split-")
1506        except EnvironmentError:
1507            raise service_error(service_error.internal, "Cannot create tmp dir")
1508
1509        try:
1510            access_user = self.accessdb[fid]
1511        except KeyError:
1512            raise service_error(service_error.internal,
1513                    "Access map and authorizer out of sync in " + \
1514                            "new_experiment for fedid %s"  % fid)
1515
1516        # Generate an ID for the experiment (slice) and a certificate that the
1517        # allocator can use to prove they own it.  We'll ship it back through
1518        # the encrypted connection.  If the requester supplied one, use it.
1519        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1520            expcert = req['experimentAccess']['X509']
1521            expid = fedid(certstr=expcert)
1522            self.state_lock.acquire()
1523            if expid in self.state:
1524                self.state_lock.release()
1525                raise service_error(service_error.req, 
1526                        'fedid %s identifies an existing experiment' % expid)
1527            self.state_lock.release()
1528        else:
1529            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1530
1531        #now we're done with the tmpdir, and it should be empty
1532        if self.cleanup:
1533            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1534            os.rmdir(tmpdir)
1535        else:
1536            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1537
1538        eid = self.create_experiment_state(fid, req, expid, expcert, 
1539                state='empty')
1540
1541        rv = {
1542                'experimentID': [
1543                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1544                ],
1545                'experimentStatus': 'empty',
1546                'experimentAccess': { 'X509' : expcert },
1547                'proof': proof.to_dict(),
1548            }
1549
1550        return rv
1551
1552    # create_experiment sub-functions
1553
1554    @staticmethod
1555    def get_experiment_key(req, field='experimentID'):
1556        """
1557        Parse the experiment identifiers out of the request (the request body
1558        tag has been removed).  Specifically this pulls either the fedid or the
1559        localname out of the experimentID field.  A fedid is preferred.  If
1560        neither is present or the request does not contain the fields,
1561        service_errors are raised.
1562        """
1563        # Get the experiment access
1564        exp = req.get(field, None)
1565        if exp:
1566            if exp.has_key('fedid'):
1567                key = exp['fedid']
1568            elif exp.has_key('localname'):
1569                key = exp['localname']
1570            else:
1571                raise service_error(service_error.req, "Unknown lookup type")
1572        else:
1573            raise service_error(service_error.req, "No request?")
1574
1575        return key
1576
1577    def get_experiment_ids_and_start(self, key, tmpdir):
1578        """
1579        Get the experiment name, id and access certificate from the state, and
1580        set the experiment state to 'starting'.  returns a triple (fedid,
1581        localname, access_cert_file). The access_cert_file is a copy of the
1582        contents of the access certificate, created in the tempdir with
1583        restricted permissions.  If things are confused, raise an exception.
1584        """
1585
1586        expid = eid = None
1587        self.state_lock.acquire()
1588        if self.state.has_key(key):
1589            self.state[key]['experimentStatus'] = "starting"
1590            for e in self.state[key].get('experimentID',[]):
1591                if not expid and e.has_key('fedid'):
1592                    expid = e['fedid']
1593                elif not eid and e.has_key('localname'):
1594                    eid = e['localname']
1595            if 'experimentAccess' in self.state[key] and \
1596                    'X509' in self.state[key]['experimentAccess']:
1597                expcert = self.state[key]['experimentAccess']['X509']
1598            else:
1599                expcert = None
1600        self.state_lock.release()
1601
1602        # make a protected copy of the access certificate so the experiment
1603        # controller can act as the experiment principal.
1604        if expcert:
1605            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1606            if not expcert_file:
1607                raise service_error(service_error.internal, 
1608                        "Cannot create temp cert file?")
1609        else:
1610            expcert_file = None
1611
1612        return (eid, expid, expcert_file)
1613
1614    def get_topology(self, req, tmpdir):
1615        """
1616        Get the ns2 content and put it into a file for parsing.  Call the local
1617        or remote parser and return the topdl.Topology.  Errors result in
1618        exceptions.  req is the request and tmpdir is a work directory.
1619        """
1620
1621        # The tcl parser needs to read a file so put the content into that file
1622        descr=req.get('experimentdescription', None)
1623        if descr:
1624            if 'ns2description' in descr:
1625                file_content=descr['ns2description']
1626            elif 'topdldescription' in descr:
1627                return topdl.Topology(**descr['topdldescription'])
1628            else:
1629                raise service_error(service_error.req, 
1630                        'Unknown experiment description type')
1631        else:
1632            raise service_error(service_error.req, "No experiment description")
1633
1634
1635        if self.splitter_url:
1636            self.log.debug("Calling remote topdl translator at %s" % \
1637                    self.splitter_url)
1638            top = self.remote_ns2topdl(self.splitter_url, file_content)
1639        else:
1640            tclfile = os.path.join(tmpdir, "experiment.tcl")
1641            if file_content:
1642                try:
1643                    f = open(tclfile, 'w')
1644                    f.write(file_content)
1645                    f.close()
1646                except EnvironmentError:
1647                    raise service_error(service_error.internal,
1648                            "Cannot write temp experiment description")
1649            else:
1650                raise service_error(service_error.req, 
1651                        "Only ns2descriptions supported")
1652            pid = "dummy"
1653            gid = "dummy"
1654            eid = "dummy"
1655
1656            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1657                str(self.muxmax), '-m', 'dummy']
1658
1659            tclcmd.extend([pid, gid, eid, tclfile])
1660
1661            self.log.debug("running local splitter %s", " ".join(tclcmd))
1662            # This is just fantastic.  As a side effect the parser copies
1663            # tb_compat.tcl into the current directory, so that directory
1664            # must be writable by the fedd user.  Doing this in the
1665            # temporary subdir ensures this is the case.
1666            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1667                    cwd=tmpdir)
1668            split_data = tclparser.stdout
1669
1670            top = topdl.topology_from_xml(file=split_data, top="experiment")
1671            os.remove(tclfile)
1672
1673        return top
1674
1675    def get_testbed_services(self, req, testbeds):
1676        """
1677        Parse the services section of the request into into two dicts mapping
1678        testbed to lists of federated_service objects.  The first lists all
1679        exporters of services, and the second all exporters of services that
1680        need control portals int the experiment.
1681        """
1682        masters = { }
1683        pmasters = { }
1684        for s in req.get('service', []):
1685            # If this is a service request with the importall field
1686            # set, fill it out.
1687
1688            if s.get('importall', False):
1689                s['import'] = [ tb for tb in testbeds \
1690                        if tb not in s.get('export',[])]
1691                del s['importall']
1692
1693            # Add the service to masters
1694            for tb in s.get('export', []):
1695                if s.get('name', None):
1696
1697                    params = { }
1698                    for a in s.get('fedAttr', []):
1699                        params[a.get('attribute', '')] = a.get('value','')
1700
1701                    fser = federated_service(name=s['name'],
1702                            exporter=tb, importers=s.get('import',[]),
1703                            params=params)
1704                    if fser.name == 'hide_hosts' \
1705                            and 'hosts' not in fser.params:
1706                        fser.params['hosts'] = \
1707                                ",".join(tb_hosts.get(fser.exporter, []))
1708                    if tb in masters: masters[tb].append(fser)
1709                    else: masters[tb] = [fser]
1710
1711                    if fser.portal:
1712                        if tb not in pmasters: pmasters[tb] = [ fser ]
1713                        else: pmasters[tb].append(fser)
1714                else:
1715                    self.log.error('Testbed service does not have name " + \
1716                            "and importers')
1717        return masters, pmasters
1718
1719    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1720        """
1721        Create the ssh keys necessary for interconnecting the potral nodes and
1722        the global hosts file for letting each segment know about the IP
1723        addresses in play.  Save these into the repo.  Add attributes to the
1724        autorizer allowing access controllers to download them and return a set
1725        of attributes that inform the segments where to find this stuff.  Mau
1726        raise service_errors in if there are problems.
1727        """
1728        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1729        gw_secretkey_base = "fed.%s" % self.ssh_type
1730        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1731        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1732
1733        try:
1734            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1735        except ValueError:
1736            raise service_error(service_error.server_config, 
1737                    "Bad key type (%s)" % self.ssh_type)
1738
1739
1740        # Copy configuration files into the remote file store
1741        # The config urlpath
1742        configpath = "/%s/config" % expid
1743        # The config file system location
1744        configdir ="%s%s" % ( self.repodir, configpath)
1745        try:
1746            os.makedirs(configdir)
1747        except EnvironmentError, e:
1748            raise service_error(service_error.internal,
1749                    "Cannot create config directory: %s" % e)
1750        try:
1751            f = open("%s/hosts" % configdir, "w")
1752            print >> f, string.join(hosts, '\n')
1753            f.close()
1754        except EnvironmentError, e:
1755            raise service_error(service_error.internal, 
1756                    "Cannot write hosts file: %s" % e)
1757        try:
1758            copy_file("%s" % gw_pubkey, "%s/%s" % \
1759                    (configdir, gw_pubkey_base))
1760            copy_file("%s" % gw_secretkey, "%s/%s" % \
1761                    (configdir, gw_secretkey_base))
1762        except EnvironmentError, e:
1763            raise service_error(service_error.internal, 
1764                    "Cannot copy keyfiles: %s" % e)
1765
1766        # Allow the individual testbeds to access the configuration files,
1767        # again by setting an attribute for the relevant pathnames on each
1768        # allocation principal.  Yeah, that's a long list comprehension.
1769        self.append_experiment_authorization(expid, set([
1770            (tbparams[tb]['allocID']['fedid'], "%s/%s" % (configpath, f)) \
1771                    for tb in tbparams.keys() \
1772                        for f in ("hosts", gw_secretkey_base, gw_pubkey_base)]))
1773
1774        attrs = [ 
1775                {
1776                    'attribute': 'ssh_pubkey', 
1777                    'value': '%s/%s/config/%s' % \
1778                            (self.repo_url, expid, gw_pubkey_base)
1779                },
1780                {
1781                    'attribute': 'ssh_secretkey', 
1782                    'value': '%s/%s/config/%s' % \
1783                            (self.repo_url, expid, gw_secretkey_base)
1784                },
1785                {
1786                    'attribute': 'hosts', 
1787                    'value': '%s/%s/config/hosts' % \
1788                            (self.repo_url, expid)
1789                },
1790            ]
1791        return attrs
1792
1793
1794    def get_vtopo(self, req, fid):
1795        """
1796        Return the stored virtual topology for this experiment
1797        """
1798        rv = None
1799        state = None
1800
1801        req = req.get('VtopoRequestBody', None)
1802        if not req:
1803            raise service_error(service_error.req,
1804                    "Bad request format (no VtopoRequestBody)")
1805        exp = req.get('experiment', None)
1806        if exp:
1807            if exp.has_key('fedid'):
1808                key = exp['fedid']
1809                keytype = "fedid"
1810            elif exp.has_key('localname'):
1811                key = exp['localname']
1812                keytype = "localname"
1813            else:
1814                raise service_error(service_error.req, "Unknown lookup type")
1815        else:
1816            raise service_error(service_error.req, "No request?")
1817
1818        proof = self.check_experiment_access(fid, key)
1819
1820        self.state_lock.acquire()
1821        if self.state.has_key(key):
1822            if self.state[key].has_key('vtopo'):
1823                rv = { 'experiment' : {keytype: key },
1824                        'vtopo': self.state[key]['vtopo'],
1825                        'proof': proof.to_dict(), 
1826                    }
1827            else:
1828                state = self.state[key]['experimentStatus']
1829        self.state_lock.release()
1830
1831        if rv: return rv
1832        else: 
1833            if state:
1834                raise service_error(service_error.partial, 
1835                        "Not ready: %s" % state)
1836            else:
1837                raise service_error(service_error.req, "No such experiment")
1838
1839    def get_vis(self, req, fid):
1840        """
1841        Return the stored visualization for this experiment
1842        """
1843        rv = None
1844        state = None
1845
1846        req = req.get('VisRequestBody', None)
1847        if not req:
1848            raise service_error(service_error.req,
1849                    "Bad request format (no VisRequestBody)")
1850        exp = req.get('experiment', None)
1851        if exp:
1852            if exp.has_key('fedid'):
1853                key = exp['fedid']
1854                keytype = "fedid"
1855            elif exp.has_key('localname'):
1856                key = exp['localname']
1857                keytype = "localname"
1858            else:
1859                raise service_error(service_error.req, "Unknown lookup type")
1860        else:
1861            raise service_error(service_error.req, "No request?")
1862
1863        proof = self.check_experiment_access(fid, key)
1864
1865        self.state_lock.acquire()
1866        if self.state.has_key(key):
1867            if self.state[key].has_key('vis'):
1868                rv =  { 'experiment' : {keytype: key },
1869                        'vis': self.state[key]['vis'],
1870                        'proof': proof.to_dict(), 
1871                        }
1872            else:
1873                state = self.state[key]['experimentStatus']
1874        self.state_lock.release()
1875
1876        if rv: return rv
1877        else:
1878            if state:
1879                raise service_error(service_error.partial, 
1880                        "Not ready: %s" % state)
1881            else:
1882                raise service_error(service_error.req, "No such experiment")
1883
1884   
1885    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1886            top):
1887        """
1888        Store the various data that have changed in the experiment state
1889        between when it was started and the beginning of resource allocation.
1890        This is basically the information about each local allocation.  This
1891        fills in the values of the placeholder allocation in the state.  It
1892        also collects the access proofs and returns them as dicts for a
1893        response message.
1894        """
1895        # save federant information
1896        for k in allocated.keys():
1897            tbparams[k]['federant'] = {
1898                    'name': [ { 'localname' : eid} ],
1899                    'allocID' : tbparams[k]['allocID'],
1900                    'uri': tbparams[k]['uri'],
1901                    'proof': tbparams[k]['proof'],
1902                }
1903
1904        self.state_lock.acquire()
1905        self.state[eid]['vtopo'] = vtopo
1906        self.state[eid]['vis'] = vis
1907        self.state[eid]['experimentdescription'] = \
1908                { 'topdldescription': top.to_dict() }
1909        self.state[eid]['federant'] = \
1910                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1911                    if tbparams[tb].has_key('federant') ]
1912        # Access proofs for the response message
1913        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
1914                    for p in tbparams[k]['federant']['proof']]
1915        if self.state_filename: 
1916            self.write_state()
1917        self.state_lock.release()
1918        return proofs
1919
1920    def clear_placeholder(self, eid, expid, tmpdir):
1921        """
1922        Clear the placeholder and remove any allocated temporary dir.
1923        """
1924
1925        self.state_lock.acquire()
1926        del self.state[eid]
1927        del self.state[expid]
1928        if self.state_filename: self.write_state()
1929        self.state_lock.release()
1930        if tmpdir and self.cleanup:
1931            self.remove_dirs(tmpdir)
1932
1933    # end of create_experiment sub-functions
1934
1935    def create_experiment(self, req, fid):
1936        """
1937        The external interface to experiment creation called from the
1938        dispatcher.
1939
1940        Creates a working directory, splits the incoming description using the
1941        splitter script and parses out the various subsections using the
1942        classes above.  Once each sub-experiment is created, use pooled threads
1943        to instantiate them and start it all up.
1944        """
1945
1946        req = req.get('CreateRequestBody', None)
1947        if req:
1948            key = self.get_experiment_key(req)
1949        else:
1950            raise service_error(service_error.req,
1951                    "Bad request format (no CreateRequestBody)")
1952
1953        # Import information from the requester
1954        if self.auth.import_credentials(data_list=req.get('credential', [])):
1955            self.auth.save()
1956
1957        # Make sure that the caller can talk to us
1958        proof = self.check_experiment_access(fid, key)
1959
1960        # Install the testbed map entries supplied with the request into a copy
1961        # of the testbed map.
1962        tbmap = dict(self.tbmap)
1963        for m in req.get('testbedmap', []):
1964            if 'testbed' in m and 'uri' in m:
1965                tbmap[m['testbed']] = m['uri']
1966
1967        # a place to work
1968        try:
1969            tmpdir = tempfile.mkdtemp(prefix="split-")
1970            os.mkdir(tmpdir+"/keys")
1971        except EnvironmentError:
1972            raise service_error(service_error.internal, "Cannot create tmp dir")
1973
1974        tbparams = { }
1975
1976        eid, expid, expcert_file = \
1977                self.get_experiment_ids_and_start(key, tmpdir)
1978
1979        # This catches exceptions to clear the placeholder if necessary
1980        try: 
1981            if not (eid and expid):
1982                raise service_error(service_error.internal, 
1983                        "Cannot find local experiment info!?")
1984
1985            top = self.get_topology(req, tmpdir)
1986            self.confirm_software(top)
1987            # Assign the IPs
1988            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1989            # Find the testbeds to look up
1990            tb_hosts = { }
1991            testbeds = [ ]
1992            for e in top.elements:
1993                if isinstance(e, topdl.Computer):
1994                    tb = e.get_attribute('testbed') or 'default'
1995                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
1996                    else: 
1997                        tb_hosts[tb] = [ e.name ]
1998                        testbeds.append(tb)
1999
2000            masters, pmasters = self.get_testbed_services(req, testbeds)
2001            allocated = { }         # Testbeds we can access
2002            topo ={ }               # Sub topologies
2003            connInfo = { }          # Connection information
2004
2005            self.split_topology(top, topo, testbeds)
2006
2007            self.get_access_to_testbeds(testbeds, fid, allocated, 
2008                    tbparams, masters, tbmap, expid, expcert_file)
2009
2010            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2011
2012            part = experiment_partition(self.auth, self.store_url, tbmap,
2013                    self.muxmax, self.direct_transit)
2014            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2015                    connInfo, expid)
2016
2017            auth_attrs = set()
2018            # Now get access to the dynamic testbeds (those added above)
2019            for tb in [ t for t in topo if t not in allocated]:
2020                self.get_access(tb, tbparams, fid, masters, tbmap, 
2021                        expid, expcert_file)
2022                allocated[tb] = 1
2023                store_keys = topo[tb].get_attribute('store_keys')
2024                # Give the testbed access to keys it exports or imports
2025                if store_keys:
2026                    auth_attrs.update(set([
2027                        (tbparams[tb]['allocID']['fedid'], sk) \
2028                                for sk in store_keys.split(" ")]))
2029
2030            if auth_attrs:
2031                self.append_experiment_authorization(expid, auth_attrs)
2032
2033            # transit and disconnected testbeds may not have a connInfo entry.
2034            # Fill in the blanks.
2035            for t in allocated.keys():
2036                if not connInfo.has_key(t):
2037                    connInfo[t] = { }
2038
2039            self.wrangle_software(expid, top, topo, tbparams)
2040
2041            vtopo = topdl.topology_to_vtopo(top)
2042            vis = self.genviz(vtopo)
2043            proofs = self.save_federant_information(allocated, tbparams, 
2044                    eid, vtopo, vis, top)
2045        except service_error, e:
2046            # If something goes wrong in the parse (usually an access error)
2047            # clear the placeholder state.  From here on out the code delays
2048            # exceptions.  Failing at this point returns a fault to the remote
2049            # caller.
2050            self.clear_placeholder(eid, expid, tmpdir)
2051            raise e
2052
2053        # Start the background swapper and return the starting state.  From
2054        # here on out, the state will stick around a while.
2055
2056        # Create a logger that logs to the experiment's state object as well as
2057        # to the main log file.
2058        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2059        alloc_collector = self.list_log(self.state[eid]['log'])
2060        h = logging.StreamHandler(alloc_collector)
2061        # XXX: there should be a global one of these rather than repeating the
2062        # code.
2063        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2064                    '%d %b %y %H:%M:%S'))
2065        alloc_log.addHandler(h)
2066
2067        # Start a thread to do the resource allocation
2068        t  = Thread(target=self.allocate_resources,
2069                args=(allocated, masters, eid, expid, tbparams, 
2070                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2071                    connInfo, tbmap, expcert_file),
2072                name=eid)
2073        t.start()
2074
2075        rv = {
2076                'experimentID': [
2077                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2078                ],
2079                'experimentStatus': 'starting',
2080                'proof': [ proof.to_dict() ] + proofs,
2081            }
2082
2083        return rv
2084   
2085    def get_experiment_fedid(self, key):
2086        """
2087        find the fedid associated with the localname key in the state database.
2088        """
2089
2090        rv = None
2091        self.state_lock.acquire()
2092        if self.state.has_key(key):
2093            if isinstance(self.state[key], dict):
2094                try:
2095                    kl = [ f['fedid'] for f in \
2096                            self.state[key]['experimentID']\
2097                                if f.has_key('fedid') ]
2098                except KeyError:
2099                    self.state_lock.release()
2100                    raise service_error(service_error.internal, 
2101                            "No fedid for experiment %s when getting "+\
2102                                    "fedid(!?)" % key)
2103                if len(kl) == 1:
2104                    rv = kl[0]
2105                else:
2106                    self.state_lock.release()
2107                    raise service_error(service_error.internal, 
2108                            "multiple fedids for experiment %s when " +\
2109                                    "getting fedid(!?)" % key)
2110            else:
2111                self.state_lock.release()
2112                raise service_error(service_error.internal, 
2113                        "Unexpected state for %s" % key)
2114        self.state_lock.release()
2115        return rv
2116
2117    def check_experiment_access(self, fid, key):
2118        """
2119        Confirm that the fid has access to the experiment.  Though a request
2120        may be made in terms of a local name, the access attribute is always
2121        the experiment's fedid.
2122        """
2123        if not isinstance(key, fedid):
2124            key = self.get_experiment_fedid(key)
2125
2126        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2127
2128        if access_ok:
2129            return proof
2130        else:
2131            raise service_error(service_error.access, "Access Denied",
2132                proof)
2133
2134
2135    def get_handler(self, path, fid):
2136        """
2137        Perhaps surprisingly named, this function handles HTTP GET requests to
2138        this server (SOAP requests are POSTs).
2139        """
2140        self.log.info("Get handler %s %s" % (path, fid))
2141        # XXX: log proofs?
2142        if self.auth.check_attribute(fid, path):
2143            return ("%s/%s" % (self.repodir, path), "application/binary")
2144        else:
2145            return (None, None)
2146
2147    def clean_info_response(self, rv, proof):
2148        """
2149        Remove the information in the experiment's state object that is not in
2150        the info response.
2151        """
2152        # Remove the owner info (should always be there, but...)
2153        if rv.has_key('owner'): del rv['owner']
2154        if 'auth' in rv: del rv['auth']
2155
2156        # Convert the log into the allocationLog parameter and remove the
2157        # log entry (with defensive programming)
2158        if rv.has_key('log'):
2159            rv['allocationLog'] = "".join(rv['log'])
2160            del rv['log']
2161        else:
2162            rv['allocationLog'] = ""
2163
2164        if rv['experimentStatus'] != 'active':
2165            if rv.has_key('federant'): del rv['federant']
2166        else:
2167            # remove the allocationID and uri info from each federant
2168            for f in rv.get('federant', []):
2169                if f.has_key('allocID'): del f['allocID']
2170                if f.has_key('uri'): del f['uri']
2171        rv['proof'] = proof.to_dict()
2172
2173        return rv
2174
2175    def get_info(self, req, fid):
2176        """
2177        Return all the stored info about this experiment
2178        """
2179        rv = None
2180
2181        req = req.get('InfoRequestBody', None)
2182        if not req:
2183            raise service_error(service_error.req,
2184                    "Bad request format (no InfoRequestBody)")
2185        exp = req.get('experiment', None)
2186        if exp:
2187            if exp.has_key('fedid'):
2188                key = exp['fedid']
2189                keytype = "fedid"
2190            elif exp.has_key('localname'):
2191                key = exp['localname']
2192                keytype = "localname"
2193            else:
2194                raise service_error(service_error.req, "Unknown lookup type")
2195        else:
2196            raise service_error(service_error.req, "No request?")
2197
2198        proof = self.check_experiment_access(fid, key)
2199
2200        # The state may be massaged by the service function that called
2201        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2202        # state.
2203        self.state_lock.acquire()
2204        if self.state.has_key(key):
2205            rv = copy.deepcopy(self.state[key])
2206        self.state_lock.release()
2207
2208        if rv:
2209            return self.clean_info_response(rv, proof)
2210        else:
2211            raise service_error(service_error.req, "No such experiment")
2212
2213    def get_multi_info(self, req, fid):
2214        """
2215        Return all the stored info that this fedid can access
2216        """
2217        rv = { 'info': [ ], 'proof': [ ] }
2218
2219        self.state_lock.acquire()
2220        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2221            try:
2222                proof = self.check_experiment_access(fid, key)
2223            except service_error, e:
2224                if e.code == service_error.access:
2225                    continue
2226                else:
2227                    self.state_lock.release()
2228                    raise e
2229
2230            if self.state.has_key(key):
2231                e = copy.deepcopy(self.state[key])
2232                e = self.clean_info_response(e, proof)
2233                rv['info'].append(e)
2234                rv['proof'].append(proof.to_dict())
2235        self.state_lock.release()
2236        return rv
2237
2238    def check_termination_status(self, fed_exp, force):
2239        """
2240        Confirm that the experiment is sin a valid state to stop (or force it)
2241        return the state - invalid states for deletion and force settings cause
2242        exceptions.
2243        """
2244        self.state_lock.acquire()
2245        status = fed_exp.get('experimentStatus', None)
2246
2247        if status:
2248            if status in ('starting', 'terminating'):
2249                if not force:
2250                    self.state_lock.release()
2251                    raise service_error(service_error.partial, 
2252                            'Experiment still being created or destroyed')
2253                else:
2254                    self.log.warning('Experiment in %s state ' % status + \
2255                            'being terminated by force.')
2256            self.state_lock.release()
2257            return status
2258        else:
2259            # No status??? trouble
2260            self.state_lock.release()
2261            raise service_error(service_error.internal,
2262                    "Experiment has no status!?")
2263
2264
2265    def get_termination_info(self, fed_exp):
2266        ids = []
2267        term_params = { }
2268        self.state_lock.acquire()
2269        #  experimentID is a list of dicts that are self-describing
2270        #  identifiers.  This finds all the fedids and localnames - the
2271        #  keys of self.state - and puts them into ids, which is used to delete
2272        #  the state after everything is swapped out.
2273        for id in fed_exp.get('experimentID', []):
2274            if 'fedid' in id: 
2275                ids.append(id['fedid'])
2276                repo = "%s" % id['fedid']
2277            if 'localname' in id: ids.append(id['localname'])
2278
2279        # Get the experimentAccess - the principal for this experiment.  It
2280        # is this principal to which credentials have been delegated, and
2281        # as which the experiment controller must act.
2282        if 'experimentAccess' in fed_exp and \
2283                'X509' in fed_exp['experimentAccess']:
2284            expcert = fed_exp['experimentAccess']['X509']
2285        else:
2286            expcert = None
2287
2288        # Collect the allocation/segment ids into a dict keyed by the fedid
2289        # of the allocation (or a monotonically increasing integer) that
2290        # contains a tuple of uri, aid (which is a dict...)
2291        for i, fed in enumerate(fed_exp.get('federant', [])):
2292            try:
2293                uri = fed['uri']
2294                aid = fed['allocID']
2295                k = fed['allocID'].get('fedid', i)
2296            except KeyError, e:
2297                continue
2298            term_params[k] = (uri, aid)
2299        # Change the experiment state
2300        fed_exp['experimentStatus'] = 'terminating'
2301        if self.state_filename: self.write_state()
2302        self.state_lock.release()
2303
2304        return ids, term_params, expcert, repo
2305
2306
2307    def deallocate_resources(self, term_params, expcert, status, force, 
2308            dealloc_log):
2309        tmpdir = None
2310        # This try block makes sure the tempdir is cleared
2311        try:
2312            # If no expcert, try the deallocation as the experiment
2313            # controller instance.
2314            if expcert and self.auth_type != 'legacy': 
2315                try:
2316                    tmpdir = tempfile.mkdtemp(prefix="term-")
2317                except EnvironmentError:
2318                    raise service_error(service_error.internal, 
2319                            "Cannot create tmp dir")
2320                cert_file = self.make_temp_certfile(expcert, tmpdir)
2321                pw = None
2322            else: 
2323                cert_file = self.cert_file
2324                pw = self.cert_pwd
2325
2326            # Stop everyone.  NB, wait_for_all waits until a thread starts
2327            # and then completes, so we can't wait if nothing starts.  So,
2328            # no tbparams, no start.
2329            if len(term_params) > 0:
2330                tp = thread_pool(self.nthreads)
2331                for k, (uri, aid) in term_params.items():
2332                    # Create and start a thread to stop the segment
2333                    tp.wait_for_slot()
2334                    t  = pooled_thread(\
2335                            target=self.terminate_segment(log=dealloc_log,
2336                                testbed=uri,
2337                                cert_file=cert_file, 
2338                                cert_pwd=pw,
2339                                trusted_certs=self.trusted_certs,
2340                                caller=self.call_TerminateSegment),
2341                            args=(uri, aid), name=k,
2342                            pdata=tp, trace_file=self.trace_file)
2343                    t.start()
2344                # Wait for completions
2345                tp.wait_for_all_done()
2346
2347            # release the allocations (failed experiments have done this
2348            # already, and starting experiments may be in odd states, so we
2349            # ignore errors releasing those allocations
2350            try: 
2351                for k, (uri, aid)  in term_params.items():
2352                    self.release_access(None, aid, uri=uri,
2353                            cert_file=cert_file, cert_pwd=pw)
2354            except service_error, e:
2355                if status != 'failed' and not force:
2356                    raise e
2357
2358        # Clean up the tmpdir no matter what
2359        finally:
2360            if tmpdir: self.remove_dirs(tmpdir)
2361
2362    def terminate_experiment(self, req, fid):
2363        """
2364        Swap this experiment out on the federants and delete the shared
2365        information
2366        """
2367        tbparams = { }
2368        req = req.get('TerminateRequestBody', None)
2369        if not req:
2370            raise service_error(service_error.req,
2371                    "Bad request format (no TerminateRequestBody)")
2372
2373        key = self.get_experiment_key(req, 'experiment')
2374        proof = self.check_experiment_access(fid, key)
2375        exp = req.get('experiment', False)
2376        force = req.get('force', False)
2377
2378        dealloc_list = [ ]
2379
2380
2381        # Create a logger that logs to the dealloc_list as well as to the main
2382        # log file.
2383        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2384        h = logging.StreamHandler(self.list_log(dealloc_list))
2385        # XXX: there should be a global one of these rather than repeating the
2386        # code.
2387        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2388                    '%d %b %y %H:%M:%S'))
2389        dealloc_log.addHandler(h)
2390
2391        self.state_lock.acquire()
2392        fed_exp = self.state.get(key, None)
2393        self.state_lock.release()
2394        repo = None
2395
2396        if fed_exp:
2397            status = self.check_termination_status(fed_exp, force)
2398            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2399            self.deallocate_resources(term_params, expcert, status, force, 
2400                    dealloc_log)
2401
2402            # Remove the terminated experiment
2403            self.state_lock.acquire()
2404            for id in ids:
2405                self.clear_experiment_authorization(id, need_state_lock=False)
2406                if id in self.state: del self.state[id]
2407
2408            if self.state_filename: self.write_state()
2409            self.state_lock.release()
2410
2411            # Delete any synch points associated with this experiment.  All
2412            # synch points begin with the fedid of the experiment.
2413            fedid_keys = set(["fedid:%s" % f for f in ids \
2414                    if isinstance(f, fedid)])
2415            for k in self.synch_store.all_keys():
2416                try:
2417                    if len(k) > 45 and k[0:46] in fedid_keys:
2418                        self.synch_store.del_value(k)
2419                except synch_store.BadDeletionError:
2420                    pass
2421            self.write_store()
2422
2423            # Remove software and other cached stuff from the filesystem.
2424            if repo:
2425                self.remove_dirs("%s/%s" % (self.repodir, repo))
2426       
2427            return { 
2428                    'experiment': exp , 
2429                    'deallocationLog': string.join(dealloc_list, ''),
2430                    'proof': [proof.to_dict()],
2431                    }
2432        else:
2433            raise service_error(service_error.req, "No saved state")
2434
2435
2436    def GetValue(self, req, fid):
2437        """
2438        Get a value from the synchronized store
2439        """
2440        req = req.get('GetValueRequestBody', None)
2441        if not req:
2442            raise service_error(service_error.req,
2443                    "Bad request format (no GetValueRequestBody)")
2444       
2445        name = req.get('name', None)
2446        wait = req.get('wait', False)
2447        rv = { 'name': name }
2448
2449        if not name:
2450            raise service_error(service_error.req, "No name?")
2451
2452        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2453
2454        if access_ok:
2455            self.log.debug("[GetValue] asking for %s " % name)
2456            try:
2457                v = self.synch_store.get_value(name, wait)
2458            except synch_store.RevokedKeyError:
2459                # No more synch on this key
2460                raise service_error(service_error.federant, 
2461                        "Synch key %s revoked" % name)
2462            if v is not None:
2463                rv['value'] = v
2464            rv['proof'] = proof.to_dict()
2465            self.log.debug("[GetValue] got %s from %s" % (v, name))
2466            return rv
2467        else:
2468            raise service_error(service_error.access, "Access Denied",
2469                    proof=proof)
2470       
2471
2472    def SetValue(self, req, fid):
2473        """
2474        Set a value in the synchronized store
2475        """
2476        req = req.get('SetValueRequestBody', None)
2477        if not req:
2478            raise service_error(service_error.req,
2479                    "Bad request format (no SetValueRequestBody)")
2480       
2481        name = req.get('name', None)
2482        v = req.get('value', '')
2483
2484        if not name:
2485            raise service_error(service_error.req, "No name?")
2486
2487        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2488
2489        if access_ok:
2490            try:
2491                self.synch_store.set_value(name, v)
2492                self.write_store()
2493                self.log.debug("[SetValue] set %s to %s" % (name, v))
2494            except synch_store.CollisionError:
2495                # Translate into a service_error
2496                raise service_error(service_error.req,
2497                        "Value already set: %s" %name)
2498            except synch_store.RevokedKeyError:
2499                # No more synch on this key
2500                raise service_error(service_error.federant, 
2501                        "Synch key %s revoked" % name)
2502                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2503        else:
2504            raise service_error(service_error.access, "Access Denied",
2505                    proof=proof)
Note: See TracBrowser for help on using the repository browser.