source: fedd/federation/experiment_control.py @ 3df9b33

compt_changesinfo-ops
Last change on this file since 3df9b33 was 3df9b33, checked in by Ted Faber <faber@…>, 8 years ago

fedd-generated SEER certs and distribution (initial implementation,
untested) addresses #33

  • Property mode set to 100644
File size: 87.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from fedid import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38
39import topdl
40import list_log
41from ip_allocator import ip_allocator
42from ip_addr import ip_addr
43
44
45class nullHandler(logging.Handler):
46    def emit(self, record): pass
47
48fl = logging.getLogger("fedd.experiment_control")
49fl.addHandler(nullHandler())
50
51
52# Right now, no support for composition.
53class federated_service:
54    def __init__(self, name, exporter=None, importers=None, params=None, 
55            reqs=None, portal=None):
56        self.name=name
57        self.exporter=exporter
58        if importers is None: self.importers = []
59        else: self.importers=importers
60        if params is None: self.params = { }
61        else: self.params = params
62        if reqs is None: self.reqs = []
63        else: self.reqs = reqs
64
65        if portal is not None:
66            self.portal = portal
67        else:
68            self.portal = (name in federated_service.needs_portal)
69
70    def __str__(self):
71        return "name %s export %s import %s params %s reqs %s" % \
72                (self.name, self.exporter, self.importers, self.params,
73                        [ (r['name'], r['visibility']) for r in self.reqs] )
74
75    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
76
77class experiment_control_local(experiment_control_legacy):
78    """
79    Control of experiments that this system can directly access.
80
81    Includes experiment creation, termination and information dissemination.
82    Thred safe.
83    """
84
85    class ssh_cmd_timeout(RuntimeError): pass
86   
87    call_RequestAccess = service_caller('RequestAccess')
88    call_ReleaseAccess = service_caller('ReleaseAccess')
89    call_StartSegment = service_caller('StartSegment')
90    call_TerminateSegment = service_caller('TerminateSegment')
91    call_Ns2Topdl = service_caller('Ns2Topdl')
92
93    def __init__(self, config=None, auth=None):
94        """
95        Intialize the various attributes, most from the config object
96        """
97
98        def parse_tarfile_list(tf):
99            """
100            Parse a tarfile list from the configuration.  This is a set of
101            paths and tarfiles separated by spaces.
102            """
103            rv = [ ]
104            if tf is not None:
105                tl = tf.split()
106                while len(tl) > 1:
107                    p, t = tl[0:2]
108                    del tl[0:2]
109                    rv.append((p, t))
110            return rv
111
112        self.list_log = list_log.list_log
113
114        self.cert_file = config.get("experiment_control", "cert_file")
115        if self.cert_file:
116            self.cert_pwd = config.get("experiment_control", "cert_pwd")
117        else:
118            self.cert_file = config.get("globals", "cert_file")
119            self.cert_pwd = config.get("globals", "cert_pwd")
120
121        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
122                or config.get("globals", "trusted_certs")
123
124        self.repodir = config.get("experiment_control", "repodir")
125        self.repo_url = config.get("experiment_control", "repo_url", 
126                "https://users.isi.deterlab.net:23235");
127
128        self.exp_stem = "fed-stem"
129        self.log = logging.getLogger("fedd.experiment_control")
130        set_log_level(config, "experiment_control", self.log)
131        self.muxmax = 2
132        self.nthreads = 10
133        self.randomize_experiments = False
134
135        self.splitter = None
136        self.ssh_keygen = "/usr/bin/ssh-keygen"
137        self.ssh_identity_file = None
138
139
140        self.debug = config.getboolean("experiment_control", "create_debug")
141        self.cleanup = not config.getboolean("experiment_control", 
142                "leave_tmpfiles")
143        self.state_filename = config.get("experiment_control", 
144                "experiment_state")
145        self.store_filename = config.get("experiment_control", 
146                "synch_store")
147        self.store_url = config.get("experiment_control", "store_url")
148        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
149        self.fedkit = parse_tarfile_list(\
150                config.get("experiment_control", "fedkit"))
151        self.gatewaykit = parse_tarfile_list(\
152                config.get("experiment_control", "gatewaykit"))
153        accessdb_file = config.get("experiment_control", "accessdb")
154
155        dt = config.get("experiment_control", "direct_transit")
156        self.auth_type = config.get('experiment_control', 'auth_type') \
157                or 'legacy'
158        self.auth_dir = config.get('experiment_control', 'auth_dir')
159        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
160        else: self.direct_transit = [ ]
161        # NB for internal master/slave ops, not experiment setup
162        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
163
164        self.overrides = set([])
165        ovr = config.get('experiment_control', 'overrides')
166        if ovr:
167            for o in ovr.split(","):
168                o = o.strip()
169                if o.startswith('fedid:'): o = o[len('fedid:'):]
170                self.overrides.add(fedid(hexstr=o))
171
172        self.state = { }
173        self.state_lock = Lock()
174        self.tclsh = "/usr/local/bin/otclsh"
175        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
176                config.get("experiment_control", "tcl_splitter",
177                        "/usr/testbed/lib/ns2ir/parse.tcl")
178        mapdb_file = config.get("experiment_control", "mapdb")
179        self.trace_file = sys.stderr
180
181        self.def_expstart = \
182                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
183                "/tmp/federate";
184        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
185                "FEDDIR/hosts";
186        self.def_gwstart = \
187                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
188                "/tmp/bridge.log";
189        self.def_mgwstart = \
190                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
191                "/tmp/bridge.log";
192        self.def_gwimage = "FBSD61-TUNNEL2";
193        self.def_gwtype = "pc";
194        self.local_access = { }
195
196        if self.auth_type == 'legacy':
197            if auth:
198                self.auth = auth
199            else:
200                self.log.error( "[access]: No authorizer initialized, " +\
201                        "creating local one.")
202                auth = authorizer()
203            self.get_access = self.legacy_get_access
204        elif self.auth_type == 'abac':
205            self.auth = abac_authorizer(load=self.auth_dir)
206        else:
207            raise service_error(service_error.internal, 
208                    "Unknown auth_type: %s" % self.auth_type)
209
210        if mapdb_file:
211            self.read_mapdb(mapdb_file)
212        else:
213            self.log.warn("[experiment_control] No testbed map, using defaults")
214            self.tbmap = { 
215                    'deter':'https://users.isi.deterlab.net:23235',
216                    'emulab':'https://users.isi.deterlab.net:23236',
217                    'ucb':'https://users.isi.deterlab.net:23237',
218                    }
219
220        if accessdb_file:
221                self.read_accessdb(accessdb_file)
222        else:
223            raise service_error(service_error.internal,
224                    "No accessdb specified in config")
225
226        # Grab saved state.  OK to do this w/o locking because it's read only
227        # and only one thread should be in existence that can see self.state at
228        # this point.
229        if self.state_filename:
230            self.read_state()
231
232        if self.store_filename:
233            self.read_store()
234        else:
235            self.log.warning("No saved synch store")
236            self.synch_store = synch_store
237
238        # Dispatch tables
239        self.soap_services = {\
240                'New': soap_handler('New', self.new_experiment),
241                'Create': soap_handler('Create', self.create_experiment),
242                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
243                'Vis': soap_handler('Vis', self.get_vis),
244                'Info': soap_handler('Info', self.get_info),
245                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
246                'Terminate': soap_handler('Terminate', 
247                    self.terminate_experiment),
248                'GetValue': soap_handler('GetValue', self.GetValue),
249                'SetValue': soap_handler('SetValue', self.SetValue),
250        }
251
252        self.xmlrpc_services = {\
253                'New': xmlrpc_handler('New', self.new_experiment),
254                'Create': xmlrpc_handler('Create', self.create_experiment),
255                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
256                'Vis': xmlrpc_handler('Vis', self.get_vis),
257                'Info': xmlrpc_handler('Info', self.get_info),
258                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
259                'Terminate': xmlrpc_handler('Terminate',
260                    self.terminate_experiment),
261                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
262                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
263        }
264
265    # Call while holding self.state_lock
266    def write_state(self):
267        """
268        Write a new copy of experiment state after copying the existing state
269        to a backup.
270
271        State format is a simple pickling of the state dictionary.
272        """
273        if os.access(self.state_filename, os.W_OK):
274            copy_file(self.state_filename, \
275                    "%s.bak" % self.state_filename)
276        try:
277            f = open(self.state_filename, 'w')
278            pickle.dump(self.state, f)
279        except EnvironmentError, e:
280            self.log.error("Can't write file %s: %s" % \
281                    (self.state_filename, e))
282        except pickle.PicklingError, e:
283            self.log.error("Pickling problem: %s" % e)
284        except TypeError, e:
285            self.log.error("Pickling problem (TypeError): %s" % e)
286
287    @staticmethod
288    def get_alloc_ids(state):
289        """
290        Pull the fedids of the identifiers of each allocation from the
291        state.  Again, a dict dive that's best isolated.
292
293        Used by read_store and read state
294        """
295
296        return [ f['allocID']['fedid'] 
297                for f in state.get('federant',[]) \
298                    if f.has_key('allocID') and \
299                        f['allocID'].has_key('fedid')]
300
301    # Call while holding self.state_lock
302    def read_state(self):
303        """
304        Read a new copy of experiment state.  Old state is overwritten.
305
306        State format is a simple pickling of the state dictionary.
307        """
308       
309        def get_experiment_id(state):
310            """
311            Pull the fedid experimentID out of the saved state.  This is kind
312            of a gross walk through the dict.
313            """
314
315            if state.has_key('experimentID'):
316                for e in state['experimentID']:
317                    if e.has_key('fedid'):
318                        return e['fedid']
319                else:
320                    return None
321            else:
322                return None
323
324        try:
325            f = open(self.state_filename, "r")
326            self.state = pickle.load(f)
327            self.log.debug("[read_state]: Read state from %s" % \
328                    self.state_filename)
329        except EnvironmentError, e:
330            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
331                    % (self.state_filename, e))
332        except pickle.UnpicklingError, e:
333            self.log.warning(("[read_state]: No saved state: " + \
334                    "Unpickling failed: %s") % e)
335       
336        for s in self.state.values():
337            try:
338
339                eid = get_experiment_id(s)
340                if eid : 
341                    if self.auth_type == 'legacy':
342                        # XXX: legacy
343                        # Give the owner rights to the experiment
344                        self.auth.set_attribute(s['owner'], eid)
345                        # And holders of the eid as well
346                        self.auth.set_attribute(eid, eid)
347                        # allow overrides to control experiments as well
348                        for o in self.overrides:
349                            self.auth.set_attribute(o, eid)
350                        # Set permissions to allow reading of the software
351                        # repo, if any, as well.
352                        for a in self.get_alloc_ids(s):
353                            self.auth.set_attribute(a, 'repo/%s' % eid)
354                else:
355                    raise KeyError("No experiment id")
356            except KeyError, e:
357                self.log.warning("[read_state]: State ownership or identity " +\
358                        "misformatted in %s: %s" % (self.state_filename, e))
359
360
361    def read_accessdb(self, accessdb_file):
362        """
363        Read the mapping from fedids that can create experiments to their name
364        in the 3-level access namespace.  All will be asserted from this
365        testbed and can include the local username and porject that will be
366        asserted on their behalf by this fedd.  Each fedid is also added to the
367        authorization system with the "create" attribute.
368        """
369        self.accessdb = {}
370        # These are the regexps for parsing the db
371        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
372        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
373                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
374        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
375                "\s*->\s*(" + name_expr + ")\s*$")
376        lineno = 0
377
378        # Parse the mappings and store in self.authdb, a dict of
379        # fedid -> (proj, user)
380        try:
381            f = open(accessdb_file, "r")
382            for line in f:
383                lineno += 1
384                line = line.strip()
385                if len(line) == 0 or line.startswith('#'):
386                    continue
387                m = project_line.match(line)
388                if m:
389                    fid = fedid(hexstr=m.group(1))
390                    project, user = m.group(2,3)
391                    if not self.accessdb.has_key(fid):
392                        self.accessdb[fid] = []
393                    self.accessdb[fid].append((project, user))
394                    continue
395
396                m = user_line.match(line)
397                if m:
398                    fid = fedid(hexstr=m.group(1))
399                    project = None
400                    user = m.group(2)
401                    if not self.accessdb.has_key(fid):
402                        self.accessdb[fid] = []
403                    self.accessdb[fid].append((project, user))
404                    continue
405                self.log.warn("[experiment_control] Error parsing access " +\
406                        "db %s at line %d" %  (accessdb_file, lineno))
407        except EnvironmentError:
408            raise service_error(service_error.internal,
409                    ("Error opening/reading %s as experiment " +\
410                            "control accessdb") %  accessdb_file)
411        f.close()
412
413        # Initialize the authorization attributes
414        # XXX: legacy
415        if self.auth_type == 'legacy':
416            for fid in self.accessdb.keys():
417                self.auth.set_attribute(fid, 'create')
418                self.auth.set_attribute(fid, 'new')
419
420    def read_mapdb(self, file):
421        """
422        Read a simple colon separated list of mappings for the
423        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
424        """
425
426        self.tbmap = { }
427        lineno =0
428        try:
429            f = open(file, "r")
430            for line in f:
431                lineno += 1
432                line = line.strip()
433                if line.startswith('#') or len(line) == 0:
434                    continue
435                try:
436                    label, url = line.split(':', 1)
437                    self.tbmap[label] = url
438                except ValueError, e:
439                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
440                            "map db: %s %s" % (lineno, line, e))
441        except EnvironmentError, e:
442            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
443                    "open %s: %s" % (file, e))
444        else:
445            f.close()
446
447    def read_store(self):
448        try:
449            self.synch_store = synch_store()
450            self.synch_store.load(self.store_filename)
451            self.log.debug("[read_store]: Read store from %s" % \
452                    self.store_filename)
453        except EnvironmentError, e:
454            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
455                    % (self.state_filename, e))
456            self.synch_store = synch_store()
457
458        # Set the initial permissions on data in the store.  XXX: This ad hoc
459        # authorization attribute initialization is getting out of hand.
460        # XXX: legacy
461        if self.auth_type == 'legacy':
462            for k in self.synch_store.all_keys():
463                try:
464                    if k.startswith('fedid:'):
465                        fid = fedid(hexstr=k[6:46])
466                        if self.state.has_key(fid):
467                            for a in self.get_alloc_ids(self.state[fid]):
468                                self.auth.set_attribute(a, k)
469                except ValueError, e:
470                    self.log.warn("Cannot deduce permissions for %s" % k)
471
472
473    def write_store(self):
474        """
475        Write a new copy of synch_store after writing current state
476        to a backup.  We use the internal synch_store pickle method to avoid
477        incinsistent data.
478
479        State format is a simple pickling of the store.
480        """
481        if os.access(self.store_filename, os.W_OK):
482            copy_file(self.store_filename, \
483                    "%s.bak" % self.store_filename)
484        try:
485            self.synch_store.save(self.store_filename)
486        except EnvironmentError, e:
487            self.log.error("Can't write file %s: %s" % \
488                    (self.store_filename, e))
489        except TypeError, e:
490            self.log.error("Pickling problem (TypeError): %s" % e)
491
492
493    def remove_dirs(self, dir):
494        """
495        Remove the directory tree and all files rooted at dir.  Log any errors,
496        but continue.
497        """
498        self.log.debug("[removedirs]: removing %s" % dir)
499        try:
500            for path, dirs, files in os.walk(dir, topdown=False):
501                for f in files:
502                    os.remove(os.path.join(path, f))
503                for d in dirs:
504                    os.rmdir(os.path.join(path, d))
505            os.rmdir(dir)
506        except EnvironmentError, e:
507            self.log.error("Error deleting directory tree in %s" % e);
508
509    @staticmethod
510    def make_temp_certfile(expcert, tmpdir):
511        """
512        make a protected copy of the access certificate so the experiment
513        controller can act as the experiment principal.  mkstemp is the most
514        secure way to do that. The directory should be created by
515        mkdtemp.  Return the filename.
516        """
517        if expcert and tmpdir:
518            try:
519                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
520                f = os.fdopen(certf, 'w')
521                print >> f, expcert
522                f.close()
523            except EnvironmentError, e:
524                raise service_error(service_error.internal, 
525                        "Cannot create temp cert file?")
526            return certfn
527        else:
528            return None
529
530       
531    def generate_ssh_keys(self, dest, type="rsa" ):
532        """
533        Generate a set of keys for the gateways to use to talk.
534
535        Keys are of type type and are stored in the required dest file.
536        """
537        valid_types = ("rsa", "dsa")
538        t = type.lower();
539        if t not in valid_types: raise ValueError
540        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
541
542        try:
543            trace = open("/dev/null", "w")
544        except EnvironmentError:
545            raise service_error(service_error.internal,
546                    "Cannot open /dev/null??");
547
548        # May raise CalledProcessError
549        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
550        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
551        if rv != 0:
552            raise service_error(service_error.internal, 
553                    "Cannot generate nonce ssh keys.  %s return code %d" \
554                            % (self.ssh_keygen, rv))
555
556    def generate_seer_certs(self, destdir):
557        '''
558        Create a SEER ca cert and a node cert in destdir/ca.pem and
559        destdir/node.pem respectively.  These will be distributed throughout
560        the federated experiment.  This routine reports errors via
561        service_errors.
562        '''
563        openssl = '/usr/bin/openssl'
564        # All the filenames and parameters we need for openssl calls below
565        ca_serial =os.path.join(destdir, 'ca.serial') 
566        ca_key =os.path.join(destdir, 'ca.key') 
567        ca_pem = os.path.join(destdir, 'ca.pem')
568        node_key =os.path.join(destdir, 'node.key') 
569        node_pem = os.path.join(destdir, 'node.pem')
570        node_req = os.path.join(destdir, 'node.req')
571        node_signed = os.path.join(destdir, 'node.signed')
572        days = '%s' % (3600 * 24 * 365 * 10)
573
574        try:
575            # init a serial number for the openssl calls
576            f = open(ca_serial, 'w')
577            print >>f, '%s' % random.randint(0, 0xffffffff)
578            f.close()
579
580            # Sequence of calls to create a CA key, create a ca cert, create a
581            # node key, node signing request, and finally a signed node
582            # certificate.
583            sequence = (
584                    (openssl, 'genrsa', '-out', ca_key, '1024'),
585                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
586                        ca_pem, '-days', days, '-subj', 
587                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
588                    (openssl, 'genrsa', '-out', node_key, '1024'),
589                    (openssl, 'req', '-new', '-key', node_key, '-out', 
590                        node_req, '-days', days, '-subj', 
591                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
592                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
593                        '-CAserial', ca_serial, '-req', '-in', node_req, 
594                        '-out', node_signed, '-days', days),
595                )
596            # Do all that stuff; bail if there's an error, and push all the
597            # output to dev/null.
598            for cmd in sequence:
599                trace = open("/dev/null", "w")
600                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
601                if rv != 0:
602                    raise service_error(service_error.internal, 
603                            "Cannot generate SEER certs.  %s return code %d" \
604                                    % (' '.join(cmd), rv))
605            # Concatinate the node key and signed certificate into node.pem
606            f = open(node_pem, 'w')
607            for comp in (node_signed, node_key):
608                g = open(comp, 'r')
609                f.write(g.read))
610                g.close()
611            f.close()
612
613            # Throw out intermediaries.
614            for fn in (ca_serial, ca_key, node_key, node_req, node_signed):
615                os.unlink(fn)
616
617        except EnvironmentError, e:
618            # Any difficulties with the file system wind up here
619            raise service_error(service_error.internal,
620                    "File error on  %s while creating SEER certs: %s" % \
621                            (e.filename, e.strerror))
622
623
624
625    def gentopo(self, str):
626        """
627        Generate the topology data structure from the splitter's XML
628        representation of it.
629
630        The topology XML looks like:
631            <experiment>
632                <nodes>
633                    <node><vname></vname><ips>ip1:ip2</ips></node>
634                </nodes>
635                <lans>
636                    <lan>
637                        <vname></vname><vnode></vnode><ip></ip>
638                        <bandwidth></bandwidth><member>node:port</member>
639                    </lan>
640                </lans>
641        """
642        class topo_parse:
643            """
644            Parse the topology XML and create the dats structure.
645            """
646            def __init__(self):
647                # Typing of the subelements for data conversion
648                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
649                self.int_subelements = ( 'bandwidth',)
650                self.float_subelements = ( 'delay',)
651                # The final data structure
652                self.nodes = [ ]
653                self.lans =  [ ]
654                self.topo = { \
655                        'node': self.nodes,\
656                        'lan' : self.lans,\
657                    }
658                self.element = { }  # Current element being created
659                self.chars = ""     # Last text seen
660
661            def end_element(self, name):
662                # After each sub element the contents is added to the current
663                # element or to the appropriate list.
664                if name == 'node':
665                    self.nodes.append(self.element)
666                    self.element = { }
667                elif name == 'lan':
668                    self.lans.append(self.element)
669                    self.element = { }
670                elif name in self.str_subelements:
671                    self.element[name] = self.chars
672                    self.chars = ""
673                elif name in self.int_subelements:
674                    self.element[name] = int(self.chars)
675                    self.chars = ""
676                elif name in self.float_subelements:
677                    self.element[name] = float(self.chars)
678                    self.chars = ""
679
680            def found_chars(self, data):
681                self.chars += data.rstrip()
682
683
684        tp = topo_parse();
685        parser = xml.parsers.expat.ParserCreate()
686        parser.EndElementHandler = tp.end_element
687        parser.CharacterDataHandler = tp.found_chars
688
689        parser.Parse(str)
690
691        return tp.topo
692       
693
694    def genviz(self, topo):
695        """
696        Generate the visualization the virtual topology
697        """
698
699        neato = "/usr/local/bin/neato"
700        # These are used to parse neato output and to create the visualization
701        # file.
702        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
703        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
704                "%s</type></node>"
705
706        try:
707            # Node names
708            nodes = [ n['vname'] for n in topo['node'] ]
709            topo_lans = topo['lan']
710        except KeyError, e:
711            raise service_error(service_error.internal, "Bad topology: %s" %e)
712
713        lans = { }
714        links = { }
715
716        # Walk through the virtual topology, organizing the connections into
717        # 2-node connections (links) and more-than-2-node connections (lans).
718        # When a lan is created, it's added to the list of nodes (there's a
719        # node in the visualization for the lan).
720        for l in topo_lans:
721            if links.has_key(l['vname']):
722                if len(links[l['vname']]) < 2:
723                    links[l['vname']].append(l['vnode'])
724                else:
725                    nodes.append(l['vname'])
726                    lans[l['vname']] = links[l['vname']]
727                    del links[l['vname']]
728                    lans[l['vname']].append(l['vnode'])
729            elif lans.has_key(l['vname']):
730                lans[l['vname']].append(l['vnode'])
731            else:
732                links[l['vname']] = [ l['vnode'] ]
733
734
735        # Open up a temporary file for dot to turn into a visualization
736        try:
737            df, dotname = tempfile.mkstemp()
738            dotfile = os.fdopen(df, 'w')
739        except EnvironmentError:
740            raise service_error(service_error.internal,
741                    "Failed to open file in genviz")
742
743        try:
744            dnull = open('/dev/null', 'w')
745        except EnvironmentError:
746            service_error(service_error.internal,
747                    "Failed to open /dev/null in genviz")
748
749        # Generate a dot/neato input file from the links, nodes and lans
750        try:
751            print >>dotfile, "graph G {"
752            for n in nodes:
753                print >>dotfile, '\t"%s"' % n
754            for l in links.keys():
755                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
756            for l in lans.keys():
757                for n in lans[l]:
758                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
759            print >>dotfile, "}"
760            dotfile.close()
761        except TypeError:
762            raise service_error(service_error.internal,
763                    "Single endpoint link in vtopo")
764        except EnvironmentError:
765            raise service_error(service_error.internal, "Cannot write dot file")
766
767        # Use dot to create a visualization
768        try:
769            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
770                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
771                stderr=dnull, close_fds=True)
772        except EnvironmentError:
773            raise service_error(service_error.internal, 
774                    "Cannot generate visualization: is graphviz available?")
775        dnull.close()
776
777        # Translate dot to vis format
778        vis_nodes = [ ]
779        vis = { 'node': vis_nodes }
780        for line in dot.stdout:
781            m = vis_re.match(line)
782            if m:
783                vn = m.group(1)
784                vis_node = {'name': vn, \
785                        'x': float(m.group(2)),\
786                        'y' : float(m.group(3)),\
787                    }
788                if vn in links.keys() or vn in lans.keys():
789                    vis_node['type'] = 'lan'
790                else:
791                    vis_node['type'] = 'node'
792                vis_nodes.append(vis_node)
793        rv = dot.wait()
794
795        os.remove(dotname)
796        if rv == 0 : return vis
797        else: return None
798
799
800    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
801            cert_pwd=None):
802        """
803        Release access to testbed through fedd
804        """
805
806        if not uri and tbmap:
807            uri = tbmap.get(tb, None)
808        if not uri:
809            raise service_error(service_error.server_config, 
810                    "Unknown testbed: %s" % tb)
811
812        if self.local_access.has_key(uri):
813            resp = self.local_access[uri].ReleaseAccess(\
814                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
815                    fedid(file=cert_file))
816            resp = { 'ReleaseAccessResponseBody': resp } 
817        else:
818            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
819                    cert_file, cert_pwd, self.trusted_certs)
820
821        # better error coding
822
823    def remote_ns2topdl(self, uri, desc):
824
825        req = {
826                'description' : { 'ns2description': desc },
827            }
828
829        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
830                self.trusted_certs)
831
832        if r.has_key('Ns2TopdlResponseBody'):
833            r = r['Ns2TopdlResponseBody']
834            ed = r.get('experimentdescription', None)
835            if ed.has_key('topdldescription'):
836                return topdl.Topology(**ed['topdldescription'])
837            else:
838                raise service_error(service_error.protocol, 
839                        "Bad splitter response (no output)")
840        else:
841            raise service_error(service_error.protocol, "Bad splitter response")
842
843    class start_segment:
844        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
845                cert_pwd=None, trusted_certs=None, caller=None,
846                log_collector=None):
847            self.log = log
848            self.debug = debug
849            self.cert_file = cert_file
850            self.cert_pwd = cert_pwd
851            self.trusted_certs = None
852            self.caller = caller
853            self.testbed = testbed
854            self.log_collector = log_collector
855            self.response = None
856            self.node = { }
857            self.proof = None
858
859        def make_map(self, resp):
860            for e in resp.get('embedding', []):
861                if 'toponame' in e and 'physname' in e:
862                    self.node[e['toponame']] = e['physname'][0]
863
864        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
865            req = {
866                    'allocID': { 'fedid' : aid }, 
867                    'segmentdescription': { 
868                        'topdldescription': topo.to_dict(),
869                    },
870                }
871
872            if connInfo:
873                req['connection'] = connInfo
874
875            import_svcs = [ s for m in masters.values() \
876                    for s in m if self.testbed in s.importers]
877
878            if import_svcs or self.testbed in masters:
879                req['service'] = []
880
881            for s in import_svcs:
882                for r in s.reqs:
883                    sr = copy.deepcopy(r)
884                    sr['visibility'] = 'import';
885                    req['service'].append(sr)
886
887            for s in masters.get(self.testbed, []):
888                for r in s.reqs:
889                    sr = copy.deepcopy(r)
890                    sr['visibility'] = 'export';
891                    req['service'].append(sr)
892
893            if attrs:
894                req['fedAttr'] = attrs
895
896            try:
897                self.log.debug("Calling StartSegment at %s " % uri)
898                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
899                        self.trusted_certs)
900                if r.has_key('StartSegmentResponseBody'):
901                    lval = r['StartSegmentResponseBody'].get('allocationLog',
902                            None)
903                    if lval and self.log_collector:
904                        for line in  lval.splitlines(True):
905                            self.log_collector.write(line)
906                    self.make_map(r['StartSegmentResponseBody'])
907                    if 'proof' in r: self.proof = r['proof']
908                    self.response = r
909                else:
910                    raise service_error(service_error.internal, 
911                            "Bad response!?: %s" %r)
912                return True
913            except service_error, e:
914                self.log.error("Start segment failed on %s: %s" % \
915                        (self.testbed, e))
916                return False
917
918
919
920    class terminate_segment:
921        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
922                cert_pwd=None, trusted_certs=None, caller=None):
923            self.log = log
924            self.debug = debug
925            self.cert_file = cert_file
926            self.cert_pwd = cert_pwd
927            self.trusted_certs = None
928            self.caller = caller
929            self.testbed = testbed
930
931        def __call__(self, uri, aid ):
932            req = {
933                    'allocID': aid , 
934                }
935            try:
936                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
937                        self.trusted_certs)
938                return True
939            except service_error, e:
940                self.log.error("Terminate segment failed on %s: %s" % \
941                        (self.testbed, e))
942                return False
943
944    def allocate_resources(self, allocated, masters, eid, expid, 
945            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
946            attrs=None, connInfo={}, tbmap=None, expcert=None):
947
948        started = { }           # Testbeds where a sub-experiment started
949                                # successfully
950
951        # XXX
952        fail_soft = False
953
954        if tbmap is None: tbmap = { }
955
956        log = alloc_log or self.log
957
958        tp = thread_pool(self.nthreads)
959        threads = [ ]
960        starters = [ ]
961
962        if expcert:
963            cert = expcert
964            pw = None
965        else:
966            cert = self.cert_file
967            pw = self.cert_pwd
968
969        for tb in allocated.keys():
970            # Create and start a thread to start the segment, and save it
971            # to get the return value later
972            tb_attrs = copy.copy(attrs)
973            tp.wait_for_slot()
974            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
975            base, suffix = split_testbed(tb)
976            if suffix:
977                tb_attrs.append({'attribute': 'experiment_name', 
978                    'value': "%s-%s" % (eid, suffix)})
979            else:
980                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
981            if not uri:
982                raise service_error(service_error.internal, 
983                        "Unknown testbed %s !?" % tb)
984
985            if tbparams[tb].has_key('allocID') and \
986                    tbparams[tb]['allocID'].has_key('fedid'):
987                aid = tbparams[tb]['allocID']['fedid']
988            else:
989                raise service_error(service_error.internal, 
990                        "No alloc id for testbed %s !?" % tb)
991
992            s = self.start_segment(log=log, debug=self.debug,
993                    testbed=tb, cert_file=cert,
994                    cert_pwd=pw, trusted_certs=self.trusted_certs,
995                    caller=self.call_StartSegment,
996                    log_collector=log_collector)
997            starters.append(s)
998            t  = pooled_thread(\
999                    target=s, name=tb,
1000                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1001                    pdata=tp, trace_file=self.trace_file)
1002            threads.append(t)
1003            t.start()
1004
1005        # Wait until all finish (keep pinging the log, though)
1006        mins = 0
1007        revoked = False
1008        while not tp.wait_for_all_done(60.0):
1009            mins += 1
1010            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1011                    % mins)
1012            if not revoked and \
1013                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1014                # a testbed has failed.  Revoke this experiment's
1015                # synchronizarion values so that sub experiments will not
1016                # deadlock waiting for synchronization that will never happen
1017                self.log.info("A subexperiment has failed to swap in, " + \
1018                        "revoking synch keys")
1019                var_key = "fedid:%s" % expid
1020                for k in self.synch_store.all_keys():
1021                    if len(k) > 45 and k[0:46] == var_key:
1022                        self.synch_store.revoke_key(k)
1023                revoked = True
1024
1025        failed = [ t.getName() for t in threads if not t.rv ]
1026        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1027
1028        # If one failed clean up, unless fail_soft is set
1029        if failed:
1030            if not fail_soft:
1031                tp.clear()
1032                for tb in succeeded:
1033                    # Create and start a thread to stop the segment
1034                    tp.wait_for_slot()
1035                    uri = tbparams[tb]['uri']
1036                    t  = pooled_thread(\
1037                            target=self.terminate_segment(log=log,
1038                                testbed=tb,
1039                                cert_file=cert, 
1040                                cert_pwd=pw,
1041                                trusted_certs=self.trusted_certs,
1042                                caller=self.call_TerminateSegment),
1043                            args=(uri, tbparams[tb]['federant']['allocID']),
1044                            name=tb,
1045                            pdata=tp, trace_file=self.trace_file)
1046                    t.start()
1047                # Wait until all finish (if any are being stopped)
1048                if succeeded:
1049                    tp.wait_for_all_done()
1050
1051                # release the allocations
1052                for tb in tbparams.keys():
1053                    try:
1054                        self.release_access(tb, tbparams[tb]['allocID'], 
1055                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
1056                                cert_file=cert, cert_pwd=pw)
1057                    except service_error, e:
1058                        self.log.warn("Error releasing access: %s" % e.desc)
1059                # Remove the placeholder
1060                self.state_lock.acquire()
1061                self.state[eid]['experimentStatus'] = 'failed'
1062                if self.state_filename: self.write_state()
1063                self.state_lock.release()
1064                # Remove the repo dir
1065                self.remove_dirs("%s/%s" %(self.repodir, expid))
1066                # Walk up tmpdir, deleting as we go
1067                if self.cleanup:
1068                    self.remove_dirs(tmpdir)
1069                else:
1070                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1071
1072
1073                log.error("Swap in failed on %s" % ",".join(failed))
1074                return
1075        else:
1076            # Walk through the successes and gather the virtual to physical
1077            # mapping.
1078            embedding = [ ]
1079            proofs = { }
1080            for s in starters:
1081                for k, v in s.node.items():
1082                    embedding.append({
1083                        'toponame': k, 
1084                        'physname': [ v],
1085                        'testbed': s.testbed
1086                        })
1087                if s.proof:
1088                    proofs[s.testbed] = s.proof
1089            log.info("[start_segment]: Experiment %s active" % eid)
1090
1091
1092        # Walk up tmpdir, deleting as we go
1093        if self.cleanup:
1094            self.remove_dirs(tmpdir)
1095        else:
1096            log.debug("[start_experiment]: not removing %s" % tmpdir)
1097
1098        # Insert the experiment into our state and update the disk copy.
1099        self.state_lock.acquire()
1100        self.state[expid]['experimentStatus'] = 'active'
1101        self.state[eid] = self.state[expid]
1102        self.state[eid]['experimentdescription']['topdldescription'] = \
1103                top.to_dict()
1104        self.state[eid]['embedding'] = embedding
1105        # Append startup proofs
1106        for f in self.state[eid]['federant']:
1107            if 'name' in f and 'localname' in f['name']:
1108                if f['name']['localname'] in proofs:
1109                    f['proof'].append(proofs[f['name']['localname']])
1110
1111        if self.state_filename: self.write_state()
1112        self.state_lock.release()
1113        return
1114
1115
1116    def add_kit(self, e, kit):
1117        """
1118        Add a Software object created from the list of (install, location)
1119        tuples passed as kit  to the software attribute of an object e.  We
1120        do this enough to break out the code, but it's kind of a hack to
1121        avoid changing the old tuple rep.
1122        """
1123
1124        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1125
1126        if isinstance(e.software, list): e.software.extend(s)
1127        else: e.software = s
1128
1129    def append_experiment_authorization(self, expid, attrs, 
1130            need_state_lock=True):
1131        """
1132        Append the authorization information to system state
1133        """
1134
1135        for p, a in attrs:
1136            self.auth.set_attribute(p, a)
1137        self.auth.save()
1138
1139        if need_state_lock: self.state_lock.acquire()
1140        self.state[expid]['auth'].update(attrs)
1141        if self.state_filename: self.write_state()
1142        if need_state_lock: self.state_lock.release()
1143
1144    def clear_experiment_authorization(self, expid, need_state_lock=True):
1145        """
1146        Attrs is a set of attribute principal pairs that need to be removed
1147        from the authenticator.  Remove them and save the authenticator.
1148        """
1149
1150        if need_state_lock: self.state_lock.acquire()
1151        if expid in self.state and 'auth' in self.state[expid]:
1152            for p, a in self.state[expid]['auth']:
1153                self.auth.unset_attribute(p, a)
1154            self.state[expid]['auth'] = set()
1155        if self.state_filename: self.write_state()
1156        if need_state_lock: self.state_lock.release()
1157        self.auth.save()
1158
1159
1160    def create_experiment_state(self, fid, req, expid, expcert,
1161            state='starting'):
1162        """
1163        Create the initial entry in the experiment's state.  The expid and
1164        expcert are the experiment's fedid and certifacte that represents that
1165        ID, which are installed in the experiment state.  If the request
1166        includes a suggested local name that is used if possible.  If the local
1167        name is already taken by an experiment owned by this user that has
1168        failed, it is overwritten.  Otherwise new letters are added until a
1169        valid localname is found.  The generated local name is returned.
1170        """
1171
1172        if req.has_key('experimentID') and \
1173                req['experimentID'].has_key('localname'):
1174            overwrite = False
1175            eid = req['experimentID']['localname']
1176            # If there's an old failed experiment here with the same local name
1177            # and accessible by this user, we'll overwrite it, otherwise we'll
1178            # fall through and do the collision avoidance.
1179            old_expid = self.get_experiment_fedid(eid)
1180            if old_expid:
1181                users_experiment = True
1182                try:
1183                    self.check_experiment_access(fid, old_expid)
1184                except service_error, e:
1185                    if e.code == service_error.access: users_experiment = False
1186                    else: raise e
1187                if users_experiment:
1188                    self.state_lock.acquire()
1189                    status = self.state[eid].get('experimentStatus', None)
1190                    if status and status == 'failed':
1191                        # remove the old access attributes
1192                        self.clear_experiment_authorization(eid,
1193                                need_state_lock=False)
1194                        overwrite = True
1195                        del self.state[eid]
1196                        del self.state[old_expid]
1197                    self.state_lock.release()
1198                else:
1199                    self.log.info('Experiment %s exists, ' % eid + \
1200                            'but this user cannot access it')
1201            self.state_lock.acquire()
1202            while (self.state.has_key(eid) and not overwrite):
1203                eid += random.choice(string.ascii_letters)
1204            # Initial state
1205            self.state[eid] = {
1206                    'experimentID' : \
1207                            [ { 'localname' : eid }, {'fedid': expid } ],
1208                    'experimentStatus': state,
1209                    'experimentAccess': { 'X509' : expcert },
1210                    'owner': fid,
1211                    'log' : [],
1212                    'auth': set(),
1213                }
1214            self.state[expid] = self.state[eid]
1215            if self.state_filename: self.write_state()
1216            self.state_lock.release()
1217        else:
1218            eid = self.exp_stem
1219            for i in range(0,5):
1220                eid += random.choice(string.ascii_letters)
1221            self.state_lock.acquire()
1222            while (self.state.has_key(eid)):
1223                eid = self.exp_stem
1224                for i in range(0,5):
1225                    eid += random.choice(string.ascii_letters)
1226            # Initial state
1227            self.state[eid] = {
1228                    'experimentID' : \
1229                            [ { 'localname' : eid }, {'fedid': expid } ],
1230                    'experimentStatus': state,
1231                    'experimentAccess': { 'X509' : expcert },
1232                    'owner': fid,
1233                    'log' : [],
1234                    'auth': set(),
1235                }
1236            self.state[expid] = self.state[eid]
1237            if self.state_filename: self.write_state()
1238            self.state_lock.release()
1239
1240        # Let users touch the state.  Authorize this fid and the expid itself
1241        # to touch the experiment, as well as allowing th eoverrides.
1242        self.append_experiment_authorization(eid, 
1243                set([(fid, expid), (expid,expid)] + \
1244                        [ (o, expid) for o in self.overrides]))
1245
1246        return eid
1247
1248
1249    def allocate_ips_to_topo(self, top):
1250        """
1251        Add an ip4_address attribute to all the hosts in the topology, based on
1252        the shared substrates on which they sit.  An /etc/hosts file is also
1253        created and returned as a list of hostfiles entries.  We also return
1254        the allocator, because we may need to allocate IPs to portals
1255        (specifically DRAGON portals).
1256        """
1257        subs = sorted(top.substrates, 
1258                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1259                reverse=True)
1260        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1261        ifs = { }
1262        hosts = [ ]
1263
1264        for idx, s in enumerate(subs):
1265            net_size = len(s.interfaces)+2
1266
1267            a = ips.allocate(net_size)
1268            if a :
1269                base, num = a
1270                if num < net_size: 
1271                    raise service_error(service_error.internal,
1272                            "Allocator returned wrong number of IPs??")
1273            else:
1274                raise service_error(service_error.req, 
1275                        "Cannot allocate IP addresses")
1276            mask = ips.min_alloc
1277            while mask < net_size:
1278                mask *= 2
1279
1280            netmask = ((2**32-1) ^ (mask-1))
1281
1282            base += 1
1283            for i in s.interfaces:
1284                i.attribute.append(
1285                        topdl.Attribute('ip4_address', 
1286                            "%s" % ip_addr(base)))
1287                i.attribute.append(
1288                        topdl.Attribute('ip4_netmask', 
1289                            "%s" % ip_addr(int(netmask))))
1290
1291                hname = i.element.name
1292                if ifs.has_key(hname):
1293                    hosts.append("%s\t%s-%s %s-%d" % \
1294                            (ip_addr(base), hname, s.name, hname,
1295                                ifs[hname]))
1296                else:
1297                    ifs[hname] = 0
1298                    hosts.append("%s\t%s-%s %s-%d %s" % \
1299                            (ip_addr(base), hname, s.name, hname,
1300                                ifs[hname], hname))
1301
1302                ifs[hname] += 1
1303                base += 1
1304        return hosts, ips
1305
1306    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1307            tbparam, masters, tbmap, expid=None, expcert=None):
1308        for tb in testbeds:
1309            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1310                    expcert)
1311            allocated[tb] = 1
1312
1313    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1314            expcert=None):
1315        """
1316        Get access to testbed through fedd and set the parameters for that tb
1317        """
1318        def get_export_project(svcs):
1319            """
1320            Look through for the list of federated_service for this testbed
1321            objects for a project_export service, and extract the project
1322            parameter.
1323            """
1324
1325            pe = [s for s in svcs if s.name=='project_export']
1326            if len(pe) == 1:
1327                return pe[0].params.get('project', None)
1328            elif len(pe) == 0:
1329                return None
1330            else:
1331                raise service_error(service_error.req,
1332                        "More than one project export is not supported")
1333
1334        def add_services(svcs, type, slist, keys):
1335            """
1336            Add the given services to slist.  type is import or export.  Also
1337            add a mapping entry from the assigned id to the original service
1338            record.
1339            """
1340            for i, s in enumerate(svcs):
1341                idx = '%s%d' % (type, i)
1342                keys[idx] = s
1343                sr = {'id': idx, 'name': s.name, 'visibility': type }
1344                if s.params:
1345                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1346                            for k, v in s.params.items()]
1347                slist.append(sr)
1348
1349        uri = tbmap.get(testbed_base(tb), None)
1350        if not uri:
1351            raise service_error(service_error.server_config, 
1352                    "Unknown testbed: %s" % tb)
1353
1354        export_svcs = masters.get(tb,[])
1355        import_svcs = [ s for m in masters.values() \
1356                for s in m \
1357                    if tb in s.importers ]
1358
1359        export_project = get_export_project(export_svcs)
1360        # Compose the credential list so that IDs come before attributes
1361        creds = set()
1362        keys = set()
1363        certs = self.auth.get_creds_for_principal(fid)
1364        if expid:
1365            certs.update(self.auth.get_creds_for_principal(expid))
1366        for c in certs:
1367            keys.add(c.issuer_cert())
1368            creds.add(c.attribute_cert())
1369        creds = list(keys) + list(creds)
1370
1371        if expcert: cert, pw = expcert, None
1372        else: cert, pw = self.cert_file, self.cert_pw
1373
1374        # Request credentials
1375        req = {
1376                'abac_credential': creds,
1377            }
1378        # Make the service request from the services we're importing and
1379        # exporting.  Keep track of the export request ids so we can
1380        # collect the resulting info from the access response.
1381        e_keys = { }
1382        if import_svcs or export_svcs:
1383            slist = []
1384            add_services(import_svcs, 'import', slist, e_keys)
1385            add_services(export_svcs, 'export', slist, e_keys)
1386            req['service'] = slist
1387
1388        if self.local_access.has_key(uri):
1389            # Local access call
1390            req = { 'RequestAccessRequestBody' : req }
1391            r = self.local_access[uri].RequestAccess(req, 
1392                    fedid(file=self.cert_file))
1393            r = { 'RequestAccessResponseBody' : r }
1394        else:
1395            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1396
1397        if r.has_key('RequestAccessResponseBody'):
1398            # Through to here we have a valid response, not a fault.
1399            # Access denied is a fault, so something better or worse than
1400            # access denied has happened.
1401            r = r['RequestAccessResponseBody']
1402            self.log.debug("[get_access] Access granted")
1403        else:
1404            raise service_error(service_error.protocol,
1405                        "Bad proxy response")
1406        if 'proof' not in r:
1407            raise service_error(service_error.protocol,
1408                        "Bad access response (no access proof)")
1409       
1410        tbparam[tb] = { 
1411                "allocID" : r['allocID'],
1412                "uri": uri,
1413                "proof": [ r['proof'] ],
1414                }
1415
1416        # Collect the responses corresponding to the services this testbed
1417        # exports.  These will be the service requests that we will include in
1418        # the start segment requests (with appropriate visibility values) to
1419        # import and export the segments.
1420        for s in r.get('service', []):
1421            id = s.get('id', None)
1422            if id and id in e_keys:
1423                e_keys[id].reqs.append(s)
1424
1425        # Add attributes to parameter space.  We don't allow attributes to
1426        # overlay any parameters already installed.
1427        for a in r.get('fedAttr', []):
1428            try:
1429                if a['attribute'] and \
1430                        isinstance(a['attribute'], basestring)\
1431                        and not tbparam[tb].has_key(a['attribute'].lower()):
1432                    tbparam[tb][a['attribute'].lower()] = a['value']
1433            except KeyError:
1434                self.log.error("Bad attribute in response: %s" % a)
1435
1436
1437    def split_topology(self, top, topo, testbeds):
1438        """
1439        Create the sub-topologies that are needed for experiment instantiation.
1440        """
1441        for tb in testbeds:
1442            topo[tb] = top.clone()
1443            # copy in for loop allows deletions from the original
1444            for e in [ e for e in topo[tb].elements]:
1445                etb = e.get_attribute('testbed')
1446                # NB: elements without a testbed attribute won't appear in any
1447                # sub topologies. 
1448                if not etb or etb != tb:
1449                    for i in e.interface:
1450                        for s in i.subs:
1451                            try:
1452                                s.interfaces.remove(i)
1453                            except ValueError:
1454                                raise service_error(service_error.internal,
1455                                        "Can't remove interface??")
1456                    topo[tb].elements.remove(e)
1457            topo[tb].make_indices()
1458
1459    def confirm_software(self, top):
1460        """
1461        Make sure that the software to be loaded in the topo is all available
1462        before we begin making access requests, etc.  This is a subset of
1463        wrangle_software.
1464        """
1465        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1466        pkgs.update([x.location for e in top.elements for x in e.software])
1467
1468        for pkg in pkgs:
1469            loc = pkg
1470
1471            scheme, host, path = urlparse(loc)[0:3]
1472            dest = os.path.basename(path)
1473            if not scheme:
1474                if not loc.startswith('/'):
1475                    loc = "/%s" % loc
1476                loc = "file://%s" %loc
1477            # NB: if scheme was found, loc == pkg
1478            try:
1479                u = urlopen(loc)
1480                u.close()
1481            except Exception, e:
1482                raise service_error(service_error.req, 
1483                        "Cannot open %s: %s" % (loc, e))
1484        return True
1485
1486    def wrangle_software(self, expid, top, topo, tbparams):
1487        """
1488        Copy software out to the repository directory, allocate permissions and
1489        rewrite the segment topologies to look for the software in local
1490        places.
1491        """
1492
1493        # Copy the rpms and tarfiles to a distribution directory from
1494        # which the federants can retrieve them
1495        linkpath = "%s/software" %  expid
1496        softdir ="%s/%s" % ( self.repodir, linkpath)
1497        softmap = { }
1498
1499        # self.fedkit and self.gateway kit are lists of tuples of
1500        # (install_location, download_location) this extracts the download
1501        # locations.
1502        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1503        pkgs.update([x.location for e in top.elements for x in e.software])
1504        try:
1505            os.makedirs(softdir)
1506        except EnvironmentError, e:
1507            raise service_error(
1508                    "Cannot create software directory: %s" % e)
1509        # The actual copying.  Everything's converted into a url for copying.
1510        auth_attrs = set()
1511        for pkg in pkgs:
1512            loc = pkg
1513
1514            scheme, host, path = urlparse(loc)[0:3]
1515            dest = os.path.basename(path)
1516            if not scheme:
1517                if not loc.startswith('/'):
1518                    loc = "/%s" % loc
1519                loc = "file://%s" %loc
1520            # NB: if scheme was found, loc == pkg
1521            try:
1522                u = urlopen(loc)
1523            except Exception, e:
1524                raise service_error(service_error.req, 
1525                        "Cannot open %s: %s" % (loc, e))
1526            try:
1527                f = open("%s/%s" % (softdir, dest) , "w")
1528                self.log.debug("Writing %s/%s" % (softdir,dest) )
1529                data = u.read(4096)
1530                while data:
1531                    f.write(data)
1532                    data = u.read(4096)
1533                f.close()
1534                u.close()
1535            except Exception, e:
1536                raise service_error(service_error.internal,
1537                        "Could not copy %s: %s" % (loc, e))
1538            path = re.sub("/tmp", "", linkpath)
1539            # XXX
1540            softmap[pkg] = \
1541                    "%s/%s/%s" %\
1542                    ( self.repo_url, path, dest)
1543
1544            # Allow the individual segments to access the software by assigning
1545            # an attribute to each testbed allocation that encodes the data to
1546            # be released.  This expression collects the data for each run of
1547            # the loop.
1548            auth_attrs.update([
1549                (tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) \
1550                        for tb in tbparams.keys()])
1551
1552        self.append_experiment_authorization(expid, auth_attrs)
1553
1554        # Convert the software locations in the segments into the local
1555        # copies on this host
1556        for soft in [ s for tb in topo.values() \
1557                for e in tb.elements \
1558                    if getattr(e, 'software', False) \
1559                        for s in e.software ]:
1560            if softmap.has_key(soft.location):
1561                soft.location = softmap[soft.location]
1562
1563
1564    def new_experiment(self, req, fid):
1565        """
1566        The external interface to empty initial experiment creation called from
1567        the dispatcher.
1568
1569        Creates a working directory, splits the incoming description using the
1570        splitter script and parses out the avrious subsections using the
1571        lcasses above.  Once each sub-experiment is created, use pooled threads
1572        to instantiate them and start it all up.
1573        """
1574        req = req.get('NewRequestBody', None)
1575        if not req:
1576            raise service_error(service_error.req,
1577                    "Bad request format (no NewRequestBody)")
1578
1579        if self.auth.import_credentials(data_list=req.get('credential', [])):
1580            self.auth.save()
1581
1582        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1583                with_proof=True)
1584
1585        if not access_ok:
1586            raise service_error(service_error.access, "New access denied",
1587                    proof=[proof])
1588
1589        try:
1590            tmpdir = tempfile.mkdtemp(prefix="split-")
1591        except EnvironmentError:
1592            raise service_error(service_error.internal, "Cannot create tmp dir")
1593
1594        try:
1595            access_user = self.accessdb[fid]
1596        except KeyError:
1597            raise service_error(service_error.internal,
1598                    "Access map and authorizer out of sync in " + \
1599                            "new_experiment for fedid %s"  % fid)
1600
1601        # Generate an ID for the experiment (slice) and a certificate that the
1602        # allocator can use to prove they own it.  We'll ship it back through
1603        # the encrypted connection.  If the requester supplied one, use it.
1604        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1605            expcert = req['experimentAccess']['X509']
1606            expid = fedid(certstr=expcert)
1607            self.state_lock.acquire()
1608            if expid in self.state:
1609                self.state_lock.release()
1610                raise service_error(service_error.req, 
1611                        'fedid %s identifies an existing experiment' % expid)
1612            self.state_lock.release()
1613        else:
1614            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1615
1616        #now we're done with the tmpdir, and it should be empty
1617        if self.cleanup:
1618            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1619            os.rmdir(tmpdir)
1620        else:
1621            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1622
1623        eid = self.create_experiment_state(fid, req, expid, expcert, 
1624                state='empty')
1625
1626        rv = {
1627                'experimentID': [
1628                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1629                ],
1630                'experimentStatus': 'empty',
1631                'experimentAccess': { 'X509' : expcert },
1632                'proof': proof.to_dict(),
1633            }
1634
1635        return rv
1636
1637    # create_experiment sub-functions
1638
1639    @staticmethod
1640    def get_experiment_key(req, field='experimentID'):
1641        """
1642        Parse the experiment identifiers out of the request (the request body
1643        tag has been removed).  Specifically this pulls either the fedid or the
1644        localname out of the experimentID field.  A fedid is preferred.  If
1645        neither is present or the request does not contain the fields,
1646        service_errors are raised.
1647        """
1648        # Get the experiment access
1649        exp = req.get(field, None)
1650        if exp:
1651            if exp.has_key('fedid'):
1652                key = exp['fedid']
1653            elif exp.has_key('localname'):
1654                key = exp['localname']
1655            else:
1656                raise service_error(service_error.req, "Unknown lookup type")
1657        else:
1658            raise service_error(service_error.req, "No request?")
1659
1660        return key
1661
1662    def get_experiment_ids_and_start(self, key, tmpdir):
1663        """
1664        Get the experiment name, id and access certificate from the state, and
1665        set the experiment state to 'starting'.  returns a triple (fedid,
1666        localname, access_cert_file). The access_cert_file is a copy of the
1667        contents of the access certificate, created in the tempdir with
1668        restricted permissions.  If things are confused, raise an exception.
1669        """
1670
1671        expid = eid = None
1672        self.state_lock.acquire()
1673        if self.state.has_key(key):
1674            self.state[key]['experimentStatus'] = "starting"
1675            for e in self.state[key].get('experimentID',[]):
1676                if not expid and e.has_key('fedid'):
1677                    expid = e['fedid']
1678                elif not eid and e.has_key('localname'):
1679                    eid = e['localname']
1680            if 'experimentAccess' in self.state[key] and \
1681                    'X509' in self.state[key]['experimentAccess']:
1682                expcert = self.state[key]['experimentAccess']['X509']
1683            else:
1684                expcert = None
1685        self.state_lock.release()
1686
1687        # make a protected copy of the access certificate so the experiment
1688        # controller can act as the experiment principal.
1689        if expcert:
1690            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1691            if not expcert_file:
1692                raise service_error(service_error.internal, 
1693                        "Cannot create temp cert file?")
1694        else:
1695            expcert_file = None
1696
1697        return (eid, expid, expcert_file)
1698
1699    def get_topology(self, req, tmpdir):
1700        """
1701        Get the ns2 content and put it into a file for parsing.  Call the local
1702        or remote parser and return the topdl.Topology.  Errors result in
1703        exceptions.  req is the request and tmpdir is a work directory.
1704        """
1705
1706        # The tcl parser needs to read a file so put the content into that file
1707        descr=req.get('experimentdescription', None)
1708        if descr:
1709            if 'ns2description' in descr:
1710                file_content=descr['ns2description']
1711            elif 'topdldescription' in descr:
1712                return topdl.Topology(**descr['topdldescription'])
1713            else:
1714                raise service_error(service_error.req, 
1715                        'Unknown experiment description type')
1716        else:
1717            raise service_error(service_error.req, "No experiment description")
1718
1719
1720        if self.splitter_url:
1721            self.log.debug("Calling remote topdl translator at %s" % \
1722                    self.splitter_url)
1723            top = self.remote_ns2topdl(self.splitter_url, file_content)
1724        else:
1725            tclfile = os.path.join(tmpdir, "experiment.tcl")
1726            if file_content:
1727                try:
1728                    f = open(tclfile, 'w')
1729                    f.write(file_content)
1730                    f.close()
1731                except EnvironmentError:
1732                    raise service_error(service_error.internal,
1733                            "Cannot write temp experiment description")
1734            else:
1735                raise service_error(service_error.req, 
1736                        "Only ns2descriptions supported")
1737            pid = "dummy"
1738            gid = "dummy"
1739            eid = "dummy"
1740
1741            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1742                str(self.muxmax), '-m', 'dummy']
1743
1744            tclcmd.extend([pid, gid, eid, tclfile])
1745
1746            self.log.debug("running local splitter %s", " ".join(tclcmd))
1747            # This is just fantastic.  As a side effect the parser copies
1748            # tb_compat.tcl into the current directory, so that directory
1749            # must be writable by the fedd user.  Doing this in the
1750            # temporary subdir ensures this is the case.
1751            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1752                    cwd=tmpdir)
1753            split_data = tclparser.stdout
1754
1755            top = topdl.topology_from_xml(file=split_data, top="experiment")
1756            os.remove(tclfile)
1757
1758        return top
1759
1760    def get_testbed_services(self, req, testbeds):
1761        """
1762        Parse the services section of the request into into two dicts mapping
1763        testbed to lists of federated_service objects.  The first lists all
1764        exporters of services, and the second all exporters of services that
1765        need control portals in the experiment.
1766        """
1767        masters = { }
1768        pmasters = { }
1769        for s in req.get('service', []):
1770            # If this is a service request with the importall field
1771            # set, fill it out.
1772
1773            if s.get('importall', False):
1774                s['import'] = [ tb for tb in testbeds \
1775                        if tb not in s.get('export',[])]
1776                del s['importall']
1777
1778            # Add the service to masters
1779            for tb in s.get('export', []):
1780                if s.get('name', None):
1781
1782                    params = { }
1783                    for a in s.get('fedAttr', []):
1784                        params[a.get('attribute', '')] = a.get('value','')
1785
1786                    fser = federated_service(name=s['name'],
1787                            exporter=tb, importers=s.get('import',[]),
1788                            params=params)
1789                    if fser.name == 'hide_hosts' \
1790                            and 'hosts' not in fser.params:
1791                        fser.params['hosts'] = \
1792                                ",".join(tb_hosts.get(fser.exporter, []))
1793                    if tb in masters: masters[tb].append(fser)
1794                    else: masters[tb] = [fser]
1795
1796                    if fser.portal:
1797                        if tb not in pmasters: pmasters[tb] = [ fser ]
1798                        else: pmasters[tb].append(fser)
1799                else:
1800                    self.log.error('Testbed service does not have name " + \
1801                            "and importers')
1802        return masters, pmasters
1803
1804    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1805        """
1806        Create the ssh keys necessary for interconnecting the portal nodes and
1807        the global hosts file for letting each segment know about the IP
1808        addresses in play.  Save these into the repo.  Add attributes to the
1809        autorizer allowing access controllers to download them and return a set
1810        of attributes that inform the segments where to find this stuff.  May
1811        raise service_errors in if there are problems.
1812        """
1813        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1814        gw_secretkey_base = "fed.%s" % self.ssh_type
1815        keydir = os.path.join(tmpdir, 'keys')
1816        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1817        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1818
1819        try:
1820            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1821        except ValueError:
1822            raise service_error(service_error.server_config, 
1823                    "Bad key type (%s)" % self.ssh_type)
1824
1825        self.generate_seer_certs(keydir)
1826
1827        # Copy configuration files into the remote file store
1828        # The config urlpath
1829        configpath = "/%s/config" % expid
1830        # The config file system location
1831        configdir ="%s%s" % ( self.repodir, configpath)
1832        try:
1833            os.makedirs(configdir)
1834        except EnvironmentError, e:
1835            raise service_error(service_error.internal,
1836                    "Cannot create config directory: %s" % e)
1837        try:
1838            f = open("%s/hosts" % configdir, "w")
1839            print >> f, string.join(hosts, '\n')
1840            f.close()
1841        except EnvironmentError, e:
1842            raise service_error(service_error.internal, 
1843                    "Cannot write hosts file: %s" % e)
1844        try:
1845            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1846            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1847            copy_file(os.path.join(keydir, 'ca.pem'), 
1848                    os.path.join(configdir, 'ca.pem'))
1849            copy_file(os.path.join(keydir, 'node.pem'), 
1850                    os.path.join(configdir, 'node.pem'))
1851        except EnvironmentError, e:
1852            raise service_error(service_error.internal, 
1853                    "Cannot copy keyfiles: %s" % e)
1854
1855        # Allow the individual testbeds to access the configuration files,
1856        # again by setting an attribute for the relevant pathnames on each
1857        # allocation principal.  Yeah, that's a long list comprehension.
1858        self.append_experiment_authorization(expid, set([
1859            (tbparams[tb]['allocID']['fedid'], "%s/%s" % (configpath, f)) \
1860                    for tb in tbparams.keys() \
1861                        for f in ("hosts", 'ca.pem', 'node.pem', 
1862                            gw_secretkey_base, gw_pubkey_base)]))
1863
1864        attrs = [ 
1865                {
1866                    'attribute': 'ssh_pubkey', 
1867                    'value': '%s/%s/config/%s' % \
1868                            (self.repo_url, expid, gw_pubkey_base)
1869                },
1870                {
1871                    'attribute': 'ssh_secretkey', 
1872                    'value': '%s/%s/config/%s' % \
1873                            (self.repo_url, expid, gw_secretkey_base)
1874                },
1875                {
1876                    'attribute': 'hosts', 
1877                    'value': '%s/%s/config/hosts' % \
1878                            (self.repo_url, expid)
1879                },
1880                {
1881                    'attribute': 'seer_ca_pem', 
1882                    'value': '%s/%s/config/%s' % \
1883                            (self.repo_url, expid, 'ca.pem')
1884                },
1885                {
1886                    'attribute': 'seer_node_pem', 
1887                    'value': '%s/%s/config/%s' % \
1888                            (self.repo_url, expid, 'node.pem')
1889                },
1890            ]
1891        return attrs
1892
1893
1894    def get_vtopo(self, req, fid):
1895        """
1896        Return the stored virtual topology for this experiment
1897        """
1898        rv = None
1899        state = None
1900
1901        req = req.get('VtopoRequestBody', None)
1902        if not req:
1903            raise service_error(service_error.req,
1904                    "Bad request format (no VtopoRequestBody)")
1905        exp = req.get('experiment', None)
1906        if exp:
1907            if exp.has_key('fedid'):
1908                key = exp['fedid']
1909                keytype = "fedid"
1910            elif exp.has_key('localname'):
1911                key = exp['localname']
1912                keytype = "localname"
1913            else:
1914                raise service_error(service_error.req, "Unknown lookup type")
1915        else:
1916            raise service_error(service_error.req, "No request?")
1917
1918        proof = self.check_experiment_access(fid, key)
1919
1920        self.state_lock.acquire()
1921        if self.state.has_key(key):
1922            if self.state[key].has_key('vtopo'):
1923                rv = { 'experiment' : {keytype: key },
1924                        'vtopo': self.state[key]['vtopo'],
1925                        'proof': proof.to_dict(), 
1926                    }
1927            else:
1928                state = self.state[key]['experimentStatus']
1929        self.state_lock.release()
1930
1931        if rv: return rv
1932        else: 
1933            if state:
1934                raise service_error(service_error.partial, 
1935                        "Not ready: %s" % state)
1936            else:
1937                raise service_error(service_error.req, "No such experiment")
1938
1939    def get_vis(self, req, fid):
1940        """
1941        Return the stored visualization for this experiment
1942        """
1943        rv = None
1944        state = None
1945
1946        req = req.get('VisRequestBody', None)
1947        if not req:
1948            raise service_error(service_error.req,
1949                    "Bad request format (no VisRequestBody)")
1950        exp = req.get('experiment', None)
1951        if exp:
1952            if exp.has_key('fedid'):
1953                key = exp['fedid']
1954                keytype = "fedid"
1955            elif exp.has_key('localname'):
1956                key = exp['localname']
1957                keytype = "localname"
1958            else:
1959                raise service_error(service_error.req, "Unknown lookup type")
1960        else:
1961            raise service_error(service_error.req, "No request?")
1962
1963        proof = self.check_experiment_access(fid, key)
1964
1965        self.state_lock.acquire()
1966        if self.state.has_key(key):
1967            if self.state[key].has_key('vis'):
1968                rv =  { 'experiment' : {keytype: key },
1969                        'vis': self.state[key]['vis'],
1970                        'proof': proof.to_dict(), 
1971                        }
1972            else:
1973                state = self.state[key]['experimentStatus']
1974        self.state_lock.release()
1975
1976        if rv: return rv
1977        else:
1978            if state:
1979                raise service_error(service_error.partial, 
1980                        "Not ready: %s" % state)
1981            else:
1982                raise service_error(service_error.req, "No such experiment")
1983
1984   
1985    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1986            top):
1987        """
1988        Store the various data that have changed in the experiment state
1989        between when it was started and the beginning of resource allocation.
1990        This is basically the information about each local allocation.  This
1991        fills in the values of the placeholder allocation in the state.  It
1992        also collects the access proofs and returns them as dicts for a
1993        response message.
1994        """
1995        # save federant information
1996        for k in allocated.keys():
1997            tbparams[k]['federant'] = {
1998                    'name': [ { 'localname' : eid} ],
1999                    'allocID' : tbparams[k]['allocID'],
2000                    'uri': tbparams[k]['uri'],
2001                    'proof': tbparams[k]['proof'],
2002                }
2003
2004        self.state_lock.acquire()
2005        self.state[eid]['vtopo'] = vtopo
2006        self.state[eid]['vis'] = vis
2007        self.state[eid]['experimentdescription'] = \
2008                { 'topdldescription': top.to_dict() }
2009        self.state[eid]['federant'] = \
2010                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2011                    if tbparams[tb].has_key('federant') ]
2012        # Access proofs for the response message
2013        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2014                    for p in tbparams[k]['federant']['proof']]
2015        if self.state_filename: 
2016            self.write_state()
2017        self.state_lock.release()
2018        return proofs
2019
2020    def clear_placeholder(self, eid, expid, tmpdir):
2021        """
2022        Clear the placeholder and remove any allocated temporary dir.
2023        """
2024
2025        self.state_lock.acquire()
2026        del self.state[eid]
2027        del self.state[expid]
2028        if self.state_filename: self.write_state()
2029        self.state_lock.release()
2030        if tmpdir and self.cleanup:
2031            self.remove_dirs(tmpdir)
2032
2033    # end of create_experiment sub-functions
2034
2035    def create_experiment(self, req, fid):
2036        """
2037        The external interface to experiment creation called from the
2038        dispatcher.
2039
2040        Creates a working directory, splits the incoming description using the
2041        splitter script and parses out the various subsections using the
2042        classes above.  Once each sub-experiment is created, use pooled threads
2043        to instantiate them and start it all up.
2044        """
2045
2046        req = req.get('CreateRequestBody', None)
2047        if req:
2048            key = self.get_experiment_key(req)
2049        else:
2050            raise service_error(service_error.req,
2051                    "Bad request format (no CreateRequestBody)")
2052
2053        # Import information from the requester
2054        if self.auth.import_credentials(data_list=req.get('credential', [])):
2055            self.auth.save()
2056
2057        # Make sure that the caller can talk to us
2058        proof = self.check_experiment_access(fid, key)
2059
2060        # Install the testbed map entries supplied with the request into a copy
2061        # of the testbed map.
2062        tbmap = dict(self.tbmap)
2063        for m in req.get('testbedmap', []):
2064            if 'testbed' in m and 'uri' in m:
2065                tbmap[m['testbed']] = m['uri']
2066
2067        # a place to work
2068        try:
2069            tmpdir = tempfile.mkdtemp(prefix="split-")
2070            os.mkdir(tmpdir+"/keys")
2071        except EnvironmentError:
2072            raise service_error(service_error.internal, "Cannot create tmp dir")
2073
2074        tbparams = { }
2075
2076        eid, expid, expcert_file = \
2077                self.get_experiment_ids_and_start(key, tmpdir)
2078
2079        # This catches exceptions to clear the placeholder if necessary
2080        try: 
2081            if not (eid and expid):
2082                raise service_error(service_error.internal, 
2083                        "Cannot find local experiment info!?")
2084
2085            top = self.get_topology(req, tmpdir)
2086            self.confirm_software(top)
2087            # Assign the IPs
2088            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2089            # Find the testbeds to look up
2090            tb_hosts = { }
2091            testbeds = [ ]
2092            for e in top.elements:
2093                if isinstance(e, topdl.Computer):
2094                    tb = e.get_attribute('testbed') or 'default'
2095                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2096                    else: 
2097                        tb_hosts[tb] = [ e.name ]
2098                        testbeds.append(tb)
2099
2100            masters, pmasters = self.get_testbed_services(req, testbeds)
2101            allocated = { }         # Testbeds we can access
2102            topo ={ }               # Sub topologies
2103            connInfo = { }          # Connection information
2104
2105            self.split_topology(top, topo, testbeds)
2106
2107            self.get_access_to_testbeds(testbeds, fid, allocated, 
2108                    tbparams, masters, tbmap, expid, expcert_file)
2109
2110            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2111
2112            part = experiment_partition(self.auth, self.store_url, tbmap,
2113                    self.muxmax, self.direct_transit)
2114            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2115                    connInfo, expid)
2116
2117            auth_attrs = set()
2118            # Now get access to the dynamic testbeds (those added above)
2119            for tb in [ t for t in topo if t not in allocated]:
2120                self.get_access(tb, tbparams, fid, masters, tbmap, 
2121                        expid, expcert_file)
2122                allocated[tb] = 1
2123                store_keys = topo[tb].get_attribute('store_keys')
2124                # Give the testbed access to keys it exports or imports
2125                if store_keys:
2126                    auth_attrs.update(set([
2127                        (tbparams[tb]['allocID']['fedid'], sk) \
2128                                for sk in store_keys.split(" ")]))
2129
2130            if auth_attrs:
2131                self.append_experiment_authorization(expid, auth_attrs)
2132
2133            # transit and disconnected testbeds may not have a connInfo entry.
2134            # Fill in the blanks.
2135            for t in allocated.keys():
2136                if not connInfo.has_key(t):
2137                    connInfo[t] = { }
2138
2139            self.wrangle_software(expid, top, topo, tbparams)
2140
2141            vtopo = topdl.topology_to_vtopo(top)
2142            vis = self.genviz(vtopo)
2143            proofs = self.save_federant_information(allocated, tbparams, 
2144                    eid, vtopo, vis, top)
2145        except service_error, e:
2146            # If something goes wrong in the parse (usually an access error)
2147            # clear the placeholder state.  From here on out the code delays
2148            # exceptions.  Failing at this point returns a fault to the remote
2149            # caller.
2150            self.clear_placeholder(eid, expid, tmpdir)
2151            raise e
2152
2153        # Start the background swapper and return the starting state.  From
2154        # here on out, the state will stick around a while.
2155
2156        # Create a logger that logs to the experiment's state object as well as
2157        # to the main log file.
2158        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2159        alloc_collector = self.list_log(self.state[eid]['log'])
2160        h = logging.StreamHandler(alloc_collector)
2161        # XXX: there should be a global one of these rather than repeating the
2162        # code.
2163        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2164                    '%d %b %y %H:%M:%S'))
2165        alloc_log.addHandler(h)
2166
2167        # Start a thread to do the resource allocation
2168        t  = Thread(target=self.allocate_resources,
2169                args=(allocated, masters, eid, expid, tbparams, 
2170                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2171                    connInfo, tbmap, expcert_file),
2172                name=eid)
2173        t.start()
2174
2175        rv = {
2176                'experimentID': [
2177                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2178                ],
2179                'experimentStatus': 'starting',
2180                'proof': [ proof.to_dict() ] + proofs,
2181            }
2182
2183        return rv
2184   
2185    def get_experiment_fedid(self, key):
2186        """
2187        find the fedid associated with the localname key in the state database.
2188        """
2189
2190        rv = None
2191        self.state_lock.acquire()
2192        if self.state.has_key(key):
2193            if isinstance(self.state[key], dict):
2194                try:
2195                    kl = [ f['fedid'] for f in \
2196                            self.state[key]['experimentID']\
2197                                if f.has_key('fedid') ]
2198                except KeyError:
2199                    self.state_lock.release()
2200                    raise service_error(service_error.internal, 
2201                            "No fedid for experiment %s when getting "+\
2202                                    "fedid(!?)" % key)
2203                if len(kl) == 1:
2204                    rv = kl[0]
2205                else:
2206                    self.state_lock.release()
2207                    raise service_error(service_error.internal, 
2208                            "multiple fedids for experiment %s when " +\
2209                                    "getting fedid(!?)" % key)
2210            else:
2211                self.state_lock.release()
2212                raise service_error(service_error.internal, 
2213                        "Unexpected state for %s" % key)
2214        self.state_lock.release()
2215        return rv
2216
2217    def check_experiment_access(self, fid, key):
2218        """
2219        Confirm that the fid has access to the experiment.  Though a request
2220        may be made in terms of a local name, the access attribute is always
2221        the experiment's fedid.
2222        """
2223        if not isinstance(key, fedid):
2224            key = self.get_experiment_fedid(key)
2225
2226        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2227
2228        if access_ok:
2229            return proof
2230        else:
2231            raise service_error(service_error.access, "Access Denied",
2232                proof)
2233
2234
2235    def get_handler(self, path, fid):
2236        """
2237        Perhaps surprisingly named, this function handles HTTP GET requests to
2238        this server (SOAP requests are POSTs).
2239        """
2240        self.log.info("Get handler %s %s" % (path, fid))
2241        # XXX: log proofs?
2242        if self.auth.check_attribute(fid, path):
2243            return ("%s/%s" % (self.repodir, path), "application/binary")
2244        else:
2245            return (None, None)
2246
2247    def clean_info_response(self, rv, proof):
2248        """
2249        Remove the information in the experiment's state object that is not in
2250        the info response.
2251        """
2252        # Remove the owner info (should always be there, but...)
2253        if rv.has_key('owner'): del rv['owner']
2254        if 'auth' in rv: del rv['auth']
2255
2256        # Convert the log into the allocationLog parameter and remove the
2257        # log entry (with defensive programming)
2258        if rv.has_key('log'):
2259            rv['allocationLog'] = "".join(rv['log'])
2260            del rv['log']
2261        else:
2262            rv['allocationLog'] = ""
2263
2264        if rv['experimentStatus'] != 'active':
2265            if rv.has_key('federant'): del rv['federant']
2266        else:
2267            # remove the allocationID and uri info from each federant
2268            for f in rv.get('federant', []):
2269                if f.has_key('allocID'): del f['allocID']
2270                if f.has_key('uri'): del f['uri']
2271        rv['proof'] = proof.to_dict()
2272
2273        return rv
2274
2275    def get_info(self, req, fid):
2276        """
2277        Return all the stored info about this experiment
2278        """
2279        rv = None
2280
2281        req = req.get('InfoRequestBody', None)
2282        if not req:
2283            raise service_error(service_error.req,
2284                    "Bad request format (no InfoRequestBody)")
2285        exp = req.get('experiment', None)
2286        if exp:
2287            if exp.has_key('fedid'):
2288                key = exp['fedid']
2289                keytype = "fedid"
2290            elif exp.has_key('localname'):
2291                key = exp['localname']
2292                keytype = "localname"
2293            else:
2294                raise service_error(service_error.req, "Unknown lookup type")
2295        else:
2296            raise service_error(service_error.req, "No request?")
2297
2298        proof = self.check_experiment_access(fid, key)
2299
2300        # The state may be massaged by the service function that called
2301        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2302        # state.
2303        self.state_lock.acquire()
2304        if self.state.has_key(key):
2305            rv = copy.deepcopy(self.state[key])
2306        self.state_lock.release()
2307
2308        if rv:
2309            return self.clean_info_response(rv, proof)
2310        else:
2311            raise service_error(service_error.req, "No such experiment")
2312
2313    def get_multi_info(self, req, fid):
2314        """
2315        Return all the stored info that this fedid can access
2316        """
2317        rv = { 'info': [ ], 'proof': [ ] }
2318
2319        self.state_lock.acquire()
2320        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2321            try:
2322                proof = self.check_experiment_access(fid, key)
2323            except service_error, e:
2324                if e.code == service_error.access:
2325                    continue
2326                else:
2327                    self.state_lock.release()
2328                    raise e
2329
2330            if self.state.has_key(key):
2331                e = copy.deepcopy(self.state[key])
2332                e = self.clean_info_response(e, proof)
2333                rv['info'].append(e)
2334                rv['proof'].append(proof.to_dict())
2335        self.state_lock.release()
2336        return rv
2337
2338    def check_termination_status(self, fed_exp, force):
2339        """
2340        Confirm that the experiment is sin a valid state to stop (or force it)
2341        return the state - invalid states for deletion and force settings cause
2342        exceptions.
2343        """
2344        self.state_lock.acquire()
2345        status = fed_exp.get('experimentStatus', None)
2346
2347        if status:
2348            if status in ('starting', 'terminating'):
2349                if not force:
2350                    self.state_lock.release()
2351                    raise service_error(service_error.partial, 
2352                            'Experiment still being created or destroyed')
2353                else:
2354                    self.log.warning('Experiment in %s state ' % status + \
2355                            'being terminated by force.')
2356            self.state_lock.release()
2357            return status
2358        else:
2359            # No status??? trouble
2360            self.state_lock.release()
2361            raise service_error(service_error.internal,
2362                    "Experiment has no status!?")
2363
2364
2365    def get_termination_info(self, fed_exp):
2366        ids = []
2367        term_params = { }
2368        self.state_lock.acquire()
2369        #  experimentID is a list of dicts that are self-describing
2370        #  identifiers.  This finds all the fedids and localnames - the
2371        #  keys of self.state - and puts them into ids, which is used to delete
2372        #  the state after everything is swapped out.
2373        for id in fed_exp.get('experimentID', []):
2374            if 'fedid' in id: 
2375                ids.append(id['fedid'])
2376                repo = "%s" % id['fedid']
2377            if 'localname' in id: ids.append(id['localname'])
2378
2379        # Get the experimentAccess - the principal for this experiment.  It
2380        # is this principal to which credentials have been delegated, and
2381        # as which the experiment controller must act.
2382        if 'experimentAccess' in fed_exp and \
2383                'X509' in fed_exp['experimentAccess']:
2384            expcert = fed_exp['experimentAccess']['X509']
2385        else:
2386            expcert = None
2387
2388        # Collect the allocation/segment ids into a dict keyed by the fedid
2389        # of the allocation (or a monotonically increasing integer) that
2390        # contains a tuple of uri, aid (which is a dict...)
2391        for i, fed in enumerate(fed_exp.get('federant', [])):
2392            try:
2393                uri = fed['uri']
2394                aid = fed['allocID']
2395                k = fed['allocID'].get('fedid', i)
2396            except KeyError, e:
2397                continue
2398            term_params[k] = (uri, aid)
2399        # Change the experiment state
2400        fed_exp['experimentStatus'] = 'terminating'
2401        if self.state_filename: self.write_state()
2402        self.state_lock.release()
2403
2404        return ids, term_params, expcert, repo
2405
2406
2407    def deallocate_resources(self, term_params, expcert, status, force, 
2408            dealloc_log):
2409        tmpdir = None
2410        # This try block makes sure the tempdir is cleared
2411        try:
2412            # If no expcert, try the deallocation as the experiment
2413            # controller instance.
2414            if expcert and self.auth_type != 'legacy': 
2415                try:
2416                    tmpdir = tempfile.mkdtemp(prefix="term-")
2417                except EnvironmentError:
2418                    raise service_error(service_error.internal, 
2419                            "Cannot create tmp dir")
2420                cert_file = self.make_temp_certfile(expcert, tmpdir)
2421                pw = None
2422            else: 
2423                cert_file = self.cert_file
2424                pw = self.cert_pwd
2425
2426            # Stop everyone.  NB, wait_for_all waits until a thread starts
2427            # and then completes, so we can't wait if nothing starts.  So,
2428            # no tbparams, no start.
2429            if len(term_params) > 0:
2430                tp = thread_pool(self.nthreads)
2431                for k, (uri, aid) in term_params.items():
2432                    # Create and start a thread to stop the segment
2433                    tp.wait_for_slot()
2434                    t  = pooled_thread(\
2435                            target=self.terminate_segment(log=dealloc_log,
2436                                testbed=uri,
2437                                cert_file=cert_file, 
2438                                cert_pwd=pw,
2439                                trusted_certs=self.trusted_certs,
2440                                caller=self.call_TerminateSegment),
2441                            args=(uri, aid), name=k,
2442                            pdata=tp, trace_file=self.trace_file)
2443                    t.start()
2444                # Wait for completions
2445                tp.wait_for_all_done()
2446
2447            # release the allocations (failed experiments have done this
2448            # already, and starting experiments may be in odd states, so we
2449            # ignore errors releasing those allocations
2450            try: 
2451                for k, (uri, aid)  in term_params.items():
2452                    self.release_access(None, aid, uri=uri,
2453                            cert_file=cert_file, cert_pwd=pw)
2454            except service_error, e:
2455                if status != 'failed' and not force:
2456                    raise e
2457
2458        # Clean up the tmpdir no matter what
2459        finally:
2460            if tmpdir: self.remove_dirs(tmpdir)
2461
2462    def terminate_experiment(self, req, fid):
2463        """
2464        Swap this experiment out on the federants and delete the shared
2465        information
2466        """
2467        tbparams = { }
2468        req = req.get('TerminateRequestBody', None)
2469        if not req:
2470            raise service_error(service_error.req,
2471                    "Bad request format (no TerminateRequestBody)")
2472
2473        key = self.get_experiment_key(req, 'experiment')
2474        proof = self.check_experiment_access(fid, key)
2475        exp = req.get('experiment', False)
2476        force = req.get('force', False)
2477
2478        dealloc_list = [ ]
2479
2480
2481        # Create a logger that logs to the dealloc_list as well as to the main
2482        # log file.
2483        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2484        h = logging.StreamHandler(self.list_log(dealloc_list))
2485        # XXX: there should be a global one of these rather than repeating the
2486        # code.
2487        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2488                    '%d %b %y %H:%M:%S'))
2489        dealloc_log.addHandler(h)
2490
2491        self.state_lock.acquire()
2492        fed_exp = self.state.get(key, None)
2493        self.state_lock.release()
2494        repo = None
2495
2496        if fed_exp:
2497            status = self.check_termination_status(fed_exp, force)
2498            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2499            self.deallocate_resources(term_params, expcert, status, force, 
2500                    dealloc_log)
2501
2502            # Remove the terminated experiment
2503            self.state_lock.acquire()
2504            for id in ids:
2505                self.clear_experiment_authorization(id, need_state_lock=False)
2506                if id in self.state: del self.state[id]
2507
2508            if self.state_filename: self.write_state()
2509            self.state_lock.release()
2510
2511            # Delete any synch points associated with this experiment.  All
2512            # synch points begin with the fedid of the experiment.
2513            fedid_keys = set(["fedid:%s" % f for f in ids \
2514                    if isinstance(f, fedid)])
2515            for k in self.synch_store.all_keys():
2516                try:
2517                    if len(k) > 45 and k[0:46] in fedid_keys:
2518                        self.synch_store.del_value(k)
2519                except synch_store.BadDeletionError:
2520                    pass
2521            self.write_store()
2522
2523            # Remove software and other cached stuff from the filesystem.
2524            if repo:
2525                self.remove_dirs("%s/%s" % (self.repodir, repo))
2526       
2527            return { 
2528                    'experiment': exp , 
2529                    'deallocationLog': string.join(dealloc_list, ''),
2530                    'proof': [proof.to_dict()],
2531                    }
2532        else:
2533            raise service_error(service_error.req, "No saved state")
2534
2535
2536    def GetValue(self, req, fid):
2537        """
2538        Get a value from the synchronized store
2539        """
2540        req = req.get('GetValueRequestBody', None)
2541        if not req:
2542            raise service_error(service_error.req,
2543                    "Bad request format (no GetValueRequestBody)")
2544       
2545        name = req.get('name', None)
2546        wait = req.get('wait', False)
2547        rv = { 'name': name }
2548
2549        if not name:
2550            raise service_error(service_error.req, "No name?")
2551
2552        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2553
2554        if access_ok:
2555            self.log.debug("[GetValue] asking for %s " % name)
2556            try:
2557                v = self.synch_store.get_value(name, wait)
2558            except synch_store.RevokedKeyError:
2559                # No more synch on this key
2560                raise service_error(service_error.federant, 
2561                        "Synch key %s revoked" % name)
2562            if v is not None:
2563                rv['value'] = v
2564            rv['proof'] = proof.to_dict()
2565            self.log.debug("[GetValue] got %s from %s" % (v, name))
2566            return rv
2567        else:
2568            raise service_error(service_error.access, "Access Denied",
2569                    proof=proof)
2570       
2571
2572    def SetValue(self, req, fid):
2573        """
2574        Set a value in the synchronized store
2575        """
2576        req = req.get('SetValueRequestBody', None)
2577        if not req:
2578            raise service_error(service_error.req,
2579                    "Bad request format (no SetValueRequestBody)")
2580       
2581        name = req.get('name', None)
2582        v = req.get('value', '')
2583
2584        if not name:
2585            raise service_error(service_error.req, "No name?")
2586
2587        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2588
2589        if access_ok:
2590            try:
2591                self.synch_store.set_value(name, v)
2592                self.write_store()
2593                self.log.debug("[SetValue] set %s to %s" % (name, v))
2594            except synch_store.CollisionError:
2595                # Translate into a service_error
2596                raise service_error(service_error.req,
2597                        "Value already set: %s" %name)
2598            except synch_store.RevokedKeyError:
2599                # No more synch on this key
2600                raise service_error(service_error.federant, 
2601                        "Synch key %s revoked" % name)
2602                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2603        else:
2604            raise service_error(service_error.access, "Access Denied",
2605                    proof=proof)
Note: See TracBrowser for help on using the repository browser.