source: fedd/federation/experiment_control.py @ db974ed

compt_changesinfo-ops
Last change on this file since db974ed was db974ed, checked in by Ted Faber <faber@…>, 12 years ago

Remove debugging (from #35)

  • Property mode set to 100644
File size: 87.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from fedid import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38
39import topdl
40import list_log
41from ip_allocator import ip_allocator
42from ip_addr import ip_addr
43
44
45class nullHandler(logging.Handler):
46    def emit(self, record): pass
47
48fl = logging.getLogger("fedd.experiment_control")
49fl.addHandler(nullHandler())
50
51
52# Right now, no support for composition.
53class federated_service:
54    def __init__(self, name, exporter=None, importers=None, params=None, 
55            reqs=None, portal=None):
56        self.name=name
57        self.exporter=exporter
58        if importers is None: self.importers = []
59        else: self.importers=importers
60        if params is None: self.params = { }
61        else: self.params = params
62        if reqs is None: self.reqs = []
63        else: self.reqs = reqs
64
65        if portal is not None:
66            self.portal = portal
67        else:
68            self.portal = (name in federated_service.needs_portal)
69
70    def __str__(self):
71        return "name %s export %s import %s params %s reqs %s" % \
72                (self.name, self.exporter, self.importers, self.params,
73                        [ (r['name'], r['visibility']) for r in self.reqs] )
74
75    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
76
77class experiment_control_local(experiment_control_legacy):
78    """
79    Control of experiments that this system can directly access.
80
81    Includes experiment creation, termination and information dissemination.
82    Thred safe.
83    """
84
85    class ssh_cmd_timeout(RuntimeError): pass
86   
87    call_RequestAccess = service_caller('RequestAccess')
88    call_ReleaseAccess = service_caller('ReleaseAccess')
89    call_StartSegment = service_caller('StartSegment')
90    call_TerminateSegment = service_caller('TerminateSegment')
91    call_Ns2Topdl = service_caller('Ns2Topdl')
92
93    def __init__(self, config=None, auth=None):
94        """
95        Intialize the various attributes, most from the config object
96        """
97
98        def parse_tarfile_list(tf):
99            """
100            Parse a tarfile list from the configuration.  This is a set of
101            paths and tarfiles separated by spaces.
102            """
103            rv = [ ]
104            if tf is not None:
105                tl = tf.split()
106                while len(tl) > 1:
107                    p, t = tl[0:2]
108                    del tl[0:2]
109                    rv.append((p, t))
110            return rv
111
112        self.list_log = list_log.list_log
113
114        self.cert_file = config.get("experiment_control", "cert_file")
115        if self.cert_file:
116            self.cert_pwd = config.get("experiment_control", "cert_pwd")
117        else:
118            self.cert_file = config.get("globals", "cert_file")
119            self.cert_pwd = config.get("globals", "cert_pwd")
120
121        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
122                or config.get("globals", "trusted_certs")
123
124        self.repodir = config.get("experiment_control", "repodir")
125        self.repo_url = config.get("experiment_control", "repo_url", 
126                "https://users.isi.deterlab.net:23235");
127
128        self.exp_stem = "fed-stem"
129        self.log = logging.getLogger("fedd.experiment_control")
130        set_log_level(config, "experiment_control", self.log)
131        self.muxmax = 2
132        self.nthreads = 10
133        self.randomize_experiments = False
134
135        self.splitter = None
136        self.ssh_keygen = "/usr/bin/ssh-keygen"
137        self.ssh_identity_file = None
138
139
140        self.debug = config.getboolean("experiment_control", "create_debug")
141        self.cleanup = not config.getboolean("experiment_control", 
142                "leave_tmpfiles")
143        self.state_filename = config.get("experiment_control", 
144                "experiment_state")
145        self.store_filename = config.get("experiment_control", 
146                "synch_store")
147        self.store_url = config.get("experiment_control", "store_url")
148        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
149        self.fedkit = parse_tarfile_list(\
150                config.get("experiment_control", "fedkit"))
151        self.gatewaykit = parse_tarfile_list(\
152                config.get("experiment_control", "gatewaykit"))
153        accessdb_file = config.get("experiment_control", "accessdb")
154
155        dt = config.get("experiment_control", "direct_transit")
156        self.auth_type = config.get('experiment_control', 'auth_type') \
157                or 'legacy'
158        self.auth_dir = config.get('experiment_control', 'auth_dir')
159        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
160        else: self.direct_transit = [ ]
161        # NB for internal master/slave ops, not experiment setup
162        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
163
164        self.overrides = set([])
165        ovr = config.get('experiment_control', 'overrides')
166        if ovr:
167            for o in ovr.split(","):
168                o = o.strip()
169                if o.startswith('fedid:'): o = o[len('fedid:'):]
170                self.overrides.add(fedid(hexstr=o))
171
172        self.state = { }
173        self.state_lock = Lock()
174        self.tclsh = "/usr/local/bin/otclsh"
175        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
176                config.get("experiment_control", "tcl_splitter",
177                        "/usr/testbed/lib/ns2ir/parse.tcl")
178        mapdb_file = config.get("experiment_control", "mapdb")
179        self.trace_file = sys.stderr
180
181        self.def_expstart = \
182                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
183                "/tmp/federate";
184        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
185                "FEDDIR/hosts";
186        self.def_gwstart = \
187                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
188                "/tmp/bridge.log";
189        self.def_mgwstart = \
190                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
191                "/tmp/bridge.log";
192        self.def_gwimage = "FBSD61-TUNNEL2";
193        self.def_gwtype = "pc";
194        self.local_access = { }
195
196        if self.auth_type == 'legacy':
197            if auth:
198                self.auth = auth
199            else:
200                self.log.error( "[access]: No authorizer initialized, " +\
201                        "creating local one.")
202                auth = authorizer()
203            self.get_access = self.legacy_get_access
204        elif self.auth_type == 'abac':
205            self.auth = abac_authorizer(load=self.auth_dir)
206        else:
207            raise service_error(service_error.internal, 
208                    "Unknown auth_type: %s" % self.auth_type)
209
210        if mapdb_file:
211            self.read_mapdb(mapdb_file)
212        else:
213            self.log.warn("[experiment_control] No testbed map, using defaults")
214            self.tbmap = { 
215                    'deter':'https://users.isi.deterlab.net:23235',
216                    'emulab':'https://users.isi.deterlab.net:23236',
217                    'ucb':'https://users.isi.deterlab.net:23237',
218                    }
219
220        if accessdb_file:
221                self.read_accessdb(accessdb_file)
222        else:
223            raise service_error(service_error.internal,
224                    "No accessdb specified in config")
225
226        # Grab saved state.  OK to do this w/o locking because it's read only
227        # and only one thread should be in existence that can see self.state at
228        # this point.
229        if self.state_filename:
230            self.read_state()
231
232        if self.store_filename:
233            self.read_store()
234        else:
235            self.log.warning("No saved synch store")
236            self.synch_store = synch_store
237
238        # Dispatch tables
239        self.soap_services = {\
240                'New': soap_handler('New', self.new_experiment),
241                'Create': soap_handler('Create', self.create_experiment),
242                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
243                'Vis': soap_handler('Vis', self.get_vis),
244                'Info': soap_handler('Info', self.get_info),
245                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
246                'Terminate': soap_handler('Terminate', 
247                    self.terminate_experiment),
248                'GetValue': soap_handler('GetValue', self.GetValue),
249                'SetValue': soap_handler('SetValue', self.SetValue),
250        }
251
252        self.xmlrpc_services = {\
253                'New': xmlrpc_handler('New', self.new_experiment),
254                'Create': xmlrpc_handler('Create', self.create_experiment),
255                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
256                'Vis': xmlrpc_handler('Vis', self.get_vis),
257                'Info': xmlrpc_handler('Info', self.get_info),
258                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
259                'Terminate': xmlrpc_handler('Terminate',
260                    self.terminate_experiment),
261                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
262                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
263        }
264
265    # Call while holding self.state_lock
266    def write_state(self):
267        """
268        Write a new copy of experiment state after copying the existing state
269        to a backup.
270
271        State format is a simple pickling of the state dictionary.
272        """
273        if os.access(self.state_filename, os.W_OK):
274            copy_file(self.state_filename, \
275                    "%s.bak" % self.state_filename)
276        try:
277            f = open(self.state_filename, 'w')
278            pickle.dump(self.state, f)
279        except EnvironmentError, e:
280            self.log.error("Can't write file %s: %s" % \
281                    (self.state_filename, e))
282        except pickle.PicklingError, e:
283            self.log.error("Pickling problem: %s" % e)
284        except TypeError, e:
285            self.log.error("Pickling problem (TypeError): %s" % e)
286
287    @staticmethod
288    def get_alloc_ids(state):
289        """
290        Pull the fedids of the identifiers of each allocation from the
291        state.  Again, a dict dive that's best isolated.
292
293        Used by read_store and read state
294        """
295
296        return [ f['allocID']['fedid'] 
297                for f in state.get('federant',[]) \
298                    if f.has_key('allocID') and \
299                        f['allocID'].has_key('fedid')]
300
301    # Call while holding self.state_lock
302    def read_state(self):
303        """
304        Read a new copy of experiment state.  Old state is overwritten.
305
306        State format is a simple pickling of the state dictionary.
307        """
308       
309        def get_experiment_id(state):
310            """
311            Pull the fedid experimentID out of the saved state.  This is kind
312            of a gross walk through the dict.
313            """
314
315            if state.has_key('experimentID'):
316                for e in state['experimentID']:
317                    if e.has_key('fedid'):
318                        return e['fedid']
319                else:
320                    return None
321            else:
322                return None
323
324        try:
325            f = open(self.state_filename, "r")
326            self.state = pickle.load(f)
327            self.log.debug("[read_state]: Read state from %s" % \
328                    self.state_filename)
329        except EnvironmentError, e:
330            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
331                    % (self.state_filename, e))
332        except pickle.UnpicklingError, e:
333            self.log.warning(("[read_state]: No saved state: " + \
334                    "Unpickling failed: %s") % e)
335       
336        for s in self.state.values():
337            try:
338
339                eid = get_experiment_id(s)
340                if eid : 
341                    if self.auth_type == 'legacy':
342                        # XXX: legacy
343                        # Give the owner rights to the experiment
344                        self.auth.set_attribute(s['owner'], eid)
345                        # And holders of the eid as well
346                        self.auth.set_attribute(eid, eid)
347                        # allow overrides to control experiments as well
348                        for o in self.overrides:
349                            self.auth.set_attribute(o, eid)
350                        # Set permissions to allow reading of the software
351                        # repo, if any, as well.
352                        for a in self.get_alloc_ids(s):
353                            self.auth.set_attribute(a, 'repo/%s' % eid)
354                else:
355                    raise KeyError("No experiment id")
356            except KeyError, e:
357                self.log.warning("[read_state]: State ownership or identity " +\
358                        "misformatted in %s: %s" % (self.state_filename, e))
359
360
361    def read_accessdb(self, accessdb_file):
362        """
363        Read the mapping from fedids that can create experiments to their name
364        in the 3-level access namespace.  All will be asserted from this
365        testbed and can include the local username and porject that will be
366        asserted on their behalf by this fedd.  Each fedid is also added to the
367        authorization system with the "create" attribute.
368        """
369        self.accessdb = {}
370        # These are the regexps for parsing the db
371        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
372        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
373                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
374        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
375                "\s*->\s*(" + name_expr + ")\s*$")
376        lineno = 0
377
378        # Parse the mappings and store in self.authdb, a dict of
379        # fedid -> (proj, user)
380        try:
381            f = open(accessdb_file, "r")
382            for line in f:
383                lineno += 1
384                line = line.strip()
385                if len(line) == 0 or line.startswith('#'):
386                    continue
387                m = project_line.match(line)
388                if m:
389                    fid = fedid(hexstr=m.group(1))
390                    project, user = m.group(2,3)
391                    if not self.accessdb.has_key(fid):
392                        self.accessdb[fid] = []
393                    self.accessdb[fid].append((project, user))
394                    continue
395
396                m = user_line.match(line)
397                if m:
398                    fid = fedid(hexstr=m.group(1))
399                    project = None
400                    user = m.group(2)
401                    if not self.accessdb.has_key(fid):
402                        self.accessdb[fid] = []
403                    self.accessdb[fid].append((project, user))
404                    continue
405                self.log.warn("[experiment_control] Error parsing access " +\
406                        "db %s at line %d" %  (accessdb_file, lineno))
407        except EnvironmentError:
408            raise service_error(service_error.internal,
409                    ("Error opening/reading %s as experiment " +\
410                            "control accessdb") %  accessdb_file)
411        f.close()
412
413        # Initialize the authorization attributes
414        # XXX: legacy
415        if self.auth_type == 'legacy':
416            for fid in self.accessdb.keys():
417                self.auth.set_attribute(fid, 'create')
418                self.auth.set_attribute(fid, 'new')
419
420    def read_mapdb(self, file):
421        """
422        Read a simple colon separated list of mappings for the
423        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
424        """
425
426        self.tbmap = { }
427        lineno =0
428        try:
429            f = open(file, "r")
430            for line in f:
431                lineno += 1
432                line = line.strip()
433                if line.startswith('#') or len(line) == 0:
434                    continue
435                try:
436                    label, url = line.split(':', 1)
437                    self.tbmap[label] = url
438                except ValueError, e:
439                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
440                            "map db: %s %s" % (lineno, line, e))
441        except EnvironmentError, e:
442            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
443                    "open %s: %s" % (file, e))
444        else:
445            f.close()
446
447    def read_store(self):
448        try:
449            self.synch_store = synch_store()
450            self.synch_store.load(self.store_filename)
451            self.log.debug("[read_store]: Read store from %s" % \
452                    self.store_filename)
453        except EnvironmentError, e:
454            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
455                    % (self.state_filename, e))
456            self.synch_store = synch_store()
457
458        # Set the initial permissions on data in the store.  XXX: This ad hoc
459        # authorization attribute initialization is getting out of hand.
460        # XXX: legacy
461        if self.auth_type == 'legacy':
462            for k in self.synch_store.all_keys():
463                try:
464                    if k.startswith('fedid:'):
465                        fid = fedid(hexstr=k[6:46])
466                        if self.state.has_key(fid):
467                            for a in self.get_alloc_ids(self.state[fid]):
468                                self.auth.set_attribute(a, k)
469                except ValueError, e:
470                    self.log.warn("Cannot deduce permissions for %s" % k)
471
472
473    def write_store(self):
474        """
475        Write a new copy of synch_store after writing current state
476        to a backup.  We use the internal synch_store pickle method to avoid
477        incinsistent data.
478
479        State format is a simple pickling of the store.
480        """
481        if os.access(self.store_filename, os.W_OK):
482            copy_file(self.store_filename, \
483                    "%s.bak" % self.store_filename)
484        try:
485            self.synch_store.save(self.store_filename)
486        except EnvironmentError, e:
487            self.log.error("Can't write file %s: %s" % \
488                    (self.store_filename, e))
489        except TypeError, e:
490            self.log.error("Pickling problem (TypeError): %s" % e)
491
492
493    def remove_dirs(self, dir):
494        """
495        Remove the directory tree and all files rooted at dir.  Log any errors,
496        but continue.
497        """
498        self.log.debug("[removedirs]: removing %s" % dir)
499        try:
500            for path, dirs, files in os.walk(dir, topdown=False):
501                for f in files:
502                    os.remove(os.path.join(path, f))
503                for d in dirs:
504                    os.rmdir(os.path.join(path, d))
505            os.rmdir(dir)
506        except EnvironmentError, e:
507            self.log.error("Error deleting directory tree in %s" % e);
508
509    @staticmethod
510    def make_temp_certfile(expcert, tmpdir):
511        """
512        make a protected copy of the access certificate so the experiment
513        controller can act as the experiment principal.  mkstemp is the most
514        secure way to do that. The directory should be created by
515        mkdtemp.  Return the filename.
516        """
517        if expcert and tmpdir:
518            try:
519                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
520                f = os.fdopen(certf, 'w')
521                print >> f, expcert
522                f.close()
523            except EnvironmentError, e:
524                raise service_error(service_error.internal, 
525                        "Cannot create temp cert file?")
526            return certfn
527        else:
528            return None
529
530       
531    def generate_ssh_keys(self, dest, type="rsa" ):
532        """
533        Generate a set of keys for the gateways to use to talk.
534
535        Keys are of type type and are stored in the required dest file.
536        """
537        valid_types = ("rsa", "dsa")
538        t = type.lower();
539        if t not in valid_types: raise ValueError
540        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
541
542        try:
543            trace = open("/dev/null", "w")
544        except EnvironmentError:
545            raise service_error(service_error.internal,
546                    "Cannot open /dev/null??");
547
548        # May raise CalledProcessError
549        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
550        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
551        if rv != 0:
552            raise service_error(service_error.internal, 
553                    "Cannot generate nonce ssh keys.  %s return code %d" \
554                            % (self.ssh_keygen, rv))
555
556    def generate_seer_certs(self, destdir):
557        '''
558        Create a SEER ca cert and a node cert in destdir/ca.pem and
559        destdir/node.pem respectively.  These will be distributed throughout
560        the federated experiment.  This routine reports errors via
561        service_errors.
562        '''
563        openssl = '/usr/bin/openssl'
564        # All the filenames and parameters we need for openssl calls below
565        ca_key =os.path.join(destdir, 'ca.key') 
566        ca_pem = os.path.join(destdir, 'ca.pem')
567        node_key =os.path.join(destdir, 'node.key') 
568        node_pem = os.path.join(destdir, 'node.pem')
569        node_req = os.path.join(destdir, 'node.req')
570        node_signed = os.path.join(destdir, 'node.signed')
571        days = '%s' % (365 * 10)
572        serial = '%s' % random.randint(0, 1<<16)
573
574        try:
575            # Sequence of calls to create a CA key, create a ca cert, create a
576            # node key, node signing request, and finally a signed node
577            # certificate.
578            sequence = (
579                    (openssl, 'genrsa', '-out', ca_key, '1024'),
580                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
581                        ca_pem, '-days', days, '-subj', 
582                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
583                    (openssl, 'genrsa', '-out', node_key, '1024'),
584                    (openssl, 'req', '-new', '-key', node_key, '-out', 
585                        node_req, '-days', days, '-subj', 
586                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
587                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
588                        '-set_serial', serial, '-req', '-in', node_req, 
589                        '-out', node_signed, '-days', days),
590                )
591            # Do all that stuff; bail if there's an error, and push all the
592            # output to dev/null.
593            for cmd in sequence:
594                trace = open("/dev/null", "w")
595                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
596                if rv != 0:
597                    raise service_error(service_error.internal, 
598                            "Cannot generate SEER certs.  %s return code %d" \
599                                    % (' '.join(cmd), rv))
600            # Concatinate the node key and signed certificate into node.pem
601            f = open(node_pem, 'w')
602            for comp in (node_signed, node_key):
603                g = open(comp, 'r')
604                f.write(g.read())
605                g.close()
606            f.close()
607
608            # Throw out intermediaries.
609            for fn in (ca_key, node_key, node_req, node_signed):
610                os.unlink(fn)
611
612        except EnvironmentError, e:
613            # Any difficulties with the file system wind up here
614            raise service_error(service_error.internal,
615                    "File error on  %s while creating SEER certs: %s" % \
616                            (e.filename, e.strerror))
617
618
619
620    def gentopo(self, str):
621        """
622        Generate the topology data structure from the splitter's XML
623        representation of it.
624
625        The topology XML looks like:
626            <experiment>
627                <nodes>
628                    <node><vname></vname><ips>ip1:ip2</ips></node>
629                </nodes>
630                <lans>
631                    <lan>
632                        <vname></vname><vnode></vnode><ip></ip>
633                        <bandwidth></bandwidth><member>node:port</member>
634                    </lan>
635                </lans>
636        """
637        class topo_parse:
638            """
639            Parse the topology XML and create the dats structure.
640            """
641            def __init__(self):
642                # Typing of the subelements for data conversion
643                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
644                self.int_subelements = ( 'bandwidth',)
645                self.float_subelements = ( 'delay',)
646                # The final data structure
647                self.nodes = [ ]
648                self.lans =  [ ]
649                self.topo = { \
650                        'node': self.nodes,\
651                        'lan' : self.lans,\
652                    }
653                self.element = { }  # Current element being created
654                self.chars = ""     # Last text seen
655
656            def end_element(self, name):
657                # After each sub element the contents is added to the current
658                # element or to the appropriate list.
659                if name == 'node':
660                    self.nodes.append(self.element)
661                    self.element = { }
662                elif name == 'lan':
663                    self.lans.append(self.element)
664                    self.element = { }
665                elif name in self.str_subelements:
666                    self.element[name] = self.chars
667                    self.chars = ""
668                elif name in self.int_subelements:
669                    self.element[name] = int(self.chars)
670                    self.chars = ""
671                elif name in self.float_subelements:
672                    self.element[name] = float(self.chars)
673                    self.chars = ""
674
675            def found_chars(self, data):
676                self.chars += data.rstrip()
677
678
679        tp = topo_parse();
680        parser = xml.parsers.expat.ParserCreate()
681        parser.EndElementHandler = tp.end_element
682        parser.CharacterDataHandler = tp.found_chars
683
684        parser.Parse(str)
685
686        return tp.topo
687       
688
689    def genviz(self, topo):
690        """
691        Generate the visualization the virtual topology
692        """
693
694        neato = "/usr/local/bin/neato"
695        # These are used to parse neato output and to create the visualization
696        # file.
697        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
698        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
699                "%s</type></node>"
700
701        try:
702            # Node names
703            nodes = [ n['vname'] for n in topo['node'] ]
704            topo_lans = topo['lan']
705        except KeyError, e:
706            raise service_error(service_error.internal, "Bad topology: %s" %e)
707
708        lans = { }
709        links = { }
710
711        # Walk through the virtual topology, organizing the connections into
712        # 2-node connections (links) and more-than-2-node connections (lans).
713        # When a lan is created, it's added to the list of nodes (there's a
714        # node in the visualization for the lan).
715        for l in topo_lans:
716            if links.has_key(l['vname']):
717                if len(links[l['vname']]) < 2:
718                    links[l['vname']].append(l['vnode'])
719                else:
720                    nodes.append(l['vname'])
721                    lans[l['vname']] = links[l['vname']]
722                    del links[l['vname']]
723                    lans[l['vname']].append(l['vnode'])
724            elif lans.has_key(l['vname']):
725                lans[l['vname']].append(l['vnode'])
726            else:
727                links[l['vname']] = [ l['vnode'] ]
728
729
730        # Open up a temporary file for dot to turn into a visualization
731        try:
732            df, dotname = tempfile.mkstemp()
733            dotfile = os.fdopen(df, 'w')
734        except EnvironmentError:
735            raise service_error(service_error.internal,
736                    "Failed to open file in genviz")
737
738        try:
739            dnull = open('/dev/null', 'w')
740        except EnvironmentError:
741            service_error(service_error.internal,
742                    "Failed to open /dev/null in genviz")
743
744        # Generate a dot/neato input file from the links, nodes and lans
745        try:
746            print >>dotfile, "graph G {"
747            for n in nodes:
748                print >>dotfile, '\t"%s"' % n
749            for l in links.keys():
750                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
751            for l in lans.keys():
752                for n in lans[l]:
753                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
754            print >>dotfile, "}"
755            dotfile.close()
756        except TypeError:
757            raise service_error(service_error.internal,
758                    "Single endpoint link in vtopo")
759        except EnvironmentError:
760            raise service_error(service_error.internal, "Cannot write dot file")
761
762        # Use dot to create a visualization
763        try:
764            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
765                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
766                stderr=dnull, close_fds=True)
767        except EnvironmentError:
768            raise service_error(service_error.internal, 
769                    "Cannot generate visualization: is graphviz available?")
770        dnull.close()
771
772        # Translate dot to vis format
773        vis_nodes = [ ]
774        vis = { 'node': vis_nodes }
775        for line in dot.stdout:
776            m = vis_re.match(line)
777            if m:
778                vn = m.group(1)
779                vis_node = {'name': vn, \
780                        'x': float(m.group(2)),\
781                        'y' : float(m.group(3)),\
782                    }
783                if vn in links.keys() or vn in lans.keys():
784                    vis_node['type'] = 'lan'
785                else:
786                    vis_node['type'] = 'node'
787                vis_nodes.append(vis_node)
788        rv = dot.wait()
789
790        os.remove(dotname)
791        # XXX: graphviz seems to use low return codes for warnings, like
792        # "couldn't find font"
793        if rv < 2 : return vis
794        else: return None
795
796
797    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
798            cert_pwd=None):
799        """
800        Release access to testbed through fedd
801        """
802
803        if not uri and tbmap:
804            uri = tbmap.get(tb, None)
805        if not uri:
806            raise service_error(service_error.server_config, 
807                    "Unknown testbed: %s" % tb)
808
809        if self.local_access.has_key(uri):
810            resp = self.local_access[uri].ReleaseAccess(\
811                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
812                    fedid(file=cert_file))
813            resp = { 'ReleaseAccessResponseBody': resp } 
814        else:
815            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
816                    cert_file, cert_pwd, self.trusted_certs)
817
818        # better error coding
819
820    def remote_ns2topdl(self, uri, desc):
821
822        req = {
823                'description' : { 'ns2description': desc },
824            }
825
826        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
827                self.trusted_certs)
828
829        if r.has_key('Ns2TopdlResponseBody'):
830            r = r['Ns2TopdlResponseBody']
831            ed = r.get('experimentdescription', None)
832            if ed.has_key('topdldescription'):
833                return topdl.Topology(**ed['topdldescription'])
834            else:
835                raise service_error(service_error.protocol, 
836                        "Bad splitter response (no output)")
837        else:
838            raise service_error(service_error.protocol, "Bad splitter response")
839
840    class start_segment:
841        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
842                cert_pwd=None, trusted_certs=None, caller=None,
843                log_collector=None):
844            self.log = log
845            self.debug = debug
846            self.cert_file = cert_file
847            self.cert_pwd = cert_pwd
848            self.trusted_certs = None
849            self.caller = caller
850            self.testbed = testbed
851            self.log_collector = log_collector
852            self.response = None
853            self.node = { }
854            self.proof = None
855
856        def make_map(self, resp):
857            for e in resp.get('embedding', []):
858                if 'toponame' in e and 'physname' in e:
859                    self.node[e['toponame']] = e['physname'][0]
860
861        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
862            req = {
863                    'allocID': { 'fedid' : aid }, 
864                    'segmentdescription': { 
865                        'topdldescription': topo.to_dict(),
866                    },
867                }
868
869            if connInfo:
870                req['connection'] = connInfo
871
872            import_svcs = [ s for m in masters.values() \
873                    for s in m if self.testbed in s.importers]
874
875            if import_svcs or self.testbed in masters:
876                req['service'] = []
877
878            for s in import_svcs:
879                for r in s.reqs:
880                    sr = copy.deepcopy(r)
881                    sr['visibility'] = 'import';
882                    req['service'].append(sr)
883
884            for s in masters.get(self.testbed, []):
885                for r in s.reqs:
886                    sr = copy.deepcopy(r)
887                    sr['visibility'] = 'export';
888                    req['service'].append(sr)
889
890            if attrs:
891                req['fedAttr'] = attrs
892
893            try:
894                self.log.debug("Calling StartSegment at %s " % uri)
895                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
896                        self.trusted_certs)
897                if r.has_key('StartSegmentResponseBody'):
898                    lval = r['StartSegmentResponseBody'].get('allocationLog',
899                            None)
900                    if lval and self.log_collector:
901                        for line in  lval.splitlines(True):
902                            self.log_collector.write(line)
903                    self.make_map(r['StartSegmentResponseBody'])
904                    if 'proof' in r: self.proof = r['proof']
905                    self.response = r
906                else:
907                    raise service_error(service_error.internal, 
908                            "Bad response!?: %s" %r)
909                return True
910            except service_error, e:
911                self.log.error("Start segment failed on %s: %s" % \
912                        (self.testbed, e))
913                return False
914
915
916
917    class terminate_segment:
918        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
919                cert_pwd=None, trusted_certs=None, caller=None):
920            self.log = log
921            self.debug = debug
922            self.cert_file = cert_file
923            self.cert_pwd = cert_pwd
924            self.trusted_certs = None
925            self.caller = caller
926            self.testbed = testbed
927
928        def __call__(self, uri, aid ):
929            req = {
930                    'allocID': aid , 
931                }
932            try:
933                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
934                        self.trusted_certs)
935                return True
936            except service_error, e:
937                self.log.error("Terminate segment failed on %s: %s" % \
938                        (self.testbed, e))
939                return False
940
941    def allocate_resources(self, allocated, masters, eid, expid, 
942            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
943            attrs=None, connInfo={}, tbmap=None, expcert=None):
944
945        started = { }           # Testbeds where a sub-experiment started
946                                # successfully
947
948        # XXX
949        fail_soft = False
950
951        if tbmap is None: tbmap = { }
952
953        log = alloc_log or self.log
954
955        tp = thread_pool(self.nthreads)
956        threads = [ ]
957        starters = [ ]
958
959        if expcert:
960            cert = expcert
961            pw = None
962        else:
963            cert = self.cert_file
964            pw = self.cert_pwd
965
966        for tb in allocated.keys():
967            # Create and start a thread to start the segment, and save it
968            # to get the return value later
969            tb_attrs = copy.copy(attrs)
970            tp.wait_for_slot()
971            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
972            base, suffix = split_testbed(tb)
973            if suffix:
974                tb_attrs.append({'attribute': 'experiment_name', 
975                    'value': "%s-%s" % (eid, suffix)})
976            else:
977                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
978            if not uri:
979                raise service_error(service_error.internal, 
980                        "Unknown testbed %s !?" % tb)
981
982            if tbparams[tb].has_key('allocID') and \
983                    tbparams[tb]['allocID'].has_key('fedid'):
984                aid = tbparams[tb]['allocID']['fedid']
985            else:
986                raise service_error(service_error.internal, 
987                        "No alloc id for testbed %s !?" % tb)
988
989            s = self.start_segment(log=log, debug=self.debug,
990                    testbed=tb, cert_file=cert,
991                    cert_pwd=pw, trusted_certs=self.trusted_certs,
992                    caller=self.call_StartSegment,
993                    log_collector=log_collector)
994            starters.append(s)
995            t  = pooled_thread(\
996                    target=s, name=tb,
997                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
998                    pdata=tp, trace_file=self.trace_file)
999            threads.append(t)
1000            t.start()
1001
1002        # Wait until all finish (keep pinging the log, though)
1003        mins = 0
1004        revoked = False
1005        while not tp.wait_for_all_done(60.0):
1006            mins += 1
1007            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1008                    % mins)
1009            if not revoked and \
1010                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1011                # a testbed has failed.  Revoke this experiment's
1012                # synchronizarion values so that sub experiments will not
1013                # deadlock waiting for synchronization that will never happen
1014                self.log.info("A subexperiment has failed to swap in, " + \
1015                        "revoking synch keys")
1016                var_key = "fedid:%s" % expid
1017                for k in self.synch_store.all_keys():
1018                    if len(k) > 45 and k[0:46] == var_key:
1019                        self.synch_store.revoke_key(k)
1020                revoked = True
1021
1022        failed = [ t.getName() for t in threads if not t.rv ]
1023        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1024
1025        # If one failed clean up, unless fail_soft is set
1026        if failed:
1027            if not fail_soft:
1028                tp.clear()
1029                for tb in succeeded:
1030                    # Create and start a thread to stop the segment
1031                    tp.wait_for_slot()
1032                    uri = tbparams[tb]['uri']
1033                    t  = pooled_thread(\
1034                            target=self.terminate_segment(log=log,
1035                                testbed=tb,
1036                                cert_file=cert, 
1037                                cert_pwd=pw,
1038                                trusted_certs=self.trusted_certs,
1039                                caller=self.call_TerminateSegment),
1040                            args=(uri, tbparams[tb]['federant']['allocID']),
1041                            name=tb,
1042                            pdata=tp, trace_file=self.trace_file)
1043                    t.start()
1044                # Wait until all finish (if any are being stopped)
1045                if succeeded:
1046                    tp.wait_for_all_done()
1047
1048                # release the allocations
1049                for tb in tbparams.keys():
1050                    try:
1051                        self.release_access(tb, tbparams[tb]['allocID'], 
1052                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
1053                                cert_file=cert, cert_pwd=pw)
1054                    except service_error, e:
1055                        self.log.warn("Error releasing access: %s" % e.desc)
1056                # Remove the placeholder
1057                self.state_lock.acquire()
1058                self.state[eid]['experimentStatus'] = 'failed'
1059                if self.state_filename: self.write_state()
1060                self.state_lock.release()
1061                # Remove the repo dir
1062                self.remove_dirs("%s/%s" %(self.repodir, expid))
1063                # Walk up tmpdir, deleting as we go
1064                if self.cleanup:
1065                    self.remove_dirs(tmpdir)
1066                else:
1067                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1068
1069
1070                log.error("Swap in failed on %s" % ",".join(failed))
1071                return
1072        else:
1073            # Walk through the successes and gather the virtual to physical
1074            # mapping.
1075            embedding = [ ]
1076            proofs = { }
1077            for s in starters:
1078                for k, v in s.node.items():
1079                    embedding.append({
1080                        'toponame': k, 
1081                        'physname': [ v],
1082                        'testbed': s.testbed
1083                        })
1084                if s.proof:
1085                    proofs[s.testbed] = s.proof
1086            log.info("[start_segment]: Experiment %s active" % eid)
1087
1088
1089        # Walk up tmpdir, deleting as we go
1090        if self.cleanup:
1091            self.remove_dirs(tmpdir)
1092        else:
1093            log.debug("[start_experiment]: not removing %s" % tmpdir)
1094
1095        # Insert the experiment into our state and update the disk copy.
1096        self.state_lock.acquire()
1097        self.state[expid]['experimentStatus'] = 'active'
1098        self.state[eid] = self.state[expid]
1099        self.state[eid]['experimentdescription']['topdldescription'] = \
1100                top.to_dict()
1101        self.state[eid]['embedding'] = embedding
1102        # Append startup proofs
1103        for f in self.state[eid]['federant']:
1104            if 'name' in f and 'localname' in f['name']:
1105                if f['name']['localname'] in proofs:
1106                    f['proof'].append(proofs[f['name']['localname']])
1107
1108        if self.state_filename: self.write_state()
1109        self.state_lock.release()
1110        return
1111
1112
1113    def add_kit(self, e, kit):
1114        """
1115        Add a Software object created from the list of (install, location)
1116        tuples passed as kit  to the software attribute of an object e.  We
1117        do this enough to break out the code, but it's kind of a hack to
1118        avoid changing the old tuple rep.
1119        """
1120
1121        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1122
1123        if isinstance(e.software, list): e.software.extend(s)
1124        else: e.software = s
1125
1126    def append_experiment_authorization(self, expid, attrs, 
1127            need_state_lock=True):
1128        """
1129        Append the authorization information to system state
1130        """
1131
1132        for p, a in attrs:
1133            self.auth.set_attribute(p, a)
1134        self.auth.save()
1135
1136        if need_state_lock: self.state_lock.acquire()
1137        self.state[expid]['auth'].update(attrs)
1138        if self.state_filename: self.write_state()
1139        if need_state_lock: self.state_lock.release()
1140
1141    def clear_experiment_authorization(self, expid, need_state_lock=True):
1142        """
1143        Attrs is a set of attribute principal pairs that need to be removed
1144        from the authenticator.  Remove them and save the authenticator.
1145        """
1146
1147        if need_state_lock: self.state_lock.acquire()
1148        if expid in self.state and 'auth' in self.state[expid]:
1149            for p, a in self.state[expid]['auth']:
1150                self.auth.unset_attribute(p, a)
1151            self.state[expid]['auth'] = set()
1152        if self.state_filename: self.write_state()
1153        if need_state_lock: self.state_lock.release()
1154        self.auth.save()
1155
1156
1157    def create_experiment_state(self, fid, req, expid, expcert,
1158            state='starting'):
1159        """
1160        Create the initial entry in the experiment's state.  The expid and
1161        expcert are the experiment's fedid and certifacte that represents that
1162        ID, which are installed in the experiment state.  If the request
1163        includes a suggested local name that is used if possible.  If the local
1164        name is already taken by an experiment owned by this user that has
1165        failed, it is overwritten.  Otherwise new letters are added until a
1166        valid localname is found.  The generated local name is returned.
1167        """
1168
1169        if req.has_key('experimentID') and \
1170                req['experimentID'].has_key('localname'):
1171            overwrite = False
1172            eid = req['experimentID']['localname']
1173            # If there's an old failed experiment here with the same local name
1174            # and accessible by this user, we'll overwrite it, otherwise we'll
1175            # fall through and do the collision avoidance.
1176            old_expid = self.get_experiment_fedid(eid)
1177            if old_expid:
1178                users_experiment = True
1179                try:
1180                    self.check_experiment_access(fid, old_expid)
1181                except service_error, e:
1182                    if e.code == service_error.access: users_experiment = False
1183                    else: raise e
1184                if users_experiment:
1185                    self.state_lock.acquire()
1186                    status = self.state[eid].get('experimentStatus', None)
1187                    if status and status == 'failed':
1188                        # remove the old access attributes
1189                        self.clear_experiment_authorization(eid,
1190                                need_state_lock=False)
1191                        overwrite = True
1192                        del self.state[eid]
1193                        del self.state[old_expid]
1194                    self.state_lock.release()
1195                else:
1196                    self.log.info('Experiment %s exists, ' % eid + \
1197                            'but this user cannot access it')
1198            self.state_lock.acquire()
1199            while (self.state.has_key(eid) and not overwrite):
1200                eid += random.choice(string.ascii_letters)
1201            # Initial state
1202            self.state[eid] = {
1203                    'experimentID' : \
1204                            [ { 'localname' : eid }, {'fedid': expid } ],
1205                    'experimentStatus': state,
1206                    'experimentAccess': { 'X509' : expcert },
1207                    'owner': fid,
1208                    'log' : [],
1209                    'auth': set(),
1210                }
1211            self.state[expid] = self.state[eid]
1212            if self.state_filename: self.write_state()
1213            self.state_lock.release()
1214        else:
1215            eid = self.exp_stem
1216            for i in range(0,5):
1217                eid += random.choice(string.ascii_letters)
1218            self.state_lock.acquire()
1219            while (self.state.has_key(eid)):
1220                eid = self.exp_stem
1221                for i in range(0,5):
1222                    eid += random.choice(string.ascii_letters)
1223            # Initial state
1224            self.state[eid] = {
1225                    'experimentID' : \
1226                            [ { 'localname' : eid }, {'fedid': expid } ],
1227                    'experimentStatus': state,
1228                    'experimentAccess': { 'X509' : expcert },
1229                    'owner': fid,
1230                    'log' : [],
1231                    'auth': set(),
1232                }
1233            self.state[expid] = self.state[eid]
1234            if self.state_filename: self.write_state()
1235            self.state_lock.release()
1236
1237        # Let users touch the state.  Authorize this fid and the expid itself
1238        # to touch the experiment, as well as allowing th eoverrides.
1239        self.append_experiment_authorization(eid, 
1240                set([(fid, expid), (expid,expid)] + \
1241                        [ (o, expid) for o in self.overrides]))
1242
1243        return eid
1244
1245
1246    def allocate_ips_to_topo(self, top):
1247        """
1248        Add an ip4_address attribute to all the hosts in the topology, based on
1249        the shared substrates on which they sit.  An /etc/hosts file is also
1250        created and returned as a list of hostfiles entries.  We also return
1251        the allocator, because we may need to allocate IPs to portals
1252        (specifically DRAGON portals).
1253        """
1254        subs = sorted(top.substrates, 
1255                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1256                reverse=True)
1257        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1258        ifs = { }
1259        hosts = [ ]
1260
1261        for idx, s in enumerate(subs):
1262            net_size = len(s.interfaces)+2
1263
1264            a = ips.allocate(net_size)
1265            if a :
1266                base, num = a
1267                if num < net_size: 
1268                    raise service_error(service_error.internal,
1269                            "Allocator returned wrong number of IPs??")
1270            else:
1271                raise service_error(service_error.req, 
1272                        "Cannot allocate IP addresses")
1273            mask = ips.min_alloc
1274            while mask < net_size:
1275                mask *= 2
1276
1277            netmask = ((2**32-1) ^ (mask-1))
1278
1279            base += 1
1280            for i in s.interfaces:
1281                i.attribute.append(
1282                        topdl.Attribute('ip4_address', 
1283                            "%s" % ip_addr(base)))
1284                i.attribute.append(
1285                        topdl.Attribute('ip4_netmask', 
1286                            "%s" % ip_addr(int(netmask))))
1287
1288                hname = i.element.name
1289                if ifs.has_key(hname):
1290                    hosts.append("%s\t%s-%s %s-%d" % \
1291                            (ip_addr(base), hname, s.name, hname,
1292                                ifs[hname]))
1293                else:
1294                    ifs[hname] = 0
1295                    hosts.append("%s\t%s-%s %s-%d %s" % \
1296                            (ip_addr(base), hname, s.name, hname,
1297                                ifs[hname], hname))
1298
1299                ifs[hname] += 1
1300                base += 1
1301        return hosts, ips
1302
1303    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1304            tbparam, masters, tbmap, expid=None, expcert=None):
1305        for tb in testbeds:
1306            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1307                    expcert)
1308            allocated[tb] = 1
1309
1310    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1311            expcert=None):
1312        """
1313        Get access to testbed through fedd and set the parameters for that tb
1314        """
1315        def get_export_project(svcs):
1316            """
1317            Look through for the list of federated_service for this testbed
1318            objects for a project_export service, and extract the project
1319            parameter.
1320            """
1321
1322            pe = [s for s in svcs if s.name=='project_export']
1323            if len(pe) == 1:
1324                return pe[0].params.get('project', None)
1325            elif len(pe) == 0:
1326                return None
1327            else:
1328                raise service_error(service_error.req,
1329                        "More than one project export is not supported")
1330
1331        def add_services(svcs, type, slist, keys):
1332            """
1333            Add the given services to slist.  type is import or export.  Also
1334            add a mapping entry from the assigned id to the original service
1335            record.
1336            """
1337            for i, s in enumerate(svcs):
1338                idx = '%s%d' % (type, i)
1339                keys[idx] = s
1340                sr = {'id': idx, 'name': s.name, 'visibility': type }
1341                if s.params:
1342                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1343                            for k, v in s.params.items()]
1344                slist.append(sr)
1345
1346        uri = tbmap.get(testbed_base(tb), None)
1347        if not uri:
1348            raise service_error(service_error.server_config, 
1349                    "Unknown testbed: %s" % tb)
1350
1351        export_svcs = masters.get(tb,[])
1352        import_svcs = [ s for m in masters.values() \
1353                for s in m \
1354                    if tb in s.importers ]
1355
1356        export_project = get_export_project(export_svcs)
1357        # Compose the credential list so that IDs come before attributes
1358        creds = set()
1359        keys = set()
1360        certs = self.auth.get_creds_for_principal(fid)
1361        if expid:
1362            certs.update(self.auth.get_creds_for_principal(expid))
1363        for c in certs:
1364            keys.add(c.issuer_cert())
1365            creds.add(c.attribute_cert())
1366        creds = list(keys) + list(creds)
1367
1368        if expcert: cert, pw = expcert, None
1369        else: cert, pw = self.cert_file, self.cert_pw
1370
1371        # Request credentials
1372        req = {
1373                'abac_credential': creds,
1374            }
1375        # Make the service request from the services we're importing and
1376        # exporting.  Keep track of the export request ids so we can
1377        # collect the resulting info from the access response.
1378        e_keys = { }
1379        if import_svcs or export_svcs:
1380            slist = []
1381            add_services(import_svcs, 'import', slist, e_keys)
1382            add_services(export_svcs, 'export', slist, e_keys)
1383            req['service'] = slist
1384
1385        if self.local_access.has_key(uri):
1386            # Local access call
1387            req = { 'RequestAccessRequestBody' : req }
1388            r = self.local_access[uri].RequestAccess(req, 
1389                    fedid(file=self.cert_file))
1390            r = { 'RequestAccessResponseBody' : r }
1391        else:
1392            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1393
1394        if r.has_key('RequestAccessResponseBody'):
1395            # Through to here we have a valid response, not a fault.
1396            # Access denied is a fault, so something better or worse than
1397            # access denied has happened.
1398            r = r['RequestAccessResponseBody']
1399            self.log.debug("[get_access] Access granted")
1400        else:
1401            raise service_error(service_error.protocol,
1402                        "Bad proxy response")
1403        if 'proof' not in r:
1404            raise service_error(service_error.protocol,
1405                        "Bad access response (no access proof)")
1406       
1407        tbparam[tb] = { 
1408                "allocID" : r['allocID'],
1409                "uri": uri,
1410                "proof": [ r['proof'] ],
1411                }
1412
1413        # Collect the responses corresponding to the services this testbed
1414        # exports.  These will be the service requests that we will include in
1415        # the start segment requests (with appropriate visibility values) to
1416        # import and export the segments.
1417        for s in r.get('service', []):
1418            id = s.get('id', None)
1419            if id and id in e_keys:
1420                e_keys[id].reqs.append(s)
1421
1422        # Add attributes to parameter space.  We don't allow attributes to
1423        # overlay any parameters already installed.
1424        for a in r.get('fedAttr', []):
1425            try:
1426                if a['attribute'] and \
1427                        isinstance(a['attribute'], basestring)\
1428                        and not tbparam[tb].has_key(a['attribute'].lower()):
1429                    tbparam[tb][a['attribute'].lower()] = a['value']
1430            except KeyError:
1431                self.log.error("Bad attribute in response: %s" % a)
1432
1433
1434    def split_topology(self, top, topo, testbeds):
1435        """
1436        Create the sub-topologies that are needed for experiment instantiation.
1437        """
1438        for tb in testbeds:
1439            topo[tb] = top.clone()
1440            # copy in for loop allows deletions from the original
1441            for e in [ e for e in topo[tb].elements]:
1442                etb = e.get_attribute('testbed')
1443                # NB: elements without a testbed attribute won't appear in any
1444                # sub topologies. 
1445                if not etb or etb != tb:
1446                    for i in e.interface:
1447                        for s in i.subs:
1448                            try:
1449                                s.interfaces.remove(i)
1450                            except ValueError:
1451                                raise service_error(service_error.internal,
1452                                        "Can't remove interface??")
1453                    topo[tb].elements.remove(e)
1454            topo[tb].make_indices()
1455
1456    def confirm_software(self, top):
1457        """
1458        Make sure that the software to be loaded in the topo is all available
1459        before we begin making access requests, etc.  This is a subset of
1460        wrangle_software.
1461        """
1462        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1463        pkgs.update([x.location for e in top.elements for x in e.software])
1464
1465        for pkg in pkgs:
1466            loc = pkg
1467
1468            scheme, host, path = urlparse(loc)[0:3]
1469            dest = os.path.basename(path)
1470            if not scheme:
1471                if not loc.startswith('/'):
1472                    loc = "/%s" % loc
1473                loc = "file://%s" %loc
1474            # NB: if scheme was found, loc == pkg
1475            try:
1476                u = urlopen(loc)
1477                u.close()
1478            except Exception, e:
1479                raise service_error(service_error.req, 
1480                        "Cannot open %s: %s" % (loc, e))
1481        return True
1482
1483    def wrangle_software(self, expid, top, topo, tbparams):
1484        """
1485        Copy software out to the repository directory, allocate permissions and
1486        rewrite the segment topologies to look for the software in local
1487        places.
1488        """
1489
1490        # Copy the rpms and tarfiles to a distribution directory from
1491        # which the federants can retrieve them
1492        linkpath = "%s/software" %  expid
1493        softdir ="%s/%s" % ( self.repodir, linkpath)
1494        softmap = { }
1495
1496        # self.fedkit and self.gateway kit are lists of tuples of
1497        # (install_location, download_location) this extracts the download
1498        # locations.
1499        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1500        pkgs.update([x.location for e in top.elements for x in e.software])
1501        try:
1502            os.makedirs(softdir)
1503        except EnvironmentError, e:
1504            raise service_error(
1505                    "Cannot create software directory: %s" % e)
1506        # The actual copying.  Everything's converted into a url for copying.
1507        auth_attrs = set()
1508        for pkg in pkgs:
1509            loc = pkg
1510
1511            scheme, host, path = urlparse(loc)[0:3]
1512            dest = os.path.basename(path)
1513            if not scheme:
1514                if not loc.startswith('/'):
1515                    loc = "/%s" % loc
1516                loc = "file://%s" %loc
1517            # NB: if scheme was found, loc == pkg
1518            try:
1519                u = urlopen(loc)
1520            except Exception, e:
1521                raise service_error(service_error.req, 
1522                        "Cannot open %s: %s" % (loc, e))
1523            try:
1524                f = open("%s/%s" % (softdir, dest) , "w")
1525                self.log.debug("Writing %s/%s" % (softdir,dest) )
1526                data = u.read(4096)
1527                while data:
1528                    f.write(data)
1529                    data = u.read(4096)
1530                f.close()
1531                u.close()
1532            except Exception, e:
1533                raise service_error(service_error.internal,
1534                        "Could not copy %s: %s" % (loc, e))
1535            path = re.sub("/tmp", "", linkpath)
1536            # XXX
1537            softmap[pkg] = \
1538                    "%s/%s/%s" %\
1539                    ( self.repo_url, path, dest)
1540
1541            # Allow the individual segments to access the software by assigning
1542            # an attribute to each testbed allocation that encodes the data to
1543            # be released.  This expression collects the data for each run of
1544            # the loop.
1545            auth_attrs.update([
1546                (tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) \
1547                        for tb in tbparams.keys()])
1548
1549        self.append_experiment_authorization(expid, auth_attrs)
1550
1551        # Convert the software locations in the segments into the local
1552        # copies on this host
1553        for soft in [ s for tb in topo.values() \
1554                for e in tb.elements \
1555                    if getattr(e, 'software', False) \
1556                        for s in e.software ]:
1557            if softmap.has_key(soft.location):
1558                soft.location = softmap[soft.location]
1559
1560
1561    def new_experiment(self, req, fid):
1562        """
1563        The external interface to empty initial experiment creation called from
1564        the dispatcher.
1565
1566        Creates a working directory, splits the incoming description using the
1567        splitter script and parses out the avrious subsections using the
1568        lcasses above.  Once each sub-experiment is created, use pooled threads
1569        to instantiate them and start it all up.
1570        """
1571        req = req.get('NewRequestBody', None)
1572        if not req:
1573            raise service_error(service_error.req,
1574                    "Bad request format (no NewRequestBody)")
1575
1576        if self.auth.import_credentials(data_list=req.get('credential', [])):
1577            self.auth.save()
1578
1579        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1580                with_proof=True)
1581
1582        if not access_ok:
1583            raise service_error(service_error.access, "New access denied",
1584                    proof=[proof])
1585
1586        try:
1587            tmpdir = tempfile.mkdtemp(prefix="split-")
1588        except EnvironmentError:
1589            raise service_error(service_error.internal, "Cannot create tmp dir")
1590
1591        try:
1592            access_user = self.accessdb[fid]
1593        except KeyError:
1594            raise service_error(service_error.internal,
1595                    "Access map and authorizer out of sync in " + \
1596                            "new_experiment for fedid %s"  % fid)
1597
1598        # Generate an ID for the experiment (slice) and a certificate that the
1599        # allocator can use to prove they own it.  We'll ship it back through
1600        # the encrypted connection.  If the requester supplied one, use it.
1601        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1602            expcert = req['experimentAccess']['X509']
1603            expid = fedid(certstr=expcert)
1604            self.state_lock.acquire()
1605            if expid in self.state:
1606                self.state_lock.release()
1607                raise service_error(service_error.req, 
1608                        'fedid %s identifies an existing experiment' % expid)
1609            self.state_lock.release()
1610        else:
1611            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1612
1613        #now we're done with the tmpdir, and it should be empty
1614        if self.cleanup:
1615            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1616            os.rmdir(tmpdir)
1617        else:
1618            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1619
1620        eid = self.create_experiment_state(fid, req, expid, expcert, 
1621                state='empty')
1622
1623        rv = {
1624                'experimentID': [
1625                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1626                ],
1627                'experimentStatus': 'empty',
1628                'experimentAccess': { 'X509' : expcert },
1629                'proof': proof.to_dict(),
1630            }
1631
1632        return rv
1633
1634    # create_experiment sub-functions
1635
1636    @staticmethod
1637    def get_experiment_key(req, field='experimentID'):
1638        """
1639        Parse the experiment identifiers out of the request (the request body
1640        tag has been removed).  Specifically this pulls either the fedid or the
1641        localname out of the experimentID field.  A fedid is preferred.  If
1642        neither is present or the request does not contain the fields,
1643        service_errors are raised.
1644        """
1645        # Get the experiment access
1646        exp = req.get(field, None)
1647        if exp:
1648            if exp.has_key('fedid'):
1649                key = exp['fedid']
1650            elif exp.has_key('localname'):
1651                key = exp['localname']
1652            else:
1653                raise service_error(service_error.req, "Unknown lookup type")
1654        else:
1655            raise service_error(service_error.req, "No request?")
1656
1657        return key
1658
1659    def get_experiment_ids_and_start(self, key, tmpdir):
1660        """
1661        Get the experiment name, id and access certificate from the state, and
1662        set the experiment state to 'starting'.  returns a triple (fedid,
1663        localname, access_cert_file). The access_cert_file is a copy of the
1664        contents of the access certificate, created in the tempdir with
1665        restricted permissions.  If things are confused, raise an exception.
1666        """
1667
1668        expid = eid = None
1669        self.state_lock.acquire()
1670        if self.state.has_key(key):
1671            self.state[key]['experimentStatus'] = "starting"
1672            for e in self.state[key].get('experimentID',[]):
1673                if not expid and e.has_key('fedid'):
1674                    expid = e['fedid']
1675                elif not eid and e.has_key('localname'):
1676                    eid = e['localname']
1677            if 'experimentAccess' in self.state[key] and \
1678                    'X509' in self.state[key]['experimentAccess']:
1679                expcert = self.state[key]['experimentAccess']['X509']
1680            else:
1681                expcert = None
1682        self.state_lock.release()
1683
1684        # make a protected copy of the access certificate so the experiment
1685        # controller can act as the experiment principal.
1686        if expcert:
1687            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1688            if not expcert_file:
1689                raise service_error(service_error.internal, 
1690                        "Cannot create temp cert file?")
1691        else:
1692            expcert_file = None
1693
1694        return (eid, expid, expcert_file)
1695
1696    def get_topology(self, req, tmpdir):
1697        """
1698        Get the ns2 content and put it into a file for parsing.  Call the local
1699        or remote parser and return the topdl.Topology.  Errors result in
1700        exceptions.  req is the request and tmpdir is a work directory.
1701        """
1702
1703        # The tcl parser needs to read a file so put the content into that file
1704        descr=req.get('experimentdescription', None)
1705        if descr:
1706            if 'ns2description' in descr:
1707                file_content=descr['ns2description']
1708            elif 'topdldescription' in descr:
1709                return topdl.Topology(**descr['topdldescription'])
1710            else:
1711                raise service_error(service_error.req, 
1712                        'Unknown experiment description type')
1713        else:
1714            raise service_error(service_error.req, "No experiment description")
1715
1716
1717        if self.splitter_url:
1718            self.log.debug("Calling remote topdl translator at %s" % \
1719                    self.splitter_url)
1720            top = self.remote_ns2topdl(self.splitter_url, file_content)
1721        else:
1722            tclfile = os.path.join(tmpdir, "experiment.tcl")
1723            if file_content:
1724                try:
1725                    f = open(tclfile, 'w')
1726                    f.write(file_content)
1727                    f.close()
1728                except EnvironmentError:
1729                    raise service_error(service_error.internal,
1730                            "Cannot write temp experiment description")
1731            else:
1732                raise service_error(service_error.req, 
1733                        "Only ns2descriptions supported")
1734            pid = "dummy"
1735            gid = "dummy"
1736            eid = "dummy"
1737
1738            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1739                str(self.muxmax), '-m', 'dummy']
1740
1741            tclcmd.extend([pid, gid, eid, tclfile])
1742
1743            self.log.debug("running local splitter %s", " ".join(tclcmd))
1744            # This is just fantastic.  As a side effect the parser copies
1745            # tb_compat.tcl into the current directory, so that directory
1746            # must be writable by the fedd user.  Doing this in the
1747            # temporary subdir ensures this is the case.
1748            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1749                    cwd=tmpdir)
1750            split_data = tclparser.stdout
1751
1752            top = topdl.topology_from_xml(file=split_data, top="experiment")
1753            os.remove(tclfile)
1754
1755        return top
1756
1757    def get_testbed_services(self, req, testbeds):
1758        """
1759        Parse the services section of the request into into two dicts mapping
1760        testbed to lists of federated_service objects.  The first lists all
1761        exporters of services, and the second all exporters of services that
1762        need control portals in the experiment.
1763        """
1764        masters = { }
1765        pmasters = { }
1766        for s in req.get('service', []):
1767            # If this is a service request with the importall field
1768            # set, fill it out.
1769
1770            if s.get('importall', False):
1771                s['import'] = [ tb for tb in testbeds \
1772                        if tb not in s.get('export',[])]
1773                del s['importall']
1774
1775            # Add the service to masters
1776            for tb in s.get('export', []):
1777                if s.get('name', None):
1778
1779                    params = { }
1780                    for a in s.get('fedAttr', []):
1781                        params[a.get('attribute', '')] = a.get('value','')
1782
1783                    fser = federated_service(name=s['name'],
1784                            exporter=tb, importers=s.get('import',[]),
1785                            params=params)
1786                    if fser.name == 'hide_hosts' \
1787                            and 'hosts' not in fser.params:
1788                        fser.params['hosts'] = \
1789                                ",".join(tb_hosts.get(fser.exporter, []))
1790                    if tb in masters: masters[tb].append(fser)
1791                    else: masters[tb] = [fser]
1792
1793                    if fser.portal:
1794                        if tb not in pmasters: pmasters[tb] = [ fser ]
1795                        else: pmasters[tb].append(fser)
1796                else:
1797                    self.log.error('Testbed service does not have name " + \
1798                            "and importers')
1799        return masters, pmasters
1800
1801    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1802        """
1803        Create the ssh keys necessary for interconnecting the portal nodes and
1804        the global hosts file for letting each segment know about the IP
1805        addresses in play.  Save these into the repo.  Add attributes to the
1806        autorizer allowing access controllers to download them and return a set
1807        of attributes that inform the segments where to find this stuff.  May
1808        raise service_errors in if there are problems.
1809        """
1810        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1811        gw_secretkey_base = "fed.%s" % self.ssh_type
1812        keydir = os.path.join(tmpdir, 'keys')
1813        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1814        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1815
1816        try:
1817            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1818        except ValueError:
1819            raise service_error(service_error.server_config, 
1820                    "Bad key type (%s)" % self.ssh_type)
1821
1822        self.generate_seer_certs(keydir)
1823
1824        # Copy configuration files into the remote file store
1825        # The config urlpath
1826        configpath = "/%s/config" % expid
1827        # The config file system location
1828        configdir ="%s%s" % ( self.repodir, configpath)
1829        try:
1830            os.makedirs(configdir)
1831        except EnvironmentError, e:
1832            raise service_error(service_error.internal,
1833                    "Cannot create config directory: %s" % e)
1834        try:
1835            f = open("%s/hosts" % configdir, "w")
1836            print >> f, string.join(hosts, '\n')
1837            f.close()
1838        except EnvironmentError, e:
1839            raise service_error(service_error.internal, 
1840                    "Cannot write hosts file: %s" % e)
1841        try:
1842            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1843            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1844            copy_file(os.path.join(keydir, 'ca.pem'), 
1845                    os.path.join(configdir, 'ca.pem'))
1846            copy_file(os.path.join(keydir, 'node.pem'), 
1847                    os.path.join(configdir, 'node.pem'))
1848        except EnvironmentError, e:
1849            raise service_error(service_error.internal, 
1850                    "Cannot copy keyfiles: %s" % e)
1851
1852        # Allow the individual testbeds to access the configuration files,
1853        # again by setting an attribute for the relevant pathnames on each
1854        # allocation principal.  Yeah, that's a long list comprehension.
1855        self.append_experiment_authorization(expid, set([
1856            (tbparams[tb]['allocID']['fedid'], "%s/%s" % (configpath, f)) \
1857                    for tb in tbparams.keys() \
1858                        for f in ("hosts", 'ca.pem', 'node.pem', 
1859                            gw_secretkey_base, gw_pubkey_base)]))
1860
1861        attrs = [ 
1862                {
1863                    'attribute': 'ssh_pubkey', 
1864                    'value': '%s/%s/config/%s' % \
1865                            (self.repo_url, expid, gw_pubkey_base)
1866                },
1867                {
1868                    'attribute': 'ssh_secretkey', 
1869                    'value': '%s/%s/config/%s' % \
1870                            (self.repo_url, expid, gw_secretkey_base)
1871                },
1872                {
1873                    'attribute': 'hosts', 
1874                    'value': '%s/%s/config/hosts' % \
1875                            (self.repo_url, expid)
1876                },
1877                {
1878                    'attribute': 'seer_ca_pem', 
1879                    'value': '%s/%s/config/%s' % \
1880                            (self.repo_url, expid, 'ca.pem')
1881                },
1882                {
1883                    'attribute': 'seer_node_pem', 
1884                    'value': '%s/%s/config/%s' % \
1885                            (self.repo_url, expid, 'node.pem')
1886                },
1887            ]
1888        return attrs
1889
1890
1891    def get_vtopo(self, req, fid):
1892        """
1893        Return the stored virtual topology for this experiment
1894        """
1895        rv = None
1896        state = None
1897
1898        req = req.get('VtopoRequestBody', None)
1899        if not req:
1900            raise service_error(service_error.req,
1901                    "Bad request format (no VtopoRequestBody)")
1902        exp = req.get('experiment', None)
1903        if exp:
1904            if exp.has_key('fedid'):
1905                key = exp['fedid']
1906                keytype = "fedid"
1907            elif exp.has_key('localname'):
1908                key = exp['localname']
1909                keytype = "localname"
1910            else:
1911                raise service_error(service_error.req, "Unknown lookup type")
1912        else:
1913            raise service_error(service_error.req, "No request?")
1914
1915        proof = self.check_experiment_access(fid, key)
1916
1917        self.state_lock.acquire()
1918        if self.state.has_key(key):
1919            if self.state[key].has_key('vtopo'):
1920                rv = { 'experiment' : {keytype: key },
1921                        'vtopo': self.state[key]['vtopo'],
1922                        'proof': proof.to_dict(), 
1923                    }
1924            else:
1925                state = self.state[key]['experimentStatus']
1926        self.state_lock.release()
1927
1928        if rv: return rv
1929        else: 
1930            if state:
1931                raise service_error(service_error.partial, 
1932                        "Not ready: %s" % state)
1933            else:
1934                raise service_error(service_error.req, "No such experiment")
1935
1936    def get_vis(self, req, fid):
1937        """
1938        Return the stored visualization for this experiment
1939        """
1940        rv = None
1941        state = None
1942
1943        req = req.get('VisRequestBody', None)
1944        if not req:
1945            raise service_error(service_error.req,
1946                    "Bad request format (no VisRequestBody)")
1947        exp = req.get('experiment', None)
1948        if exp:
1949            if exp.has_key('fedid'):
1950                key = exp['fedid']
1951                keytype = "fedid"
1952            elif exp.has_key('localname'):
1953                key = exp['localname']
1954                keytype = "localname"
1955            else:
1956                raise service_error(service_error.req, "Unknown lookup type")
1957        else:
1958            raise service_error(service_error.req, "No request?")
1959
1960        proof = self.check_experiment_access(fid, key)
1961
1962        self.state_lock.acquire()
1963        if self.state.has_key(key):
1964            if self.state[key].has_key('vis'):
1965                rv =  { 'experiment' : {keytype: key },
1966                        'vis': self.state[key]['vis'],
1967                        'proof': proof.to_dict(), 
1968                        }
1969            else:
1970                state = self.state[key]['experimentStatus']
1971        self.state_lock.release()
1972
1973        if rv: return rv
1974        else:
1975            if state:
1976                raise service_error(service_error.partial, 
1977                        "Not ready: %s" % state)
1978            else:
1979                raise service_error(service_error.req, "No such experiment")
1980
1981   
1982    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1983            top):
1984        """
1985        Store the various data that have changed in the experiment state
1986        between when it was started and the beginning of resource allocation.
1987        This is basically the information about each local allocation.  This
1988        fills in the values of the placeholder allocation in the state.  It
1989        also collects the access proofs and returns them as dicts for a
1990        response message.
1991        """
1992        # save federant information
1993        for k in allocated.keys():
1994            tbparams[k]['federant'] = {
1995                    'name': [ { 'localname' : eid} ],
1996                    'allocID' : tbparams[k]['allocID'],
1997                    'uri': tbparams[k]['uri'],
1998                    'proof': tbparams[k]['proof'],
1999                }
2000
2001        self.state_lock.acquire()
2002        self.state[eid]['vtopo'] = vtopo
2003        self.state[eid]['vis'] = vis
2004        self.state[eid]['experimentdescription'] = \
2005                { 'topdldescription': top.to_dict() }
2006        self.state[eid]['federant'] = \
2007                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2008                    if tbparams[tb].has_key('federant') ]
2009        # Access proofs for the response message
2010        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2011                    for p in tbparams[k]['federant']['proof']]
2012        if self.state_filename: 
2013            self.write_state()
2014        self.state_lock.release()
2015        return proofs
2016
2017    def clear_placeholder(self, eid, expid, tmpdir):
2018        """
2019        Clear the placeholder and remove any allocated temporary dir.
2020        """
2021
2022        self.state_lock.acquire()
2023        del self.state[eid]
2024        del self.state[expid]
2025        if self.state_filename: self.write_state()
2026        self.state_lock.release()
2027        if tmpdir and self.cleanup:
2028            self.remove_dirs(tmpdir)
2029
2030    # end of create_experiment sub-functions
2031
2032    def create_experiment(self, req, fid):
2033        """
2034        The external interface to experiment creation called from the
2035        dispatcher.
2036
2037        Creates a working directory, splits the incoming description using the
2038        splitter script and parses out the various subsections using the
2039        classes above.  Once each sub-experiment is created, use pooled threads
2040        to instantiate them and start it all up.
2041        """
2042
2043        req = req.get('CreateRequestBody', None)
2044        if req:
2045            key = self.get_experiment_key(req)
2046        else:
2047            raise service_error(service_error.req,
2048                    "Bad request format (no CreateRequestBody)")
2049
2050        # Import information from the requester
2051        if self.auth.import_credentials(data_list=req.get('credential', [])):
2052            self.auth.save()
2053
2054        # Make sure that the caller can talk to us
2055        proof = self.check_experiment_access(fid, key)
2056
2057        # Install the testbed map entries supplied with the request into a copy
2058        # of the testbed map.
2059        tbmap = dict(self.tbmap)
2060        for m in req.get('testbedmap', []):
2061            if 'testbed' in m and 'uri' in m:
2062                tbmap[m['testbed']] = m['uri']
2063
2064        # a place to work
2065        try:
2066            tmpdir = tempfile.mkdtemp(prefix="split-")
2067            os.mkdir(tmpdir+"/keys")
2068        except EnvironmentError:
2069            raise service_error(service_error.internal, "Cannot create tmp dir")
2070
2071        tbparams = { }
2072
2073        eid, expid, expcert_file = \
2074                self.get_experiment_ids_and_start(key, tmpdir)
2075
2076        # This catches exceptions to clear the placeholder if necessary
2077        try: 
2078            if not (eid and expid):
2079                raise service_error(service_error.internal, 
2080                        "Cannot find local experiment info!?")
2081
2082            top = self.get_topology(req, tmpdir)
2083            self.confirm_software(top)
2084            # Assign the IPs
2085            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2086            # Find the testbeds to look up
2087            tb_hosts = { }
2088            testbeds = [ ]
2089            for e in top.elements:
2090                if isinstance(e, topdl.Computer):
2091                    tb = e.get_attribute('testbed') or 'default'
2092                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2093                    else: 
2094                        tb_hosts[tb] = [ e.name ]
2095                        testbeds.append(tb)
2096
2097            masters, pmasters = self.get_testbed_services(req, testbeds)
2098            allocated = { }         # Testbeds we can access
2099            topo ={ }               # Sub topologies
2100            connInfo = { }          # Connection information
2101
2102            self.split_topology(top, topo, testbeds)
2103
2104            self.get_access_to_testbeds(testbeds, fid, allocated, 
2105                    tbparams, masters, tbmap, expid, expcert_file)
2106
2107            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2108
2109            part = experiment_partition(self.auth, self.store_url, tbmap,
2110                    self.muxmax, self.direct_transit)
2111            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2112                    connInfo, expid)
2113
2114            auth_attrs = set()
2115            # Now get access to the dynamic testbeds (those added above)
2116            for tb in [ t for t in topo if t not in allocated]:
2117                self.get_access(tb, tbparams, fid, masters, tbmap, 
2118                        expid, expcert_file)
2119                allocated[tb] = 1
2120                store_keys = topo[tb].get_attribute('store_keys')
2121                # Give the testbed access to keys it exports or imports
2122                if store_keys:
2123                    auth_attrs.update(set([
2124                        (tbparams[tb]['allocID']['fedid'], sk) \
2125                                for sk in store_keys.split(" ")]))
2126
2127            if auth_attrs:
2128                self.append_experiment_authorization(expid, auth_attrs)
2129
2130            # transit and disconnected testbeds may not have a connInfo entry.
2131            # Fill in the blanks.
2132            for t in allocated.keys():
2133                if not connInfo.has_key(t):
2134                    connInfo[t] = { }
2135
2136            self.wrangle_software(expid, top, topo, tbparams)
2137
2138            vtopo = topdl.topology_to_vtopo(top)
2139            vis = self.genviz(vtopo)
2140            proofs = self.save_federant_information(allocated, tbparams, 
2141                    eid, vtopo, vis, top)
2142        except service_error, e:
2143            # If something goes wrong in the parse (usually an access error)
2144            # clear the placeholder state.  From here on out the code delays
2145            # exceptions.  Failing at this point returns a fault to the remote
2146            # caller.
2147            self.clear_placeholder(eid, expid, tmpdir)
2148            raise e
2149
2150        # Start the background swapper and return the starting state.  From
2151        # here on out, the state will stick around a while.
2152
2153        # Create a logger that logs to the experiment's state object as well as
2154        # to the main log file.
2155        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2156        alloc_collector = self.list_log(self.state[eid]['log'])
2157        h = logging.StreamHandler(alloc_collector)
2158        # XXX: there should be a global one of these rather than repeating the
2159        # code.
2160        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2161                    '%d %b %y %H:%M:%S'))
2162        alloc_log.addHandler(h)
2163
2164        # Start a thread to do the resource allocation
2165        t  = Thread(target=self.allocate_resources,
2166                args=(allocated, masters, eid, expid, tbparams, 
2167                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2168                    connInfo, tbmap, expcert_file),
2169                name=eid)
2170        t.start()
2171
2172        rv = {
2173                'experimentID': [
2174                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2175                ],
2176                'experimentStatus': 'starting',
2177                'proof': [ proof.to_dict() ] + proofs,
2178            }
2179
2180        return rv
2181   
2182    def get_experiment_fedid(self, key):
2183        """
2184        find the fedid associated with the localname key in the state database.
2185        """
2186
2187        rv = None
2188        self.state_lock.acquire()
2189        if self.state.has_key(key):
2190            if isinstance(self.state[key], dict):
2191                try:
2192                    kl = [ f['fedid'] for f in \
2193                            self.state[key]['experimentID']\
2194                                if f.has_key('fedid') ]
2195                except KeyError:
2196                    self.state_lock.release()
2197                    raise service_error(service_error.internal, 
2198                            "No fedid for experiment %s when getting "+\
2199                                    "fedid(!?)" % key)
2200                if len(kl) == 1:
2201                    rv = kl[0]
2202                else:
2203                    self.state_lock.release()
2204                    raise service_error(service_error.internal, 
2205                            "multiple fedids for experiment %s when " +\
2206                                    "getting fedid(!?)" % key)
2207            else:
2208                self.state_lock.release()
2209                raise service_error(service_error.internal, 
2210                        "Unexpected state for %s" % key)
2211        self.state_lock.release()
2212        return rv
2213
2214    def check_experiment_access(self, fid, key):
2215        """
2216        Confirm that the fid has access to the experiment.  Though a request
2217        may be made in terms of a local name, the access attribute is always
2218        the experiment's fedid.
2219        """
2220        if not isinstance(key, fedid):
2221            key = self.get_experiment_fedid(key)
2222
2223        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2224
2225        if access_ok:
2226            return proof
2227        else:
2228            raise service_error(service_error.access, "Access Denied",
2229                proof)
2230
2231
2232    def get_handler(self, path, fid):
2233        """
2234        Perhaps surprisingly named, this function handles HTTP GET requests to
2235        this server (SOAP requests are POSTs).
2236        """
2237        self.log.info("Get handler %s %s" % (path, fid))
2238        # XXX: log proofs?
2239        if self.auth.check_attribute(fid, path):
2240            return ("%s/%s" % (self.repodir, path), "application/binary")
2241        else:
2242            return (None, None)
2243
2244    def clean_info_response(self, rv, proof):
2245        """
2246        Remove the information in the experiment's state object that is not in
2247        the info response.
2248        """
2249        # Remove the owner info (should always be there, but...)
2250        if rv.has_key('owner'): del rv['owner']
2251        if 'auth' in rv: del rv['auth']
2252
2253        # Convert the log into the allocationLog parameter and remove the
2254        # log entry (with defensive programming)
2255        if rv.has_key('log'):
2256            rv['allocationLog'] = "".join(rv['log'])
2257            del rv['log']
2258        else:
2259            rv['allocationLog'] = ""
2260
2261        if rv['experimentStatus'] != 'active':
2262            if rv.has_key('federant'): del rv['federant']
2263        else:
2264            # remove the allocationID and uri info from each federant
2265            for f in rv.get('federant', []):
2266                if f.has_key('allocID'): del f['allocID']
2267                if f.has_key('uri'): del f['uri']
2268        rv['proof'] = proof.to_dict()
2269
2270        return rv
2271
2272    def get_info(self, req, fid):
2273        """
2274        Return all the stored info about this experiment
2275        """
2276        rv = None
2277
2278        req = req.get('InfoRequestBody', None)
2279        if not req:
2280            raise service_error(service_error.req,
2281                    "Bad request format (no InfoRequestBody)")
2282        exp = req.get('experiment', None)
2283        if exp:
2284            if exp.has_key('fedid'):
2285                key = exp['fedid']
2286                keytype = "fedid"
2287            elif exp.has_key('localname'):
2288                key = exp['localname']
2289                keytype = "localname"
2290            else:
2291                raise service_error(service_error.req, "Unknown lookup type")
2292        else:
2293            raise service_error(service_error.req, "No request?")
2294
2295        proof = self.check_experiment_access(fid, key)
2296
2297        # The state may be massaged by the service function that called
2298        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2299        # state.
2300        self.state_lock.acquire()
2301        if self.state.has_key(key):
2302            rv = copy.deepcopy(self.state[key])
2303        self.state_lock.release()
2304
2305        if rv:
2306            return self.clean_info_response(rv, proof)
2307        else:
2308            raise service_error(service_error.req, "No such experiment")
2309
2310    def get_multi_info(self, req, fid):
2311        """
2312        Return all the stored info that this fedid can access
2313        """
2314        rv = { 'info': [ ], 'proof': [ ] }
2315
2316        self.state_lock.acquire()
2317        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2318            try:
2319                proof = self.check_experiment_access(fid, key)
2320            except service_error, e:
2321                if e.code == service_error.access:
2322                    continue
2323                else:
2324                    self.state_lock.release()
2325                    raise e
2326
2327            if self.state.has_key(key):
2328                e = copy.deepcopy(self.state[key])
2329                e = self.clean_info_response(e, proof)
2330                rv['info'].append(e)
2331                rv['proof'].append(proof.to_dict())
2332        self.state_lock.release()
2333        return rv
2334
2335    def check_termination_status(self, fed_exp, force):
2336        """
2337        Confirm that the experiment is sin a valid state to stop (or force it)
2338        return the state - invalid states for deletion and force settings cause
2339        exceptions.
2340        """
2341        self.state_lock.acquire()
2342        status = fed_exp.get('experimentStatus', None)
2343
2344        if status:
2345            if status in ('starting', 'terminating'):
2346                if not force:
2347                    self.state_lock.release()
2348                    raise service_error(service_error.partial, 
2349                            'Experiment still being created or destroyed')
2350                else:
2351                    self.log.warning('Experiment in %s state ' % status + \
2352                            'being terminated by force.')
2353            self.state_lock.release()
2354            return status
2355        else:
2356            # No status??? trouble
2357            self.state_lock.release()
2358            raise service_error(service_error.internal,
2359                    "Experiment has no status!?")
2360
2361
2362    def get_termination_info(self, fed_exp):
2363        ids = []
2364        term_params = { }
2365        self.state_lock.acquire()
2366        #  experimentID is a list of dicts that are self-describing
2367        #  identifiers.  This finds all the fedids and localnames - the
2368        #  keys of self.state - and puts them into ids, which is used to delete
2369        #  the state after everything is swapped out.
2370        for id in fed_exp.get('experimentID', []):
2371            if 'fedid' in id: 
2372                ids.append(id['fedid'])
2373                repo = "%s" % id['fedid']
2374            if 'localname' in id: ids.append(id['localname'])
2375
2376        # Get the experimentAccess - the principal for this experiment.  It
2377        # is this principal to which credentials have been delegated, and
2378        # as which the experiment controller must act.
2379        if 'experimentAccess' in fed_exp and \
2380                'X509' in fed_exp['experimentAccess']:
2381            expcert = fed_exp['experimentAccess']['X509']
2382        else:
2383            expcert = None
2384
2385        # Collect the allocation/segment ids into a dict keyed by the fedid
2386        # of the allocation (or a monotonically increasing integer) that
2387        # contains a tuple of uri, aid (which is a dict...)
2388        for i, fed in enumerate(fed_exp.get('federant', [])):
2389            try:
2390                uri = fed['uri']
2391                aid = fed['allocID']
2392                k = fed['allocID'].get('fedid', i)
2393            except KeyError, e:
2394                continue
2395            term_params[k] = (uri, aid)
2396        # Change the experiment state
2397        fed_exp['experimentStatus'] = 'terminating'
2398        if self.state_filename: self.write_state()
2399        self.state_lock.release()
2400
2401        return ids, term_params, expcert, repo
2402
2403
2404    def deallocate_resources(self, term_params, expcert, status, force, 
2405            dealloc_log):
2406        tmpdir = None
2407        # This try block makes sure the tempdir is cleared
2408        try:
2409            # If no expcert, try the deallocation as the experiment
2410            # controller instance.
2411            if expcert and self.auth_type != 'legacy': 
2412                try:
2413                    tmpdir = tempfile.mkdtemp(prefix="term-")
2414                except EnvironmentError:
2415                    raise service_error(service_error.internal, 
2416                            "Cannot create tmp dir")
2417                cert_file = self.make_temp_certfile(expcert, tmpdir)
2418                pw = None
2419            else: 
2420                cert_file = self.cert_file
2421                pw = self.cert_pwd
2422
2423            # Stop everyone.  NB, wait_for_all waits until a thread starts
2424            # and then completes, so we can't wait if nothing starts.  So,
2425            # no tbparams, no start.
2426            if len(term_params) > 0:
2427                tp = thread_pool(self.nthreads)
2428                for k, (uri, aid) in term_params.items():
2429                    # Create and start a thread to stop the segment
2430                    tp.wait_for_slot()
2431                    t  = pooled_thread(\
2432                            target=self.terminate_segment(log=dealloc_log,
2433                                testbed=uri,
2434                                cert_file=cert_file, 
2435                                cert_pwd=pw,
2436                                trusted_certs=self.trusted_certs,
2437                                caller=self.call_TerminateSegment),
2438                            args=(uri, aid), name=k,
2439                            pdata=tp, trace_file=self.trace_file)
2440                    t.start()
2441                # Wait for completions
2442                tp.wait_for_all_done()
2443
2444            # release the allocations (failed experiments have done this
2445            # already, and starting experiments may be in odd states, so we
2446            # ignore errors releasing those allocations
2447            try: 
2448                for k, (uri, aid)  in term_params.items():
2449                    self.release_access(None, aid, uri=uri,
2450                            cert_file=cert_file, cert_pwd=pw)
2451            except service_error, e:
2452                if status != 'failed' and not force:
2453                    raise e
2454
2455        # Clean up the tmpdir no matter what
2456        finally:
2457            if tmpdir: self.remove_dirs(tmpdir)
2458
2459    def terminate_experiment(self, req, fid):
2460        """
2461        Swap this experiment out on the federants and delete the shared
2462        information
2463        """
2464        tbparams = { }
2465        req = req.get('TerminateRequestBody', None)
2466        if not req:
2467            raise service_error(service_error.req,
2468                    "Bad request format (no TerminateRequestBody)")
2469
2470        key = self.get_experiment_key(req, 'experiment')
2471        proof = self.check_experiment_access(fid, key)
2472        exp = req.get('experiment', False)
2473        force = req.get('force', False)
2474
2475        dealloc_list = [ ]
2476
2477
2478        # Create a logger that logs to the dealloc_list as well as to the main
2479        # log file.
2480        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2481        h = logging.StreamHandler(self.list_log(dealloc_list))
2482        # XXX: there should be a global one of these rather than repeating the
2483        # code.
2484        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2485                    '%d %b %y %H:%M:%S'))
2486        dealloc_log.addHandler(h)
2487
2488        self.state_lock.acquire()
2489        fed_exp = self.state.get(key, None)
2490        self.state_lock.release()
2491        repo = None
2492
2493        if fed_exp:
2494            status = self.check_termination_status(fed_exp, force)
2495            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2496            self.deallocate_resources(term_params, expcert, status, force, 
2497                    dealloc_log)
2498
2499            # Remove the terminated experiment
2500            self.state_lock.acquire()
2501            for id in ids:
2502                self.clear_experiment_authorization(id, need_state_lock=False)
2503                if id in self.state: del self.state[id]
2504
2505            if self.state_filename: self.write_state()
2506            self.state_lock.release()
2507
2508            # Delete any synch points associated with this experiment.  All
2509            # synch points begin with the fedid of the experiment.
2510            fedid_keys = set(["fedid:%s" % f for f in ids \
2511                    if isinstance(f, fedid)])
2512            for k in self.synch_store.all_keys():
2513                try:
2514                    if len(k) > 45 and k[0:46] in fedid_keys:
2515                        self.synch_store.del_value(k)
2516                except synch_store.BadDeletionError:
2517                    pass
2518            self.write_store()
2519
2520            # Remove software and other cached stuff from the filesystem.
2521            if repo:
2522                self.remove_dirs("%s/%s" % (self.repodir, repo))
2523       
2524            return { 
2525                    'experiment': exp , 
2526                    'deallocationLog': string.join(dealloc_list, ''),
2527                    'proof': [proof.to_dict()],
2528                    }
2529        else:
2530            raise service_error(service_error.req, "No saved state")
2531
2532
2533    def GetValue(self, req, fid):
2534        """
2535        Get a value from the synchronized store
2536        """
2537        req = req.get('GetValueRequestBody', None)
2538        if not req:
2539            raise service_error(service_error.req,
2540                    "Bad request format (no GetValueRequestBody)")
2541       
2542        name = req.get('name', None)
2543        wait = req.get('wait', False)
2544        rv = { 'name': name }
2545
2546        if not name:
2547            raise service_error(service_error.req, "No name?")
2548
2549        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2550
2551        if access_ok:
2552            self.log.debug("[GetValue] asking for %s " % name)
2553            try:
2554                v = self.synch_store.get_value(name, wait)
2555            except synch_store.RevokedKeyError:
2556                # No more synch on this key
2557                raise service_error(service_error.federant, 
2558                        "Synch key %s revoked" % name)
2559            if v is not None:
2560                rv['value'] = v
2561            rv['proof'] = proof.to_dict()
2562            self.log.debug("[GetValue] got %s from %s" % (v, name))
2563            return rv
2564        else:
2565            raise service_error(service_error.access, "Access Denied",
2566                    proof=proof)
2567       
2568
2569    def SetValue(self, req, fid):
2570        """
2571        Set a value in the synchronized store
2572        """
2573        req = req.get('SetValueRequestBody', None)
2574        if not req:
2575            raise service_error(service_error.req,
2576                    "Bad request format (no SetValueRequestBody)")
2577       
2578        name = req.get('name', None)
2579        v = req.get('value', '')
2580
2581        if not name:
2582            raise service_error(service_error.req, "No name?")
2583
2584        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2585
2586        if access_ok:
2587            try:
2588                self.synch_store.set_value(name, v)
2589                self.write_store()
2590                self.log.debug("[SetValue] set %s to %s" % (name, v))
2591            except synch_store.CollisionError:
2592                # Translate into a service_error
2593                raise service_error(service_error.req,
2594                        "Value already set: %s" %name)
2595            except synch_store.RevokedKeyError:
2596                # No more synch on this key
2597                raise service_error(service_error.federant, 
2598                        "Synch key %s revoked" % name)
2599                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2600        else:
2601            raise service_error(service_error.access, "Access Denied",
2602                    proof=proof)
Note: See TracBrowser for help on using the repository browser.