source: fedd/federation/experiment_control.py @ 9294673

compt_changesinfo-ops
Last change on this file since 9294673 was 9294673, checked in by Ted Faber <faber@…>, 12 years ago

Turn (most) of another free floating dict into a class.

  • Property mode set to 100644
File size: 83.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from fedid import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info
39
40import topdl
41import list_log
42from ip_allocator import ip_allocator
43from ip_addr import ip_addr
44
45
46class nullHandler(logging.Handler):
47    def emit(self, record): pass
48
49fl = logging.getLogger("fedd.experiment_control")
50fl.addHandler(nullHandler())
51
52
53# Right now, no support for composition.
54class federated_service:
55    def __init__(self, name, exporter=None, importers=None, params=None, 
56            reqs=None, portal=None):
57        self.name=name
58        self.exporter=exporter
59        if importers is None: self.importers = []
60        else: self.importers=importers
61        if params is None: self.params = { }
62        else: self.params = params
63        if reqs is None: self.reqs = []
64        else: self.reqs = reqs
65
66        if portal is not None:
67            self.portal = portal
68        else:
69            self.portal = (name in federated_service.needs_portal)
70
71    def __str__(self):
72        return "name %s export %s import %s params %s reqs %s" % \
73                (self.name, self.exporter, self.importers, self.params,
74                        [ (r['name'], r['visibility']) for r in self.reqs] )
75
76    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
77
78class experiment_control_local(experiment_control_legacy):
79    """
80    Control of experiments that this system can directly access.
81
82    Includes experiment creation, termination and information dissemination.
83    Thred safe.
84    """
85
86    class ssh_cmd_timeout(RuntimeError): pass
87   
88    call_RequestAccess = service_caller('RequestAccess')
89    call_ReleaseAccess = service_caller('ReleaseAccess')
90    call_StartSegment = service_caller('StartSegment')
91    call_TerminateSegment = service_caller('TerminateSegment')
92    call_Ns2Topdl = service_caller('Ns2Topdl')
93
94    def __init__(self, config=None, auth=None):
95        """
96        Intialize the various attributes, most from the config object
97        """
98
99        def parse_tarfile_list(tf):
100            """
101            Parse a tarfile list from the configuration.  This is a set of
102            paths and tarfiles separated by spaces.
103            """
104            rv = [ ]
105            if tf is not None:
106                tl = tf.split()
107                while len(tl) > 1:
108                    p, t = tl[0:2]
109                    del tl[0:2]
110                    rv.append((p, t))
111            return rv
112
113        self.list_log = list_log.list_log
114
115        self.cert_file = config.get("experiment_control", "cert_file")
116        if self.cert_file:
117            self.cert_pwd = config.get("experiment_control", "cert_pwd")
118        else:
119            self.cert_file = config.get("globals", "cert_file")
120            self.cert_pwd = config.get("globals", "cert_pwd")
121
122        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
123                or config.get("globals", "trusted_certs")
124
125        self.repodir = config.get("experiment_control", "repodir")
126        self.repo_url = config.get("experiment_control", "repo_url", 
127                "https://users.isi.deterlab.net:23235");
128
129        self.exp_stem = "fed-stem"
130        self.log = logging.getLogger("fedd.experiment_control")
131        set_log_level(config, "experiment_control", self.log)
132        self.muxmax = 2
133        self.nthreads = 10
134        self.randomize_experiments = False
135
136        self.splitter = None
137        self.ssh_keygen = "/usr/bin/ssh-keygen"
138        self.ssh_identity_file = None
139
140
141        self.debug = config.getboolean("experiment_control", "create_debug")
142        self.cleanup = not config.getboolean("experiment_control", 
143                "leave_tmpfiles")
144        self.state_filename = config.get("experiment_control", 
145                "experiment_state")
146        self.store_filename = config.get("experiment_control", 
147                "synch_store")
148        self.store_url = config.get("experiment_control", "store_url")
149        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
150        self.fedkit = parse_tarfile_list(\
151                config.get("experiment_control", "fedkit"))
152        self.gatewaykit = parse_tarfile_list(\
153                config.get("experiment_control", "gatewaykit"))
154        accessdb_file = config.get("experiment_control", "accessdb")
155
156        dt = config.get("experiment_control", "direct_transit")
157        self.auth_type = config.get('experiment_control', 'auth_type') \
158                or 'legacy'
159        self.auth_dir = config.get('experiment_control', 'auth_dir')
160        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
161        else: self.direct_transit = [ ]
162        # NB for internal master/slave ops, not experiment setup
163        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
164
165        self.overrides = set([])
166        ovr = config.get('experiment_control', 'overrides')
167        if ovr:
168            for o in ovr.split(","):
169                o = o.strip()
170                if o.startswith('fedid:'): o = o[len('fedid:'):]
171                self.overrides.add(fedid(hexstr=o))
172
173        self.state = { }
174        self.state_lock = Lock()
175        self.tclsh = "/usr/local/bin/otclsh"
176        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
177                config.get("experiment_control", "tcl_splitter",
178                        "/usr/testbed/lib/ns2ir/parse.tcl")
179        mapdb_file = config.get("experiment_control", "mapdb")
180        self.trace_file = sys.stderr
181
182        self.def_expstart = \
183                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
184                "/tmp/federate";
185        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
186                "FEDDIR/hosts";
187        self.def_gwstart = \
188                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
189                "/tmp/bridge.log";
190        self.def_mgwstart = \
191                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
192                "/tmp/bridge.log";
193        self.def_gwimage = "FBSD61-TUNNEL2";
194        self.def_gwtype = "pc";
195        self.local_access = { }
196
197        if self.auth_type == 'legacy':
198            if auth:
199                self.auth = auth
200            else:
201                self.log.error( "[access]: No authorizer initialized, " +\
202                        "creating local one.")
203                auth = authorizer()
204            self.get_access = self.legacy_get_access
205        elif self.auth_type == 'abac':
206            self.auth = abac_authorizer(load=self.auth_dir)
207        else:
208            raise service_error(service_error.internal, 
209                    "Unknown auth_type: %s" % self.auth_type)
210
211        if mapdb_file:
212            self.read_mapdb(mapdb_file)
213        else:
214            self.log.warn("[experiment_control] No testbed map, using defaults")
215            self.tbmap = { 
216                    'deter':'https://users.isi.deterlab.net:23235',
217                    'emulab':'https://users.isi.deterlab.net:23236',
218                    'ucb':'https://users.isi.deterlab.net:23237',
219                    }
220
221        if accessdb_file:
222                self.read_accessdb(accessdb_file)
223        else:
224            raise service_error(service_error.internal,
225                    "No accessdb specified in config")
226
227        # Grab saved state.  OK to do this w/o locking because it's read only
228        # and only one thread should be in existence that can see self.state at
229        # this point.
230        if self.state_filename:
231            self.read_state()
232
233        if self.store_filename:
234            self.read_store()
235        else:
236            self.log.warning("No saved synch store")
237            self.synch_store = synch_store
238
239        # Dispatch tables
240        self.soap_services = {\
241                'New': soap_handler('New', self.new_experiment),
242                'Create': soap_handler('Create', self.create_experiment),
243                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
244                'Vis': soap_handler('Vis', self.get_vis),
245                'Info': soap_handler('Info', self.get_info),
246                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
247                'Terminate': soap_handler('Terminate', 
248                    self.terminate_experiment),
249                'GetValue': soap_handler('GetValue', self.GetValue),
250                'SetValue': soap_handler('SetValue', self.SetValue),
251        }
252
253        self.xmlrpc_services = {\
254                'New': xmlrpc_handler('New', self.new_experiment),
255                'Create': xmlrpc_handler('Create', self.create_experiment),
256                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
257                'Vis': xmlrpc_handler('Vis', self.get_vis),
258                'Info': xmlrpc_handler('Info', self.get_info),
259                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
260                'Terminate': xmlrpc_handler('Terminate',
261                    self.terminate_experiment),
262                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
263                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
264        }
265
266    # Call while holding self.state_lock
267    def write_state(self):
268        """
269        Write a new copy of experiment state after copying the existing state
270        to a backup.
271
272        State format is a simple pickling of the state dictionary.
273        """
274        if os.access(self.state_filename, os.W_OK):
275            copy_file(self.state_filename, \
276                    "%s.bak" % self.state_filename)
277        try:
278            f = open(self.state_filename, 'w')
279            pickle.dump(self.state, f)
280        except EnvironmentError, e:
281            self.log.error("Can't write file %s: %s" % \
282                    (self.state_filename, e))
283        except pickle.PicklingError, e:
284            self.log.error("Pickling problem: %s" % e)
285        except TypeError, e:
286            self.log.error("Pickling problem (TypeError): %s" % e)
287
288    @staticmethod
289    def get_alloc_ids(exp):
290        """
291        Used by read_store and read state.  This used to be worse.
292        """
293
294        return [ a.allocID for a in exp.get_all_allocations() ]
295
296
297    # Call while holding self.state_lock
298    def read_state(self):
299        """
300        Read a new copy of experiment state.  Old state is overwritten.
301
302        State format is a simple pickling of the state dictionary.
303        """
304       
305        try:
306            f = open(self.state_filename, "r")
307            self.state = pickle.load(f)
308            self.log.debug("[read_state]: Read state from %s" % \
309                    self.state_filename)
310        except EnvironmentError, e:
311            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
312                    % (self.state_filename, e))
313        except pickle.UnpicklingError, e:
314            self.log.warning(("[read_state]: No saved state: " + \
315                    "Unpickling failed: %s") % e)
316       
317        for s in self.state.values():
318            try:
319
320                eid = s.fedid
321                if eid : 
322                    if self.auth_type == 'legacy':
323                        # XXX: legacy
324                        # Give the owner rights to the experiment
325                        #self.auth.set_attribute(s['owner'], eid)
326                        # And holders of the eid as well
327                        self.auth.set_attribute(eid, eid)
328                        # allow overrides to control experiments as well
329                        for o in self.overrides:
330                            self.auth.set_attribute(o, eid)
331                        # Set permissions to allow reading of the software
332                        # repo, if any, as well.
333                        for a in self.get_alloc_ids(s):
334                            self.auth.set_attribute(a, 'repo/%s' % eid)
335                else:
336                    raise KeyError("No experiment id")
337            except KeyError, e:
338                self.log.warning("[read_state]: State ownership or identity " +\
339                        "misformatted in %s: %s" % (self.state_filename, e))
340
341
342    def read_accessdb(self, accessdb_file):
343        """
344        Read the mapping from fedids that can create experiments to their name
345        in the 3-level access namespace.  All will be asserted from this
346        testbed and can include the local username and porject that will be
347        asserted on their behalf by this fedd.  Each fedid is also added to the
348        authorization system with the "create" attribute.
349        """
350        self.accessdb = {}
351        # These are the regexps for parsing the db
352        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
353        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
354                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
355        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
356                "\s*->\s*(" + name_expr + ")\s*$")
357        lineno = 0
358
359        # Parse the mappings and store in self.authdb, a dict of
360        # fedid -> (proj, user)
361        try:
362            f = open(accessdb_file, "r")
363            for line in f:
364                lineno += 1
365                line = line.strip()
366                if len(line) == 0 or line.startswith('#'):
367                    continue
368                m = project_line.match(line)
369                if m:
370                    fid = fedid(hexstr=m.group(1))
371                    project, user = m.group(2,3)
372                    if not self.accessdb.has_key(fid):
373                        self.accessdb[fid] = []
374                    self.accessdb[fid].append((project, user))
375                    continue
376
377                m = user_line.match(line)
378                if m:
379                    fid = fedid(hexstr=m.group(1))
380                    project = None
381                    user = m.group(2)
382                    if not self.accessdb.has_key(fid):
383                        self.accessdb[fid] = []
384                    self.accessdb[fid].append((project, user))
385                    continue
386                self.log.warn("[experiment_control] Error parsing access " +\
387                        "db %s at line %d" %  (accessdb_file, lineno))
388        except EnvironmentError:
389            raise service_error(service_error.internal,
390                    ("Error opening/reading %s as experiment " +\
391                            "control accessdb") %  accessdb_file)
392        f.close()
393
394        # Initialize the authorization attributes
395        # XXX: legacy
396        if self.auth_type == 'legacy':
397            for fid in self.accessdb.keys():
398                self.auth.set_attribute(fid, 'create')
399                self.auth.set_attribute(fid, 'new')
400
401    def read_mapdb(self, file):
402        """
403        Read a simple colon separated list of mappings for the
404        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
405        """
406
407        self.tbmap = { }
408        lineno =0
409        try:
410            f = open(file, "r")
411            for line in f:
412                lineno += 1
413                line = line.strip()
414                if line.startswith('#') or len(line) == 0:
415                    continue
416                try:
417                    label, url = line.split(':', 1)
418                    self.tbmap[label] = url
419                except ValueError, e:
420                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
421                            "map db: %s %s" % (lineno, line, e))
422        except EnvironmentError, e:
423            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
424                    "open %s: %s" % (file, e))
425        else:
426            f.close()
427
428    def read_store(self):
429        try:
430            self.synch_store = synch_store()
431            self.synch_store.load(self.store_filename)
432            self.log.debug("[read_store]: Read store from %s" % \
433                    self.store_filename)
434        except EnvironmentError, e:
435            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
436                    % (self.state_filename, e))
437            self.synch_store = synch_store()
438
439        # Set the initial permissions on data in the store.  XXX: This ad hoc
440        # authorization attribute initialization is getting out of hand.
441        # XXX: legacy
442        if self.auth_type == 'legacy':
443            for k in self.synch_store.all_keys():
444                try:
445                    if k.startswith('fedid:'):
446                        fid = fedid(hexstr=k[6:46])
447                        if self.state.has_key(fid):
448                            for a in self.get_alloc_ids(self.state[fid]):
449                                self.auth.set_attribute(a, k)
450                except ValueError, e:
451                    self.log.warn("Cannot deduce permissions for %s" % k)
452
453
454    def write_store(self):
455        """
456        Write a new copy of synch_store after writing current state
457        to a backup.  We use the internal synch_store pickle method to avoid
458        incinsistent data.
459
460        State format is a simple pickling of the store.
461        """
462        if os.access(self.store_filename, os.W_OK):
463            copy_file(self.store_filename, \
464                    "%s.bak" % self.store_filename)
465        try:
466            self.synch_store.save(self.store_filename)
467        except EnvironmentError, e:
468            self.log.error("Can't write file %s: %s" % \
469                    (self.store_filename, e))
470        except TypeError, e:
471            self.log.error("Pickling problem (TypeError): %s" % e)
472
473
474    def remove_dirs(self, dir):
475        """
476        Remove the directory tree and all files rooted at dir.  Log any errors,
477        but continue.
478        """
479        self.log.debug("[removedirs]: removing %s" % dir)
480        try:
481            for path, dirs, files in os.walk(dir, topdown=False):
482                for f in files:
483                    os.remove(os.path.join(path, f))
484                for d in dirs:
485                    os.rmdir(os.path.join(path, d))
486            os.rmdir(dir)
487        except EnvironmentError, e:
488            self.log.error("Error deleting directory tree in %s" % e);
489
490    @staticmethod
491    def make_temp_certfile(expcert, tmpdir):
492        """
493        make a protected copy of the access certificate so the experiment
494        controller can act as the experiment principal.  mkstemp is the most
495        secure way to do that. The directory should be created by
496        mkdtemp.  Return the filename.
497        """
498        if expcert and tmpdir:
499            try:
500                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
501                f = os.fdopen(certf, 'w')
502                print >> f, expcert
503                f.close()
504            except EnvironmentError, e:
505                raise service_error(service_error.internal, 
506                        "Cannot create temp cert file?")
507            return certfn
508        else:
509            return None
510
511       
512    def generate_ssh_keys(self, dest, type="rsa" ):
513        """
514        Generate a set of keys for the gateways to use to talk.
515
516        Keys are of type type and are stored in the required dest file.
517        """
518        valid_types = ("rsa", "dsa")
519        t = type.lower();
520        if t not in valid_types: raise ValueError
521        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
522
523        try:
524            trace = open("/dev/null", "w")
525        except EnvironmentError:
526            raise service_error(service_error.internal,
527                    "Cannot open /dev/null??");
528
529        # May raise CalledProcessError
530        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
531        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
532        if rv != 0:
533            raise service_error(service_error.internal, 
534                    "Cannot generate nonce ssh keys.  %s return code %d" \
535                            % (self.ssh_keygen, rv))
536
537    def generate_seer_certs(self, destdir):
538        '''
539        Create a SEER ca cert and a node cert in destdir/ca.pem and
540        destdir/node.pem respectively.  These will be distributed throughout
541        the federated experiment.  This routine reports errors via
542        service_errors.
543        '''
544        openssl = '/usr/bin/openssl'
545        # All the filenames and parameters we need for openssl calls below
546        ca_key =os.path.join(destdir, 'ca.key') 
547        ca_pem = os.path.join(destdir, 'ca.pem')
548        node_key =os.path.join(destdir, 'node.key') 
549        node_pem = os.path.join(destdir, 'node.pem')
550        node_req = os.path.join(destdir, 'node.req')
551        node_signed = os.path.join(destdir, 'node.signed')
552        days = '%s' % (365 * 10)
553        serial = '%s' % random.randint(0, 1<<16)
554
555        try:
556            # Sequence of calls to create a CA key, create a ca cert, create a
557            # node key, node signing request, and finally a signed node
558            # certificate.
559            sequence = (
560                    (openssl, 'genrsa', '-out', ca_key, '1024'),
561                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
562                        ca_pem, '-days', days, '-subj', 
563                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
564                    (openssl, 'genrsa', '-out', node_key, '1024'),
565                    (openssl, 'req', '-new', '-key', node_key, '-out', 
566                        node_req, '-days', days, '-subj', 
567                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
568                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
569                        '-set_serial', serial, '-req', '-in', node_req, 
570                        '-out', node_signed, '-days', days),
571                )
572            # Do all that stuff; bail if there's an error, and push all the
573            # output to dev/null.
574            for cmd in sequence:
575                trace = open("/dev/null", "w")
576                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
577                if rv != 0:
578                    raise service_error(service_error.internal, 
579                            "Cannot generate SEER certs.  %s return code %d" \
580                                    % (' '.join(cmd), rv))
581            # Concatinate the node key and signed certificate into node.pem
582            f = open(node_pem, 'w')
583            for comp in (node_signed, node_key):
584                g = open(comp, 'r')
585                f.write(g.read())
586                g.close()
587            f.close()
588
589            # Throw out intermediaries.
590            for fn in (ca_key, node_key, node_req, node_signed):
591                os.unlink(fn)
592
593        except EnvironmentError, e:
594            # Any difficulties with the file system wind up here
595            raise service_error(service_error.internal,
596                    "File error on  %s while creating SEER certs: %s" % \
597                            (e.filename, e.strerror))
598
599
600
601    def gentopo(self, str):
602        """
603        Generate the topology data structure from the splitter's XML
604        representation of it.
605
606        The topology XML looks like:
607            <experiment>
608                <nodes>
609                    <node><vname></vname><ips>ip1:ip2</ips></node>
610                </nodes>
611                <lans>
612                    <lan>
613                        <vname></vname><vnode></vnode><ip></ip>
614                        <bandwidth></bandwidth><member>node:port</member>
615                    </lan>
616                </lans>
617        """
618        class topo_parse:
619            """
620            Parse the topology XML and create the dats structure.
621            """
622            def __init__(self):
623                # Typing of the subelements for data conversion
624                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
625                self.int_subelements = ( 'bandwidth',)
626                self.float_subelements = ( 'delay',)
627                # The final data structure
628                self.nodes = [ ]
629                self.lans =  [ ]
630                self.topo = { \
631                        'node': self.nodes,\
632                        'lan' : self.lans,\
633                    }
634                self.element = { }  # Current element being created
635                self.chars = ""     # Last text seen
636
637            def end_element(self, name):
638                # After each sub element the contents is added to the current
639                # element or to the appropriate list.
640                if name == 'node':
641                    self.nodes.append(self.element)
642                    self.element = { }
643                elif name == 'lan':
644                    self.lans.append(self.element)
645                    self.element = { }
646                elif name in self.str_subelements:
647                    self.element[name] = self.chars
648                    self.chars = ""
649                elif name in self.int_subelements:
650                    self.element[name] = int(self.chars)
651                    self.chars = ""
652                elif name in self.float_subelements:
653                    self.element[name] = float(self.chars)
654                    self.chars = ""
655
656            def found_chars(self, data):
657                self.chars += data.rstrip()
658
659
660        tp = topo_parse();
661        parser = xml.parsers.expat.ParserCreate()
662        parser.EndElementHandler = tp.end_element
663        parser.CharacterDataHandler = tp.found_chars
664
665        parser.Parse(str)
666
667        return tp.topo
668       
669
670    def genviz(self, topo):
671        """
672        Generate the visualization the virtual topology
673        """
674
675        neato = "/usr/local/bin/neato"
676        # These are used to parse neato output and to create the visualization
677        # file.
678        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
679        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
680                "%s</type></node>"
681
682        try:
683            # Node names
684            nodes = [ n['vname'] for n in topo['node'] ]
685            topo_lans = topo['lan']
686        except KeyError, e:
687            raise service_error(service_error.internal, "Bad topology: %s" %e)
688
689        lans = { }
690        links = { }
691
692        # Walk through the virtual topology, organizing the connections into
693        # 2-node connections (links) and more-than-2-node connections (lans).
694        # When a lan is created, it's added to the list of nodes (there's a
695        # node in the visualization for the lan).
696        for l in topo_lans:
697            if links.has_key(l['vname']):
698                if len(links[l['vname']]) < 2:
699                    links[l['vname']].append(l['vnode'])
700                else:
701                    nodes.append(l['vname'])
702                    lans[l['vname']] = links[l['vname']]
703                    del links[l['vname']]
704                    lans[l['vname']].append(l['vnode'])
705            elif lans.has_key(l['vname']):
706                lans[l['vname']].append(l['vnode'])
707            else:
708                links[l['vname']] = [ l['vnode'] ]
709
710
711        # Open up a temporary file for dot to turn into a visualization
712        try:
713            df, dotname = tempfile.mkstemp()
714            dotfile = os.fdopen(df, 'w')
715        except EnvironmentError:
716            raise service_error(service_error.internal,
717                    "Failed to open file in genviz")
718
719        try:
720            dnull = open('/dev/null', 'w')
721        except EnvironmentError:
722            service_error(service_error.internal,
723                    "Failed to open /dev/null in genviz")
724
725        # Generate a dot/neato input file from the links, nodes and lans
726        try:
727            print >>dotfile, "graph G {"
728            for n in nodes:
729                print >>dotfile, '\t"%s"' % n
730            for l in links.keys():
731                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
732            for l in lans.keys():
733                for n in lans[l]:
734                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
735            print >>dotfile, "}"
736            dotfile.close()
737        except TypeError:
738            raise service_error(service_error.internal,
739                    "Single endpoint link in vtopo")
740        except EnvironmentError:
741            raise service_error(service_error.internal, "Cannot write dot file")
742
743        # Use dot to create a visualization
744        try:
745            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
746                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
747                stderr=dnull, close_fds=True)
748        except EnvironmentError:
749            raise service_error(service_error.internal, 
750                    "Cannot generate visualization: is graphviz available?")
751        dnull.close()
752
753        # Translate dot to vis format
754        vis_nodes = [ ]
755        vis = { 'node': vis_nodes }
756        for line in dot.stdout:
757            m = vis_re.match(line)
758            if m:
759                vn = m.group(1)
760                vis_node = {'name': vn, \
761                        'x': float(m.group(2)),\
762                        'y' : float(m.group(3)),\
763                    }
764                if vn in links.keys() or vn in lans.keys():
765                    vis_node['type'] = 'lan'
766                else:
767                    vis_node['type'] = 'node'
768                vis_nodes.append(vis_node)
769        rv = dot.wait()
770
771        os.remove(dotname)
772        # XXX: graphviz seems to use low return codes for warnings, like
773        # "couldn't find font"
774        if rv < 2 : return vis
775        else: return None
776
777
778    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
779            cert_pwd=None):
780        """
781        Release access to testbed through fedd
782        """
783
784        if not uri and tbmap:
785            uri = tbmap.get(tb, None)
786        if not uri:
787            raise service_error(service_error.server_config, 
788                    "Unknown testbed: %s" % tb)
789
790        if self.local_access.has_key(uri):
791            resp = self.local_access[uri].ReleaseAccess(\
792                    { 'ReleaseAccessRequestBody' : 
793                        {'allocID': {'fedid': aid}},}, 
794                    fedid(file=cert_file))
795            resp = { 'ReleaseAccessResponseBody': resp } 
796        else:
797            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
798                    cert_file, cert_pwd, self.trusted_certs)
799
800        # better error coding
801
802    def remote_ns2topdl(self, uri, desc):
803
804        req = {
805                'description' : { 'ns2description': desc },
806            }
807
808        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
809                self.trusted_certs)
810
811        if r.has_key('Ns2TopdlResponseBody'):
812            r = r['Ns2TopdlResponseBody']
813            ed = r.get('experimentdescription', None)
814            if ed.has_key('topdldescription'):
815                return topdl.Topology(**ed['topdldescription'])
816            else:
817                raise service_error(service_error.protocol, 
818                        "Bad splitter response (no output)")
819        else:
820            raise service_error(service_error.protocol, "Bad splitter response")
821
822    class start_segment:
823        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
824                cert_pwd=None, trusted_certs=None, caller=None,
825                log_collector=None):
826            self.log = log
827            self.debug = debug
828            self.cert_file = cert_file
829            self.cert_pwd = cert_pwd
830            self.trusted_certs = None
831            self.caller = caller
832            self.testbed = testbed
833            self.log_collector = log_collector
834            self.response = None
835            self.node = { }
836            self.proof = None
837
838        def make_map(self, resp):
839            if 'segmentdescription' not in resp  or \
840                    'topdldescription' not in resp['segmentdescription']:
841                self.log.warn('No topology returned from startsegment')
842                return 
843
844            top = topdl.Topology(
845                    **resp['segmentdescription']['topdldescription'])
846
847            for e in top.elements:
848                if isinstance(e, topdl.Computer):
849                    self.node[e.name] = \
850                            (e.localname, e.status, e.service, e.operation)
851
852        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
853            req = {
854                    'allocID': { 'fedid' : aid }, 
855                    'segmentdescription': { 
856                        'topdldescription': topo.to_dict(),
857                    },
858                }
859
860            if connInfo:
861                req['connection'] = connInfo
862
863            import_svcs = [ s for m in masters.values() \
864                    for s in m if self.testbed in s.importers]
865
866            if import_svcs or self.testbed in masters:
867                req['service'] = []
868
869            for s in import_svcs:
870                for r in s.reqs:
871                    sr = copy.deepcopy(r)
872                    sr['visibility'] = 'import';
873                    req['service'].append(sr)
874
875            for s in masters.get(self.testbed, []):
876                for r in s.reqs:
877                    sr = copy.deepcopy(r)
878                    sr['visibility'] = 'export';
879                    req['service'].append(sr)
880
881            if attrs:
882                req['fedAttr'] = attrs
883
884            try:
885                self.log.debug("Calling StartSegment at %s " % uri)
886                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
887                        self.trusted_certs)
888                if r.has_key('StartSegmentResponseBody'):
889                    lval = r['StartSegmentResponseBody'].get('allocationLog',
890                            None)
891                    if lval and self.log_collector:
892                        for line in  lval.splitlines(True):
893                            self.log_collector.write(line)
894                    self.make_map(r['StartSegmentResponseBody'])
895                    if 'proof' in r: self.proof = r['proof']
896                    self.response = r
897                else:
898                    raise service_error(service_error.internal, 
899                            "Bad response!?: %s" %r)
900                return True
901            except service_error, e:
902                self.log.error("Start segment failed on %s: %s" % \
903                        (self.testbed, e))
904                return False
905
906
907
908    class terminate_segment:
909        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
910                cert_pwd=None, trusted_certs=None, caller=None):
911            self.log = log
912            self.debug = debug
913            self.cert_file = cert_file
914            self.cert_pwd = cert_pwd
915            self.trusted_certs = None
916            self.caller = caller
917            self.testbed = testbed
918
919        def __call__(self, uri, aid ):
920            req = {
921                    'allocID': {'fedid': aid }, 
922                }
923            try:
924                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
925                        self.trusted_certs)
926                return True
927            except service_error, e:
928                self.log.error("Terminate segment failed on %s: %s" % \
929                        (self.testbed, e))
930                return False
931
932    def allocate_resources(self, allocated, masters, eid, expid, 
933            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
934            attrs=None, connInfo={}, tbmap=None, expcert=None):
935
936        started = { }           # Testbeds where a sub-experiment started
937                                # successfully
938
939        # XXX
940        fail_soft = False
941
942        if tbmap is None: tbmap = { }
943
944        log = alloc_log or self.log
945
946        tp = thread_pool(self.nthreads)
947        threads = [ ]
948        starters = [ ]
949
950        if expcert:
951            cert = expcert
952            pw = None
953        else:
954            cert = self.cert_file
955            pw = self.cert_pwd
956
957        for tb in allocated.keys():
958            # Create and start a thread to start the segment, and save it
959            # to get the return value later
960            tb_attrs = copy.copy(attrs)
961            tp.wait_for_slot()
962            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
963            base, suffix = split_testbed(tb)
964            if suffix:
965                tb_attrs.append({'attribute': 'experiment_name', 
966                    'value': "%s-%s" % (eid, suffix)})
967            else:
968                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
969            if not uri:
970                raise service_error(service_error.internal, 
971                        "Unknown testbed %s !?" % tb)
972
973            aid = tbparams[tb].allocID
974            if not aid:
975                raise service_error(service_error.internal, 
976                        "No alloc id for testbed %s !?" % tb)
977
978            s = self.start_segment(log=log, debug=self.debug,
979                    testbed=tb, cert_file=cert,
980                    cert_pwd=pw, trusted_certs=self.trusted_certs,
981                    caller=self.call_StartSegment,
982                    log_collector=log_collector)
983            starters.append(s)
984            t  = pooled_thread(\
985                    target=s, name=tb,
986                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
987                    pdata=tp, trace_file=self.trace_file)
988            threads.append(t)
989            t.start()
990
991        # Wait until all finish (keep pinging the log, though)
992        mins = 0
993        revoked = False
994        while not tp.wait_for_all_done(60.0):
995            mins += 1
996            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
997                    % mins)
998            if not revoked and \
999                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1000                # a testbed has failed.  Revoke this experiment's
1001                # synchronizarion values so that sub experiments will not
1002                # deadlock waiting for synchronization that will never happen
1003                self.log.info("A subexperiment has failed to swap in, " + \
1004                        "revoking synch keys")
1005                var_key = "fedid:%s" % expid
1006                for k in self.synch_store.all_keys():
1007                    if len(k) > 45 and k[0:46] == var_key:
1008                        self.synch_store.revoke_key(k)
1009                revoked = True
1010
1011        failed = [ t.getName() for t in threads if not t.rv ]
1012        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1013
1014        # If one failed clean up, unless fail_soft is set
1015        if failed:
1016            if not fail_soft:
1017                tp.clear()
1018                for tb in succeeded:
1019                    # Create and start a thread to stop the segment
1020                    tp.wait_for_slot()
1021                    uri = tbparams[tb].uri
1022                    t  = pooled_thread(\
1023                            target=self.terminate_segment(log=log,
1024                                testbed=tb,
1025                                cert_file=cert, 
1026                                cert_pwd=pw,
1027                                trusted_certs=self.trusted_certs,
1028                                caller=self.call_TerminateSegment),
1029                            args=(uri, tbparams[tb].allocID),
1030                            name=tb,
1031                            pdata=tp, trace_file=self.trace_file)
1032                    t.start()
1033                # Wait until all finish (if any are being stopped)
1034                if succeeded:
1035                    tp.wait_for_all_done()
1036
1037                # release the allocations
1038                for tb in tbparams.keys():
1039                    try:
1040                        self.release_access(tb, tbparams[tb].allocID, 
1041                                tbmap=tbmap, uri=tbparams[tb].uri,
1042                                cert_file=cert, cert_pwd=pw)
1043                    except service_error, e:
1044                        self.log.warn("Error releasing access: %s" % e.desc)
1045                # Remove the placeholder
1046                self.state_lock.acquire()
1047                self.state[eid].status = 'failed'
1048                if self.state_filename: self.write_state()
1049                self.state_lock.release()
1050                # Remove the repo dir
1051                self.remove_dirs("%s/%s" %(self.repodir, expid))
1052                # Walk up tmpdir, deleting as we go
1053                if self.cleanup:
1054                    self.remove_dirs(tmpdir)
1055                else:
1056                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1057
1058
1059                log.error("Swap in failed on %s" % ",".join(failed))
1060                return
1061        else:
1062            # Walk through the successes and gather the proofs
1063            proofs = { }
1064            for s in starters:
1065                if s.proof:
1066                    proofs[s.testbed] = s.proof
1067
1068            # Annotate the topology with embedding info
1069            for e in top.elements:
1070                if isinstance(e, topdl.Computer):
1071                    for s in starters:
1072                        ann = s.node.get(e.name, None)
1073                        if ann is not None:
1074                            e.localname.extend(ann[0])
1075                            e.status = ann[1]
1076                            e.service.extend(ann[2])
1077                            e.operation.extend(ann[3])
1078                            break
1079
1080            log.info("[start_segment]: Experiment %s active" % eid)
1081
1082
1083        # Walk up tmpdir, deleting as we go
1084        if self.cleanup:
1085            self.remove_dirs(tmpdir)
1086        else:
1087            log.debug("[start_experiment]: not removing %s" % tmpdir)
1088
1089        # Insert the experiment into our state and update the disk copy.
1090        self.state_lock.acquire()
1091        self.state[expid].status = 'active'
1092        self.state[eid] = self.state[expid]
1093        self.state[eid].top = top
1094        # Append startup proofs
1095        for f in self.state[eid].get_all_allocations():
1096            if f.tb in proofs:
1097                f.proof.append(proofs[f.tb])
1098
1099        if self.state_filename: self.write_state()
1100        self.state_lock.release()
1101        return
1102
1103
1104    def add_kit(self, e, kit):
1105        """
1106        Add a Software object created from the list of (install, location)
1107        tuples passed as kit  to the software attribute of an object e.  We
1108        do this enough to break out the code, but it's kind of a hack to
1109        avoid changing the old tuple rep.
1110        """
1111
1112        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1113
1114        if isinstance(e.software, list): e.software.extend(s)
1115        else: e.software = s
1116
1117    def append_experiment_authorization(self, expid, attrs, 
1118            need_state_lock=True):
1119        """
1120        Append the authorization information to system state
1121        """
1122
1123        for p, a in attrs:
1124            self.auth.set_attribute(p, a)
1125        self.auth.save()
1126
1127        if need_state_lock: self.state_lock.acquire()
1128        # XXX: really a no op?
1129        #self.state[expid]['auth'].update(attrs)
1130        if self.state_filename: self.write_state()
1131        if need_state_lock: self.state_lock.release()
1132
1133    def clear_experiment_authorization(self, expid, need_state_lock=True):
1134        """
1135        Attrs is a set of attribute principal pairs that need to be removed
1136        from the authenticator.  Remove them and save the authenticator.
1137        """
1138
1139        if need_state_lock: self.state_lock.acquire()
1140        # XXX: should be a no-op
1141        #if expid in self.state and 'auth' in self.state[expid]:
1142            #for p, a in self.state[expid]['auth']:
1143                #self.auth.unset_attribute(p, a)
1144            #self.state[expid]['auth'] = set()
1145        if self.state_filename: self.write_state()
1146        if need_state_lock: self.state_lock.release()
1147        self.auth.save()
1148
1149
1150    def create_experiment_state(self, fid, req, expid, expcert,
1151            state='starting'):
1152        """
1153        Create the initial entry in the experiment's state.  The expid and
1154        expcert are the experiment's fedid and certifacte that represents that
1155        ID, which are installed in the experiment state.  If the request
1156        includes a suggested local name that is used if possible.  If the local
1157        name is already taken by an experiment owned by this user that has
1158        failed, it is overwritten.  Otherwise new letters are added until a
1159        valid localname is found.  The generated local name is returned.
1160        """
1161
1162        if req.has_key('experimentID') and \
1163                req['experimentID'].has_key('localname'):
1164            overwrite = False
1165            eid = req['experimentID']['localname']
1166            # If there's an old failed experiment here with the same local name
1167            # and accessible by this user, we'll overwrite it, otherwise we'll
1168            # fall through and do the collision avoidance.
1169            old_expid = self.get_experiment_fedid(eid)
1170            if old_expid:
1171                users_experiment = True
1172                try:
1173                    self.check_experiment_access(fid, old_expid)
1174                except service_error, e:
1175                    if e.code == service_error.access: users_experiment = False
1176                    else: raise e
1177                if users_experiment:
1178                    self.state_lock.acquire()
1179                    status = self.state[eid].status
1180                    if status and status == 'failed':
1181                        # remove the old access attributes
1182                        self.clear_experiment_authorization(eid,
1183                                need_state_lock=False)
1184                        overwrite = True
1185                        del self.state[eid]
1186                        del self.state[old_expid]
1187                    self.state_lock.release()
1188                else:
1189                    self.log.info('Experiment %s exists, ' % eid + \
1190                            'but this user cannot access it')
1191            self.state_lock.acquire()
1192            while (self.state.has_key(eid) and not overwrite):
1193                eid += random.choice(string.ascii_letters)
1194            # Initial state
1195            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1196                    identity=expcert)
1197            self.state[expid] = self.state[eid]
1198            if self.state_filename: self.write_state()
1199            self.state_lock.release()
1200        else:
1201            eid = self.exp_stem
1202            for i in range(0,5):
1203                eid += random.choice(string.ascii_letters)
1204            self.state_lock.acquire()
1205            while (self.state.has_key(eid)):
1206                eid = self.exp_stem
1207                for i in range(0,5):
1208                    eid += random.choice(string.ascii_letters)
1209            # Initial state
1210            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1211                    identity=expcert)
1212            self.state[expid] = self.state[eid]
1213            if self.state_filename: self.write_state()
1214            self.state_lock.release()
1215
1216        # Let users touch the state.  Authorize this fid and the expid itself
1217        # to touch the experiment, as well as allowing th eoverrides.
1218        self.append_experiment_authorization(eid, 
1219                set([(fid, expid), (expid,expid)] + \
1220                        [ (o, expid) for o in self.overrides]))
1221
1222        return eid
1223
1224
1225    def allocate_ips_to_topo(self, top):
1226        """
1227        Add an ip4_address attribute to all the hosts in the topology, based on
1228        the shared substrates on which they sit.  An /etc/hosts file is also
1229        created and returned as a list of hostfiles entries.  We also return
1230        the allocator, because we may need to allocate IPs to portals
1231        (specifically DRAGON portals).
1232        """
1233        subs = sorted(top.substrates, 
1234                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1235                reverse=True)
1236        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1237        ifs = { }
1238        hosts = [ ]
1239
1240        for idx, s in enumerate(subs):
1241            net_size = len(s.interfaces)+2
1242
1243            a = ips.allocate(net_size)
1244            if a :
1245                base, num = a
1246                if num < net_size: 
1247                    raise service_error(service_error.internal,
1248                            "Allocator returned wrong number of IPs??")
1249            else:
1250                raise service_error(service_error.req, 
1251                        "Cannot allocate IP addresses")
1252            mask = ips.min_alloc
1253            while mask < net_size:
1254                mask *= 2
1255
1256            netmask = ((2**32-1) ^ (mask-1))
1257
1258            base += 1
1259            for i in s.interfaces:
1260                i.attribute.append(
1261                        topdl.Attribute('ip4_address', 
1262                            "%s" % ip_addr(base)))
1263                i.attribute.append(
1264                        topdl.Attribute('ip4_netmask', 
1265                            "%s" % ip_addr(int(netmask))))
1266
1267                hname = i.element.name
1268                if ifs.has_key(hname):
1269                    hosts.append("%s\t%s-%s %s-%d" % \
1270                            (ip_addr(base), hname, s.name, hname,
1271                                ifs[hname]))
1272                else:
1273                    ifs[hname] = 0
1274                    hosts.append("%s\t%s-%s %s-%d %s" % \
1275                            (ip_addr(base), hname, s.name, hname,
1276                                ifs[hname], hname))
1277
1278                ifs[hname] += 1
1279                base += 1
1280        return hosts, ips
1281
1282    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1283            tbparam, masters, tbmap, expid=None, expcert=None):
1284        for tb in testbeds:
1285            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1286                    expcert)
1287            allocated[tb] = 1
1288
1289    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1290            expcert=None):
1291        """
1292        Get access to testbed through fedd and set the parameters for that tb
1293        """
1294        def get_export_project(svcs):
1295            """
1296            Look through for the list of federated_service for this testbed
1297            objects for a project_export service, and extract the project
1298            parameter.
1299            """
1300
1301            pe = [s for s in svcs if s.name=='project_export']
1302            if len(pe) == 1:
1303                return pe[0].params.get('project', None)
1304            elif len(pe) == 0:
1305                return None
1306            else:
1307                raise service_error(service_error.req,
1308                        "More than one project export is not supported")
1309
1310        def add_services(svcs, type, slist, keys):
1311            """
1312            Add the given services to slist.  type is import or export.  Also
1313            add a mapping entry from the assigned id to the original service
1314            record.
1315            """
1316            for i, s in enumerate(svcs):
1317                idx = '%s%d' % (type, i)
1318                keys[idx] = s
1319                sr = {'id': idx, 'name': s.name, 'visibility': type }
1320                if s.params:
1321                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1322                            for k, v in s.params.items()]
1323                slist.append(sr)
1324
1325        uri = tbmap.get(testbed_base(tb), None)
1326        if not uri:
1327            raise service_error(service_error.server_config, 
1328                    "Unknown testbed: %s" % tb)
1329
1330        export_svcs = masters.get(tb,[])
1331        import_svcs = [ s for m in masters.values() \
1332                for s in m \
1333                    if tb in s.importers ]
1334
1335        export_project = get_export_project(export_svcs)
1336        # Compose the credential list so that IDs come before attributes
1337        creds = set()
1338        keys = set()
1339        certs = self.auth.get_creds_for_principal(fid)
1340        if expid:
1341            certs.update(self.auth.get_creds_for_principal(expid))
1342        for c in certs:
1343            keys.add(c.issuer_cert())
1344            creds.add(c.attribute_cert())
1345        creds = list(keys) + list(creds)
1346
1347        if expcert: cert, pw = expcert, None
1348        else: cert, pw = self.cert_file, self.cert_pw
1349
1350        # Request credentials
1351        req = {
1352                'abac_credential': creds,
1353            }
1354        # Make the service request from the services we're importing and
1355        # exporting.  Keep track of the export request ids so we can
1356        # collect the resulting info from the access response.
1357        e_keys = { }
1358        if import_svcs or export_svcs:
1359            slist = []
1360            add_services(import_svcs, 'import', slist, e_keys)
1361            add_services(export_svcs, 'export', slist, e_keys)
1362            req['service'] = slist
1363
1364        if self.local_access.has_key(uri):
1365            # Local access call
1366            req = { 'RequestAccessRequestBody' : req }
1367            r = self.local_access[uri].RequestAccess(req, 
1368                    fedid(file=self.cert_file))
1369            r = { 'RequestAccessResponseBody' : r }
1370        else:
1371            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1372
1373        if r.has_key('RequestAccessResponseBody'):
1374            # Through to here we have a valid response, not a fault.
1375            # Access denied is a fault, so something better or worse than
1376            # access denied has happened.
1377            r = r['RequestAccessResponseBody']
1378            self.log.debug("[get_access] Access granted")
1379        else:
1380            raise service_error(service_error.protocol,
1381                        "Bad proxy response")
1382        if 'proof' not in r:
1383            raise service_error(service_error.protocol,
1384                        "Bad access response (no access proof)")
1385       
1386        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1387                tb=tb, uri=uri, proof=[r['proof']])
1388
1389        # Collect the responses corresponding to the services this testbed
1390        # exports.  These will be the service requests that we will include in
1391        # the start segment requests (with appropriate visibility values) to
1392        # import and export the segments.
1393        for s in r.get('service', []):
1394            id = s.get('id', None)
1395            if id and id in e_keys:
1396                e_keys[id].reqs.append(s)
1397
1398        # Add attributes to parameter space.  We don't allow attributes to
1399        # overlay any parameters already installed.
1400        for a in r.get('fedAttr', []):
1401            try:
1402                if a['attribute']:
1403                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1404            except KeyError:
1405                self.log.error("Bad attribute in response: %s" % a)
1406
1407
1408    def split_topology(self, top, topo, testbeds):
1409        """
1410        Create the sub-topologies that are needed for experiment instantiation.
1411        """
1412        for tb in testbeds:
1413            topo[tb] = top.clone()
1414            # copy in for loop allows deletions from the original
1415            for e in [ e for e in topo[tb].elements]:
1416                etb = e.get_attribute('testbed')
1417                # NB: elements without a testbed attribute won't appear in any
1418                # sub topologies. 
1419                if not etb or etb != tb:
1420                    for i in e.interface:
1421                        for s in i.subs:
1422                            try:
1423                                s.interfaces.remove(i)
1424                            except ValueError:
1425                                raise service_error(service_error.internal,
1426                                        "Can't remove interface??")
1427                    topo[tb].elements.remove(e)
1428            topo[tb].make_indices()
1429
1430    def confirm_software(self, top):
1431        """
1432        Make sure that the software to be loaded in the topo is all available
1433        before we begin making access requests, etc.  This is a subset of
1434        wrangle_software.
1435        """
1436        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1437        pkgs.update([x.location for e in top.elements for x in e.software])
1438
1439        for pkg in pkgs:
1440            loc = pkg
1441
1442            scheme, host, path = urlparse(loc)[0:3]
1443            dest = os.path.basename(path)
1444            if not scheme:
1445                if not loc.startswith('/'):
1446                    loc = "/%s" % loc
1447                loc = "file://%s" %loc
1448            # NB: if scheme was found, loc == pkg
1449            try:
1450                u = urlopen(loc)
1451                u.close()
1452            except Exception, e:
1453                raise service_error(service_error.req, 
1454                        "Cannot open %s: %s" % (loc, e))
1455        return True
1456
1457    def wrangle_software(self, expid, top, topo, tbparams):
1458        """
1459        Copy software out to the repository directory, allocate permissions and
1460        rewrite the segment topologies to look for the software in local
1461        places.
1462        """
1463
1464        # Copy the rpms and tarfiles to a distribution directory from
1465        # which the federants can retrieve them
1466        linkpath = "%s/software" %  expid
1467        softdir ="%s/%s" % ( self.repodir, linkpath)
1468        softmap = { }
1469
1470        # self.fedkit and self.gateway kit are lists of tuples of
1471        # (install_location, download_location) this extracts the download
1472        # locations.
1473        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1474        pkgs.update([x.location for e in top.elements for x in e.software])
1475        try:
1476            os.makedirs(softdir)
1477        except EnvironmentError, e:
1478            raise service_error(
1479                    "Cannot create software directory: %s" % e)
1480        # The actual copying.  Everything's converted into a url for copying.
1481        auth_attrs = set()
1482        for pkg in pkgs:
1483            loc = pkg
1484
1485            scheme, host, path = urlparse(loc)[0:3]
1486            dest = os.path.basename(path)
1487            if not scheme:
1488                if not loc.startswith('/'):
1489                    loc = "/%s" % loc
1490                loc = "file://%s" %loc
1491            # NB: if scheme was found, loc == pkg
1492            try:
1493                u = urlopen(loc)
1494            except Exception, e:
1495                raise service_error(service_error.req, 
1496                        "Cannot open %s: %s" % (loc, e))
1497            try:
1498                f = open("%s/%s" % (softdir, dest) , "w")
1499                self.log.debug("Writing %s/%s" % (softdir,dest) )
1500                data = u.read(4096)
1501                while data:
1502                    f.write(data)
1503                    data = u.read(4096)
1504                f.close()
1505                u.close()
1506            except Exception, e:
1507                raise service_error(service_error.internal,
1508                        "Could not copy %s: %s" % (loc, e))
1509            path = re.sub("/tmp", "", linkpath)
1510            # XXX
1511            softmap[pkg] = \
1512                    "%s/%s/%s" %\
1513                    ( self.repo_url, path, dest)
1514
1515            # Allow the individual segments to access the software by assigning
1516            # an attribute to each testbed allocation that encodes the data to
1517            # be released.  This expression collects the data for each run of
1518            # the loop.
1519            auth_attrs.update([
1520                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1521                        for tb in tbparams.keys()])
1522
1523        self.append_experiment_authorization(expid, auth_attrs)
1524
1525        # Convert the software locations in the segments into the local
1526        # copies on this host
1527        for soft in [ s for tb in topo.values() \
1528                for e in tb.elements \
1529                    if getattr(e, 'software', False) \
1530                        for s in e.software ]:
1531            if softmap.has_key(soft.location):
1532                soft.location = softmap[soft.location]
1533
1534
1535    def new_experiment(self, req, fid):
1536        """
1537        The external interface to empty initial experiment creation called from
1538        the dispatcher.
1539
1540        Creates a working directory, splits the incoming description using the
1541        splitter script and parses out the avrious subsections using the
1542        lcasses above.  Once each sub-experiment is created, use pooled threads
1543        to instantiate them and start it all up.
1544        """
1545        req = req.get('NewRequestBody', None)
1546        if not req:
1547            raise service_error(service_error.req,
1548                    "Bad request format (no NewRequestBody)")
1549
1550        if self.auth.import_credentials(data_list=req.get('credential', [])):
1551            self.auth.save()
1552
1553        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1554                with_proof=True)
1555
1556        if not access_ok:
1557            raise service_error(service_error.access, "New access denied",
1558                    proof=[proof])
1559
1560        try:
1561            tmpdir = tempfile.mkdtemp(prefix="split-")
1562        except EnvironmentError:
1563            raise service_error(service_error.internal, "Cannot create tmp dir")
1564
1565        try:
1566            access_user = self.accessdb[fid]
1567        except KeyError:
1568            raise service_error(service_error.internal,
1569                    "Access map and authorizer out of sync in " + \
1570                            "new_experiment for fedid %s"  % fid)
1571
1572        # Generate an ID for the experiment (slice) and a certificate that the
1573        # allocator can use to prove they own it.  We'll ship it back through
1574        # the encrypted connection.  If the requester supplied one, use it.
1575        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1576            expcert = req['experimentAccess']['X509']
1577            expid = fedid(certstr=expcert)
1578            self.state_lock.acquire()
1579            if expid in self.state:
1580                self.state_lock.release()
1581                raise service_error(service_error.req, 
1582                        'fedid %s identifies an existing experiment' % expid)
1583            self.state_lock.release()
1584        else:
1585            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1586
1587        #now we're done with the tmpdir, and it should be empty
1588        if self.cleanup:
1589            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1590            os.rmdir(tmpdir)
1591        else:
1592            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1593
1594        eid = self.create_experiment_state(fid, req, expid, expcert, 
1595                state='empty')
1596
1597        rv = {
1598                'experimentID': [
1599                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1600                ],
1601                'experimentStatus': 'empty',
1602                'experimentAccess': { 'X509' : expcert },
1603                'proof': proof.to_dict(),
1604            }
1605
1606        return rv
1607
1608    # create_experiment sub-functions
1609
1610    @staticmethod
1611    def get_experiment_key(req, field='experimentID'):
1612        """
1613        Parse the experiment identifiers out of the request (the request body
1614        tag has been removed).  Specifically this pulls either the fedid or the
1615        localname out of the experimentID field.  A fedid is preferred.  If
1616        neither is present or the request does not contain the fields,
1617        service_errors are raised.
1618        """
1619        # Get the experiment access
1620        exp = req.get(field, None)
1621        if exp:
1622            if exp.has_key('fedid'):
1623                key = exp['fedid']
1624            elif exp.has_key('localname'):
1625                key = exp['localname']
1626            else:
1627                raise service_error(service_error.req, "Unknown lookup type")
1628        else:
1629            raise service_error(service_error.req, "No request?")
1630
1631        return key
1632
1633    def get_experiment_ids_and_start(self, key, tmpdir):
1634        """
1635        Get the experiment name, id and access certificate from the state, and
1636        set the experiment state to 'starting'.  returns a triple (fedid,
1637        localname, access_cert_file). The access_cert_file is a copy of the
1638        contents of the access certificate, created in the tempdir with
1639        restricted permissions.  If things are confused, raise an exception.
1640        """
1641
1642        expid = eid = None
1643        self.state_lock.acquire()
1644        if key in self.state:
1645            exp = self.state[key]
1646            exp.status = "starting"
1647            expid = exp.fedid
1648            eid = exp.localname
1649            expcert = exp.identity
1650        self.state_lock.release()
1651
1652        # make a protected copy of the access certificate so the experiment
1653        # controller can act as the experiment principal.
1654        if expcert:
1655            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1656            if not expcert_file:
1657                raise service_error(service_error.internal, 
1658                        "Cannot create temp cert file?")
1659        else:
1660            expcert_file = None
1661
1662        return (eid, expid, expcert_file)
1663
1664    def get_topology(self, req, tmpdir):
1665        """
1666        Get the ns2 content and put it into a file for parsing.  Call the local
1667        or remote parser and return the topdl.Topology.  Errors result in
1668        exceptions.  req is the request and tmpdir is a work directory.
1669        """
1670
1671        # The tcl parser needs to read a file so put the content into that file
1672        descr=req.get('experimentdescription', None)
1673        if descr:
1674            if 'ns2description' in descr:
1675                file_content=descr['ns2description']
1676            elif 'topdldescription' in descr:
1677                return topdl.Topology(**descr['topdldescription'])
1678            else:
1679                raise service_error(service_error.req, 
1680                        'Unknown experiment description type')
1681        else:
1682            raise service_error(service_error.req, "No experiment description")
1683
1684
1685        if self.splitter_url:
1686            self.log.debug("Calling remote topdl translator at %s" % \
1687                    self.splitter_url)
1688            top = self.remote_ns2topdl(self.splitter_url, file_content)
1689        else:
1690            tclfile = os.path.join(tmpdir, "experiment.tcl")
1691            if file_content:
1692                try:
1693                    f = open(tclfile, 'w')
1694                    f.write(file_content)
1695                    f.close()
1696                except EnvironmentError:
1697                    raise service_error(service_error.internal,
1698                            "Cannot write temp experiment description")
1699            else:
1700                raise service_error(service_error.req, 
1701                        "Only ns2descriptions supported")
1702            pid = "dummy"
1703            gid = "dummy"
1704            eid = "dummy"
1705
1706            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1707                str(self.muxmax), '-m', 'dummy']
1708
1709            tclcmd.extend([pid, gid, eid, tclfile])
1710
1711            self.log.debug("running local splitter %s", " ".join(tclcmd))
1712            # This is just fantastic.  As a side effect the parser copies
1713            # tb_compat.tcl into the current directory, so that directory
1714            # must be writable by the fedd user.  Doing this in the
1715            # temporary subdir ensures this is the case.
1716            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1717                    cwd=tmpdir)
1718            split_data = tclparser.stdout
1719
1720            top = topdl.topology_from_xml(file=split_data, top="experiment")
1721            os.remove(tclfile)
1722
1723        return top
1724
1725    def get_testbed_services(self, req, testbeds):
1726        """
1727        Parse the services section of the request into into two dicts mapping
1728        testbed to lists of federated_service objects.  The first lists all
1729        exporters of services, and the second all exporters of services that
1730        need control portals in the experiment.
1731        """
1732        masters = { }
1733        pmasters = { }
1734        for s in req.get('service', []):
1735            # If this is a service request with the importall field
1736            # set, fill it out.
1737
1738            if s.get('importall', False):
1739                s['import'] = [ tb for tb in testbeds \
1740                        if tb not in s.get('export',[])]
1741                del s['importall']
1742
1743            # Add the service to masters
1744            for tb in s.get('export', []):
1745                if s.get('name', None):
1746
1747                    params = { }
1748                    for a in s.get('fedAttr', []):
1749                        params[a.get('attribute', '')] = a.get('value','')
1750
1751                    fser = federated_service(name=s['name'],
1752                            exporter=tb, importers=s.get('import',[]),
1753                            params=params)
1754                    if fser.name == 'hide_hosts' \
1755                            and 'hosts' not in fser.params:
1756                        fser.params['hosts'] = \
1757                                ",".join(tb_hosts.get(fser.exporter, []))
1758                    if tb in masters: masters[tb].append(fser)
1759                    else: masters[tb] = [fser]
1760
1761                    if fser.portal:
1762                        if tb not in pmasters: pmasters[tb] = [ fser ]
1763                        else: pmasters[tb].append(fser)
1764                else:
1765                    self.log.error('Testbed service does not have name " + \
1766                            "and importers')
1767        return masters, pmasters
1768
1769    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1770        """
1771        Create the ssh keys necessary for interconnecting the portal nodes and
1772        the global hosts file for letting each segment know about the IP
1773        addresses in play.  Save these into the repo.  Add attributes to the
1774        autorizer allowing access controllers to download them and return a set
1775        of attributes that inform the segments where to find this stuff.  May
1776        raise service_errors in if there are problems.
1777        """
1778        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1779        gw_secretkey_base = "fed.%s" % self.ssh_type
1780        keydir = os.path.join(tmpdir, 'keys')
1781        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1782        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1783
1784        try:
1785            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1786        except ValueError:
1787            raise service_error(service_error.server_config, 
1788                    "Bad key type (%s)" % self.ssh_type)
1789
1790        self.generate_seer_certs(keydir)
1791
1792        # Copy configuration files into the remote file store
1793        # The config urlpath
1794        configpath = "/%s/config" % expid
1795        # The config file system location
1796        configdir ="%s%s" % ( self.repodir, configpath)
1797        try:
1798            os.makedirs(configdir)
1799        except EnvironmentError, e:
1800            raise service_error(service_error.internal,
1801                    "Cannot create config directory: %s" % e)
1802        try:
1803            f = open("%s/hosts" % configdir, "w")
1804            print >> f, string.join(hosts, '\n')
1805            f.close()
1806        except EnvironmentError, e:
1807            raise service_error(service_error.internal, 
1808                    "Cannot write hosts file: %s" % e)
1809        try:
1810            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1811            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1812            copy_file(os.path.join(keydir, 'ca.pem'), 
1813                    os.path.join(configdir, 'ca.pem'))
1814            copy_file(os.path.join(keydir, 'node.pem'), 
1815                    os.path.join(configdir, 'node.pem'))
1816        except EnvironmentError, e:
1817            raise service_error(service_error.internal, 
1818                    "Cannot copy keyfiles: %s" % e)
1819
1820        # Allow the individual testbeds to access the configuration files,
1821        # again by setting an attribute for the relevant pathnames on each
1822        # allocation principal.  Yeah, that's a long list comprehension.
1823        self.append_experiment_authorization(expid, set([
1824            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1825                    for tb in tbparams.keys() \
1826                        for f in ("hosts", 'ca.pem', 'node.pem', 
1827                            gw_secretkey_base, gw_pubkey_base)]))
1828
1829        attrs = [ 
1830                {
1831                    'attribute': 'ssh_pubkey', 
1832                    'value': '%s/%s/config/%s' % \
1833                            (self.repo_url, expid, gw_pubkey_base)
1834                },
1835                {
1836                    'attribute': 'ssh_secretkey', 
1837                    'value': '%s/%s/config/%s' % \
1838                            (self.repo_url, expid, gw_secretkey_base)
1839                },
1840                {
1841                    'attribute': 'hosts', 
1842                    'value': '%s/%s/config/hosts' % \
1843                            (self.repo_url, expid)
1844                },
1845                {
1846                    'attribute': 'seer_ca_pem', 
1847                    'value': '%s/%s/config/%s' % \
1848                            (self.repo_url, expid, 'ca.pem')
1849                },
1850                {
1851                    'attribute': 'seer_node_pem', 
1852                    'value': '%s/%s/config/%s' % \
1853                            (self.repo_url, expid, 'node.pem')
1854                },
1855            ]
1856        return attrs
1857
1858
1859    def get_vtopo(self, req, fid):
1860        """
1861        Return the stored virtual topology for this experiment
1862        """
1863        rv = None
1864        state = None
1865
1866        req = req.get('VtopoRequestBody', None)
1867        if not req:
1868            raise service_error(service_error.req,
1869                    "Bad request format (no VtopoRequestBody)")
1870        exp = req.get('experiment', None)
1871        if exp:
1872            if exp.has_key('fedid'):
1873                key = exp['fedid']
1874                keytype = "fedid"
1875            elif exp.has_key('localname'):
1876                key = exp['localname']
1877                keytype = "localname"
1878            else:
1879                raise service_error(service_error.req, "Unknown lookup type")
1880        else:
1881            raise service_error(service_error.req, "No request?")
1882
1883        proof = self.check_experiment_access(fid, key)
1884
1885        self.state_lock.acquire()
1886        # XXX: this needs to be recalculated
1887        if key in self.state:
1888            if self.state[key].top is not None:
1889                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1890                rv = { 'experiment' : {keytype: key },
1891                        'vtopo': vtopo,
1892                        'proof': proof.to_dict(), 
1893                    }
1894            else:
1895                state = self.state[key].status
1896        self.state_lock.release()
1897
1898        if rv: return rv
1899        else: 
1900            if state:
1901                raise service_error(service_error.partial, 
1902                        "Not ready: %s" % state)
1903            else:
1904                raise service_error(service_error.req, "No such experiment")
1905
1906    def get_vis(self, req, fid):
1907        """
1908        Return the stored visualization for this experiment
1909        """
1910        rv = None
1911        state = None
1912
1913        req = req.get('VisRequestBody', None)
1914        if not req:
1915            raise service_error(service_error.req,
1916                    "Bad request format (no VisRequestBody)")
1917        exp = req.get('experiment', None)
1918        if exp:
1919            if exp.has_key('fedid'):
1920                key = exp['fedid']
1921                keytype = "fedid"
1922            elif exp.has_key('localname'):
1923                key = exp['localname']
1924                keytype = "localname"
1925            else:
1926                raise service_error(service_error.req, "Unknown lookup type")
1927        else:
1928            raise service_error(service_error.req, "No request?")
1929
1930        proof = self.check_experiment_access(fid, key)
1931
1932        self.state_lock.acquire()
1933        # Generate the visualization
1934        if key in self.state:
1935            if self.state[key].top is not None:
1936                try:
1937                    vis = self.genviz(
1938                            topdl.topology_to_vtopo(self.state[key].topo))
1939                except service_error, e:
1940                    self.state_lock.release()
1941                    raise e
1942                rv =  { 'experiment' : {keytype: key },
1943                        'vis': vis,
1944                        'proof': proof.to_dict(), 
1945                        }
1946            else:
1947                state = self.state[key].status
1948        self.state_lock.release()
1949
1950        if rv: return rv
1951        else:
1952            if state:
1953                raise service_error(service_error.partial, 
1954                        "Not ready: %s" % state)
1955            else:
1956                raise service_error(service_error.req, "No such experiment")
1957
1958   
1959    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1960            top):
1961        """
1962        Store the various data that have changed in the experiment state
1963        between when it was started and the beginning of resource allocation.
1964        This is basically the information about each local allocation.  This
1965        fills in the values of the placeholder allocation in the state.  It
1966        also collects the access proofs and returns them as dicts for a
1967        response message.
1968        """
1969        self.state_lock.acquire()
1970        # XXX:
1971        #self.state[eid]['vtopo'] = vtopo
1972        #self.state[eid]['vis'] = vis
1973        exp = self.state[eid]
1974        exp.topology = top
1975        # save federant information
1976        for k in allocated.keys():
1977            exp.add_allocation(tbparams[k])
1978
1979        # Access proofs for the response message
1980        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
1981                    for p in tbparams[k].proof]
1982        if self.state_filename: 
1983            self.write_state()
1984        self.state_lock.release()
1985        return proofs
1986
1987    def clear_placeholder(self, eid, expid, tmpdir):
1988        """
1989        Clear the placeholder and remove any allocated temporary dir.
1990        """
1991
1992        self.state_lock.acquire()
1993        del self.state[eid]
1994        del self.state[expid]
1995        if self.state_filename: self.write_state()
1996        self.state_lock.release()
1997        if tmpdir and self.cleanup:
1998            self.remove_dirs(tmpdir)
1999
2000    # end of create_experiment sub-functions
2001
2002    def create_experiment(self, req, fid):
2003        """
2004        The external interface to experiment creation called from the
2005        dispatcher.
2006
2007        Creates a working directory, splits the incoming description using the
2008        splitter script and parses out the various subsections using the
2009        classes above.  Once each sub-experiment is created, use pooled threads
2010        to instantiate them and start it all up.
2011        """
2012
2013        req = req.get('CreateRequestBody', None)
2014        if req:
2015            key = self.get_experiment_key(req)
2016        else:
2017            raise service_error(service_error.req,
2018                    "Bad request format (no CreateRequestBody)")
2019
2020        # Import information from the requester
2021        if self.auth.import_credentials(data_list=req.get('credential', [])):
2022            self.auth.save()
2023
2024        # Make sure that the caller can talk to us
2025        proof = self.check_experiment_access(fid, key)
2026
2027        # Install the testbed map entries supplied with the request into a copy
2028        # of the testbed map.
2029        tbmap = dict(self.tbmap)
2030        for m in req.get('testbedmap', []):
2031            if 'testbed' in m and 'uri' in m:
2032                tbmap[m['testbed']] = m['uri']
2033
2034        # a place to work
2035        try:
2036            tmpdir = tempfile.mkdtemp(prefix="split-")
2037            os.mkdir(tmpdir+"/keys")
2038        except EnvironmentError:
2039            raise service_error(service_error.internal, "Cannot create tmp dir")
2040
2041        tbparams = { }
2042
2043        eid, expid, expcert_file = \
2044                self.get_experiment_ids_and_start(key, tmpdir)
2045
2046        # This catches exceptions to clear the placeholder if necessary
2047        try: 
2048            if not (eid and expid):
2049                raise service_error(service_error.internal, 
2050                        "Cannot find local experiment info!?")
2051
2052            top = self.get_topology(req, tmpdir)
2053            self.confirm_software(top)
2054            # Assign the IPs
2055            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2056            # Find the testbeds to look up
2057            tb_hosts = { }
2058            testbeds = [ ]
2059            for e in top.elements:
2060                if isinstance(e, topdl.Computer):
2061                    tb = e.get_attribute('testbed') or 'default'
2062                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2063                    else: 
2064                        tb_hosts[tb] = [ e.name ]
2065                        testbeds.append(tb)
2066
2067            masters, pmasters = self.get_testbed_services(req, testbeds)
2068            allocated = { }         # Testbeds we can access
2069            topo ={ }               # Sub topologies
2070            connInfo = { }          # Connection information
2071
2072            self.split_topology(top, topo, testbeds)
2073
2074            self.get_access_to_testbeds(testbeds, fid, allocated, 
2075                    tbparams, masters, tbmap, expid, expcert_file)
2076
2077            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2078
2079            part = experiment_partition(self.auth, self.store_url, tbmap,
2080                    self.muxmax, self.direct_transit)
2081            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2082                    connInfo, expid)
2083
2084            auth_attrs = set()
2085            # Now get access to the dynamic testbeds (those added above)
2086            for tb in [ t for t in topo if t not in allocated]:
2087                self.get_access(tb, tbparams, fid, masters, tbmap, 
2088                        expid, expcert_file)
2089                allocated[tb] = 1
2090                store_keys = topo[tb].get_attribute('store_keys')
2091                # Give the testbed access to keys it exports or imports
2092                if store_keys:
2093                    auth_attrs.update(set([
2094                        (tbparams[tb].allocID, sk) \
2095                                for sk in store_keys.split(" ")]))
2096
2097            if auth_attrs:
2098                self.append_experiment_authorization(expid, auth_attrs)
2099
2100            # transit and disconnected testbeds may not have a connInfo entry.
2101            # Fill in the blanks.
2102            for t in allocated.keys():
2103                if not connInfo.has_key(t):
2104                    connInfo[t] = { }
2105
2106            self.wrangle_software(expid, top, topo, tbparams)
2107
2108            vtopo = topdl.topology_to_vtopo(top)
2109            vis = self.genviz(vtopo)
2110            proofs = self.save_federant_information(allocated, tbparams, 
2111                    eid, vtopo, vis, top)
2112        except service_error, e:
2113            # If something goes wrong in the parse (usually an access error)
2114            # clear the placeholder state.  From here on out the code delays
2115            # exceptions.  Failing at this point returns a fault to the remote
2116            # caller.
2117            self.clear_placeholder(eid, expid, tmpdir)
2118            raise e
2119
2120        # Start the background swapper and return the starting state.  From
2121        # here on out, the state will stick around a while.
2122
2123        # Create a logger that logs to the experiment's state object as well as
2124        # to the main log file.
2125        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2126        alloc_collector = self.list_log(self.state[eid].log)
2127        h = logging.StreamHandler(alloc_collector)
2128        # XXX: there should be a global one of these rather than repeating the
2129        # code.
2130        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2131                    '%d %b %y %H:%M:%S'))
2132        alloc_log.addHandler(h)
2133
2134        # Start a thread to do the resource allocation
2135        t  = Thread(target=self.allocate_resources,
2136                args=(allocated, masters, eid, expid, tbparams, 
2137                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2138                    connInfo, tbmap, expcert_file),
2139                name=eid)
2140        t.start()
2141
2142        rv = {
2143                'experimentID': [
2144                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2145                ],
2146                'experimentStatus': 'starting',
2147                'proof': [ proof.to_dict() ] + proofs,
2148            }
2149
2150        return rv
2151   
2152    def get_experiment_fedid(self, key):
2153        """
2154        find the fedid associated with the localname key in the state database.
2155        """
2156
2157        rv = None
2158        self.state_lock.acquire()
2159        if key in self.state:
2160            rv = self.state[key].fedid
2161        self.state_lock.release()
2162        return rv
2163
2164    def check_experiment_access(self, fid, key):
2165        """
2166        Confirm that the fid has access to the experiment.  Though a request
2167        may be made in terms of a local name, the access attribute is always
2168        the experiment's fedid.
2169        """
2170        if not isinstance(key, fedid):
2171            key = self.get_experiment_fedid(key)
2172
2173        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2174
2175        if access_ok:
2176            return proof
2177        else:
2178            raise service_error(service_error.access, "Access Denied",
2179                proof)
2180
2181
2182    def get_handler(self, path, fid):
2183        """
2184        Perhaps surprisingly named, this function handles HTTP GET requests to
2185        this server (SOAP requests are POSTs).
2186        """
2187        self.log.info("Get handler %s %s" % (path, fid))
2188        # XXX: log proofs?
2189        if self.auth.check_attribute(fid, path):
2190            return ("%s/%s" % (self.repodir, path), "application/binary")
2191        else:
2192            return (None, None)
2193
2194   
2195    def get_info(self, req, fid):
2196        """
2197        Return all the stored info about this experiment
2198        """
2199        rv = None
2200
2201        req = req.get('InfoRequestBody', None)
2202        if not req:
2203            raise service_error(service_error.req,
2204                    "Bad request format (no InfoRequestBody)")
2205        exp = req.get('experiment', None)
2206        legacy = req.get('legacy', False)
2207        if exp:
2208            if exp.has_key('fedid'):
2209                key = exp['fedid']
2210                keytype = "fedid"
2211            elif exp.has_key('localname'):
2212                key = exp['localname']
2213                keytype = "localname"
2214            else:
2215                raise service_error(service_error.req, "Unknown lookup type")
2216        else:
2217            raise service_error(service_error.req, "No request?")
2218
2219        proof = self.check_experiment_access(fid, key)
2220
2221        self.state_lock.acquire()
2222        if self.state.has_key(key):
2223            rv = self.state[key].get_info()
2224            # Copy the topo if we need to generate legacy representations
2225            if legacy:
2226                top = self.state[key].top
2227                if top is not None: top = top.clone()
2228        self.state_lock.release()
2229
2230        # If the legacy visualization and topology representations are
2231        # requested, calculate them and add them to teh return.
2232        if legacy and rv is not None:
2233            if top is not None:
2234                vtopo = topdl.topology_to_vtopo(top)
2235                if vtopo is not None:
2236                    rv['vtopo'] = vtopo
2237                    try:
2238                        vis = self.genviz(vtopo)
2239                    except service_error, e:
2240                        self.log.debug('Problem generating visualization: %s' \
2241                                % e)
2242                        vis = None
2243                    if vis is not None:
2244                        rv['vis'] = vis
2245        if rv:
2246            rv['proof'] = proof.to_dict()
2247            return rv
2248        else:
2249            raise service_error(service_error.req, "No such experiment")
2250
2251    def get_multi_info(self, req, fid):
2252        """
2253        Return all the stored info that this fedid can access
2254        """
2255        rv = { 'info': [ ], 'proof': [ ] }
2256
2257        self.state_lock.acquire()
2258        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2259            try:
2260                proof = self.check_experiment_access(fid, key)
2261            except service_error, e:
2262                if e.code == service_error.access:
2263                    continue
2264                else:
2265                    self.state_lock.release()
2266                    raise e
2267
2268            if self.state.has_key(key):
2269                e = self.state[key].get_info()
2270                e['proof'] = proof.to_dict()
2271                rv['info'].append(e)
2272                rv['proof'].append(proof.to_dict())
2273        self.state_lock.release()
2274        return rv
2275
2276    def check_termination_status(self, fed_exp, force):
2277        """
2278        Confirm that the experiment is sin a valid state to stop (or force it)
2279        return the state - invalid states for deletion and force settings cause
2280        exceptions.
2281        """
2282        self.state_lock.acquire()
2283        status = fed_exp.status
2284
2285        if status:
2286            if status in ('starting', 'terminating'):
2287                if not force:
2288                    self.state_lock.release()
2289                    raise service_error(service_error.partial, 
2290                            'Experiment still being created or destroyed')
2291                else:
2292                    self.log.warning('Experiment in %s state ' % status + \
2293                            'being terminated by force.')
2294            self.state_lock.release()
2295            return status
2296        else:
2297            # No status??? trouble
2298            self.state_lock.release()
2299            raise service_error(service_error.internal,
2300                    "Experiment has no status!?")
2301
2302
2303    def get_termination_info(self, fed_exp):
2304        ids = []
2305        term_params = { }
2306        self.state_lock.acquire()
2307        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2308        expcert = fed_exp.identity
2309        repo = "%s" % fed_exp.fedid
2310
2311        # Collect the allocation/segment ids into a dict keyed by the fedid
2312        # of the allocation that contains a tuple of uri, aid
2313        for i, fed in enumerate(fed_exp.get_all_allocations()):
2314            uri = fed.uri
2315            aid = fed.allocID
2316            term_params[aid] = (uri, aid)
2317        # Change the experiment state
2318        fed_exp.status = 'terminating'
2319        if self.state_filename: self.write_state()
2320        self.state_lock.release()
2321
2322        return ids, term_params, expcert, repo
2323
2324
2325    def deallocate_resources(self, term_params, expcert, status, force, 
2326            dealloc_log):
2327        tmpdir = None
2328        # This try block makes sure the tempdir is cleared
2329        try:
2330            # If no expcert, try the deallocation as the experiment
2331            # controller instance.
2332            if expcert and self.auth_type != 'legacy': 
2333                try:
2334                    tmpdir = tempfile.mkdtemp(prefix="term-")
2335                except EnvironmentError:
2336                    raise service_error(service_error.internal, 
2337                            "Cannot create tmp dir")
2338                cert_file = self.make_temp_certfile(expcert, tmpdir)
2339                pw = None
2340            else: 
2341                cert_file = self.cert_file
2342                pw = self.cert_pwd
2343
2344            # Stop everyone.  NB, wait_for_all waits until a thread starts
2345            # and then completes, so we can't wait if nothing starts.  So,
2346            # no tbparams, no start.
2347            if len(term_params) > 0:
2348                tp = thread_pool(self.nthreads)
2349                for k, (uri, aid) in term_params.items():
2350                    # Create and start a thread to stop the segment
2351                    tp.wait_for_slot()
2352                    t  = pooled_thread(\
2353                            target=self.terminate_segment(log=dealloc_log,
2354                                testbed=uri,
2355                                cert_file=cert_file, 
2356                                cert_pwd=pw,
2357                                trusted_certs=self.trusted_certs,
2358                                caller=self.call_TerminateSegment),
2359                            args=(uri, aid), name=k,
2360                            pdata=tp, trace_file=self.trace_file)
2361                    t.start()
2362                # Wait for completions
2363                tp.wait_for_all_done()
2364
2365            # release the allocations (failed experiments have done this
2366            # already, and starting experiments may be in odd states, so we
2367            # ignore errors releasing those allocations
2368            try: 
2369                for k, (uri, aid)  in term_params.items():
2370                    self.release_access(None, aid, uri=uri,
2371                            cert_file=cert_file, cert_pwd=pw)
2372            except service_error, e:
2373                if status != 'failed' and not force:
2374                    raise e
2375
2376        # Clean up the tmpdir no matter what
2377        finally:
2378            if tmpdir: self.remove_dirs(tmpdir)
2379
2380    def terminate_experiment(self, req, fid):
2381        """
2382        Swap this experiment out on the federants and delete the shared
2383        information
2384        """
2385        tbparams = { }
2386        req = req.get('TerminateRequestBody', None)
2387        if not req:
2388            raise service_error(service_error.req,
2389                    "Bad request format (no TerminateRequestBody)")
2390
2391        key = self.get_experiment_key(req, 'experiment')
2392        proof = self.check_experiment_access(fid, key)
2393        exp = req.get('experiment', False)
2394        force = req.get('force', False)
2395
2396        dealloc_list = [ ]
2397
2398
2399        # Create a logger that logs to the dealloc_list as well as to the main
2400        # log file.
2401        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2402        h = logging.StreamHandler(self.list_log(dealloc_list))
2403        # XXX: there should be a global one of these rather than repeating the
2404        # code.
2405        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2406                    '%d %b %y %H:%M:%S'))
2407        dealloc_log.addHandler(h)
2408
2409        self.state_lock.acquire()
2410        fed_exp = self.state.get(key, None)
2411        self.state_lock.release()
2412        repo = None
2413
2414        if fed_exp:
2415            status = self.check_termination_status(fed_exp, force)
2416            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2417            self.deallocate_resources(term_params, expcert, status, force, 
2418                    dealloc_log)
2419
2420            # Remove the terminated experiment
2421            self.state_lock.acquire()
2422            for id in ids:
2423                self.clear_experiment_authorization(id, need_state_lock=False)
2424                if id in self.state: del self.state[id]
2425
2426            if self.state_filename: self.write_state()
2427            self.state_lock.release()
2428
2429            # Delete any synch points associated with this experiment.  All
2430            # synch points begin with the fedid of the experiment.
2431            fedid_keys = set(["fedid:%s" % f for f in ids \
2432                    if isinstance(f, fedid)])
2433            for k in self.synch_store.all_keys():
2434                try:
2435                    if len(k) > 45 and k[0:46] in fedid_keys:
2436                        self.synch_store.del_value(k)
2437                except synch_store.BadDeletionError:
2438                    pass
2439            self.write_store()
2440
2441            # Remove software and other cached stuff from the filesystem.
2442            if repo:
2443                self.remove_dirs("%s/%s" % (self.repodir, repo))
2444       
2445            return { 
2446                    'experiment': exp , 
2447                    'deallocationLog': string.join(dealloc_list, ''),
2448                    'proof': [proof.to_dict()],
2449                    }
2450        else:
2451            raise service_error(service_error.req, "No saved state")
2452
2453
2454    def GetValue(self, req, fid):
2455        """
2456        Get a value from the synchronized store
2457        """
2458        req = req.get('GetValueRequestBody', None)
2459        if not req:
2460            raise service_error(service_error.req,
2461                    "Bad request format (no GetValueRequestBody)")
2462       
2463        name = req.get('name', None)
2464        wait = req.get('wait', False)
2465        rv = { 'name': name }
2466
2467        if not name:
2468            raise service_error(service_error.req, "No name?")
2469
2470        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2471
2472        if access_ok:
2473            self.log.debug("[GetValue] asking for %s " % name)
2474            try:
2475                v = self.synch_store.get_value(name, wait)
2476            except synch_store.RevokedKeyError:
2477                # No more synch on this key
2478                raise service_error(service_error.federant, 
2479                        "Synch key %s revoked" % name)
2480            if v is not None:
2481                rv['value'] = v
2482            rv['proof'] = proof.to_dict()
2483            self.log.debug("[GetValue] got %s from %s" % (v, name))
2484            return rv
2485        else:
2486            raise service_error(service_error.access, "Access Denied",
2487                    proof=proof)
2488       
2489
2490    def SetValue(self, req, fid):
2491        """
2492        Set a value in the synchronized store
2493        """
2494        req = req.get('SetValueRequestBody', None)
2495        if not req:
2496            raise service_error(service_error.req,
2497                    "Bad request format (no SetValueRequestBody)")
2498       
2499        name = req.get('name', None)
2500        v = req.get('value', '')
2501
2502        if not name:
2503            raise service_error(service_error.req, "No name?")
2504
2505        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2506
2507        if access_ok:
2508            try:
2509                self.synch_store.set_value(name, v)
2510                self.write_store()
2511                self.log.debug("[SetValue] set %s to %s" % (name, v))
2512            except synch_store.CollisionError:
2513                # Translate into a service_error
2514                raise service_error(service_error.req,
2515                        "Value already set: %s" %name)
2516            except synch_store.RevokedKeyError:
2517                # No more synch on this key
2518                raise service_error(service_error.federant, 
2519                        "Synch key %s revoked" % name)
2520                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2521        else:
2522            raise service_error(service_error.access, "Access Denied",
2523                    proof=proof)
Note: See TracBrowser for help on using the repository browser.