source: fedd/federation/experiment_control.py @ 5954004

compt_changesinfo-ops
Last change on this file since 5954004 was 5954004, checked in by Ted Faber <faber@…>, 13 years ago

Log missing graphviz. Closes #21

  • Property mode set to 100644
File size: 84.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from experiment_control_legacy import experiment_control_legacy
34from authorizer import abac_authorizer
35from thread_pool import thread_pool, pooled_thread
36
37import topdl
38import list_log
39from ip_allocator import ip_allocator
40from ip_addr import ip_addr
41
42
43class nullHandler(logging.Handler):
44    def emit(self, record): pass
45
46fl = logging.getLogger("fedd.experiment_control")
47fl.addHandler(nullHandler())
48
49
50# Right now, no support for composition.
51class federated_service:
52    def __init__(self, name, exporter=None, importers=None, params=None, 
53            reqs=None, portal=None):
54        self.name=name
55        self.exporter=exporter
56        if importers is None: self.importers = []
57        else: self.importers=importers
58        if params is None: self.params = { }
59        else: self.params = params
60        if reqs is None: self.reqs = []
61        else: self.reqs = reqs
62
63        if portal is not None:
64            self.portal = portal
65        else:
66            self.portal = (name in federated_service.needs_portal)
67
68    def __str__(self):
69        return "name %s export %s import %s params %s reqs %s" % \
70                (self.name, self.exporter, self.importers, self.params,
71                        [ (r['name'], r['visibility']) for r in self.reqs] )
72
73    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
74
75class experiment_control_local(experiment_control_legacy):
76    """
77    Control of experiments that this system can directly access.
78
79    Includes experiment creation, termination and information dissemination.
80    Thred safe.
81    """
82
83    class ssh_cmd_timeout(RuntimeError): pass
84   
85    call_RequestAccess = service_caller('RequestAccess')
86    call_ReleaseAccess = service_caller('ReleaseAccess')
87    call_StartSegment = service_caller('StartSegment')
88    call_TerminateSegment = service_caller('TerminateSegment')
89    call_Ns2Topdl = service_caller('Ns2Topdl')
90
91    def __init__(self, config=None, auth=None):
92        """
93        Intialize the various attributes, most from the config object
94        """
95
96        def parse_tarfile_list(tf):
97            """
98            Parse a tarfile list from the configuration.  This is a set of
99            paths and tarfiles separated by spaces.
100            """
101            rv = [ ]
102            if tf is not None:
103                tl = tf.split()
104                while len(tl) > 1:
105                    p, t = tl[0:2]
106                    del tl[0:2]
107                    rv.append((p, t))
108            return rv
109
110        self.list_log = list_log.list_log
111
112        self.cert_file = config.get("experiment_control", "cert_file")
113        if self.cert_file:
114            self.cert_pwd = config.get("experiment_control", "cert_pwd")
115        else:
116            self.cert_file = config.get("globals", "cert_file")
117            self.cert_pwd = config.get("globals", "cert_pwd")
118
119        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
120                or config.get("globals", "trusted_certs")
121
122        self.repodir = config.get("experiment_control", "repodir")
123        self.repo_url = config.get("experiment_control", "repo_url", 
124                "https://users.isi.deterlab.net:23235");
125
126        self.exp_stem = "fed-stem"
127        self.log = logging.getLogger("fedd.experiment_control")
128        set_log_level(config, "experiment_control", self.log)
129        self.muxmax = 2
130        self.nthreads = 10
131        self.randomize_experiments = False
132
133        self.splitter = None
134        self.ssh_keygen = "/usr/bin/ssh-keygen"
135        self.ssh_identity_file = None
136
137
138        self.debug = config.getboolean("experiment_control", "create_debug")
139        self.cleanup = not config.getboolean("experiment_control", 
140                "leave_tmpfiles")
141        self.state_filename = config.get("experiment_control", 
142                "experiment_state")
143        self.store_filename = config.get("experiment_control", 
144                "synch_store")
145        self.store_url = config.get("experiment_control", "store_url")
146        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
147        self.fedkit = parse_tarfile_list(\
148                config.get("experiment_control", "fedkit"))
149        self.gatewaykit = parse_tarfile_list(\
150                config.get("experiment_control", "gatewaykit"))
151        accessdb_file = config.get("experiment_control", "accessdb")
152
153        dt = config.get("experiment_control", "direct_transit")
154        self.auth_type = config.get('experiment_control', 'auth_type') \
155                or 'legacy'
156        self.auth_dir = config.get('experiment_control', 'auth_dir')
157        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
158        else: self.direct_transit = [ ]
159        # NB for internal master/slave ops, not experiment setup
160        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
161
162        self.overrides = set([])
163        ovr = config.get('experiment_control', 'overrides')
164        if ovr:
165            for o in ovr.split(","):
166                o = o.strip()
167                if o.startswith('fedid:'): o = o[len('fedid:'):]
168                self.overrides.add(fedid(hexstr=o))
169
170        self.state = { }
171        self.state_lock = Lock()
172        self.tclsh = "/usr/local/bin/otclsh"
173        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
174                config.get("experiment_control", "tcl_splitter",
175                        "/usr/testbed/lib/ns2ir/parse.tcl")
176        mapdb_file = config.get("experiment_control", "mapdb")
177        self.trace_file = sys.stderr
178
179        self.def_expstart = \
180                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
181                "/tmp/federate";
182        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
183                "FEDDIR/hosts";
184        self.def_gwstart = \
185                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
186                "/tmp/bridge.log";
187        self.def_mgwstart = \
188                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
189                "/tmp/bridge.log";
190        self.def_gwimage = "FBSD61-TUNNEL2";
191        self.def_gwtype = "pc";
192        self.local_access = { }
193
194        if self.auth_type == 'legacy':
195            if auth:
196                self.auth = auth
197            else:
198                self.log.error( "[access]: No authorizer initialized, " +\
199                        "creating local one.")
200                auth = authorizer()
201            self.get_access = self.legacy_get_access
202        elif self.auth_type == 'abac':
203            self.auth = abac_authorizer(load=self.auth_dir)
204        else:
205            raise service_error(service_error.internal, 
206                    "Unknown auth_type: %s" % self.auth_type)
207
208        if mapdb_file:
209            self.read_mapdb(mapdb_file)
210        else:
211            self.log.warn("[experiment_control] No testbed map, using defaults")
212            self.tbmap = { 
213                    'deter':'https://users.isi.deterlab.net:23235',
214                    'emulab':'https://users.isi.deterlab.net:23236',
215                    'ucb':'https://users.isi.deterlab.net:23237',
216                    }
217
218        if accessdb_file:
219                self.read_accessdb(accessdb_file)
220        else:
221            raise service_error(service_error.internal,
222                    "No accessdb specified in config")
223
224        # Grab saved state.  OK to do this w/o locking because it's read only
225        # and only one thread should be in existence that can see self.state at
226        # this point.
227        if self.state_filename:
228            self.read_state()
229
230        if self.store_filename:
231            self.read_store()
232        else:
233            self.log.warning("No saved synch store")
234            self.synch_store = synch_store
235
236        # Dispatch tables
237        self.soap_services = {\
238                'New': soap_handler('New', self.new_experiment),
239                'Create': soap_handler('Create', self.create_experiment),
240                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
241                'Vis': soap_handler('Vis', self.get_vis),
242                'Info': soap_handler('Info', self.get_info),
243                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
244                'Terminate': soap_handler('Terminate', 
245                    self.terminate_experiment),
246                'GetValue': soap_handler('GetValue', self.GetValue),
247                'SetValue': soap_handler('SetValue', self.SetValue),
248        }
249
250        self.xmlrpc_services = {\
251                'New': xmlrpc_handler('New', self.new_experiment),
252                'Create': xmlrpc_handler('Create', self.create_experiment),
253                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
254                'Vis': xmlrpc_handler('Vis', self.get_vis),
255                'Info': xmlrpc_handler('Info', self.get_info),
256                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
257                'Terminate': xmlrpc_handler('Terminate',
258                    self.terminate_experiment),
259                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
260                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
261        }
262
263    # Call while holding self.state_lock
264    def write_state(self):
265        """
266        Write a new copy of experiment state after copying the existing state
267        to a backup.
268
269        State format is a simple pickling of the state dictionary.
270        """
271        if os.access(self.state_filename, os.W_OK):
272            copy_file(self.state_filename, \
273                    "%s.bak" % self.state_filename)
274        try:
275            f = open(self.state_filename, 'w')
276            pickle.dump(self.state, f)
277        except EnvironmentError, e:
278            self.log.error("Can't write file %s: %s" % \
279                    (self.state_filename, e))
280        except pickle.PicklingError, e:
281            self.log.error("Pickling problem: %s" % e)
282        except TypeError, e:
283            self.log.error("Pickling problem (TypeError): %s" % e)
284
285    @staticmethod
286    def get_alloc_ids(state):
287        """
288        Pull the fedids of the identifiers of each allocation from the
289        state.  Again, a dict dive that's best isolated.
290
291        Used by read_store and read state
292        """
293
294        return [ f['allocID']['fedid'] 
295                for f in state.get('federant',[]) \
296                    if f.has_key('allocID') and \
297                        f['allocID'].has_key('fedid')]
298
299    # Call while holding self.state_lock
300    def read_state(self):
301        """
302        Read a new copy of experiment state.  Old state is overwritten.
303
304        State format is a simple pickling of the state dictionary.
305        """
306       
307        def get_experiment_id(state):
308            """
309            Pull the fedid experimentID out of the saved state.  This is kind
310            of a gross walk through the dict.
311            """
312
313            if state.has_key('experimentID'):
314                for e in state['experimentID']:
315                    if e.has_key('fedid'):
316                        return e['fedid']
317                else:
318                    return None
319            else:
320                return None
321
322        try:
323            f = open(self.state_filename, "r")
324            self.state = pickle.load(f)
325            self.log.debug("[read_state]: Read state from %s" % \
326                    self.state_filename)
327        except EnvironmentError, e:
328            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
329                    % (self.state_filename, e))
330        except pickle.UnpicklingError, e:
331            self.log.warning(("[read_state]: No saved state: " + \
332                    "Unpickling failed: %s") % e)
333       
334        for s in self.state.values():
335            try:
336
337                eid = get_experiment_id(s)
338                if eid : 
339                    if self.auth_type == 'legacy':
340                        # XXX: legacy
341                        # Give the owner rights to the experiment
342                        self.auth.set_attribute(s['owner'], eid)
343                        # And holders of the eid as well
344                        self.auth.set_attribute(eid, eid)
345                        # allow overrides to control experiments as well
346                        for o in self.overrides:
347                            self.auth.set_attribute(o, eid)
348                        # Set permissions to allow reading of the software
349                        # repo, if any, as well.
350                        for a in self.get_alloc_ids(s):
351                            self.auth.set_attribute(a, 'repo/%s' % eid)
352                else:
353                    raise KeyError("No experiment id")
354            except KeyError, e:
355                self.log.warning("[read_state]: State ownership or identity " +\
356                        "misformatted in %s: %s" % (self.state_filename, e))
357
358
359    def read_accessdb(self, accessdb_file):
360        """
361        Read the mapping from fedids that can create experiments to their name
362        in the 3-level access namespace.  All will be asserted from this
363        testbed and can include the local username and porject that will be
364        asserted on their behalf by this fedd.  Each fedid is also added to the
365        authorization system with the "create" attribute.
366        """
367        self.accessdb = {}
368        # These are the regexps for parsing the db
369        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
370        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
371                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
372        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
373                "\s*->\s*(" + name_expr + ")\s*$")
374        lineno = 0
375
376        # Parse the mappings and store in self.authdb, a dict of
377        # fedid -> (proj, user)
378        try:
379            f = open(accessdb_file, "r")
380            for line in f:
381                lineno += 1
382                line = line.strip()
383                if len(line) == 0 or line.startswith('#'):
384                    continue
385                m = project_line.match(line)
386                if m:
387                    fid = fedid(hexstr=m.group(1))
388                    project, user = m.group(2,3)
389                    if not self.accessdb.has_key(fid):
390                        self.accessdb[fid] = []
391                    self.accessdb[fid].append((project, user))
392                    continue
393
394                m = user_line.match(line)
395                if m:
396                    fid = fedid(hexstr=m.group(1))
397                    project = None
398                    user = m.group(2)
399                    if not self.accessdb.has_key(fid):
400                        self.accessdb[fid] = []
401                    self.accessdb[fid].append((project, user))
402                    continue
403                self.log.warn("[experiment_control] Error parsing access " +\
404                        "db %s at line %d" %  (accessdb_file, lineno))
405        except EnvironmentError:
406            raise service_error(service_error.internal,
407                    ("Error opening/reading %s as experiment " +\
408                            "control accessdb") %  accessdb_file)
409        f.close()
410
411        # Initialize the authorization attributes
412        # XXX: legacy
413        if self.auth_type == 'legacy':
414            for fid in self.accessdb.keys():
415                self.auth.set_attribute(fid, 'create')
416                self.auth.set_attribute(fid, 'new')
417
418    def read_mapdb(self, file):
419        """
420        Read a simple colon separated list of mappings for the
421        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
422        """
423
424        self.tbmap = { }
425        lineno =0
426        try:
427            f = open(file, "r")
428            for line in f:
429                lineno += 1
430                line = line.strip()
431                if line.startswith('#') or len(line) == 0:
432                    continue
433                try:
434                    label, url = line.split(':', 1)
435                    self.tbmap[label] = url
436                except ValueError, e:
437                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
438                            "map db: %s %s" % (lineno, line, e))
439        except EnvironmentError, e:
440            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
441                    "open %s: %s" % (file, e))
442        else:
443            f.close()
444
445    def read_store(self):
446        try:
447            self.synch_store = synch_store()
448            self.synch_store.load(self.store_filename)
449            self.log.debug("[read_store]: Read store from %s" % \
450                    self.store_filename)
451        except EnvironmentError, e:
452            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
453                    % (self.state_filename, e))
454            self.synch_store = synch_store()
455
456        # Set the initial permissions on data in the store.  XXX: This ad hoc
457        # authorization attribute initialization is getting out of hand.
458        # XXX: legacy
459        if self.auth_type == 'legacy':
460            for k in self.synch_store.all_keys():
461                try:
462                    if k.startswith('fedid:'):
463                        fid = fedid(hexstr=k[6:46])
464                        if self.state.has_key(fid):
465                            for a in self.get_alloc_ids(self.state[fid]):
466                                self.auth.set_attribute(a, k)
467                except ValueError, e:
468                    self.log.warn("Cannot deduce permissions for %s" % k)
469
470
471    def write_store(self):
472        """
473        Write a new copy of synch_store after writing current state
474        to a backup.  We use the internal synch_store pickle method to avoid
475        incinsistent data.
476
477        State format is a simple pickling of the store.
478        """
479        if os.access(self.store_filename, os.W_OK):
480            copy_file(self.store_filename, \
481                    "%s.bak" % self.store_filename)
482        try:
483            self.synch_store.save(self.store_filename)
484        except EnvironmentError, e:
485            self.log.error("Can't write file %s: %s" % \
486                    (self.store_filename, e))
487        except TypeError, e:
488            self.log.error("Pickling problem (TypeError): %s" % e)
489
490
491    def remove_dirs(self, dir):
492        """
493        Remove the directory tree and all files rooted at dir.  Log any errors,
494        but continue.
495        """
496        self.log.debug("[removedirs]: removing %s" % dir)
497        try:
498            for path, dirs, files in os.walk(dir, topdown=False):
499                for f in files:
500                    os.remove(os.path.join(path, f))
501                for d in dirs:
502                    os.rmdir(os.path.join(path, d))
503            os.rmdir(dir)
504        except EnvironmentError, e:
505            self.log.error("Error deleting directory tree in %s" % e);
506
507    @staticmethod
508    def make_temp_certfile(expcert, tmpdir):
509        """
510        make a protected copy of the access certificate so the experiment
511        controller can act as the experiment principal.  mkstemp is the most
512        secure way to do that. The directory should be created by
513        mkdtemp.  Return the filename.
514        """
515        if expcert and tmpdir:
516            try:
517                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
518                f = os.fdopen(certf, 'w')
519                print >> f, expcert
520                f.close()
521            except EnvironmentError, e:
522                raise service_error(service_error.internal, 
523                        "Cannot create temp cert file?")
524            return certfn
525        else:
526            return None
527
528       
529    def generate_ssh_keys(self, dest, type="rsa" ):
530        """
531        Generate a set of keys for the gateways to use to talk.
532
533        Keys are of type type and are stored in the required dest file.
534        """
535        valid_types = ("rsa", "dsa")
536        t = type.lower();
537        if t not in valid_types: raise ValueError
538        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
539
540        try:
541            trace = open("/dev/null", "w")
542        except EnvironmentError:
543            raise service_error(service_error.internal,
544                    "Cannot open /dev/null??");
545
546        # May raise CalledProcessError
547        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
548        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
549        if rv != 0:
550            raise service_error(service_error.internal, 
551                    "Cannot generate nonce ssh keys.  %s return code %d" \
552                            % (self.ssh_keygen, rv))
553
554    def gentopo(self, str):
555        """
556        Generate the topology data structure from the splitter's XML
557        representation of it.
558
559        The topology XML looks like:
560            <experiment>
561                <nodes>
562                    <node><vname></vname><ips>ip1:ip2</ips></node>
563                </nodes>
564                <lans>
565                    <lan>
566                        <vname></vname><vnode></vnode><ip></ip>
567                        <bandwidth></bandwidth><member>node:port</member>
568                    </lan>
569                </lans>
570        """
571        class topo_parse:
572            """
573            Parse the topology XML and create the dats structure.
574            """
575            def __init__(self):
576                # Typing of the subelements for data conversion
577                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
578                self.int_subelements = ( 'bandwidth',)
579                self.float_subelements = ( 'delay',)
580                # The final data structure
581                self.nodes = [ ]
582                self.lans =  [ ]
583                self.topo = { \
584                        'node': self.nodes,\
585                        'lan' : self.lans,\
586                    }
587                self.element = { }  # Current element being created
588                self.chars = ""     # Last text seen
589
590            def end_element(self, name):
591                # After each sub element the contents is added to the current
592                # element or to the appropriate list.
593                if name == 'node':
594                    self.nodes.append(self.element)
595                    self.element = { }
596                elif name == 'lan':
597                    self.lans.append(self.element)
598                    self.element = { }
599                elif name in self.str_subelements:
600                    self.element[name] = self.chars
601                    self.chars = ""
602                elif name in self.int_subelements:
603                    self.element[name] = int(self.chars)
604                    self.chars = ""
605                elif name in self.float_subelements:
606                    self.element[name] = float(self.chars)
607                    self.chars = ""
608
609            def found_chars(self, data):
610                self.chars += data.rstrip()
611
612
613        tp = topo_parse();
614        parser = xml.parsers.expat.ParserCreate()
615        parser.EndElementHandler = tp.end_element
616        parser.CharacterDataHandler = tp.found_chars
617
618        parser.Parse(str)
619
620        return tp.topo
621       
622
623    def genviz(self, topo):
624        """
625        Generate the visualization the virtual topology
626        """
627
628        neato = "/usr/local/bin/neato"
629        # These are used to parse neato output and to create the visualization
630        # file.
631        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
632        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
633                "%s</type></node>"
634
635        try:
636            # Node names
637            nodes = [ n['vname'] for n in topo['node'] ]
638            topo_lans = topo['lan']
639        except KeyError, e:
640            raise service_error(service_error.internal, "Bad topology: %s" %e)
641
642        lans = { }
643        links = { }
644
645        # Walk through the virtual topology, organizing the connections into
646        # 2-node connections (links) and more-than-2-node connections (lans).
647        # When a lan is created, it's added to the list of nodes (there's a
648        # node in the visualization for the lan).
649        for l in topo_lans:
650            if links.has_key(l['vname']):
651                if len(links[l['vname']]) < 2:
652                    links[l['vname']].append(l['vnode'])
653                else:
654                    nodes.append(l['vname'])
655                    lans[l['vname']] = links[l['vname']]
656                    del links[l['vname']]
657                    lans[l['vname']].append(l['vnode'])
658            elif lans.has_key(l['vname']):
659                lans[l['vname']].append(l['vnode'])
660            else:
661                links[l['vname']] = [ l['vnode'] ]
662
663
664        # Open up a temporary file for dot to turn into a visualization
665        try:
666            df, dotname = tempfile.mkstemp()
667            dotfile = os.fdopen(df, 'w')
668        except EnvironmentError:
669            raise service_error(service_error.internal,
670                    "Failed to open file in genviz")
671
672        try:
673            dnull = open('/dev/null', 'w')
674        except EnvironmentError:
675            service_error(service_error.internal,
676                    "Failed to open /dev/null in genviz")
677
678        # Generate a dot/neato input file from the links, nodes and lans
679        try:
680            print >>dotfile, "graph G {"
681            for n in nodes:
682                print >>dotfile, '\t"%s"' % n
683            for l in links.keys():
684                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
685            for l in lans.keys():
686                for n in lans[l]:
687                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
688            print >>dotfile, "}"
689            dotfile.close()
690        except TypeError:
691            raise service_error(service_error.internal,
692                    "Single endpoint link in vtopo")
693        except EnvironmentError:
694            raise service_error(service_error.internal, "Cannot write dot file")
695
696        # Use dot to create a visualization
697        try:
698            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
699                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
700                stderr=dnull, close_fds=True)
701        except EnvironmentError:
702            raise service_error(service_error.internal, 
703                    "Cannot generate visualization: is graphviz available?")
704        dnull.close()
705
706        # Translate dot to vis format
707        vis_nodes = [ ]
708        vis = { 'node': vis_nodes }
709        for line in dot.stdout:
710            m = vis_re.match(line)
711            if m:
712                vn = m.group(1)
713                vis_node = {'name': vn, \
714                        'x': float(m.group(2)),\
715                        'y' : float(m.group(3)),\
716                    }
717                if vn in links.keys() or vn in lans.keys():
718                    vis_node['type'] = 'lan'
719                else:
720                    vis_node['type'] = 'node'
721                vis_nodes.append(vis_node)
722        rv = dot.wait()
723
724        os.remove(dotname)
725        if rv == 0 : return vis
726        else: return None
727
728
729    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
730            cert_pwd=None):
731        """
732        Release access to testbed through fedd
733        """
734
735        if not uri and tbmap:
736            uri = tbmap.get(tb, None)
737        if not uri:
738            raise service_error(service_error.server_config, 
739                    "Unknown testbed: %s" % tb)
740
741        if self.local_access.has_key(uri):
742            resp = self.local_access[uri].ReleaseAccess(\
743                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
744                    fedid(file=cert_file))
745            resp = { 'ReleaseAccessResponseBody': resp } 
746        else:
747            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
748                    cert_file, cert_pwd, self.trusted_certs)
749
750        # better error coding
751
752    def remote_ns2topdl(self, uri, desc):
753
754        req = {
755                'description' : { 'ns2description': desc },
756            }
757
758        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
759                self.trusted_certs)
760
761        if r.has_key('Ns2TopdlResponseBody'):
762            r = r['Ns2TopdlResponseBody']
763            ed = r.get('experimentdescription', None)
764            if ed.has_key('topdldescription'):
765                return topdl.Topology(**ed['topdldescription'])
766            else:
767                raise service_error(service_error.protocol, 
768                        "Bad splitter response (no output)")
769        else:
770            raise service_error(service_error.protocol, "Bad splitter response")
771
772    class start_segment:
773        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
774                cert_pwd=None, trusted_certs=None, caller=None,
775                log_collector=None):
776            self.log = log
777            self.debug = debug
778            self.cert_file = cert_file
779            self.cert_pwd = cert_pwd
780            self.trusted_certs = None
781            self.caller = caller
782            self.testbed = testbed
783            self.log_collector = log_collector
784            self.response = None
785            self.node = { }
786            self.proof = None
787
788        def make_map(self, resp):
789            for e in resp.get('embedding', []):
790                if 'toponame' in e and 'physname' in e:
791                    self.node[e['toponame']] = e['physname'][0]
792
793        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
794            req = {
795                    'allocID': { 'fedid' : aid }, 
796                    'segmentdescription': { 
797                        'topdldescription': topo.to_dict(),
798                    },
799                }
800
801            if connInfo:
802                req['connection'] = connInfo
803
804            import_svcs = [ s for m in masters.values() \
805                    for s in m if self.testbed in s.importers]
806
807            if import_svcs or self.testbed in masters:
808                req['service'] = []
809
810            for s in import_svcs:
811                for r in s.reqs:
812                    sr = copy.deepcopy(r)
813                    sr['visibility'] = 'import';
814                    req['service'].append(sr)
815
816            for s in masters.get(self.testbed, []):
817                for r in s.reqs:
818                    sr = copy.deepcopy(r)
819                    sr['visibility'] = 'export';
820                    req['service'].append(sr)
821
822            if attrs:
823                req['fedAttr'] = attrs
824
825            try:
826                self.log.debug("Calling StartSegment at %s " % uri)
827                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
828                        self.trusted_certs)
829                if r.has_key('StartSegmentResponseBody'):
830                    lval = r['StartSegmentResponseBody'].get('allocationLog',
831                            None)
832                    if lval and self.log_collector:
833                        for line in  lval.splitlines(True):
834                            self.log_collector.write(line)
835                    self.make_map(r['StartSegmentResponseBody'])
836                    if 'proof' in r: self.proof = r['proof']
837                    self.response = r
838                else:
839                    raise service_error(service_error.internal, 
840                            "Bad response!?: %s" %r)
841                return True
842            except service_error, e:
843                self.log.error("Start segment failed on %s: %s" % \
844                        (self.testbed, e))
845                return False
846
847
848
849    class terminate_segment:
850        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
851                cert_pwd=None, trusted_certs=None, caller=None):
852            self.log = log
853            self.debug = debug
854            self.cert_file = cert_file
855            self.cert_pwd = cert_pwd
856            self.trusted_certs = None
857            self.caller = caller
858            self.testbed = testbed
859
860        def __call__(self, uri, aid ):
861            req = {
862                    'allocID': aid , 
863                }
864            try:
865                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
866                        self.trusted_certs)
867                return True
868            except service_error, e:
869                self.log.error("Terminate segment failed on %s: %s" % \
870                        (self.testbed, e))
871                return False
872
873    def allocate_resources(self, allocated, masters, eid, expid, 
874            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
875            attrs=None, connInfo={}, tbmap=None, expcert=None):
876
877        started = { }           # Testbeds where a sub-experiment started
878                                # successfully
879
880        # XXX
881        fail_soft = False
882
883        if tbmap is None: tbmap = { }
884
885        log = alloc_log or self.log
886
887        tp = thread_pool(self.nthreads)
888        threads = [ ]
889        starters = [ ]
890
891        if expcert:
892            cert = expcert
893            pw = None
894        else:
895            cert = self.cert_file
896            pw = self.cert_pwd
897
898        for tb in allocated.keys():
899            # Create and start a thread to start the segment, and save it
900            # to get the return value later
901            tb_attrs = copy.copy(attrs)
902            tp.wait_for_slot()
903            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
904            base, suffix = split_testbed(tb)
905            if suffix:
906                tb_attrs.append({'attribute': 'experiment_name', 
907                    'value': "%s-%s" % (eid, suffix)})
908            else:
909                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
910            if not uri:
911                raise service_error(service_error.internal, 
912                        "Unknown testbed %s !?" % tb)
913
914            if tbparams[tb].has_key('allocID') and \
915                    tbparams[tb]['allocID'].has_key('fedid'):
916                aid = tbparams[tb]['allocID']['fedid']
917            else:
918                raise service_error(service_error.internal, 
919                        "No alloc id for testbed %s !?" % tb)
920
921            s = self.start_segment(log=log, debug=self.debug,
922                    testbed=tb, cert_file=cert,
923                    cert_pwd=pw, trusted_certs=self.trusted_certs,
924                    caller=self.call_StartSegment,
925                    log_collector=log_collector)
926            starters.append(s)
927            t  = pooled_thread(\
928                    target=s, name=tb,
929                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
930                    pdata=tp, trace_file=self.trace_file)
931            threads.append(t)
932            t.start()
933
934        # Wait until all finish (keep pinging the log, though)
935        mins = 0
936        revoked = False
937        while not tp.wait_for_all_done(60.0):
938            mins += 1
939            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
940                    % mins)
941            if not revoked and \
942                    len([ t.getName() for t in threads if t.rv == False]) > 0:
943                # a testbed has failed.  Revoke this experiment's
944                # synchronizarion values so that sub experiments will not
945                # deadlock waiting for synchronization that will never happen
946                self.log.info("A subexperiment has failed to swap in, " + \
947                        "revoking synch keys")
948                var_key = "fedid:%s" % expid
949                for k in self.synch_store.all_keys():
950                    if len(k) > 45 and k[0:46] == var_key:
951                        self.synch_store.revoke_key(k)
952                revoked = True
953
954        failed = [ t.getName() for t in threads if not t.rv ]
955        succeeded = [tb for tb in allocated.keys() if tb not in failed]
956
957        # If one failed clean up, unless fail_soft is set
958        if failed:
959            if not fail_soft:
960                tp.clear()
961                for tb in succeeded:
962                    # Create and start a thread to stop the segment
963                    tp.wait_for_slot()
964                    uri = tbparams[tb]['uri']
965                    t  = pooled_thread(\
966                            target=self.terminate_segment(log=log,
967                                testbed=tb,
968                                cert_file=cert, 
969                                cert_pwd=pw,
970                                trusted_certs=self.trusted_certs,
971                                caller=self.call_TerminateSegment),
972                            args=(uri, tbparams[tb]['federant']['allocID']),
973                            name=tb,
974                            pdata=tp, trace_file=self.trace_file)
975                    t.start()
976                # Wait until all finish (if any are being stopped)
977                if succeeded:
978                    tp.wait_for_all_done()
979
980                # release the allocations
981                for tb in tbparams.keys():
982                    try:
983                        self.release_access(tb, tbparams[tb]['allocID'], 
984                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
985                                cert_file=cert, cert_pwd=pw)
986                    except service_error, e:
987                        self.log.warn("Error releasing access: %s" % e.desc)
988                # Remove the placeholder
989                self.state_lock.acquire()
990                self.state[eid]['experimentStatus'] = 'failed'
991                if self.state_filename: self.write_state()
992                self.state_lock.release()
993                # Remove the repo dir
994                self.remove_dirs("%s/%s" %(self.repodir, expid))
995                # Walk up tmpdir, deleting as we go
996                if self.cleanup:
997                    self.remove_dirs(tmpdir)
998                else:
999                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1000
1001
1002                log.error("Swap in failed on %s" % ",".join(failed))
1003                return
1004        else:
1005            # Walk through the successes and gather the virtual to physical
1006            # mapping.
1007            embedding = [ ]
1008            proofs = { }
1009            for s in starters:
1010                for k, v in s.node.items():
1011                    embedding.append({
1012                        'toponame': k, 
1013                        'physname': [ v],
1014                        'testbed': s.testbed
1015                        })
1016                if s.proof:
1017                    proofs[s.testbed] = s.proof
1018            log.info("[start_segment]: Experiment %s active" % eid)
1019
1020
1021        # Walk up tmpdir, deleting as we go
1022        if self.cleanup:
1023            self.remove_dirs(tmpdir)
1024        else:
1025            log.debug("[start_experiment]: not removing %s" % tmpdir)
1026
1027        # Insert the experiment into our state and update the disk copy.
1028        self.state_lock.acquire()
1029        self.state[expid]['experimentStatus'] = 'active'
1030        self.state[eid] = self.state[expid]
1031        self.state[eid]['experimentdescription']['topdldescription'] = \
1032                top.to_dict()
1033        self.state[eid]['embedding'] = embedding
1034        # Append startup proofs
1035        for f in self.state[eid]['federant']:
1036            if 'name' in f and 'localname' in f['name']:
1037                if f['name']['localname'] in proofs:
1038                    f['proof'].append(proofs[f['name']['localname']])
1039
1040        if self.state_filename: self.write_state()
1041        self.state_lock.release()
1042        return
1043
1044
1045    def add_kit(self, e, kit):
1046        """
1047        Add a Software object created from the list of (install, location)
1048        tuples passed as kit  to the software attribute of an object e.  We
1049        do this enough to break out the code, but it's kind of a hack to
1050        avoid changing the old tuple rep.
1051        """
1052
1053        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1054
1055        if isinstance(e.software, list): e.software.extend(s)
1056        else: e.software = s
1057
1058    def append_experiment_authorization(self, expid, attrs, 
1059            need_state_lock=True):
1060        """
1061        Append the authorization information to system state
1062        """
1063
1064        for p, a in attrs:
1065            self.auth.set_attribute(p, a)
1066        self.auth.save()
1067
1068        if need_state_lock: self.state_lock.acquire()
1069        self.state[expid]['auth'].update(attrs)
1070        if self.state_filename: self.write_state()
1071        if need_state_lock: self.state_lock.release()
1072
1073    def clear_experiment_authorization(self, expid, need_state_lock=True):
1074        """
1075        Attrs is a set of attribute principal pairs that need to be removed
1076        from the authenticator.  Remove them and save the authenticator.
1077        """
1078
1079        if need_state_lock: self.state_lock.acquire()
1080        if expid in self.state and 'auth' in self.state[expid]:
1081            for p, a in self.state[expid]['auth']:
1082                self.auth.unset_attribute(p, a)
1083            self.state[expid]['auth'] = set()
1084        if self.state_filename: self.write_state()
1085        if need_state_lock: self.state_lock.release()
1086        self.auth.save()
1087
1088
1089    def create_experiment_state(self, fid, req, expid, expcert,
1090            state='starting'):
1091        """
1092        Create the initial entry in the experiment's state.  The expid and
1093        expcert are the experiment's fedid and certifacte that represents that
1094        ID, which are installed in the experiment state.  If the request
1095        includes a suggested local name that is used if possible.  If the local
1096        name is already taken by an experiment owned by this user that has
1097        failed, it is overwritten.  Otherwise new letters are added until a
1098        valid localname is found.  The generated local name is returned.
1099        """
1100
1101        if req.has_key('experimentID') and \
1102                req['experimentID'].has_key('localname'):
1103            overwrite = False
1104            eid = req['experimentID']['localname']
1105            # If there's an old failed experiment here with the same local name
1106            # and accessible by this user, we'll overwrite it, otherwise we'll
1107            # fall through and do the collision avoidance.
1108            old_expid = self.get_experiment_fedid(eid)
1109            if old_expid:
1110                users_experiment = True
1111                try:
1112                    self.check_experiment_access(fid, old_expid)
1113                except service_error, e:
1114                    if e.code == service_error.access: users_experiment = False
1115                    else: raise e
1116                if users_experiment:
1117                    self.state_lock.acquire()
1118                    status = self.state[eid].get('experimentStatus', None)
1119                    if status and status == 'failed':
1120                        # remove the old access attributes
1121                        self.clear_experiment_authorization(eid,
1122                                need_state_lock=False)
1123                        overwrite = True
1124                        del self.state[eid]
1125                        del self.state[old_expid]
1126                    self.state_lock.release()
1127                else:
1128                    self.log.info('Experiment %s exists, ' % eid + \
1129                            'but this user cannot access it')
1130            self.state_lock.acquire()
1131            while (self.state.has_key(eid) and not overwrite):
1132                eid += random.choice(string.ascii_letters)
1133            # Initial state
1134            self.state[eid] = {
1135                    'experimentID' : \
1136                            [ { 'localname' : eid }, {'fedid': expid } ],
1137                    'experimentStatus': state,
1138                    'experimentAccess': { 'X509' : expcert },
1139                    'owner': fid,
1140                    'log' : [],
1141                    'auth': set(),
1142                }
1143            self.state[expid] = self.state[eid]
1144            if self.state_filename: self.write_state()
1145            self.state_lock.release()
1146        else:
1147            eid = self.exp_stem
1148            for i in range(0,5):
1149                eid += random.choice(string.ascii_letters)
1150            self.state_lock.acquire()
1151            while (self.state.has_key(eid)):
1152                eid = self.exp_stem
1153                for i in range(0,5):
1154                    eid += random.choice(string.ascii_letters)
1155            # Initial state
1156            self.state[eid] = {
1157                    'experimentID' : \
1158                            [ { 'localname' : eid }, {'fedid': expid } ],
1159                    'experimentStatus': state,
1160                    'experimentAccess': { 'X509' : expcert },
1161                    'owner': fid,
1162                    'log' : [],
1163                    'auth': set(),
1164                }
1165            self.state[expid] = self.state[eid]
1166            if self.state_filename: self.write_state()
1167            self.state_lock.release()
1168
1169        # Let users touch the state.  Authorize this fid and the expid itself
1170        # to touch the experiment, as well as allowing th eoverrides.
1171        self.append_experiment_authorization(eid, 
1172                set([(fid, expid), (expid,expid)] + \
1173                        [ (o, expid) for o in self.overrides]))
1174
1175        return eid
1176
1177
1178    def allocate_ips_to_topo(self, top):
1179        """
1180        Add an ip4_address attribute to all the hosts in the topology, based on
1181        the shared substrates on which they sit.  An /etc/hosts file is also
1182        created and returned as a list of hostfiles entries.  We also return
1183        the allocator, because we may need to allocate IPs to portals
1184        (specifically DRAGON portals).
1185        """
1186        subs = sorted(top.substrates, 
1187                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1188                reverse=True)
1189        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1190        ifs = { }
1191        hosts = [ ]
1192
1193        for idx, s in enumerate(subs):
1194            net_size = len(s.interfaces)+2
1195
1196            a = ips.allocate(net_size)
1197            if a :
1198                base, num = a
1199                if num < net_size: 
1200                    raise service_error(service_error.internal,
1201                            "Allocator returned wrong number of IPs??")
1202            else:
1203                raise service_error(service_error.req, 
1204                        "Cannot allocate IP addresses")
1205            mask = ips.min_alloc
1206            while mask < net_size:
1207                mask *= 2
1208
1209            netmask = ((2**32-1) ^ (mask-1))
1210
1211            base += 1
1212            for i in s.interfaces:
1213                i.attribute.append(
1214                        topdl.Attribute('ip4_address', 
1215                            "%s" % ip_addr(base)))
1216                i.attribute.append(
1217                        topdl.Attribute('ip4_netmask', 
1218                            "%s" % ip_addr(int(netmask))))
1219
1220                hname = i.element.name
1221                if ifs.has_key(hname):
1222                    hosts.append("%s\t%s-%s %s-%d" % \
1223                            (ip_addr(base), hname, s.name, hname,
1224                                ifs[hname]))
1225                else:
1226                    ifs[hname] = 0
1227                    hosts.append("%s\t%s-%s %s-%d %s" % \
1228                            (ip_addr(base), hname, s.name, hname,
1229                                ifs[hname], hname))
1230
1231                ifs[hname] += 1
1232                base += 1
1233        return hosts, ips
1234
1235    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1236            tbparam, masters, tbmap, expid=None, expcert=None):
1237        for tb in testbeds:
1238            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1239                    expcert)
1240            allocated[tb] = 1
1241
1242    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1243            expcert=None):
1244        """
1245        Get access to testbed through fedd and set the parameters for that tb
1246        """
1247        def get_export_project(svcs):
1248            """
1249            Look through for the list of federated_service for this testbed
1250            objects for a project_export service, and extract the project
1251            parameter.
1252            """
1253
1254            pe = [s for s in svcs if s.name=='project_export']
1255            if len(pe) == 1:
1256                return pe[0].params.get('project', None)
1257            elif len(pe) == 0:
1258                return None
1259            else:
1260                raise service_error(service_error.req,
1261                        "More than one project export is not supported")
1262
1263        def add_services(svcs, type, slist, keys):
1264            """
1265            Add the given services to slist.  type is import or export.  Also
1266            add a mapping entry from the assigned id to the original service
1267            record.
1268            """
1269            for i, s in enumerate(svcs):
1270                idx = '%s%d' % (type, i)
1271                keys[idx] = s
1272                sr = {'id': idx, 'name': s.name, 'visibility': type }
1273                if s.params:
1274                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1275                            for k, v in s.params.items()]
1276                slist.append(sr)
1277
1278        uri = tbmap.get(testbed_base(tb), None)
1279        if not uri:
1280            raise service_error(service_error.server_config, 
1281                    "Unknown testbed: %s" % tb)
1282
1283        export_svcs = masters.get(tb,[])
1284        import_svcs = [ s for m in masters.values() \
1285                for s in m \
1286                    if tb in s.importers ]
1287
1288        export_project = get_export_project(export_svcs)
1289        # Compose the credential list so that IDs come before attributes
1290        creds = set()
1291        keys = set()
1292        certs = self.auth.get_creds_for_principal(fid)
1293        if expid:
1294            certs.update(self.auth.get_creds_for_principal(expid))
1295        for c in certs:
1296            keys.add(c.issuer_cert())
1297            creds.add(c.attribute_cert())
1298        creds = list(keys) + list(creds)
1299
1300        if expcert: cert, pw = expcert, None
1301        else: cert, pw = self.cert_file, self.cert_pw
1302
1303        # Request credentials
1304        req = {
1305                'abac_credential': creds,
1306            }
1307        # Make the service request from the services we're importing and
1308        # exporting.  Keep track of the export request ids so we can
1309        # collect the resulting info from the access response.
1310        e_keys = { }
1311        if import_svcs or export_svcs:
1312            slist = []
1313            add_services(import_svcs, 'import', slist, e_keys)
1314            add_services(export_svcs, 'export', slist, e_keys)
1315            req['service'] = slist
1316
1317        if self.local_access.has_key(uri):
1318            # Local access call
1319            req = { 'RequestAccessRequestBody' : req }
1320            r = self.local_access[uri].RequestAccess(req, 
1321                    fedid(file=self.cert_file))
1322            r = { 'RequestAccessResponseBody' : r }
1323        else:
1324            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1325
1326        if r.has_key('RequestAccessResponseBody'):
1327            # Through to here we have a valid response, not a fault.
1328            # Access denied is a fault, so something better or worse than
1329            # access denied has happened.
1330            r = r['RequestAccessResponseBody']
1331            self.log.debug("[get_access] Access granted")
1332        else:
1333            raise service_error(service_error.protocol,
1334                        "Bad proxy response")
1335        if 'proof' not in r:
1336            raise service_error(service_error.protocol,
1337                        "Bad access response (no access proof)")
1338       
1339        tbparam[tb] = { 
1340                "allocID" : r['allocID'],
1341                "uri": uri,
1342                "proof": [ r['proof'] ],
1343                }
1344
1345        # Collect the responses corresponding to the services this testbed
1346        # exports.  These will be the service requests that we will include in
1347        # the start segment requests (with appropriate visibility values) to
1348        # import and export the segments.
1349        for s in r.get('service', []):
1350            id = s.get('id', None)
1351            if id and id in e_keys:
1352                e_keys[id].reqs.append(s)
1353
1354        # Add attributes to parameter space.  We don't allow attributes to
1355        # overlay any parameters already installed.
1356        for a in r.get('fedAttr', []):
1357            try:
1358                if a['attribute'] and \
1359                        isinstance(a['attribute'], basestring)\
1360                        and not tbparam[tb].has_key(a['attribute'].lower()):
1361                    tbparam[tb][a['attribute'].lower()] = a['value']
1362            except KeyError:
1363                self.log.error("Bad attribute in response: %s" % a)
1364
1365
1366    def split_topology(self, top, topo, testbeds):
1367        """
1368        Create the sub-topologies that are needed for experiment instantiation.
1369        """
1370        for tb in testbeds:
1371            topo[tb] = top.clone()
1372            # copy in for loop allows deletions from the original
1373            for e in [ e for e in topo[tb].elements]:
1374                etb = e.get_attribute('testbed')
1375                # NB: elements without a testbed attribute won't appear in any
1376                # sub topologies. 
1377                if not etb or etb != tb:
1378                    for i in e.interface:
1379                        for s in i.subs:
1380                            try:
1381                                s.interfaces.remove(i)
1382                            except ValueError:
1383                                raise service_error(service_error.internal,
1384                                        "Can't remove interface??")
1385                    topo[tb].elements.remove(e)
1386            topo[tb].make_indices()
1387
1388    def confirm_software(self, top):
1389        """
1390        Make sure that the software to be loaded in the topo is all available
1391        before we begin making access requests, etc.  This is a subset of
1392        wrangle_software.
1393        """
1394        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1395        pkgs.update([x.location for e in top.elements for x in e.software])
1396
1397        for pkg in pkgs:
1398            loc = pkg
1399
1400            scheme, host, path = urlparse(loc)[0:3]
1401            dest = os.path.basename(path)
1402            if not scheme:
1403                if not loc.startswith('/'):
1404                    loc = "/%s" % loc
1405                loc = "file://%s" %loc
1406            # NB: if scheme was found, loc == pkg
1407            try:
1408                u = urlopen(loc)
1409                u.close()
1410            except Exception, e:
1411                raise service_error(service_error.req, 
1412                        "Cannot open %s: %s" % (loc, e))
1413        return True
1414
1415    def wrangle_software(self, expid, top, topo, tbparams):
1416        """
1417        Copy software out to the repository directory, allocate permissions and
1418        rewrite the segment topologies to look for the software in local
1419        places.
1420        """
1421
1422        # Copy the rpms and tarfiles to a distribution directory from
1423        # which the federants can retrieve them
1424        linkpath = "%s/software" %  expid
1425        softdir ="%s/%s" % ( self.repodir, linkpath)
1426        softmap = { }
1427
1428        # self.fedkit and self.gateway kit are lists of tuples of
1429        # (install_location, download_location) this extracts the download
1430        # locations.
1431        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1432        pkgs.update([x.location for e in top.elements for x in e.software])
1433        try:
1434            os.makedirs(softdir)
1435        except EnvironmentError, e:
1436            raise service_error(
1437                    "Cannot create software directory: %s" % e)
1438        # The actual copying.  Everything's converted into a url for copying.
1439        auth_attrs = set()
1440        for pkg in pkgs:
1441            loc = pkg
1442
1443            scheme, host, path = urlparse(loc)[0:3]
1444            dest = os.path.basename(path)
1445            if not scheme:
1446                if not loc.startswith('/'):
1447                    loc = "/%s" % loc
1448                loc = "file://%s" %loc
1449            # NB: if scheme was found, loc == pkg
1450            try:
1451                u = urlopen(loc)
1452            except Exception, e:
1453                raise service_error(service_error.req, 
1454                        "Cannot open %s: %s" % (loc, e))
1455            try:
1456                f = open("%s/%s" % (softdir, dest) , "w")
1457                self.log.debug("Writing %s/%s" % (softdir,dest) )
1458                data = u.read(4096)
1459                while data:
1460                    f.write(data)
1461                    data = u.read(4096)
1462                f.close()
1463                u.close()
1464            except Exception, e:
1465                raise service_error(service_error.internal,
1466                        "Could not copy %s: %s" % (loc, e))
1467            path = re.sub("/tmp", "", linkpath)
1468            # XXX
1469            softmap[pkg] = \
1470                    "%s/%s/%s" %\
1471                    ( self.repo_url, path, dest)
1472
1473            # Allow the individual segments to access the software by assigning
1474            # an attribute to each testbed allocation that encodes the data to
1475            # be released.  This expression collects the data for each run of
1476            # the loop.
1477            auth_attrs.update([
1478                (tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) \
1479                        for tb in tbparams.keys()])
1480
1481        self.append_experiment_authorization(expid, auth_attrs)
1482
1483        # Convert the software locations in the segments into the local
1484        # copies on this host
1485        for soft in [ s for tb in topo.values() \
1486                for e in tb.elements \
1487                    if getattr(e, 'software', False) \
1488                        for s in e.software ]:
1489            if softmap.has_key(soft.location):
1490                soft.location = softmap[soft.location]
1491
1492
1493    def new_experiment(self, req, fid):
1494        """
1495        The external interface to empty initial experiment creation called from
1496        the dispatcher.
1497
1498        Creates a working directory, splits the incoming description using the
1499        splitter script and parses out the avrious subsections using the
1500        lcasses above.  Once each sub-experiment is created, use pooled threads
1501        to instantiate them and start it all up.
1502        """
1503        req = req.get('NewRequestBody', None)
1504        if not req:
1505            raise service_error(service_error.req,
1506                    "Bad request format (no NewRequestBody)")
1507
1508        if self.auth.import_credentials(data_list=req.get('credential', [])):
1509            self.auth.save()
1510
1511        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1512                with_proof=True)
1513
1514        if not access_ok:
1515            raise service_error(service_error.access, "New access denied",
1516                    proof=[proof])
1517
1518        try:
1519            tmpdir = tempfile.mkdtemp(prefix="split-")
1520        except EnvironmentError:
1521            raise service_error(service_error.internal, "Cannot create tmp dir")
1522
1523        try:
1524            access_user = self.accessdb[fid]
1525        except KeyError:
1526            raise service_error(service_error.internal,
1527                    "Access map and authorizer out of sync in " + \
1528                            "new_experiment for fedid %s"  % fid)
1529
1530        # Generate an ID for the experiment (slice) and a certificate that the
1531        # allocator can use to prove they own it.  We'll ship it back through
1532        # the encrypted connection.  If the requester supplied one, use it.
1533        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1534            expcert = req['experimentAccess']['X509']
1535            expid = fedid(certstr=expcert)
1536            self.state_lock.acquire()
1537            if expid in self.state:
1538                self.state_lock.release()
1539                raise service_error(service_error.req, 
1540                        'fedid %s identifies an existing experiment' % expid)
1541            self.state_lock.release()
1542        else:
1543            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1544
1545        #now we're done with the tmpdir, and it should be empty
1546        if self.cleanup:
1547            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1548            os.rmdir(tmpdir)
1549        else:
1550            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1551
1552        eid = self.create_experiment_state(fid, req, expid, expcert, 
1553                state='empty')
1554
1555        rv = {
1556                'experimentID': [
1557                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1558                ],
1559                'experimentStatus': 'empty',
1560                'experimentAccess': { 'X509' : expcert },
1561                'proof': proof.to_dict(),
1562            }
1563
1564        return rv
1565
1566    # create_experiment sub-functions
1567
1568    @staticmethod
1569    def get_experiment_key(req, field='experimentID'):
1570        """
1571        Parse the experiment identifiers out of the request (the request body
1572        tag has been removed).  Specifically this pulls either the fedid or the
1573        localname out of the experimentID field.  A fedid is preferred.  If
1574        neither is present or the request does not contain the fields,
1575        service_errors are raised.
1576        """
1577        # Get the experiment access
1578        exp = req.get(field, None)
1579        if exp:
1580            if exp.has_key('fedid'):
1581                key = exp['fedid']
1582            elif exp.has_key('localname'):
1583                key = exp['localname']
1584            else:
1585                raise service_error(service_error.req, "Unknown lookup type")
1586        else:
1587            raise service_error(service_error.req, "No request?")
1588
1589        return key
1590
1591    def get_experiment_ids_and_start(self, key, tmpdir):
1592        """
1593        Get the experiment name, id and access certificate from the state, and
1594        set the experiment state to 'starting'.  returns a triple (fedid,
1595        localname, access_cert_file). The access_cert_file is a copy of the
1596        contents of the access certificate, created in the tempdir with
1597        restricted permissions.  If things are confused, raise an exception.
1598        """
1599
1600        expid = eid = None
1601        self.state_lock.acquire()
1602        if self.state.has_key(key):
1603            self.state[key]['experimentStatus'] = "starting"
1604            for e in self.state[key].get('experimentID',[]):
1605                if not expid and e.has_key('fedid'):
1606                    expid = e['fedid']
1607                elif not eid and e.has_key('localname'):
1608                    eid = e['localname']
1609            if 'experimentAccess' in self.state[key] and \
1610                    'X509' in self.state[key]['experimentAccess']:
1611                expcert = self.state[key]['experimentAccess']['X509']
1612            else:
1613                expcert = None
1614        self.state_lock.release()
1615
1616        # make a protected copy of the access certificate so the experiment
1617        # controller can act as the experiment principal.
1618        if expcert:
1619            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1620            if not expcert_file:
1621                raise service_error(service_error.internal, 
1622                        "Cannot create temp cert file?")
1623        else:
1624            expcert_file = None
1625
1626        return (eid, expid, expcert_file)
1627
1628    def get_topology(self, req, tmpdir):
1629        """
1630        Get the ns2 content and put it into a file for parsing.  Call the local
1631        or remote parser and return the topdl.Topology.  Errors result in
1632        exceptions.  req is the request and tmpdir is a work directory.
1633        """
1634
1635        # The tcl parser needs to read a file so put the content into that file
1636        descr=req.get('experimentdescription', None)
1637        if descr:
1638            if 'ns2description' in descr:
1639                file_content=descr['ns2description']
1640            elif 'topdldescription' in descr:
1641                return topdl.Topology(**descr['topdldescription'])
1642            else:
1643                raise service_error(service_error.req, 
1644                        'Unknown experiment description type')
1645        else:
1646            raise service_error(service_error.req, "No experiment description")
1647
1648
1649        if self.splitter_url:
1650            self.log.debug("Calling remote topdl translator at %s" % \
1651                    self.splitter_url)
1652            top = self.remote_ns2topdl(self.splitter_url, file_content)
1653        else:
1654            tclfile = os.path.join(tmpdir, "experiment.tcl")
1655            if file_content:
1656                try:
1657                    f = open(tclfile, 'w')
1658                    f.write(file_content)
1659                    f.close()
1660                except EnvironmentError:
1661                    raise service_error(service_error.internal,
1662                            "Cannot write temp experiment description")
1663            else:
1664                raise service_error(service_error.req, 
1665                        "Only ns2descriptions supported")
1666            pid = "dummy"
1667            gid = "dummy"
1668            eid = "dummy"
1669
1670            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1671                str(self.muxmax), '-m', 'dummy']
1672
1673            tclcmd.extend([pid, gid, eid, tclfile])
1674
1675            self.log.debug("running local splitter %s", " ".join(tclcmd))
1676            # This is just fantastic.  As a side effect the parser copies
1677            # tb_compat.tcl into the current directory, so that directory
1678            # must be writable by the fedd user.  Doing this in the
1679            # temporary subdir ensures this is the case.
1680            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1681                    cwd=tmpdir)
1682            split_data = tclparser.stdout
1683
1684            top = topdl.topology_from_xml(file=split_data, top="experiment")
1685            os.remove(tclfile)
1686
1687        return top
1688
1689    def get_testbed_services(self, req, testbeds):
1690        """
1691        Parse the services section of the request into into two dicts mapping
1692        testbed to lists of federated_service objects.  The first lists all
1693        exporters of services, and the second all exporters of services that
1694        need control portals int the experiment.
1695        """
1696        masters = { }
1697        pmasters = { }
1698        for s in req.get('service', []):
1699            # If this is a service request with the importall field
1700            # set, fill it out.
1701
1702            if s.get('importall', False):
1703                s['import'] = [ tb for tb in testbeds \
1704                        if tb not in s.get('export',[])]
1705                del s['importall']
1706
1707            # Add the service to masters
1708            for tb in s.get('export', []):
1709                if s.get('name', None):
1710
1711                    params = { }
1712                    for a in s.get('fedAttr', []):
1713                        params[a.get('attribute', '')] = a.get('value','')
1714
1715                    fser = federated_service(name=s['name'],
1716                            exporter=tb, importers=s.get('import',[]),
1717                            params=params)
1718                    if fser.name == 'hide_hosts' \
1719                            and 'hosts' not in fser.params:
1720                        fser.params['hosts'] = \
1721                                ",".join(tb_hosts.get(fser.exporter, []))
1722                    if tb in masters: masters[tb].append(fser)
1723                    else: masters[tb] = [fser]
1724
1725                    if fser.portal:
1726                        if tb not in pmasters: pmasters[tb] = [ fser ]
1727                        else: pmasters[tb].append(fser)
1728                else:
1729                    self.log.error('Testbed service does not have name " + \
1730                            "and importers')
1731        return masters, pmasters
1732
1733    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1734        """
1735        Create the ssh keys necessary for interconnecting the potral nodes and
1736        the global hosts file for letting each segment know about the IP
1737        addresses in play.  Save these into the repo.  Add attributes to the
1738        autorizer allowing access controllers to download them and return a set
1739        of attributes that inform the segments where to find this stuff.  Mau
1740        raise service_errors in if there are problems.
1741        """
1742        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1743        gw_secretkey_base = "fed.%s" % self.ssh_type
1744        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1745        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1746
1747        try:
1748            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1749        except ValueError:
1750            raise service_error(service_error.server_config, 
1751                    "Bad key type (%s)" % self.ssh_type)
1752
1753
1754        # Copy configuration files into the remote file store
1755        # The config urlpath
1756        configpath = "/%s/config" % expid
1757        # The config file system location
1758        configdir ="%s%s" % ( self.repodir, configpath)
1759        try:
1760            os.makedirs(configdir)
1761        except EnvironmentError, e:
1762            raise service_error(service_error.internal,
1763                    "Cannot create config directory: %s" % e)
1764        try:
1765            f = open("%s/hosts" % configdir, "w")
1766            print >> f, string.join(hosts, '\n')
1767            f.close()
1768        except EnvironmentError, e:
1769            raise service_error(service_error.internal, 
1770                    "Cannot write hosts file: %s" % e)
1771        try:
1772            copy_file("%s" % gw_pubkey, "%s/%s" % \
1773                    (configdir, gw_pubkey_base))
1774            copy_file("%s" % gw_secretkey, "%s/%s" % \
1775                    (configdir, gw_secretkey_base))
1776        except EnvironmentError, e:
1777            raise service_error(service_error.internal, 
1778                    "Cannot copy keyfiles: %s" % e)
1779
1780        # Allow the individual testbeds to access the configuration files,
1781        # again by setting an attribute for the relevant pathnames on each
1782        # allocation principal.  Yeah, that's a long list comprehension.
1783        self.append_experiment_authorization(expid, set([
1784            (tbparams[tb]['allocID']['fedid'], "%s/%s" % (configpath, f)) \
1785                    for tb in tbparams.keys() \
1786                        for f in ("hosts", gw_secretkey_base, gw_pubkey_base)]))
1787
1788        attrs = [ 
1789                {
1790                    'attribute': 'ssh_pubkey', 
1791                    'value': '%s/%s/config/%s' % \
1792                            (self.repo_url, expid, gw_pubkey_base)
1793                },
1794                {
1795                    'attribute': 'ssh_secretkey', 
1796                    'value': '%s/%s/config/%s' % \
1797                            (self.repo_url, expid, gw_secretkey_base)
1798                },
1799                {
1800                    'attribute': 'hosts', 
1801                    'value': '%s/%s/config/hosts' % \
1802                            (self.repo_url, expid)
1803                },
1804            ]
1805        return attrs
1806
1807
1808    def get_vtopo(self, req, fid):
1809        """
1810        Return the stored virtual topology for this experiment
1811        """
1812        rv = None
1813        state = None
1814
1815        req = req.get('VtopoRequestBody', None)
1816        if not req:
1817            raise service_error(service_error.req,
1818                    "Bad request format (no VtopoRequestBody)")
1819        exp = req.get('experiment', None)
1820        if exp:
1821            if exp.has_key('fedid'):
1822                key = exp['fedid']
1823                keytype = "fedid"
1824            elif exp.has_key('localname'):
1825                key = exp['localname']
1826                keytype = "localname"
1827            else:
1828                raise service_error(service_error.req, "Unknown lookup type")
1829        else:
1830            raise service_error(service_error.req, "No request?")
1831
1832        proof = self.check_experiment_access(fid, key)
1833
1834        self.state_lock.acquire()
1835        if self.state.has_key(key):
1836            if self.state[key].has_key('vtopo'):
1837                rv = { 'experiment' : {keytype: key },
1838                        'vtopo': self.state[key]['vtopo'],
1839                        'proof': proof.to_dict(), 
1840                    }
1841            else:
1842                state = self.state[key]['experimentStatus']
1843        self.state_lock.release()
1844
1845        if rv: return rv
1846        else: 
1847            if state:
1848                raise service_error(service_error.partial, 
1849                        "Not ready: %s" % state)
1850            else:
1851                raise service_error(service_error.req, "No such experiment")
1852
1853    def get_vis(self, req, fid):
1854        """
1855        Return the stored visualization for this experiment
1856        """
1857        rv = None
1858        state = None
1859
1860        req = req.get('VisRequestBody', None)
1861        if not req:
1862            raise service_error(service_error.req,
1863                    "Bad request format (no VisRequestBody)")
1864        exp = req.get('experiment', None)
1865        if exp:
1866            if exp.has_key('fedid'):
1867                key = exp['fedid']
1868                keytype = "fedid"
1869            elif exp.has_key('localname'):
1870                key = exp['localname']
1871                keytype = "localname"
1872            else:
1873                raise service_error(service_error.req, "Unknown lookup type")
1874        else:
1875            raise service_error(service_error.req, "No request?")
1876
1877        proof = self.check_experiment_access(fid, key)
1878
1879        self.state_lock.acquire()
1880        if self.state.has_key(key):
1881            if self.state[key].has_key('vis'):
1882                rv =  { 'experiment' : {keytype: key },
1883                        'vis': self.state[key]['vis'],
1884                        'proof': proof.to_dict(), 
1885                        }
1886            else:
1887                state = self.state[key]['experimentStatus']
1888        self.state_lock.release()
1889
1890        if rv: return rv
1891        else:
1892            if state:
1893                raise service_error(service_error.partial, 
1894                        "Not ready: %s" % state)
1895            else:
1896                raise service_error(service_error.req, "No such experiment")
1897
1898   
1899    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1900            top):
1901        """
1902        Store the various data that have changed in the experiment state
1903        between when it was started and the beginning of resource allocation.
1904        This is basically the information about each local allocation.  This
1905        fills in the values of the placeholder allocation in the state.  It
1906        also collects the access proofs and returns them as dicts for a
1907        response message.
1908        """
1909        # save federant information
1910        for k in allocated.keys():
1911            tbparams[k]['federant'] = {
1912                    'name': [ { 'localname' : eid} ],
1913                    'allocID' : tbparams[k]['allocID'],
1914                    'uri': tbparams[k]['uri'],
1915                    'proof': tbparams[k]['proof'],
1916                }
1917
1918        self.state_lock.acquire()
1919        self.state[eid]['vtopo'] = vtopo
1920        self.state[eid]['vis'] = vis
1921        self.state[eid]['experimentdescription'] = \
1922                { 'topdldescription': top.to_dict() }
1923        self.state[eid]['federant'] = \
1924                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1925                    if tbparams[tb].has_key('federant') ]
1926        # Access proofs for the response message
1927        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
1928                    for p in tbparams[k]['federant']['proof']]
1929        if self.state_filename: 
1930            self.write_state()
1931        self.state_lock.release()
1932        return proofs
1933
1934    def clear_placeholder(self, eid, expid, tmpdir):
1935        """
1936        Clear the placeholder and remove any allocated temporary dir.
1937        """
1938
1939        self.state_lock.acquire()
1940        del self.state[eid]
1941        del self.state[expid]
1942        if self.state_filename: self.write_state()
1943        self.state_lock.release()
1944        if tmpdir and self.cleanup:
1945            self.remove_dirs(tmpdir)
1946
1947    # end of create_experiment sub-functions
1948
1949    def create_experiment(self, req, fid):
1950        """
1951        The external interface to experiment creation called from the
1952        dispatcher.
1953
1954        Creates a working directory, splits the incoming description using the
1955        splitter script and parses out the various subsections using the
1956        classes above.  Once each sub-experiment is created, use pooled threads
1957        to instantiate them and start it all up.
1958        """
1959
1960        req = req.get('CreateRequestBody', None)
1961        if req:
1962            key = self.get_experiment_key(req)
1963        else:
1964            raise service_error(service_error.req,
1965                    "Bad request format (no CreateRequestBody)")
1966
1967        # Import information from the requester
1968        if self.auth.import_credentials(data_list=req.get('credential', [])):
1969            self.auth.save()
1970
1971        # Make sure that the caller can talk to us
1972        proof = self.check_experiment_access(fid, key)
1973
1974        # Install the testbed map entries supplied with the request into a copy
1975        # of the testbed map.
1976        tbmap = dict(self.tbmap)
1977        for m in req.get('testbedmap', []):
1978            if 'testbed' in m and 'uri' in m:
1979                tbmap[m['testbed']] = m['uri']
1980
1981        # a place to work
1982        try:
1983            tmpdir = tempfile.mkdtemp(prefix="split-")
1984            os.mkdir(tmpdir+"/keys")
1985        except EnvironmentError:
1986            raise service_error(service_error.internal, "Cannot create tmp dir")
1987
1988        tbparams = { }
1989
1990        eid, expid, expcert_file = \
1991                self.get_experiment_ids_and_start(key, tmpdir)
1992
1993        # This catches exceptions to clear the placeholder if necessary
1994        try: 
1995            if not (eid and expid):
1996                raise service_error(service_error.internal, 
1997                        "Cannot find local experiment info!?")
1998
1999            top = self.get_topology(req, tmpdir)
2000            self.confirm_software(top)
2001            # Assign the IPs
2002            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2003            # Find the testbeds to look up
2004            tb_hosts = { }
2005            testbeds = [ ]
2006            for e in top.elements:
2007                if isinstance(e, topdl.Computer):
2008                    tb = e.get_attribute('testbed') or 'default'
2009                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2010                    else: 
2011                        tb_hosts[tb] = [ e.name ]
2012                        testbeds.append(tb)
2013
2014            masters, pmasters = self.get_testbed_services(req, testbeds)
2015            allocated = { }         # Testbeds we can access
2016            topo ={ }               # Sub topologies
2017            connInfo = { }          # Connection information
2018
2019            self.split_topology(top, topo, testbeds)
2020
2021            self.get_access_to_testbeds(testbeds, fid, allocated, 
2022                    tbparams, masters, tbmap, expid, expcert_file)
2023
2024            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2025
2026            part = experiment_partition(self.auth, self.store_url, tbmap,
2027                    self.muxmax, self.direct_transit)
2028            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2029                    connInfo, expid)
2030
2031            auth_attrs = set()
2032            # Now get access to the dynamic testbeds (those added above)
2033            for tb in [ t for t in topo if t not in allocated]:
2034                self.get_access(tb, tbparams, fid, masters, tbmap, 
2035                        expid, expcert_file)
2036                allocated[tb] = 1
2037                store_keys = topo[tb].get_attribute('store_keys')
2038                # Give the testbed access to keys it exports or imports
2039                if store_keys:
2040                    auth_attrs.update(set([
2041                        (tbparams[tb]['allocID']['fedid'], sk) \
2042                                for sk in store_keys.split(" ")]))
2043
2044            if auth_attrs:
2045                self.append_experiment_authorization(expid, auth_attrs)
2046
2047            # transit and disconnected testbeds may not have a connInfo entry.
2048            # Fill in the blanks.
2049            for t in allocated.keys():
2050                if not connInfo.has_key(t):
2051                    connInfo[t] = { }
2052
2053            self.wrangle_software(expid, top, topo, tbparams)
2054
2055            vtopo = topdl.topology_to_vtopo(top)
2056            vis = self.genviz(vtopo)
2057            proofs = self.save_federant_information(allocated, tbparams, 
2058                    eid, vtopo, vis, top)
2059        except service_error, e:
2060            # If something goes wrong in the parse (usually an access error)
2061            # clear the placeholder state.  From here on out the code delays
2062            # exceptions.  Failing at this point returns a fault to the remote
2063            # caller.
2064            self.clear_placeholder(eid, expid, tmpdir)
2065            raise e
2066
2067        # Start the background swapper and return the starting state.  From
2068        # here on out, the state will stick around a while.
2069
2070        # Create a logger that logs to the experiment's state object as well as
2071        # to the main log file.
2072        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2073        alloc_collector = self.list_log(self.state[eid]['log'])
2074        h = logging.StreamHandler(alloc_collector)
2075        # XXX: there should be a global one of these rather than repeating the
2076        # code.
2077        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2078                    '%d %b %y %H:%M:%S'))
2079        alloc_log.addHandler(h)
2080
2081        # Start a thread to do the resource allocation
2082        t  = Thread(target=self.allocate_resources,
2083                args=(allocated, masters, eid, expid, tbparams, 
2084                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2085                    connInfo, tbmap, expcert_file),
2086                name=eid)
2087        t.start()
2088
2089        rv = {
2090                'experimentID': [
2091                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2092                ],
2093                'experimentStatus': 'starting',
2094                'proof': [ proof.to_dict() ] + proofs,
2095            }
2096
2097        return rv
2098   
2099    def get_experiment_fedid(self, key):
2100        """
2101        find the fedid associated with the localname key in the state database.
2102        """
2103
2104        rv = None
2105        self.state_lock.acquire()
2106        if self.state.has_key(key):
2107            if isinstance(self.state[key], dict):
2108                try:
2109                    kl = [ f['fedid'] for f in \
2110                            self.state[key]['experimentID']\
2111                                if f.has_key('fedid') ]
2112                except KeyError:
2113                    self.state_lock.release()
2114                    raise service_error(service_error.internal, 
2115                            "No fedid for experiment %s when getting "+\
2116                                    "fedid(!?)" % key)
2117                if len(kl) == 1:
2118                    rv = kl[0]
2119                else:
2120                    self.state_lock.release()
2121                    raise service_error(service_error.internal, 
2122                            "multiple fedids for experiment %s when " +\
2123                                    "getting fedid(!?)" % key)
2124            else:
2125                self.state_lock.release()
2126                raise service_error(service_error.internal, 
2127                        "Unexpected state for %s" % key)
2128        self.state_lock.release()
2129        return rv
2130
2131    def check_experiment_access(self, fid, key):
2132        """
2133        Confirm that the fid has access to the experiment.  Though a request
2134        may be made in terms of a local name, the access attribute is always
2135        the experiment's fedid.
2136        """
2137        if not isinstance(key, fedid):
2138            key = self.get_experiment_fedid(key)
2139
2140        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2141
2142        if access_ok:
2143            return proof
2144        else:
2145            raise service_error(service_error.access, "Access Denied",
2146                proof)
2147
2148
2149    def get_handler(self, path, fid):
2150        """
2151        Perhaps surprisingly named, this function handles HTTP GET requests to
2152        this server (SOAP requests are POSTs).
2153        """
2154        self.log.info("Get handler %s %s" % (path, fid))
2155        # XXX: log proofs?
2156        if self.auth.check_attribute(fid, path):
2157            return ("%s/%s" % (self.repodir, path), "application/binary")
2158        else:
2159            return (None, None)
2160
2161    def clean_info_response(self, rv, proof):
2162        """
2163        Remove the information in the experiment's state object that is not in
2164        the info response.
2165        """
2166        # Remove the owner info (should always be there, but...)
2167        if rv.has_key('owner'): del rv['owner']
2168        if 'auth' in rv: del rv['auth']
2169
2170        # Convert the log into the allocationLog parameter and remove the
2171        # log entry (with defensive programming)
2172        if rv.has_key('log'):
2173            rv['allocationLog'] = "".join(rv['log'])
2174            del rv['log']
2175        else:
2176            rv['allocationLog'] = ""
2177
2178        if rv['experimentStatus'] != 'active':
2179            if rv.has_key('federant'): del rv['federant']
2180        else:
2181            # remove the allocationID and uri info from each federant
2182            for f in rv.get('federant', []):
2183                if f.has_key('allocID'): del f['allocID']
2184                if f.has_key('uri'): del f['uri']
2185        rv['proof'] = proof.to_dict()
2186
2187        return rv
2188
2189    def get_info(self, req, fid):
2190        """
2191        Return all the stored info about this experiment
2192        """
2193        rv = None
2194
2195        req = req.get('InfoRequestBody', None)
2196        if not req:
2197            raise service_error(service_error.req,
2198                    "Bad request format (no InfoRequestBody)")
2199        exp = req.get('experiment', None)
2200        if exp:
2201            if exp.has_key('fedid'):
2202                key = exp['fedid']
2203                keytype = "fedid"
2204            elif exp.has_key('localname'):
2205                key = exp['localname']
2206                keytype = "localname"
2207            else:
2208                raise service_error(service_error.req, "Unknown lookup type")
2209        else:
2210            raise service_error(service_error.req, "No request?")
2211
2212        proof = self.check_experiment_access(fid, key)
2213
2214        # The state may be massaged by the service function that called
2215        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2216        # state.
2217        self.state_lock.acquire()
2218        if self.state.has_key(key):
2219            rv = copy.deepcopy(self.state[key])
2220        self.state_lock.release()
2221
2222        if rv:
2223            return self.clean_info_response(rv, proof)
2224        else:
2225            raise service_error(service_error.req, "No such experiment")
2226
2227    def get_multi_info(self, req, fid):
2228        """
2229        Return all the stored info that this fedid can access
2230        """
2231        rv = { 'info': [ ], 'proof': [ ] }
2232
2233        self.state_lock.acquire()
2234        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2235            try:
2236                proof = self.check_experiment_access(fid, key)
2237            except service_error, e:
2238                if e.code == service_error.access:
2239                    continue
2240                else:
2241                    self.state_lock.release()
2242                    raise e
2243
2244            if self.state.has_key(key):
2245                e = copy.deepcopy(self.state[key])
2246                e = self.clean_info_response(e, proof)
2247                rv['info'].append(e)
2248                rv['proof'].append(proof.to_dict())
2249        self.state_lock.release()
2250        return rv
2251
2252    def check_termination_status(self, fed_exp, force):
2253        """
2254        Confirm that the experiment is sin a valid state to stop (or force it)
2255        return the state - invalid states for deletion and force settings cause
2256        exceptions.
2257        """
2258        self.state_lock.acquire()
2259        status = fed_exp.get('experimentStatus', None)
2260
2261        if status:
2262            if status in ('starting', 'terminating'):
2263                if not force:
2264                    self.state_lock.release()
2265                    raise service_error(service_error.partial, 
2266                            'Experiment still being created or destroyed')
2267                else:
2268                    self.log.warning('Experiment in %s state ' % status + \
2269                            'being terminated by force.')
2270            self.state_lock.release()
2271            return status
2272        else:
2273            # No status??? trouble
2274            self.state_lock.release()
2275            raise service_error(service_error.internal,
2276                    "Experiment has no status!?")
2277
2278
2279    def get_termination_info(self, fed_exp):
2280        ids = []
2281        term_params = { }
2282        self.state_lock.acquire()
2283        #  experimentID is a list of dicts that are self-describing
2284        #  identifiers.  This finds all the fedids and localnames - the
2285        #  keys of self.state - and puts them into ids, which is used to delete
2286        #  the state after everything is swapped out.
2287        for id in fed_exp.get('experimentID', []):
2288            if 'fedid' in id: 
2289                ids.append(id['fedid'])
2290                repo = "%s" % id['fedid']
2291            if 'localname' in id: ids.append(id['localname'])
2292
2293        # Get the experimentAccess - the principal for this experiment.  It
2294        # is this principal to which credentials have been delegated, and
2295        # as which the experiment controller must act.
2296        if 'experimentAccess' in fed_exp and \
2297                'X509' in fed_exp['experimentAccess']:
2298            expcert = fed_exp['experimentAccess']['X509']
2299        else:
2300            expcert = None
2301
2302        # Collect the allocation/segment ids into a dict keyed by the fedid
2303        # of the allocation (or a monotonically increasing integer) that
2304        # contains a tuple of uri, aid (which is a dict...)
2305        for i, fed in enumerate(fed_exp.get('federant', [])):
2306            try:
2307                uri = fed['uri']
2308                aid = fed['allocID']
2309                k = fed['allocID'].get('fedid', i)
2310            except KeyError, e:
2311                continue
2312            term_params[k] = (uri, aid)
2313        # Change the experiment state
2314        fed_exp['experimentStatus'] = 'terminating'
2315        if self.state_filename: self.write_state()
2316        self.state_lock.release()
2317
2318        return ids, term_params, expcert, repo
2319
2320
2321    def deallocate_resources(self, term_params, expcert, status, force, 
2322            dealloc_log):
2323        tmpdir = None
2324        # This try block makes sure the tempdir is cleared
2325        try:
2326            # If no expcert, try the deallocation as the experiment
2327            # controller instance.
2328            if expcert and self.auth_type != 'legacy': 
2329                try:
2330                    tmpdir = tempfile.mkdtemp(prefix="term-")
2331                except EnvironmentError:
2332                    raise service_error(service_error.internal, 
2333                            "Cannot create tmp dir")
2334                cert_file = self.make_temp_certfile(expcert, tmpdir)
2335                pw = None
2336            else: 
2337                cert_file = self.cert_file
2338                pw = self.cert_pwd
2339
2340            # Stop everyone.  NB, wait_for_all waits until a thread starts
2341            # and then completes, so we can't wait if nothing starts.  So,
2342            # no tbparams, no start.
2343            if len(term_params) > 0:
2344                tp = thread_pool(self.nthreads)
2345                for k, (uri, aid) in term_params.items():
2346                    # Create and start a thread to stop the segment
2347                    tp.wait_for_slot()
2348                    t  = pooled_thread(\
2349                            target=self.terminate_segment(log=dealloc_log,
2350                                testbed=uri,
2351                                cert_file=cert_file, 
2352                                cert_pwd=pw,
2353                                trusted_certs=self.trusted_certs,
2354                                caller=self.call_TerminateSegment),
2355                            args=(uri, aid), name=k,
2356                            pdata=tp, trace_file=self.trace_file)
2357                    t.start()
2358                # Wait for completions
2359                tp.wait_for_all_done()
2360
2361            # release the allocations (failed experiments have done this
2362            # already, and starting experiments may be in odd states, so we
2363            # ignore errors releasing those allocations
2364            try: 
2365                for k, (uri, aid)  in term_params.items():
2366                    self.release_access(None, aid, uri=uri,
2367                            cert_file=cert_file, cert_pwd=pw)
2368            except service_error, e:
2369                if status != 'failed' and not force:
2370                    raise e
2371
2372        # Clean up the tmpdir no matter what
2373        finally:
2374            if tmpdir: self.remove_dirs(tmpdir)
2375
2376    def terminate_experiment(self, req, fid):
2377        """
2378        Swap this experiment out on the federants and delete the shared
2379        information
2380        """
2381        tbparams = { }
2382        req = req.get('TerminateRequestBody', None)
2383        if not req:
2384            raise service_error(service_error.req,
2385                    "Bad request format (no TerminateRequestBody)")
2386
2387        key = self.get_experiment_key(req, 'experiment')
2388        proof = self.check_experiment_access(fid, key)
2389        exp = req.get('experiment', False)
2390        force = req.get('force', False)
2391
2392        dealloc_list = [ ]
2393
2394
2395        # Create a logger that logs to the dealloc_list as well as to the main
2396        # log file.
2397        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2398        h = logging.StreamHandler(self.list_log(dealloc_list))
2399        # XXX: there should be a global one of these rather than repeating the
2400        # code.
2401        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2402                    '%d %b %y %H:%M:%S'))
2403        dealloc_log.addHandler(h)
2404
2405        self.state_lock.acquire()
2406        fed_exp = self.state.get(key, None)
2407        self.state_lock.release()
2408        repo = None
2409
2410        if fed_exp:
2411            status = self.check_termination_status(fed_exp, force)
2412            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2413            self.deallocate_resources(term_params, expcert, status, force, 
2414                    dealloc_log)
2415
2416            # Remove the terminated experiment
2417            self.state_lock.acquire()
2418            for id in ids:
2419                self.clear_experiment_authorization(id, need_state_lock=False)
2420                if id in self.state: del self.state[id]
2421
2422            if self.state_filename: self.write_state()
2423            self.state_lock.release()
2424
2425            # Delete any synch points associated with this experiment.  All
2426            # synch points begin with the fedid of the experiment.
2427            fedid_keys = set(["fedid:%s" % f for f in ids \
2428                    if isinstance(f, fedid)])
2429            for k in self.synch_store.all_keys():
2430                try:
2431                    if len(k) > 45 and k[0:46] in fedid_keys:
2432                        self.synch_store.del_value(k)
2433                except synch_store.BadDeletionError:
2434                    pass
2435            self.write_store()
2436
2437            # Remove software and other cached stuff from the filesystem.
2438            if repo:
2439                self.remove_dirs("%s/%s" % (self.repodir, repo))
2440       
2441            return { 
2442                    'experiment': exp , 
2443                    'deallocationLog': string.join(dealloc_list, ''),
2444                    'proof': [proof.to_dict()],
2445                    }
2446        else:
2447            raise service_error(service_error.req, "No saved state")
2448
2449
2450    def GetValue(self, req, fid):
2451        """
2452        Get a value from the synchronized store
2453        """
2454        req = req.get('GetValueRequestBody', None)
2455        if not req:
2456            raise service_error(service_error.req,
2457                    "Bad request format (no GetValueRequestBody)")
2458       
2459        name = req.get('name', None)
2460        wait = req.get('wait', False)
2461        rv = { 'name': name }
2462
2463        if not name:
2464            raise service_error(service_error.req, "No name?")
2465
2466        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2467
2468        if access_ok:
2469            self.log.debug("[GetValue] asking for %s " % name)
2470            try:
2471                v = self.synch_store.get_value(name, wait)
2472            except synch_store.RevokedKeyError:
2473                # No more synch on this key
2474                raise service_error(service_error.federant, 
2475                        "Synch key %s revoked" % name)
2476            if v is not None:
2477                rv['value'] = v
2478            rv['proof'] = proof.to_dict()
2479            self.log.debug("[GetValue] got %s from %s" % (v, name))
2480            return rv
2481        else:
2482            raise service_error(service_error.access, "Access Denied",
2483                    proof=proof)
2484       
2485
2486    def SetValue(self, req, fid):
2487        """
2488        Set a value in the synchronized store
2489        """
2490        req = req.get('SetValueRequestBody', None)
2491        if not req:
2492            raise service_error(service_error.req,
2493                    "Bad request format (no SetValueRequestBody)")
2494       
2495        name = req.get('name', None)
2496        v = req.get('value', '')
2497
2498        if not name:
2499            raise service_error(service_error.req, "No name?")
2500
2501        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2502
2503        if access_ok:
2504            try:
2505                self.synch_store.set_value(name, v)
2506                self.write_store()
2507                self.log.debug("[SetValue] set %s to %s" % (name, v))
2508            except synch_store.CollisionError:
2509                # Translate into a service_error
2510                raise service_error(service_error.req,
2511                        "Value already set: %s" %name)
2512            except synch_store.RevokedKeyError:
2513                # No more synch on this key
2514                raise service_error(service_error.federant, 
2515                        "Synch key %s revoked" % name)
2516                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2517        else:
2518            raise service_error(service_error.access, "Access Denied",
2519                    proof=proof)
Note: See TracBrowser for help on using the repository browser.