source: fedd/federation/experiment_control.py @ 74572ba

compt_changesinfo-ops
Last change on this file since 74572ba was 74572ba, checked in by Ted Faber <faber@…>, 13 years ago

Fix collision where user is creating an experiment, an experiment with
that localname exists, and the user cannot overwrite it. (Error was an
uncaught access denied exception.) Not tested yet.

  • Property mode set to 100644
File size: 84.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from experiment_control_legacy import experiment_control_legacy
34from authorizer import abac_authorizer
35from thread_pool import thread_pool, pooled_thread
36
37import topdl
38import list_log
39from ip_allocator import ip_allocator
40from ip_addr import ip_addr
41
42
43class nullHandler(logging.Handler):
44    def emit(self, record): pass
45
46fl = logging.getLogger("fedd.experiment_control")
47fl.addHandler(nullHandler())
48
49
50# Right now, no support for composition.
51class federated_service:
52    def __init__(self, name, exporter=None, importers=None, params=None, 
53            reqs=None, portal=None):
54        self.name=name
55        self.exporter=exporter
56        if importers is None: self.importers = []
57        else: self.importers=importers
58        if params is None: self.params = { }
59        else: self.params = params
60        if reqs is None: self.reqs = []
61        else: self.reqs = reqs
62
63        if portal is not None:
64            self.portal = portal
65        else:
66            self.portal = (name in federated_service.needs_portal)
67
68    def __str__(self):
69        return "name %s export %s import %s params %s reqs %s" % \
70                (self.name, self.exporter, self.importers, self.params,
71                        [ (r['name'], r['visibility']) for r in self.reqs] )
72
73    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
74
75class experiment_control_local(experiment_control_legacy):
76    """
77    Control of experiments that this system can directly access.
78
79    Includes experiment creation, termination and information dissemination.
80    Thred safe.
81    """
82
83    class ssh_cmd_timeout(RuntimeError): pass
84   
85    call_RequestAccess = service_caller('RequestAccess')
86    call_ReleaseAccess = service_caller('ReleaseAccess')
87    call_StartSegment = service_caller('StartSegment')
88    call_TerminateSegment = service_caller('TerminateSegment')
89    call_Ns2Topdl = service_caller('Ns2Topdl')
90
91    def __init__(self, config=None, auth=None):
92        """
93        Intialize the various attributes, most from the config object
94        """
95
96        def parse_tarfile_list(tf):
97            """
98            Parse a tarfile list from the configuration.  This is a set of
99            paths and tarfiles separated by spaces.
100            """
101            rv = [ ]
102            if tf is not None:
103                tl = tf.split()
104                while len(tl) > 1:
105                    p, t = tl[0:2]
106                    del tl[0:2]
107                    rv.append((p, t))
108            return rv
109
110        self.list_log = list_log.list_log
111
112        self.cert_file = config.get("experiment_control", "cert_file")
113        if self.cert_file:
114            self.cert_pwd = config.get("experiment_control", "cert_pwd")
115        else:
116            self.cert_file = config.get("globals", "cert_file")
117            self.cert_pwd = config.get("globals", "cert_pwd")
118
119        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
120                or config.get("globals", "trusted_certs")
121
122        self.repodir = config.get("experiment_control", "repodir")
123        self.repo_url = config.get("experiment_control", "repo_url", 
124                "https://users.isi.deterlab.net:23235");
125
126        self.exp_stem = "fed-stem"
127        self.log = logging.getLogger("fedd.experiment_control")
128        set_log_level(config, "experiment_control", self.log)
129        self.muxmax = 2
130        self.nthreads = 10
131        self.randomize_experiments = False
132
133        self.splitter = None
134        self.ssh_keygen = "/usr/bin/ssh-keygen"
135        self.ssh_identity_file = None
136
137
138        self.debug = config.getboolean("experiment_control", "create_debug")
139        self.cleanup = not config.getboolean("experiment_control", 
140                "leave_tmpfiles")
141        self.state_filename = config.get("experiment_control", 
142                "experiment_state")
143        self.store_filename = config.get("experiment_control", 
144                "synch_store")
145        self.store_url = config.get("experiment_control", "store_url")
146        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
147        self.fedkit = parse_tarfile_list(\
148                config.get("experiment_control", "fedkit"))
149        self.gatewaykit = parse_tarfile_list(\
150                config.get("experiment_control", "gatewaykit"))
151        accessdb_file = config.get("experiment_control", "accessdb")
152
153        dt = config.get("experiment_control", "direct_transit")
154        self.auth_type = config.get('experiment_control', 'auth_type') \
155                or 'legacy'
156        self.auth_dir = config.get('experiment_control', 'auth_dir')
157        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
158        else: self.direct_transit = [ ]
159        # NB for internal master/slave ops, not experiment setup
160        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
161
162        self.overrides = set([])
163        ovr = config.get('experiment_control', 'overrides')
164        if ovr:
165            for o in ovr.split(","):
166                o = o.strip()
167                if o.startswith('fedid:'): o = o[len('fedid:'):]
168                self.overrides.add(fedid(hexstr=o))
169
170        self.state = { }
171        self.state_lock = Lock()
172        self.tclsh = "/usr/local/bin/otclsh"
173        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
174                config.get("experiment_control", "tcl_splitter",
175                        "/usr/testbed/lib/ns2ir/parse.tcl")
176        mapdb_file = config.get("experiment_control", "mapdb")
177        self.trace_file = sys.stderr
178
179        self.def_expstart = \
180                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
181                "/tmp/federate";
182        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
183                "FEDDIR/hosts";
184        self.def_gwstart = \
185                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
186                "/tmp/bridge.log";
187        self.def_mgwstart = \
188                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
189                "/tmp/bridge.log";
190        self.def_gwimage = "FBSD61-TUNNEL2";
191        self.def_gwtype = "pc";
192        self.local_access = { }
193
194        if self.auth_type == 'legacy':
195            if auth:
196                self.auth = auth
197            else:
198                self.log.error( "[access]: No authorizer initialized, " +\
199                        "creating local one.")
200                auth = authorizer()
201            self.get_access = self.legacy_get_access
202        elif self.auth_type == 'abac':
203            self.auth = abac_authorizer(load=self.auth_dir)
204        else:
205            raise service_error(service_error.internal, 
206                    "Unknown auth_type: %s" % self.auth_type)
207
208        if mapdb_file:
209            self.read_mapdb(mapdb_file)
210        else:
211            self.log.warn("[experiment_control] No testbed map, using defaults")
212            self.tbmap = { 
213                    'deter':'https://users.isi.deterlab.net:23235',
214                    'emulab':'https://users.isi.deterlab.net:23236',
215                    'ucb':'https://users.isi.deterlab.net:23237',
216                    }
217
218        if accessdb_file:
219                self.read_accessdb(accessdb_file)
220        else:
221            raise service_error(service_error.internal,
222                    "No accessdb specified in config")
223
224        # Grab saved state.  OK to do this w/o locking because it's read only
225        # and only one thread should be in existence that can see self.state at
226        # this point.
227        if self.state_filename:
228            self.read_state()
229
230        if self.store_filename:
231            self.read_store()
232        else:
233            self.log.warning("No saved synch store")
234            self.synch_store = synch_store
235
236        # Dispatch tables
237        self.soap_services = {\
238                'New': soap_handler('New', self.new_experiment),
239                'Create': soap_handler('Create', self.create_experiment),
240                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
241                'Vis': soap_handler('Vis', self.get_vis),
242                'Info': soap_handler('Info', self.get_info),
243                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
244                'Terminate': soap_handler('Terminate', 
245                    self.terminate_experiment),
246                'GetValue': soap_handler('GetValue', self.GetValue),
247                'SetValue': soap_handler('SetValue', self.SetValue),
248        }
249
250        self.xmlrpc_services = {\
251                'New': xmlrpc_handler('New', self.new_experiment),
252                'Create': xmlrpc_handler('Create', self.create_experiment),
253                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
254                'Vis': xmlrpc_handler('Vis', self.get_vis),
255                'Info': xmlrpc_handler('Info', self.get_info),
256                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
257                'Terminate': xmlrpc_handler('Terminate',
258                    self.terminate_experiment),
259                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
260                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
261        }
262
263    # Call while holding self.state_lock
264    def write_state(self):
265        """
266        Write a new copy of experiment state after copying the existing state
267        to a backup.
268
269        State format is a simple pickling of the state dictionary.
270        """
271        if os.access(self.state_filename, os.W_OK):
272            copy_file(self.state_filename, \
273                    "%s.bak" % self.state_filename)
274        try:
275            f = open(self.state_filename, 'w')
276            pickle.dump(self.state, f)
277        except EnvironmentError, e:
278            self.log.error("Can't write file %s: %s" % \
279                    (self.state_filename, e))
280        except pickle.PicklingError, e:
281            self.log.error("Pickling problem: %s" % e)
282        except TypeError, e:
283            self.log.error("Pickling problem (TypeError): %s" % e)
284
285    @staticmethod
286    def get_alloc_ids(state):
287        """
288        Pull the fedids of the identifiers of each allocation from the
289        state.  Again, a dict dive that's best isolated.
290
291        Used by read_store and read state
292        """
293
294        return [ f['allocID']['fedid'] 
295                for f in state.get('federant',[]) \
296                    if f.has_key('allocID') and \
297                        f['allocID'].has_key('fedid')]
298
299    # Call while holding self.state_lock
300    def read_state(self):
301        """
302        Read a new copy of experiment state.  Old state is overwritten.
303
304        State format is a simple pickling of the state dictionary.
305        """
306       
307        def get_experiment_id(state):
308            """
309            Pull the fedid experimentID out of the saved state.  This is kind
310            of a gross walk through the dict.
311            """
312
313            if state.has_key('experimentID'):
314                for e in state['experimentID']:
315                    if e.has_key('fedid'):
316                        return e['fedid']
317                else:
318                    return None
319            else:
320                return None
321
322        try:
323            f = open(self.state_filename, "r")
324            self.state = pickle.load(f)
325            self.log.debug("[read_state]: Read state from %s" % \
326                    self.state_filename)
327        except EnvironmentError, e:
328            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
329                    % (self.state_filename, e))
330        except pickle.UnpicklingError, e:
331            self.log.warning(("[read_state]: No saved state: " + \
332                    "Unpickling failed: %s") % e)
333       
334        for s in self.state.values():
335            try:
336
337                eid = get_experiment_id(s)
338                if eid : 
339                    if self.auth_type == 'legacy':
340                        # XXX: legacy
341                        # Give the owner rights to the experiment
342                        self.auth.set_attribute(s['owner'], eid)
343                        # And holders of the eid as well
344                        self.auth.set_attribute(eid, eid)
345                        # allow overrides to control experiments as well
346                        for o in self.overrides:
347                            self.auth.set_attribute(o, eid)
348                        # Set permissions to allow reading of the software
349                        # repo, if any, as well.
350                        for a in self.get_alloc_ids(s):
351                            self.auth.set_attribute(a, 'repo/%s' % eid)
352                else:
353                    raise KeyError("No experiment id")
354            except KeyError, e:
355                self.log.warning("[read_state]: State ownership or identity " +\
356                        "misformatted in %s: %s" % (self.state_filename, e))
357
358
359    def read_accessdb(self, accessdb_file):
360        """
361        Read the mapping from fedids that can create experiments to their name
362        in the 3-level access namespace.  All will be asserted from this
363        testbed and can include the local username and porject that will be
364        asserted on their behalf by this fedd.  Each fedid is also added to the
365        authorization system with the "create" attribute.
366        """
367        self.accessdb = {}
368        # These are the regexps for parsing the db
369        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
370        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
371                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
372        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
373                "\s*->\s*(" + name_expr + ")\s*$")
374        lineno = 0
375
376        # Parse the mappings and store in self.authdb, a dict of
377        # fedid -> (proj, user)
378        try:
379            f = open(accessdb_file, "r")
380            for line in f:
381                lineno += 1
382                line = line.strip()
383                if len(line) == 0 or line.startswith('#'):
384                    continue
385                m = project_line.match(line)
386                if m:
387                    fid = fedid(hexstr=m.group(1))
388                    project, user = m.group(2,3)
389                    if not self.accessdb.has_key(fid):
390                        self.accessdb[fid] = []
391                    self.accessdb[fid].append((project, user))
392                    continue
393
394                m = user_line.match(line)
395                if m:
396                    fid = fedid(hexstr=m.group(1))
397                    project = None
398                    user = m.group(2)
399                    if not self.accessdb.has_key(fid):
400                        self.accessdb[fid] = []
401                    self.accessdb[fid].append((project, user))
402                    continue
403                self.log.warn("[experiment_control] Error parsing access " +\
404                        "db %s at line %d" %  (accessdb_file, lineno))
405        except EnvironmentError:
406            raise service_error(service_error.internal,
407                    ("Error opening/reading %s as experiment " +\
408                            "control accessdb") %  accessdb_file)
409        f.close()
410
411        # Initialize the authorization attributes
412        # XXX: legacy
413        if self.auth_type == 'legacy':
414            for fid in self.accessdb.keys():
415                self.auth.set_attribute(fid, 'create')
416                self.auth.set_attribute(fid, 'new')
417
418    def read_mapdb(self, file):
419        """
420        Read a simple colon separated list of mappings for the
421        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
422        """
423
424        self.tbmap = { }
425        lineno =0
426        try:
427            f = open(file, "r")
428            for line in f:
429                lineno += 1
430                line = line.strip()
431                if line.startswith('#') or len(line) == 0:
432                    continue
433                try:
434                    label, url = line.split(':', 1)
435                    self.tbmap[label] = url
436                except ValueError, e:
437                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
438                            "map db: %s %s" % (lineno, line, e))
439        except EnvironmentError, e:
440            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
441                    "open %s: %s" % (file, e))
442        else:
443            f.close()
444
445    def read_store(self):
446        try:
447            self.synch_store = synch_store()
448            self.synch_store.load(self.store_filename)
449            self.log.debug("[read_store]: Read store from %s" % \
450                    self.store_filename)
451        except EnvironmentError, e:
452            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
453                    % (self.state_filename, e))
454            self.synch_store = synch_store()
455
456        # Set the initial permissions on data in the store.  XXX: This ad hoc
457        # authorization attribute initialization is getting out of hand.
458        # XXX: legacy
459        if self.auth_type == 'legacy':
460            for k in self.synch_store.all_keys():
461                try:
462                    if k.startswith('fedid:'):
463                        fid = fedid(hexstr=k[6:46])
464                        if self.state.has_key(fid):
465                            for a in self.get_alloc_ids(self.state[fid]):
466                                self.auth.set_attribute(a, k)
467                except ValueError, e:
468                    self.log.warn("Cannot deduce permissions for %s" % k)
469
470
471    def write_store(self):
472        """
473        Write a new copy of synch_store after writing current state
474        to a backup.  We use the internal synch_store pickle method to avoid
475        incinsistent data.
476
477        State format is a simple pickling of the store.
478        """
479        if os.access(self.store_filename, os.W_OK):
480            copy_file(self.store_filename, \
481                    "%s.bak" % self.store_filename)
482        try:
483            self.synch_store.save(self.store_filename)
484        except EnvironmentError, e:
485            self.log.error("Can't write file %s: %s" % \
486                    (self.store_filename, e))
487        except TypeError, e:
488            self.log.error("Pickling problem (TypeError): %s" % e)
489
490
491    def remove_dirs(self, dir):
492        """
493        Remove the directory tree and all files rooted at dir.  Log any errors,
494        but continue.
495        """
496        self.log.debug("[removedirs]: removing %s" % dir)
497        try:
498            for path, dirs, files in os.walk(dir, topdown=False):
499                for f in files:
500                    os.remove(os.path.join(path, f))
501                for d in dirs:
502                    os.rmdir(os.path.join(path, d))
503            os.rmdir(dir)
504        except EnvironmentError, e:
505            self.log.error("Error deleting directory tree in %s" % e);
506
507    @staticmethod
508    def make_temp_certfile(expcert, tmpdir):
509        """
510        make a protected copy of the access certificate so the experiment
511        controller can act as the experiment principal.  mkstemp is the most
512        secure way to do that. The directory should be created by
513        mkdtemp.  Return the filename.
514        """
515        if expcert and tmpdir:
516            try:
517                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
518                f = os.fdopen(certf, 'w')
519                print >> f, expcert
520                f.close()
521            except EnvironmentError, e:
522                raise service_error(service_error.internal, 
523                        "Cannot create temp cert file?")
524            return certfn
525        else:
526            return None
527
528       
529    def generate_ssh_keys(self, dest, type="rsa" ):
530        """
531        Generate a set of keys for the gateways to use to talk.
532
533        Keys are of type type and are stored in the required dest file.
534        """
535        valid_types = ("rsa", "dsa")
536        t = type.lower();
537        if t not in valid_types: raise ValueError
538        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
539
540        try:
541            trace = open("/dev/null", "w")
542        except EnvironmentError:
543            raise service_error(service_error.internal,
544                    "Cannot open /dev/null??");
545
546        # May raise CalledProcessError
547        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
548        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
549        if rv != 0:
550            raise service_error(service_error.internal, 
551                    "Cannot generate nonce ssh keys.  %s return code %d" \
552                            % (self.ssh_keygen, rv))
553
554    def gentopo(self, str):
555        """
556        Generate the topology data structure from the splitter's XML
557        representation of it.
558
559        The topology XML looks like:
560            <experiment>
561                <nodes>
562                    <node><vname></vname><ips>ip1:ip2</ips></node>
563                </nodes>
564                <lans>
565                    <lan>
566                        <vname></vname><vnode></vnode><ip></ip>
567                        <bandwidth></bandwidth><member>node:port</member>
568                    </lan>
569                </lans>
570        """
571        class topo_parse:
572            """
573            Parse the topology XML and create the dats structure.
574            """
575            def __init__(self):
576                # Typing of the subelements for data conversion
577                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
578                self.int_subelements = ( 'bandwidth',)
579                self.float_subelements = ( 'delay',)
580                # The final data structure
581                self.nodes = [ ]
582                self.lans =  [ ]
583                self.topo = { \
584                        'node': self.nodes,\
585                        'lan' : self.lans,\
586                    }
587                self.element = { }  # Current element being created
588                self.chars = ""     # Last text seen
589
590            def end_element(self, name):
591                # After each sub element the contents is added to the current
592                # element or to the appropriate list.
593                if name == 'node':
594                    self.nodes.append(self.element)
595                    self.element = { }
596                elif name == 'lan':
597                    self.lans.append(self.element)
598                    self.element = { }
599                elif name in self.str_subelements:
600                    self.element[name] = self.chars
601                    self.chars = ""
602                elif name in self.int_subelements:
603                    self.element[name] = int(self.chars)
604                    self.chars = ""
605                elif name in self.float_subelements:
606                    self.element[name] = float(self.chars)
607                    self.chars = ""
608
609            def found_chars(self, data):
610                self.chars += data.rstrip()
611
612
613        tp = topo_parse();
614        parser = xml.parsers.expat.ParserCreate()
615        parser.EndElementHandler = tp.end_element
616        parser.CharacterDataHandler = tp.found_chars
617
618        parser.Parse(str)
619
620        return tp.topo
621       
622
623    def genviz(self, topo):
624        """
625        Generate the visualization the virtual topology
626        """
627
628        neato = "/usr/local/bin/neato"
629        # These are used to parse neato output and to create the visualization
630        # file.
631        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
632        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
633                "%s</type></node>"
634
635        try:
636            # Node names
637            nodes = [ n['vname'] for n in topo['node'] ]
638            topo_lans = topo['lan']
639        except KeyError, e:
640            raise service_error(service_error.internal, "Bad topology: %s" %e)
641
642        lans = { }
643        links = { }
644
645        # Walk through the virtual topology, organizing the connections into
646        # 2-node connections (links) and more-than-2-node connections (lans).
647        # When a lan is created, it's added to the list of nodes (there's a
648        # node in the visualization for the lan).
649        for l in topo_lans:
650            if links.has_key(l['vname']):
651                if len(links[l['vname']]) < 2:
652                    links[l['vname']].append(l['vnode'])
653                else:
654                    nodes.append(l['vname'])
655                    lans[l['vname']] = links[l['vname']]
656                    del links[l['vname']]
657                    lans[l['vname']].append(l['vnode'])
658            elif lans.has_key(l['vname']):
659                lans[l['vname']].append(l['vnode'])
660            else:
661                links[l['vname']] = [ l['vnode'] ]
662
663
664        # Open up a temporary file for dot to turn into a visualization
665        try:
666            df, dotname = tempfile.mkstemp()
667            dotfile = os.fdopen(df, 'w')
668        except EnvironmentError:
669            raise service_error(service_error.internal,
670                    "Failed to open file in genviz")
671
672        try:
673            dnull = open('/dev/null', 'w')
674        except EnvironmentError:
675            service_error(service_error.internal,
676                    "Failed to open /dev/null in genviz")
677
678        # Generate a dot/neato input file from the links, nodes and lans
679        try:
680            print >>dotfile, "graph G {"
681            for n in nodes:
682                print >>dotfile, '\t"%s"' % n
683            for l in links.keys():
684                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
685            for l in lans.keys():
686                for n in lans[l]:
687                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
688            print >>dotfile, "}"
689            dotfile.close()
690        except TypeError:
691            raise service_error(service_error.internal,
692                    "Single endpoint link in vtopo")
693        except EnvironmentError:
694            raise service_error(service_error.internal, "Cannot write dot file")
695
696        # Use dot to create a visualization
697        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
698                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
699                close_fds=True)
700        dnull.close()
701
702        # Translate dot to vis format
703        vis_nodes = [ ]
704        vis = { 'node': vis_nodes }
705        for line in dot.stdout:
706            m = vis_re.match(line)
707            if m:
708                vn = m.group(1)
709                vis_node = {'name': vn, \
710                        'x': float(m.group(2)),\
711                        'y' : float(m.group(3)),\
712                    }
713                if vn in links.keys() or vn in lans.keys():
714                    vis_node['type'] = 'lan'
715                else:
716                    vis_node['type'] = 'node'
717                vis_nodes.append(vis_node)
718        rv = dot.wait()
719
720        os.remove(dotname)
721        if rv == 0 : return vis
722        else: return None
723
724
725    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
726            cert_pwd=None):
727        """
728        Release access to testbed through fedd
729        """
730
731        if not uri and tbmap:
732            uri = tbmap.get(tb, None)
733        if not uri:
734            raise service_error(service_error.server_config, 
735                    "Unknown testbed: %s" % tb)
736
737        if self.local_access.has_key(uri):
738            resp = self.local_access[uri].ReleaseAccess(\
739                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
740                    fedid(file=cert_file))
741            resp = { 'ReleaseAccessResponseBody': resp } 
742        else:
743            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
744                    cert_file, cert_pwd, self.trusted_certs)
745
746        # better error coding
747
748    def remote_ns2topdl(self, uri, desc):
749
750        req = {
751                'description' : { 'ns2description': desc },
752            }
753
754        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
755                self.trusted_certs)
756
757        if r.has_key('Ns2TopdlResponseBody'):
758            r = r['Ns2TopdlResponseBody']
759            ed = r.get('experimentdescription', None)
760            if ed.has_key('topdldescription'):
761                return topdl.Topology(**ed['topdldescription'])
762            else:
763                raise service_error(service_error.protocol, 
764                        "Bad splitter response (no output)")
765        else:
766            raise service_error(service_error.protocol, "Bad splitter response")
767
768    class start_segment:
769        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
770                cert_pwd=None, trusted_certs=None, caller=None,
771                log_collector=None):
772            self.log = log
773            self.debug = debug
774            self.cert_file = cert_file
775            self.cert_pwd = cert_pwd
776            self.trusted_certs = None
777            self.caller = caller
778            self.testbed = testbed
779            self.log_collector = log_collector
780            self.response = None
781            self.node = { }
782            self.proof = None
783
784        def make_map(self, resp):
785            for e in resp.get('embedding', []):
786                if 'toponame' in e and 'physname' in e:
787                    self.node[e['toponame']] = e['physname'][0]
788
789        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
790            req = {
791                    'allocID': { 'fedid' : aid }, 
792                    'segmentdescription': { 
793                        'topdldescription': topo.to_dict(),
794                    },
795                }
796
797            if connInfo:
798                req['connection'] = connInfo
799
800            import_svcs = [ s for m in masters.values() \
801                    for s in m if self.testbed in s.importers]
802
803            if import_svcs or self.testbed in masters:
804                req['service'] = []
805
806            for s in import_svcs:
807                for r in s.reqs:
808                    sr = copy.deepcopy(r)
809                    sr['visibility'] = 'import';
810                    req['service'].append(sr)
811
812            for s in masters.get(self.testbed, []):
813                for r in s.reqs:
814                    sr = copy.deepcopy(r)
815                    sr['visibility'] = 'export';
816                    req['service'].append(sr)
817
818            if attrs:
819                req['fedAttr'] = attrs
820
821            try:
822                self.log.debug("Calling StartSegment at %s " % uri)
823                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
824                        self.trusted_certs)
825                if r.has_key('StartSegmentResponseBody'):
826                    lval = r['StartSegmentResponseBody'].get('allocationLog',
827                            None)
828                    if lval and self.log_collector:
829                        for line in  lval.splitlines(True):
830                            self.log_collector.write(line)
831                    self.make_map(r['StartSegmentResponseBody'])
832                    if 'proof' in r: self.proof = r['proof']
833                    self.response = r
834                else:
835                    raise service_error(service_error.internal, 
836                            "Bad response!?: %s" %r)
837                return True
838            except service_error, e:
839                self.log.error("Start segment failed on %s: %s" % \
840                        (self.testbed, e))
841                return False
842
843
844
845    class terminate_segment:
846        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
847                cert_pwd=None, trusted_certs=None, caller=None):
848            self.log = log
849            self.debug = debug
850            self.cert_file = cert_file
851            self.cert_pwd = cert_pwd
852            self.trusted_certs = None
853            self.caller = caller
854            self.testbed = testbed
855
856        def __call__(self, uri, aid ):
857            req = {
858                    'allocID': aid , 
859                }
860            try:
861                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
862                        self.trusted_certs)
863                return True
864            except service_error, e:
865                self.log.error("Terminate segment failed on %s: %s" % \
866                        (self.testbed, e))
867                return False
868
869    def allocate_resources(self, allocated, masters, eid, expid, 
870            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
871            attrs=None, connInfo={}, tbmap=None, expcert=None):
872
873        started = { }           # Testbeds where a sub-experiment started
874                                # successfully
875
876        # XXX
877        fail_soft = False
878
879        if tbmap is None: tbmap = { }
880
881        log = alloc_log or self.log
882
883        tp = thread_pool(self.nthreads)
884        threads = [ ]
885        starters = [ ]
886
887        if expcert:
888            cert = expcert
889            pw = None
890        else:
891            cert = self.cert_file
892            pw = self.cert_pwd
893
894        for tb in allocated.keys():
895            # Create and start a thread to start the segment, and save it
896            # to get the return value later
897            tb_attrs = copy.copy(attrs)
898            tp.wait_for_slot()
899            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
900            base, suffix = split_testbed(tb)
901            if suffix:
902                tb_attrs.append({'attribute': 'experiment_name', 
903                    'value': "%s-%s" % (eid, suffix)})
904            else:
905                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
906            if not uri:
907                raise service_error(service_error.internal, 
908                        "Unknown testbed %s !?" % tb)
909
910            if tbparams[tb].has_key('allocID') and \
911                    tbparams[tb]['allocID'].has_key('fedid'):
912                aid = tbparams[tb]['allocID']['fedid']
913            else:
914                raise service_error(service_error.internal, 
915                        "No alloc id for testbed %s !?" % tb)
916
917            s = self.start_segment(log=log, debug=self.debug,
918                    testbed=tb, cert_file=cert,
919                    cert_pwd=pw, trusted_certs=self.trusted_certs,
920                    caller=self.call_StartSegment,
921                    log_collector=log_collector)
922            starters.append(s)
923            t  = pooled_thread(\
924                    target=s, name=tb,
925                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
926                    pdata=tp, trace_file=self.trace_file)
927            threads.append(t)
928            t.start()
929
930        # Wait until all finish (keep pinging the log, though)
931        mins = 0
932        revoked = False
933        while not tp.wait_for_all_done(60.0):
934            mins += 1
935            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
936                    % mins)
937            if not revoked and \
938                    len([ t.getName() for t in threads if t.rv == False]) > 0:
939                # a testbed has failed.  Revoke this experiment's
940                # synchronizarion values so that sub experiments will not
941                # deadlock waiting for synchronization that will never happen
942                self.log.info("A subexperiment has failed to swap in, " + \
943                        "revoking synch keys")
944                var_key = "fedid:%s" % expid
945                for k in self.synch_store.all_keys():
946                    if len(k) > 45 and k[0:46] == var_key:
947                        self.synch_store.revoke_key(k)
948                revoked = True
949
950        failed = [ t.getName() for t in threads if not t.rv ]
951        succeeded = [tb for tb in allocated.keys() if tb not in failed]
952
953        # If one failed clean up, unless fail_soft is set
954        if failed:
955            if not fail_soft:
956                tp.clear()
957                for tb in succeeded:
958                    # Create and start a thread to stop the segment
959                    tp.wait_for_slot()
960                    uri = tbparams[tb]['uri']
961                    t  = pooled_thread(\
962                            target=self.terminate_segment(log=log,
963                                testbed=tb,
964                                cert_file=cert, 
965                                cert_pwd=pw,
966                                trusted_certs=self.trusted_certs,
967                                caller=self.call_TerminateSegment),
968                            args=(uri, tbparams[tb]['federant']['allocID']),
969                            name=tb,
970                            pdata=tp, trace_file=self.trace_file)
971                    t.start()
972                # Wait until all finish (if any are being stopped)
973                if succeeded:
974                    tp.wait_for_all_done()
975
976                # release the allocations
977                for tb in tbparams.keys():
978                    try:
979                        self.release_access(tb, tbparams[tb]['allocID'], 
980                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
981                                cert_file=cert, cert_pwd=pw)
982                    except service_error, e:
983                        self.log.warn("Error releasing access: %s" % e.desc)
984                # Remove the placeholder
985                self.state_lock.acquire()
986                self.state[eid]['experimentStatus'] = 'failed'
987                if self.state_filename: self.write_state()
988                self.state_lock.release()
989                # Remove the repo dir
990                self.remove_dirs("%s/%s" %(self.repodir, expid))
991                # Walk up tmpdir, deleting as we go
992                if self.cleanup:
993                    self.remove_dirs(tmpdir)
994                else:
995                    log.debug("[start_experiment]: not removing %s" % tmpdir)
996
997
998                log.error("Swap in failed on %s" % ",".join(failed))
999                return
1000        else:
1001            # Walk through the successes and gather the virtual to physical
1002            # mapping.
1003            embedding = [ ]
1004            proofs = { }
1005            for s in starters:
1006                for k, v in s.node.items():
1007                    embedding.append({
1008                        'toponame': k, 
1009                        'physname': [ v],
1010                        'testbed': s.testbed
1011                        })
1012                if s.proof:
1013                    proofs[s.testbed] = s.proof
1014            log.info("[start_segment]: Experiment %s active" % eid)
1015
1016
1017        # Walk up tmpdir, deleting as we go
1018        if self.cleanup:
1019            self.remove_dirs(tmpdir)
1020        else:
1021            log.debug("[start_experiment]: not removing %s" % tmpdir)
1022
1023        # Insert the experiment into our state and update the disk copy.
1024        self.state_lock.acquire()
1025        self.state[expid]['experimentStatus'] = 'active'
1026        self.state[eid] = self.state[expid]
1027        self.state[eid]['experimentdescription']['topdldescription'] = \
1028                top.to_dict()
1029        self.state[eid]['embedding'] = embedding
1030        # Append startup proofs
1031        for f in self.state[eid]['federant']:
1032            if 'name' in f and 'localname' in f['name']:
1033                if f['name']['localname'] in proofs:
1034                    f['proof'].append(proofs[f['name']['localname']])
1035
1036        if self.state_filename: self.write_state()
1037        self.state_lock.release()
1038        return
1039
1040
1041    def add_kit(self, e, kit):
1042        """
1043        Add a Software object created from the list of (install, location)
1044        tuples passed as kit  to the software attribute of an object e.  We
1045        do this enough to break out the code, but it's kind of a hack to
1046        avoid changing the old tuple rep.
1047        """
1048
1049        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1050
1051        if isinstance(e.software, list): e.software.extend(s)
1052        else: e.software = s
1053
1054    def append_experiment_authorization(self, expid, attrs, 
1055            need_state_lock=True):
1056        """
1057        Append the authorization information to system state
1058        """
1059
1060        for p, a in attrs:
1061            self.auth.set_attribute(p, a)
1062        self.auth.save()
1063
1064        if need_state_lock: self.state_lock.acquire()
1065        self.state[expid]['auth'].update(attrs)
1066        if self.state_filename: self.write_state()
1067        if need_state_lock: self.state_lock.release()
1068
1069    def clear_experiment_authorization(self, expid, need_state_lock=True):
1070        """
1071        Attrs is a set of attribute principal pairs that need to be removed
1072        from the authenticator.  Remove them and save the authenticator.
1073        """
1074
1075        if need_state_lock: self.state_lock.acquire()
1076        if expid in self.state and 'auth' in self.state[expid]:
1077            for p, a in self.state[expid]['auth']:
1078                self.auth.unset_attribute(p, a)
1079            self.state[expid]['auth'] = set()
1080        if self.state_filename: self.write_state()
1081        if need_state_lock: self.state_lock.release()
1082        self.auth.save()
1083
1084
1085    def create_experiment_state(self, fid, req, expid, expcert,
1086            state='starting'):
1087        """
1088        Create the initial entry in the experiment's state.  The expid and
1089        expcert are the experiment's fedid and certifacte that represents that
1090        ID, which are installed in the experiment state.  If the request
1091        includes a suggested local name that is used if possible.  If the local
1092        name is already taken by an experiment owned by this user that has
1093        failed, it is overwritten.  Otherwise new letters are added until a
1094        valid localname is found.  The generated local name is returned.
1095        """
1096
1097        if req.has_key('experimentID') and \
1098                req['experimentID'].has_key('localname'):
1099            overwrite = False
1100            eid = req['experimentID']['localname']
1101            # If there's an old failed experiment here with the same local name
1102            # and accessible by this user, we'll overwrite it, otherwise we'll
1103            # fall through and do the collision avoidance.
1104            old_expid = self.get_experiment_fedid(eid)
1105            if old_expid:
1106                users_experiment = True
1107                try:
1108                    self.check_experiment_access(fid, old_expid)
1109                except service_error, e:
1110                    if e.code == service_error.access: users_experiment = False
1111                    else: raise e
1112                if users_experiment:
1113                    self.state_lock.acquire()
1114                    status = self.state[eid].get('experimentStatus', None)
1115                    if status and status == 'failed':
1116                        # remove the old access attributes
1117                        self.clear_experiment_authorization(eid,
1118                                need_state_lock=False)
1119                        overwrite = True
1120                        del self.state[eid]
1121                        del self.state[old_expid]
1122                    self.state_lock.release()
1123            self.state_lock.acquire()
1124            while (self.state.has_key(eid) and not overwrite):
1125                eid += random.choice(string.ascii_letters)
1126            # Initial state
1127            self.state[eid] = {
1128                    'experimentID' : \
1129                            [ { 'localname' : eid }, {'fedid': expid } ],
1130                    'experimentStatus': state,
1131                    'experimentAccess': { 'X509' : expcert },
1132                    'owner': fid,
1133                    'log' : [],
1134                    'auth': set(),
1135                }
1136            self.state[expid] = self.state[eid]
1137            if self.state_filename: self.write_state()
1138            self.state_lock.release()
1139        else:
1140            eid = self.exp_stem
1141            for i in range(0,5):
1142                eid += random.choice(string.ascii_letters)
1143            self.state_lock.acquire()
1144            while (self.state.has_key(eid)):
1145                eid = self.exp_stem
1146                for i in range(0,5):
1147                    eid += random.choice(string.ascii_letters)
1148            # Initial state
1149            self.state[eid] = {
1150                    'experimentID' : \
1151                            [ { 'localname' : eid }, {'fedid': expid } ],
1152                    'experimentStatus': state,
1153                    'experimentAccess': { 'X509' : expcert },
1154                    'owner': fid,
1155                    'log' : [],
1156                    'auth': set(),
1157                }
1158            self.state[expid] = self.state[eid]
1159            if self.state_filename: self.write_state()
1160            self.state_lock.release()
1161
1162        # Let users touch the state.  Authorize this fid and the expid itself
1163        # to touch the experiment, as well as allowing th eoverrides.
1164        self.append_experiment_authorization(eid, 
1165                set([(fid, expid), (expid,expid)] + \
1166                        [ (o, expid) for o in self.overrides]))
1167
1168        return eid
1169
1170
1171    def allocate_ips_to_topo(self, top):
1172        """
1173        Add an ip4_address attribute to all the hosts in the topology, based on
1174        the shared substrates on which they sit.  An /etc/hosts file is also
1175        created and returned as a list of hostfiles entries.  We also return
1176        the allocator, because we may need to allocate IPs to portals
1177        (specifically DRAGON portals).
1178        """
1179        subs = sorted(top.substrates, 
1180                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1181                reverse=True)
1182        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1183        ifs = { }
1184        hosts = [ ]
1185
1186        for idx, s in enumerate(subs):
1187            net_size = len(s.interfaces)+2
1188
1189            a = ips.allocate(net_size)
1190            if a :
1191                base, num = a
1192                if num < net_size: 
1193                    raise service_error(service_error.internal,
1194                            "Allocator returned wrong number of IPs??")
1195            else:
1196                raise service_error(service_error.req, 
1197                        "Cannot allocate IP addresses")
1198            mask = ips.min_alloc
1199            while mask < net_size:
1200                mask *= 2
1201
1202            netmask = ((2**32-1) ^ (mask-1))
1203
1204            base += 1
1205            for i in s.interfaces:
1206                i.attribute.append(
1207                        topdl.Attribute('ip4_address', 
1208                            "%s" % ip_addr(base)))
1209                i.attribute.append(
1210                        topdl.Attribute('ip4_netmask', 
1211                            "%s" % ip_addr(int(netmask))))
1212
1213                hname = i.element.name
1214                if ifs.has_key(hname):
1215                    hosts.append("%s\t%s-%s %s-%d" % \
1216                            (ip_addr(base), hname, s.name, hname,
1217                                ifs[hname]))
1218                else:
1219                    ifs[hname] = 0
1220                    hosts.append("%s\t%s-%s %s-%d %s" % \
1221                            (ip_addr(base), hname, s.name, hname,
1222                                ifs[hname], hname))
1223
1224                ifs[hname] += 1
1225                base += 1
1226        return hosts, ips
1227
1228    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1229            tbparam, masters, tbmap, expid=None, expcert=None):
1230        for tb in testbeds:
1231            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1232                    expcert)
1233            allocated[tb] = 1
1234
1235    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1236            expcert=None):
1237        """
1238        Get access to testbed through fedd and set the parameters for that tb
1239        """
1240        def get_export_project(svcs):
1241            """
1242            Look through for the list of federated_service for this testbed
1243            objects for a project_export service, and extract the project
1244            parameter.
1245            """
1246
1247            pe = [s for s in svcs if s.name=='project_export']
1248            if len(pe) == 1:
1249                return pe[0].params.get('project', None)
1250            elif len(pe) == 0:
1251                return None
1252            else:
1253                raise service_error(service_error.req,
1254                        "More than one project export is not supported")
1255
1256        def add_services(svcs, type, slist, keys):
1257            """
1258            Add the given services to slist.  type is import or export.  Also
1259            add a mapping entry from the assigned id to the original service
1260            record.
1261            """
1262            for i, s in enumerate(svcs):
1263                idx = '%s%d' % (type, i)
1264                keys[idx] = s
1265                sr = {'id': idx, 'name': s.name, 'visibility': type }
1266                if s.params:
1267                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1268                            for k, v in s.params.items()]
1269                slist.append(sr)
1270
1271        uri = tbmap.get(testbed_base(tb), None)
1272        if not uri:
1273            raise service_error(service_error.server_config, 
1274                    "Unknown testbed: %s" % tb)
1275
1276        export_svcs = masters.get(tb,[])
1277        import_svcs = [ s for m in masters.values() \
1278                for s in m \
1279                    if tb in s.importers ]
1280
1281        export_project = get_export_project(export_svcs)
1282        # Compose the credential list so that IDs come before attributes
1283        creds = set()
1284        keys = set()
1285        certs = self.auth.get_creds_for_principal(fid)
1286        if expid:
1287            certs.update(self.auth.get_creds_for_principal(expid))
1288        for c in certs:
1289            keys.add(c.issuer_cert())
1290            creds.add(c.attribute_cert())
1291        creds = list(keys) + list(creds)
1292
1293        if expcert: cert, pw = expcert, None
1294        else: cert, pw = self.cert_file, self.cert_pw
1295
1296        # Request credentials
1297        req = {
1298                'abac_credential': creds,
1299            }
1300        # Make the service request from the services we're importing and
1301        # exporting.  Keep track of the export request ids so we can
1302        # collect the resulting info from the access response.
1303        e_keys = { }
1304        if import_svcs or export_svcs:
1305            slist = []
1306            add_services(import_svcs, 'import', slist, e_keys)
1307            add_services(export_svcs, 'export', slist, e_keys)
1308            req['service'] = slist
1309
1310        if self.local_access.has_key(uri):
1311            # Local access call
1312            req = { 'RequestAccessRequestBody' : req }
1313            r = self.local_access[uri].RequestAccess(req, 
1314                    fedid(file=self.cert_file))
1315            r = { 'RequestAccessResponseBody' : r }
1316        else:
1317            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1318
1319        if r.has_key('RequestAccessResponseBody'):
1320            # Through to here we have a valid response, not a fault.
1321            # Access denied is a fault, so something better or worse than
1322            # access denied has happened.
1323            r = r['RequestAccessResponseBody']
1324            self.log.debug("[get_access] Access granted")
1325        else:
1326            raise service_error(service_error.protocol,
1327                        "Bad proxy response")
1328        if 'proof' not in r:
1329            raise service_error(service_error.protocol,
1330                        "Bad access response (no access proof)")
1331       
1332        tbparam[tb] = { 
1333                "allocID" : r['allocID'],
1334                "uri": uri,
1335                "proof": [ r['proof'] ],
1336                }
1337
1338        # Collect the responses corresponding to the services this testbed
1339        # exports.  These will be the service requests that we will include in
1340        # the start segment requests (with appropriate visibility values) to
1341        # import and export the segments.
1342        for s in r.get('service', []):
1343            id = s.get('id', None)
1344            if id and id in e_keys:
1345                e_keys[id].reqs.append(s)
1346
1347        # Add attributes to parameter space.  We don't allow attributes to
1348        # overlay any parameters already installed.
1349        for a in r.get('fedAttr', []):
1350            try:
1351                if a['attribute'] and \
1352                        isinstance(a['attribute'], basestring)\
1353                        and not tbparam[tb].has_key(a['attribute'].lower()):
1354                    tbparam[tb][a['attribute'].lower()] = a['value']
1355            except KeyError:
1356                self.log.error("Bad attribute in response: %s" % a)
1357
1358
1359    def split_topology(self, top, topo, testbeds):
1360        """
1361        Create the sub-topologies that are needed for experiment instantiation.
1362        """
1363        for tb in testbeds:
1364            topo[tb] = top.clone()
1365            # copy in for loop allows deletions from the original
1366            for e in [ e for e in topo[tb].elements]:
1367                etb = e.get_attribute('testbed')
1368                # NB: elements without a testbed attribute won't appear in any
1369                # sub topologies. 
1370                if not etb or etb != tb:
1371                    for i in e.interface:
1372                        for s in i.subs:
1373                            try:
1374                                s.interfaces.remove(i)
1375                            except ValueError:
1376                                raise service_error(service_error.internal,
1377                                        "Can't remove interface??")
1378                    topo[tb].elements.remove(e)
1379            topo[tb].make_indices()
1380
1381    def confirm_software(self, top):
1382        """
1383        Make sure that the software to be loaded in the topo is all available
1384        before we begin making access requests, etc.  This is a subset of
1385        wrangle_software.
1386        """
1387        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1388        pkgs.update([x.location for e in top.elements for x in e.software])
1389
1390        for pkg in pkgs:
1391            loc = pkg
1392
1393            scheme, host, path = urlparse(loc)[0:3]
1394            dest = os.path.basename(path)
1395            if not scheme:
1396                if not loc.startswith('/'):
1397                    loc = "/%s" % loc
1398                loc = "file://%s" %loc
1399            # NB: if scheme was found, loc == pkg
1400            try:
1401                u = urlopen(loc)
1402                u.close()
1403            except Exception, e:
1404                raise service_error(service_error.req, 
1405                        "Cannot open %s: %s" % (loc, e))
1406        return True
1407
1408    def wrangle_software(self, expid, top, topo, tbparams):
1409        """
1410        Copy software out to the repository directory, allocate permissions and
1411        rewrite the segment topologies to look for the software in local
1412        places.
1413        """
1414
1415        # Copy the rpms and tarfiles to a distribution directory from
1416        # which the federants can retrieve them
1417        linkpath = "%s/software" %  expid
1418        softdir ="%s/%s" % ( self.repodir, linkpath)
1419        softmap = { }
1420
1421        # self.fedkit and self.gateway kit are lists of tuples of
1422        # (install_location, download_location) this extracts the download
1423        # locations.
1424        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1425        pkgs.update([x.location for e in top.elements for x in e.software])
1426        try:
1427            os.makedirs(softdir)
1428        except EnvironmentError, e:
1429            raise service_error(
1430                    "Cannot create software directory: %s" % e)
1431        # The actual copying.  Everything's converted into a url for copying.
1432        auth_attrs = set()
1433        for pkg in pkgs:
1434            loc = pkg
1435
1436            scheme, host, path = urlparse(loc)[0:3]
1437            dest = os.path.basename(path)
1438            if not scheme:
1439                if not loc.startswith('/'):
1440                    loc = "/%s" % loc
1441                loc = "file://%s" %loc
1442            # NB: if scheme was found, loc == pkg
1443            try:
1444                u = urlopen(loc)
1445            except Exception, e:
1446                raise service_error(service_error.req, 
1447                        "Cannot open %s: %s" % (loc, e))
1448            try:
1449                f = open("%s/%s" % (softdir, dest) , "w")
1450                self.log.debug("Writing %s/%s" % (softdir,dest) )
1451                data = u.read(4096)
1452                while data:
1453                    f.write(data)
1454                    data = u.read(4096)
1455                f.close()
1456                u.close()
1457            except Exception, e:
1458                raise service_error(service_error.internal,
1459                        "Could not copy %s: %s" % (loc, e))
1460            path = re.sub("/tmp", "", linkpath)
1461            # XXX
1462            softmap[pkg] = \
1463                    "%s/%s/%s" %\
1464                    ( self.repo_url, path, dest)
1465
1466            # Allow the individual segments to access the software by assigning
1467            # an attribute to each testbed allocation that encodes the data to
1468            # be released.  This expression collects the data for each run of
1469            # the loop.
1470            auth_attrs.update([
1471                (tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) \
1472                        for tb in tbparams.keys()])
1473
1474        self.append_experiment_authorization(expid, auth_attrs)
1475
1476        # Convert the software locations in the segments into the local
1477        # copies on this host
1478        for soft in [ s for tb in topo.values() \
1479                for e in tb.elements \
1480                    if getattr(e, 'software', False) \
1481                        for s in e.software ]:
1482            if softmap.has_key(soft.location):
1483                soft.location = softmap[soft.location]
1484
1485
1486    def new_experiment(self, req, fid):
1487        """
1488        The external interface to empty initial experiment creation called from
1489        the dispatcher.
1490
1491        Creates a working directory, splits the incoming description using the
1492        splitter script and parses out the avrious subsections using the
1493        lcasses above.  Once each sub-experiment is created, use pooled threads
1494        to instantiate them and start it all up.
1495        """
1496        req = req.get('NewRequestBody', None)
1497        if not req:
1498            raise service_error(service_error.req,
1499                    "Bad request format (no NewRequestBody)")
1500
1501        if self.auth.import_credentials(data_list=req.get('credential', [])):
1502            self.auth.save()
1503
1504        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1505                with_proof=True)
1506
1507        if not access_ok:
1508            raise service_error(service_error.access, "New access denied",
1509                    proof=[proof])
1510
1511        try:
1512            tmpdir = tempfile.mkdtemp(prefix="split-")
1513        except EnvironmentError:
1514            raise service_error(service_error.internal, "Cannot create tmp dir")
1515
1516        try:
1517            access_user = self.accessdb[fid]
1518        except KeyError:
1519            raise service_error(service_error.internal,
1520                    "Access map and authorizer out of sync in " + \
1521                            "new_experiment for fedid %s"  % fid)
1522
1523        # Generate an ID for the experiment (slice) and a certificate that the
1524        # allocator can use to prove they own it.  We'll ship it back through
1525        # the encrypted connection.  If the requester supplied one, use it.
1526        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1527            expcert = req['experimentAccess']['X509']
1528            expid = fedid(certstr=expcert)
1529            self.state_lock.acquire()
1530            if expid in self.state:
1531                self.state_lock.release()
1532                raise service_error(service_error.req, 
1533                        'fedid %s identifies an existing experiment' % expid)
1534            self.state_lock.release()
1535        else:
1536            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1537
1538        #now we're done with the tmpdir, and it should be empty
1539        if self.cleanup:
1540            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1541            os.rmdir(tmpdir)
1542        else:
1543            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1544
1545        eid = self.create_experiment_state(fid, req, expid, expcert, 
1546                state='empty')
1547
1548        rv = {
1549                'experimentID': [
1550                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1551                ],
1552                'experimentStatus': 'empty',
1553                'experimentAccess': { 'X509' : expcert },
1554                'proof': proof.to_dict(),
1555            }
1556
1557        return rv
1558
1559    # create_experiment sub-functions
1560
1561    @staticmethod
1562    def get_experiment_key(req, field='experimentID'):
1563        """
1564        Parse the experiment identifiers out of the request (the request body
1565        tag has been removed).  Specifically this pulls either the fedid or the
1566        localname out of the experimentID field.  A fedid is preferred.  If
1567        neither is present or the request does not contain the fields,
1568        service_errors are raised.
1569        """
1570        # Get the experiment access
1571        exp = req.get(field, None)
1572        if exp:
1573            if exp.has_key('fedid'):
1574                key = exp['fedid']
1575            elif exp.has_key('localname'):
1576                key = exp['localname']
1577            else:
1578                raise service_error(service_error.req, "Unknown lookup type")
1579        else:
1580            raise service_error(service_error.req, "No request?")
1581
1582        return key
1583
1584    def get_experiment_ids_and_start(self, key, tmpdir):
1585        """
1586        Get the experiment name, id and access certificate from the state, and
1587        set the experiment state to 'starting'.  returns a triple (fedid,
1588        localname, access_cert_file). The access_cert_file is a copy of the
1589        contents of the access certificate, created in the tempdir with
1590        restricted permissions.  If things are confused, raise an exception.
1591        """
1592
1593        expid = eid = None
1594        self.state_lock.acquire()
1595        if self.state.has_key(key):
1596            self.state[key]['experimentStatus'] = "starting"
1597            for e in self.state[key].get('experimentID',[]):
1598                if not expid and e.has_key('fedid'):
1599                    expid = e['fedid']
1600                elif not eid and e.has_key('localname'):
1601                    eid = e['localname']
1602            if 'experimentAccess' in self.state[key] and \
1603                    'X509' in self.state[key]['experimentAccess']:
1604                expcert = self.state[key]['experimentAccess']['X509']
1605            else:
1606                expcert = None
1607        self.state_lock.release()
1608
1609        # make a protected copy of the access certificate so the experiment
1610        # controller can act as the experiment principal.
1611        if expcert:
1612            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1613            if not expcert_file:
1614                raise service_error(service_error.internal, 
1615                        "Cannot create temp cert file?")
1616        else:
1617            expcert_file = None
1618
1619        return (eid, expid, expcert_file)
1620
1621    def get_topology(self, req, tmpdir):
1622        """
1623        Get the ns2 content and put it into a file for parsing.  Call the local
1624        or remote parser and return the topdl.Topology.  Errors result in
1625        exceptions.  req is the request and tmpdir is a work directory.
1626        """
1627
1628        # The tcl parser needs to read a file so put the content into that file
1629        descr=req.get('experimentdescription', None)
1630        if descr:
1631            if 'ns2description' in descr:
1632                file_content=descr['ns2description']
1633            elif 'topdldescription' in descr:
1634                return topdl.Topology(**descr['topdldescription'])
1635            else:
1636                raise service_error(service_error.req, 
1637                        'Unknown experiment description type')
1638        else:
1639            raise service_error(service_error.req, "No experiment description")
1640
1641
1642        if self.splitter_url:
1643            self.log.debug("Calling remote topdl translator at %s" % \
1644                    self.splitter_url)
1645            top = self.remote_ns2topdl(self.splitter_url, file_content)
1646        else:
1647            tclfile = os.path.join(tmpdir, "experiment.tcl")
1648            if file_content:
1649                try:
1650                    f = open(tclfile, 'w')
1651                    f.write(file_content)
1652                    f.close()
1653                except EnvironmentError:
1654                    raise service_error(service_error.internal,
1655                            "Cannot write temp experiment description")
1656            else:
1657                raise service_error(service_error.req, 
1658                        "Only ns2descriptions supported")
1659            pid = "dummy"
1660            gid = "dummy"
1661            eid = "dummy"
1662
1663            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1664                str(self.muxmax), '-m', 'dummy']
1665
1666            tclcmd.extend([pid, gid, eid, tclfile])
1667
1668            self.log.debug("running local splitter %s", " ".join(tclcmd))
1669            # This is just fantastic.  As a side effect the parser copies
1670            # tb_compat.tcl into the current directory, so that directory
1671            # must be writable by the fedd user.  Doing this in the
1672            # temporary subdir ensures this is the case.
1673            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1674                    cwd=tmpdir)
1675            split_data = tclparser.stdout
1676
1677            top = topdl.topology_from_xml(file=split_data, top="experiment")
1678            os.remove(tclfile)
1679
1680        return top
1681
1682    def get_testbed_services(self, req, testbeds):
1683        """
1684        Parse the services section of the request into into two dicts mapping
1685        testbed to lists of federated_service objects.  The first lists all
1686        exporters of services, and the second all exporters of services that
1687        need control portals int the experiment.
1688        """
1689        masters = { }
1690        pmasters = { }
1691        for s in req.get('service', []):
1692            # If this is a service request with the importall field
1693            # set, fill it out.
1694
1695            if s.get('importall', False):
1696                s['import'] = [ tb for tb in testbeds \
1697                        if tb not in s.get('export',[])]
1698                del s['importall']
1699
1700            # Add the service to masters
1701            for tb in s.get('export', []):
1702                if s.get('name', None):
1703
1704                    params = { }
1705                    for a in s.get('fedAttr', []):
1706                        params[a.get('attribute', '')] = a.get('value','')
1707
1708                    fser = federated_service(name=s['name'],
1709                            exporter=tb, importers=s.get('import',[]),
1710                            params=params)
1711                    if fser.name == 'hide_hosts' \
1712                            and 'hosts' not in fser.params:
1713                        fser.params['hosts'] = \
1714                                ",".join(tb_hosts.get(fser.exporter, []))
1715                    if tb in masters: masters[tb].append(fser)
1716                    else: masters[tb] = [fser]
1717
1718                    if fser.portal:
1719                        if tb not in pmasters: pmasters[tb] = [ fser ]
1720                        else: pmasters[tb].append(fser)
1721                else:
1722                    self.log.error('Testbed service does not have name " + \
1723                            "and importers')
1724        return masters, pmasters
1725
1726    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1727        """
1728        Create the ssh keys necessary for interconnecting the potral nodes and
1729        the global hosts file for letting each segment know about the IP
1730        addresses in play.  Save these into the repo.  Add attributes to the
1731        autorizer allowing access controllers to download them and return a set
1732        of attributes that inform the segments where to find this stuff.  Mau
1733        raise service_errors in if there are problems.
1734        """
1735        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1736        gw_secretkey_base = "fed.%s" % self.ssh_type
1737        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1738        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1739
1740        try:
1741            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1742        except ValueError:
1743            raise service_error(service_error.server_config, 
1744                    "Bad key type (%s)" % self.ssh_type)
1745
1746
1747        # Copy configuration files into the remote file store
1748        # The config urlpath
1749        configpath = "/%s/config" % expid
1750        # The config file system location
1751        configdir ="%s%s" % ( self.repodir, configpath)
1752        try:
1753            os.makedirs(configdir)
1754        except EnvironmentError, e:
1755            raise service_error(service_error.internal,
1756                    "Cannot create config directory: %s" % e)
1757        try:
1758            f = open("%s/hosts" % configdir, "w")
1759            print >> f, string.join(hosts, '\n')
1760            f.close()
1761        except EnvironmentError, e:
1762            raise service_error(service_error.internal, 
1763                    "Cannot write hosts file: %s" % e)
1764        try:
1765            copy_file("%s" % gw_pubkey, "%s/%s" % \
1766                    (configdir, gw_pubkey_base))
1767            copy_file("%s" % gw_secretkey, "%s/%s" % \
1768                    (configdir, gw_secretkey_base))
1769        except EnvironmentError, e:
1770            raise service_error(service_error.internal, 
1771                    "Cannot copy keyfiles: %s" % e)
1772
1773        # Allow the individual testbeds to access the configuration files,
1774        # again by setting an attribute for the relevant pathnames on each
1775        # allocation principal.  Yeah, that's a long list comprehension.
1776        self.append_experiment_authorization(expid, set([
1777            (tbparams[tb]['allocID']['fedid'], "%s/%s" % (configpath, f)) \
1778                    for tb in tbparams.keys() \
1779                        for f in ("hosts", gw_secretkey_base, gw_pubkey_base)]))
1780
1781        attrs = [ 
1782                {
1783                    'attribute': 'ssh_pubkey', 
1784                    'value': '%s/%s/config/%s' % \
1785                            (self.repo_url, expid, gw_pubkey_base)
1786                },
1787                {
1788                    'attribute': 'ssh_secretkey', 
1789                    'value': '%s/%s/config/%s' % \
1790                            (self.repo_url, expid, gw_secretkey_base)
1791                },
1792                {
1793                    'attribute': 'hosts', 
1794                    'value': '%s/%s/config/hosts' % \
1795                            (self.repo_url, expid)
1796                },
1797            ]
1798        return attrs
1799
1800
1801    def get_vtopo(self, req, fid):
1802        """
1803        Return the stored virtual topology for this experiment
1804        """
1805        rv = None
1806        state = None
1807
1808        req = req.get('VtopoRequestBody', None)
1809        if not req:
1810            raise service_error(service_error.req,
1811                    "Bad request format (no VtopoRequestBody)")
1812        exp = req.get('experiment', None)
1813        if exp:
1814            if exp.has_key('fedid'):
1815                key = exp['fedid']
1816                keytype = "fedid"
1817            elif exp.has_key('localname'):
1818                key = exp['localname']
1819                keytype = "localname"
1820            else:
1821                raise service_error(service_error.req, "Unknown lookup type")
1822        else:
1823            raise service_error(service_error.req, "No request?")
1824
1825        proof = self.check_experiment_access(fid, key)
1826
1827        self.state_lock.acquire()
1828        if self.state.has_key(key):
1829            if self.state[key].has_key('vtopo'):
1830                rv = { 'experiment' : {keytype: key },
1831                        'vtopo': self.state[key]['vtopo'],
1832                        'proof': proof.to_dict(), 
1833                    }
1834            else:
1835                state = self.state[key]['experimentStatus']
1836        self.state_lock.release()
1837
1838        if rv: return rv
1839        else: 
1840            if state:
1841                raise service_error(service_error.partial, 
1842                        "Not ready: %s" % state)
1843            else:
1844                raise service_error(service_error.req, "No such experiment")
1845
1846    def get_vis(self, req, fid):
1847        """
1848        Return the stored visualization for this experiment
1849        """
1850        rv = None
1851        state = None
1852
1853        req = req.get('VisRequestBody', None)
1854        if not req:
1855            raise service_error(service_error.req,
1856                    "Bad request format (no VisRequestBody)")
1857        exp = req.get('experiment', None)
1858        if exp:
1859            if exp.has_key('fedid'):
1860                key = exp['fedid']
1861                keytype = "fedid"
1862            elif exp.has_key('localname'):
1863                key = exp['localname']
1864                keytype = "localname"
1865            else:
1866                raise service_error(service_error.req, "Unknown lookup type")
1867        else:
1868            raise service_error(service_error.req, "No request?")
1869
1870        proof = self.check_experiment_access(fid, key)
1871
1872        self.state_lock.acquire()
1873        if self.state.has_key(key):
1874            if self.state[key].has_key('vis'):
1875                rv =  { 'experiment' : {keytype: key },
1876                        'vis': self.state[key]['vis'],
1877                        'proof': proof.to_dict(), 
1878                        }
1879            else:
1880                state = self.state[key]['experimentStatus']
1881        self.state_lock.release()
1882
1883        if rv: return rv
1884        else:
1885            if state:
1886                raise service_error(service_error.partial, 
1887                        "Not ready: %s" % state)
1888            else:
1889                raise service_error(service_error.req, "No such experiment")
1890
1891   
1892    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1893            top):
1894        """
1895        Store the various data that have changed in the experiment state
1896        between when it was started and the beginning of resource allocation.
1897        This is basically the information about each local allocation.  This
1898        fills in the values of the placeholder allocation in the state.  It
1899        also collects the access proofs and returns them as dicts for a
1900        response message.
1901        """
1902        # save federant information
1903        for k in allocated.keys():
1904            tbparams[k]['federant'] = {
1905                    'name': [ { 'localname' : eid} ],
1906                    'allocID' : tbparams[k]['allocID'],
1907                    'uri': tbparams[k]['uri'],
1908                    'proof': tbparams[k]['proof'],
1909                }
1910
1911        self.state_lock.acquire()
1912        self.state[eid]['vtopo'] = vtopo
1913        self.state[eid]['vis'] = vis
1914        self.state[eid]['experimentdescription'] = \
1915                { 'topdldescription': top.to_dict() }
1916        self.state[eid]['federant'] = \
1917                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1918                    if tbparams[tb].has_key('federant') ]
1919        # Access proofs for the response message
1920        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
1921                    for p in tbparams[k]['federant']['proof']]
1922        if self.state_filename: 
1923            self.write_state()
1924        self.state_lock.release()
1925        return proofs
1926
1927    def clear_placeholder(self, eid, expid, tmpdir):
1928        """
1929        Clear the placeholder and remove any allocated temporary dir.
1930        """
1931
1932        self.state_lock.acquire()
1933        del self.state[eid]
1934        del self.state[expid]
1935        if self.state_filename: self.write_state()
1936        self.state_lock.release()
1937        if tmpdir and self.cleanup:
1938            self.remove_dirs(tmpdir)
1939
1940    # end of create_experiment sub-functions
1941
1942    def create_experiment(self, req, fid):
1943        """
1944        The external interface to experiment creation called from the
1945        dispatcher.
1946
1947        Creates a working directory, splits the incoming description using the
1948        splitter script and parses out the various subsections using the
1949        classes above.  Once each sub-experiment is created, use pooled threads
1950        to instantiate them and start it all up.
1951        """
1952
1953        req = req.get('CreateRequestBody', None)
1954        if req:
1955            key = self.get_experiment_key(req)
1956        else:
1957            raise service_error(service_error.req,
1958                    "Bad request format (no CreateRequestBody)")
1959
1960        # Import information from the requester
1961        if self.auth.import_credentials(data_list=req.get('credential', [])):
1962            self.auth.save()
1963
1964        # Make sure that the caller can talk to us
1965        proof = self.check_experiment_access(fid, key)
1966
1967        # Install the testbed map entries supplied with the request into a copy
1968        # of the testbed map.
1969        tbmap = dict(self.tbmap)
1970        for m in req.get('testbedmap', []):
1971            if 'testbed' in m and 'uri' in m:
1972                tbmap[m['testbed']] = m['uri']
1973
1974        # a place to work
1975        try:
1976            tmpdir = tempfile.mkdtemp(prefix="split-")
1977            os.mkdir(tmpdir+"/keys")
1978        except EnvironmentError:
1979            raise service_error(service_error.internal, "Cannot create tmp dir")
1980
1981        tbparams = { }
1982
1983        eid, expid, expcert_file = \
1984                self.get_experiment_ids_and_start(key, tmpdir)
1985
1986        # This catches exceptions to clear the placeholder if necessary
1987        try: 
1988            if not (eid and expid):
1989                raise service_error(service_error.internal, 
1990                        "Cannot find local experiment info!?")
1991
1992            top = self.get_topology(req, tmpdir)
1993            self.confirm_software(top)
1994            # Assign the IPs
1995            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1996            # Find the testbeds to look up
1997            tb_hosts = { }
1998            testbeds = [ ]
1999            for e in top.elements:
2000                if isinstance(e, topdl.Computer):
2001                    tb = e.get_attribute('testbed') or 'default'
2002                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2003                    else: 
2004                        tb_hosts[tb] = [ e.name ]
2005                        testbeds.append(tb)
2006
2007            masters, pmasters = self.get_testbed_services(req, testbeds)
2008            allocated = { }         # Testbeds we can access
2009            topo ={ }               # Sub topologies
2010            connInfo = { }          # Connection information
2011
2012            self.split_topology(top, topo, testbeds)
2013
2014            self.get_access_to_testbeds(testbeds, fid, allocated, 
2015                    tbparams, masters, tbmap, expid, expcert_file)
2016
2017            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2018
2019            part = experiment_partition(self.auth, self.store_url, tbmap,
2020                    self.muxmax, self.direct_transit)
2021            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2022                    connInfo, expid)
2023
2024            auth_attrs = set()
2025            # Now get access to the dynamic testbeds (those added above)
2026            for tb in [ t for t in topo if t not in allocated]:
2027                self.get_access(tb, tbparams, fid, masters, tbmap, 
2028                        expid, expcert_file)
2029                allocated[tb] = 1
2030                store_keys = topo[tb].get_attribute('store_keys')
2031                # Give the testbed access to keys it exports or imports
2032                if store_keys:
2033                    auth_attrs.update(set([
2034                        (tbparams[tb]['allocID']['fedid'], sk) \
2035                                for sk in store_keys.split(" ")]))
2036
2037            if auth_attrs:
2038                self.append_experiment_authorization(expid, auth_attrs)
2039
2040            # transit and disconnected testbeds may not have a connInfo entry.
2041            # Fill in the blanks.
2042            for t in allocated.keys():
2043                if not connInfo.has_key(t):
2044                    connInfo[t] = { }
2045
2046            self.wrangle_software(expid, top, topo, tbparams)
2047
2048            vtopo = topdl.topology_to_vtopo(top)
2049            vis = self.genviz(vtopo)
2050            proofs = self.save_federant_information(allocated, tbparams, 
2051                    eid, vtopo, vis, top)
2052        except service_error, e:
2053            # If something goes wrong in the parse (usually an access error)
2054            # clear the placeholder state.  From here on out the code delays
2055            # exceptions.  Failing at this point returns a fault to the remote
2056            # caller.
2057            self.clear_placeholder(eid, expid, tmpdir)
2058            raise e
2059
2060        # Start the background swapper and return the starting state.  From
2061        # here on out, the state will stick around a while.
2062
2063        # Create a logger that logs to the experiment's state object as well as
2064        # to the main log file.
2065        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2066        alloc_collector = self.list_log(self.state[eid]['log'])
2067        h = logging.StreamHandler(alloc_collector)
2068        # XXX: there should be a global one of these rather than repeating the
2069        # code.
2070        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2071                    '%d %b %y %H:%M:%S'))
2072        alloc_log.addHandler(h)
2073
2074        # Start a thread to do the resource allocation
2075        t  = Thread(target=self.allocate_resources,
2076                args=(allocated, masters, eid, expid, tbparams, 
2077                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2078                    connInfo, tbmap, expcert_file),
2079                name=eid)
2080        t.start()
2081
2082        rv = {
2083                'experimentID': [
2084                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2085                ],
2086                'experimentStatus': 'starting',
2087                'proof': [ proof.to_dict() ] + proofs,
2088            }
2089
2090        return rv
2091   
2092    def get_experiment_fedid(self, key):
2093        """
2094        find the fedid associated with the localname key in the state database.
2095        """
2096
2097        rv = None
2098        self.state_lock.acquire()
2099        if self.state.has_key(key):
2100            if isinstance(self.state[key], dict):
2101                try:
2102                    kl = [ f['fedid'] for f in \
2103                            self.state[key]['experimentID']\
2104                                if f.has_key('fedid') ]
2105                except KeyError:
2106                    self.state_lock.release()
2107                    raise service_error(service_error.internal, 
2108                            "No fedid for experiment %s when getting "+\
2109                                    "fedid(!?)" % key)
2110                if len(kl) == 1:
2111                    rv = kl[0]
2112                else:
2113                    self.state_lock.release()
2114                    raise service_error(service_error.internal, 
2115                            "multiple fedids for experiment %s when " +\
2116                                    "getting fedid(!?)" % key)
2117            else:
2118                self.state_lock.release()
2119                raise service_error(service_error.internal, 
2120                        "Unexpected state for %s" % key)
2121        self.state_lock.release()
2122        return rv
2123
2124    def check_experiment_access(self, fid, key):
2125        """
2126        Confirm that the fid has access to the experiment.  Though a request
2127        may be made in terms of a local name, the access attribute is always
2128        the experiment's fedid.
2129        """
2130        if not isinstance(key, fedid):
2131            key = self.get_experiment_fedid(key)
2132
2133        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2134
2135        if access_ok:
2136            return proof
2137        else:
2138            raise service_error(service_error.access, "Access Denied",
2139                proof)
2140
2141
2142    def get_handler(self, path, fid):
2143        """
2144        Perhaps surprisingly named, this function handles HTTP GET requests to
2145        this server (SOAP requests are POSTs).
2146        """
2147        self.log.info("Get handler %s %s" % (path, fid))
2148        # XXX: log proofs?
2149        if self.auth.check_attribute(fid, path):
2150            return ("%s/%s" % (self.repodir, path), "application/binary")
2151        else:
2152            return (None, None)
2153
2154    def clean_info_response(self, rv, proof):
2155        """
2156        Remove the information in the experiment's state object that is not in
2157        the info response.
2158        """
2159        # Remove the owner info (should always be there, but...)
2160        if rv.has_key('owner'): del rv['owner']
2161        if 'auth' in rv: del rv['auth']
2162
2163        # Convert the log into the allocationLog parameter and remove the
2164        # log entry (with defensive programming)
2165        if rv.has_key('log'):
2166            rv['allocationLog'] = "".join(rv['log'])
2167            del rv['log']
2168        else:
2169            rv['allocationLog'] = ""
2170
2171        if rv['experimentStatus'] != 'active':
2172            if rv.has_key('federant'): del rv['federant']
2173        else:
2174            # remove the allocationID and uri info from each federant
2175            for f in rv.get('federant', []):
2176                if f.has_key('allocID'): del f['allocID']
2177                if f.has_key('uri'): del f['uri']
2178        rv['proof'] = proof.to_dict()
2179
2180        return rv
2181
2182    def get_info(self, req, fid):
2183        """
2184        Return all the stored info about this experiment
2185        """
2186        rv = None
2187
2188        req = req.get('InfoRequestBody', None)
2189        if not req:
2190            raise service_error(service_error.req,
2191                    "Bad request format (no InfoRequestBody)")
2192        exp = req.get('experiment', None)
2193        if exp:
2194            if exp.has_key('fedid'):
2195                key = exp['fedid']
2196                keytype = "fedid"
2197            elif exp.has_key('localname'):
2198                key = exp['localname']
2199                keytype = "localname"
2200            else:
2201                raise service_error(service_error.req, "Unknown lookup type")
2202        else:
2203            raise service_error(service_error.req, "No request?")
2204
2205        proof = self.check_experiment_access(fid, key)
2206
2207        # The state may be massaged by the service function that called
2208        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2209        # state.
2210        self.state_lock.acquire()
2211        if self.state.has_key(key):
2212            rv = copy.deepcopy(self.state[key])
2213        self.state_lock.release()
2214
2215        if rv:
2216            return self.clean_info_response(rv, proof)
2217        else:
2218            raise service_error(service_error.req, "No such experiment")
2219
2220    def get_multi_info(self, req, fid):
2221        """
2222        Return all the stored info that this fedid can access
2223        """
2224        rv = { 'info': [ ], 'proof': [ ] }
2225
2226        self.state_lock.acquire()
2227        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2228            try:
2229                proof = self.check_experiment_access(fid, key)
2230            except service_error, e:
2231                if e.code == service_error.access:
2232                    continue
2233                else:
2234                    self.state_lock.release()
2235                    raise e
2236
2237            if self.state.has_key(key):
2238                e = copy.deepcopy(self.state[key])
2239                e = self.clean_info_response(e, proof)
2240                rv['info'].append(e)
2241                rv['proof'].append(proof.to_dict())
2242        self.state_lock.release()
2243        return rv
2244
2245    def check_termination_status(self, fed_exp, force):
2246        """
2247        Confirm that the experiment is sin a valid state to stop (or force it)
2248        return the state - invalid states for deletion and force settings cause
2249        exceptions.
2250        """
2251        self.state_lock.acquire()
2252        status = fed_exp.get('experimentStatus', None)
2253
2254        if status:
2255            if status in ('starting', 'terminating'):
2256                if not force:
2257                    self.state_lock.release()
2258                    raise service_error(service_error.partial, 
2259                            'Experiment still being created or destroyed')
2260                else:
2261                    self.log.warning('Experiment in %s state ' % status + \
2262                            'being terminated by force.')
2263            self.state_lock.release()
2264            return status
2265        else:
2266            # No status??? trouble
2267            self.state_lock.release()
2268            raise service_error(service_error.internal,
2269                    "Experiment has no status!?")
2270
2271
2272    def get_termination_info(self, fed_exp):
2273        ids = []
2274        term_params = { }
2275        self.state_lock.acquire()
2276        #  experimentID is a list of dicts that are self-describing
2277        #  identifiers.  This finds all the fedids and localnames - the
2278        #  keys of self.state - and puts them into ids, which is used to delete
2279        #  the state after everything is swapped out.
2280        for id in fed_exp.get('experimentID', []):
2281            if 'fedid' in id: 
2282                ids.append(id['fedid'])
2283                repo = "%s" % id['fedid']
2284            if 'localname' in id: ids.append(id['localname'])
2285
2286        # Get the experimentAccess - the principal for this experiment.  It
2287        # is this principal to which credentials have been delegated, and
2288        # as which the experiment controller must act.
2289        if 'experimentAccess' in fed_exp and \
2290                'X509' in fed_exp['experimentAccess']:
2291            expcert = fed_exp['experimentAccess']['X509']
2292        else:
2293            expcert = None
2294
2295        # Collect the allocation/segment ids into a dict keyed by the fedid
2296        # of the allocation (or a monotonically increasing integer) that
2297        # contains a tuple of uri, aid (which is a dict...)
2298        for i, fed in enumerate(fed_exp.get('federant', [])):
2299            try:
2300                uri = fed['uri']
2301                aid = fed['allocID']
2302                k = fed['allocID'].get('fedid', i)
2303            except KeyError, e:
2304                continue
2305            term_params[k] = (uri, aid)
2306        # Change the experiment state
2307        fed_exp['experimentStatus'] = 'terminating'
2308        if self.state_filename: self.write_state()
2309        self.state_lock.release()
2310
2311        return ids, term_params, expcert, repo
2312
2313
2314    def deallocate_resources(self, term_params, expcert, status, force, 
2315            dealloc_log):
2316        tmpdir = None
2317        # This try block makes sure the tempdir is cleared
2318        try:
2319            # If no expcert, try the deallocation as the experiment
2320            # controller instance.
2321            if expcert and self.auth_type != 'legacy': 
2322                try:
2323                    tmpdir = tempfile.mkdtemp(prefix="term-")
2324                except EnvironmentError:
2325                    raise service_error(service_error.internal, 
2326                            "Cannot create tmp dir")
2327                cert_file = self.make_temp_certfile(expcert, tmpdir)
2328                pw = None
2329            else: 
2330                cert_file = self.cert_file
2331                pw = self.cert_pwd
2332
2333            # Stop everyone.  NB, wait_for_all waits until a thread starts
2334            # and then completes, so we can't wait if nothing starts.  So,
2335            # no tbparams, no start.
2336            if len(term_params) > 0:
2337                tp = thread_pool(self.nthreads)
2338                for k, (uri, aid) in term_params.items():
2339                    # Create and start a thread to stop the segment
2340                    tp.wait_for_slot()
2341                    t  = pooled_thread(\
2342                            target=self.terminate_segment(log=dealloc_log,
2343                                testbed=uri,
2344                                cert_file=cert_file, 
2345                                cert_pwd=pw,
2346                                trusted_certs=self.trusted_certs,
2347                                caller=self.call_TerminateSegment),
2348                            args=(uri, aid), name=k,
2349                            pdata=tp, trace_file=self.trace_file)
2350                    t.start()
2351                # Wait for completions
2352                tp.wait_for_all_done()
2353
2354            # release the allocations (failed experiments have done this
2355            # already, and starting experiments may be in odd states, so we
2356            # ignore errors releasing those allocations
2357            try: 
2358                for k, (uri, aid)  in term_params.items():
2359                    self.release_access(None, aid, uri=uri,
2360                            cert_file=cert_file, cert_pwd=pw)
2361            except service_error, e:
2362                if status != 'failed' and not force:
2363                    raise e
2364
2365        # Clean up the tmpdir no matter what
2366        finally:
2367            if tmpdir: self.remove_dirs(tmpdir)
2368
2369    def terminate_experiment(self, req, fid):
2370        """
2371        Swap this experiment out on the federants and delete the shared
2372        information
2373        """
2374        tbparams = { }
2375        req = req.get('TerminateRequestBody', None)
2376        if not req:
2377            raise service_error(service_error.req,
2378                    "Bad request format (no TerminateRequestBody)")
2379
2380        key = self.get_experiment_key(req, 'experiment')
2381        proof = self.check_experiment_access(fid, key)
2382        exp = req.get('experiment', False)
2383        force = req.get('force', False)
2384
2385        dealloc_list = [ ]
2386
2387
2388        # Create a logger that logs to the dealloc_list as well as to the main
2389        # log file.
2390        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2391        h = logging.StreamHandler(self.list_log(dealloc_list))
2392        # XXX: there should be a global one of these rather than repeating the
2393        # code.
2394        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2395                    '%d %b %y %H:%M:%S'))
2396        dealloc_log.addHandler(h)
2397
2398        self.state_lock.acquire()
2399        fed_exp = self.state.get(key, None)
2400        self.state_lock.release()
2401        repo = None
2402
2403        if fed_exp:
2404            status = self.check_termination_status(fed_exp, force)
2405            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2406            self.deallocate_resources(term_params, expcert, status, force, 
2407                    dealloc_log)
2408
2409            # Remove the terminated experiment
2410            self.state_lock.acquire()
2411            for id in ids:
2412                self.clear_experiment_authorization(id, need_state_lock=False)
2413                if id in self.state: del self.state[id]
2414
2415            if self.state_filename: self.write_state()
2416            self.state_lock.release()
2417
2418            # Delete any synch points associated with this experiment.  All
2419            # synch points begin with the fedid of the experiment.
2420            fedid_keys = set(["fedid:%s" % f for f in ids \
2421                    if isinstance(f, fedid)])
2422            for k in self.synch_store.all_keys():
2423                try:
2424                    if len(k) > 45 and k[0:46] in fedid_keys:
2425                        self.synch_store.del_value(k)
2426                except synch_store.BadDeletionError:
2427                    pass
2428            self.write_store()
2429
2430            # Remove software and other cached stuff from the filesystem.
2431            if repo:
2432                self.remove_dirs("%s/%s" % (self.repodir, repo))
2433       
2434            return { 
2435                    'experiment': exp , 
2436                    'deallocationLog': string.join(dealloc_list, ''),
2437                    'proof': [proof.to_dict()],
2438                    }
2439        else:
2440            raise service_error(service_error.req, "No saved state")
2441
2442
2443    def GetValue(self, req, fid):
2444        """
2445        Get a value from the synchronized store
2446        """
2447        req = req.get('GetValueRequestBody', None)
2448        if not req:
2449            raise service_error(service_error.req,
2450                    "Bad request format (no GetValueRequestBody)")
2451       
2452        name = req.get('name', None)
2453        wait = req.get('wait', False)
2454        rv = { 'name': name }
2455
2456        if not name:
2457            raise service_error(service_error.req, "No name?")
2458
2459        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2460
2461        if access_ok:
2462            self.log.debug("[GetValue] asking for %s " % name)
2463            try:
2464                v = self.synch_store.get_value(name, wait)
2465            except synch_store.RevokedKeyError:
2466                # No more synch on this key
2467                raise service_error(service_error.federant, 
2468                        "Synch key %s revoked" % name)
2469            if v is not None:
2470                rv['value'] = v
2471            rv['proof'] = proof.to_dict()
2472            self.log.debug("[GetValue] got %s from %s" % (v, name))
2473            return rv
2474        else:
2475            raise service_error(service_error.access, "Access Denied",
2476                    proof=proof)
2477       
2478
2479    def SetValue(self, req, fid):
2480        """
2481        Set a value in the synchronized store
2482        """
2483        req = req.get('SetValueRequestBody', None)
2484        if not req:
2485            raise service_error(service_error.req,
2486                    "Bad request format (no SetValueRequestBody)")
2487       
2488        name = req.get('name', None)
2489        v = req.get('value', '')
2490
2491        if not name:
2492            raise service_error(service_error.req, "No name?")
2493
2494        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2495
2496        if access_ok:
2497            try:
2498                self.synch_store.set_value(name, v)
2499                self.write_store()
2500                self.log.debug("[SetValue] set %s to %s" % (name, v))
2501            except synch_store.CollisionError:
2502                # Translate into a service_error
2503                raise service_error(service_error.req,
2504                        "Value already set: %s" %name)
2505            except synch_store.RevokedKeyError:
2506                # No more synch on this key
2507                raise service_error(service_error.federant, 
2508                        "Synch key %s revoked" % name)
2509                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2510        else:
2511            raise service_error(service_error.access, "Access Denied",
2512                    proof=proof)
Note: See TracBrowser for help on using the repository browser.