source: fedd/federation/experiment_control.py @ ab3d6c5

compt_changesinfo-ops
Last change on this file since ab3d6c5 was ab3d6c5, checked in by Ted Faber <faber@…>, 13 years ago

Services into testbed nodes in topology

  • Property mode set to 100644
File size: 85.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from fedid import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info, federated_service
39
40import topdl
41import list_log
42from ip_allocator import ip_allocator
43from ip_addr import ip_addr
44
45
46class nullHandler(logging.Handler):
47    def emit(self, record): pass
48
49fl = logging.getLogger("fedd.experiment_control")
50fl.addHandler(nullHandler())
51
52class experiment_control_local(experiment_control_legacy):
53    """
54    Control of experiments that this system can directly access.
55
56    Includes experiment creation, termination and information dissemination.
57    Thred safe.
58    """
59
60    class ssh_cmd_timeout(RuntimeError): pass
61   
62    call_RequestAccess = service_caller('RequestAccess')
63    call_ReleaseAccess = service_caller('ReleaseAccess')
64    call_StartSegment = service_caller('StartSegment')
65    call_TerminateSegment = service_caller('TerminateSegment')
66    call_InfoSegment = service_caller('InfoSegment')
67    call_Ns2Topdl = service_caller('Ns2Topdl')
68
69    def __init__(self, config=None, auth=None):
70        """
71        Intialize the various attributes, most from the config object
72        """
73
74        def parse_tarfile_list(tf):
75            """
76            Parse a tarfile list from the configuration.  This is a set of
77            paths and tarfiles separated by spaces.
78            """
79            rv = [ ]
80            if tf is not None:
81                tl = tf.split()
82                while len(tl) > 1:
83                    p, t = tl[0:2]
84                    del tl[0:2]
85                    rv.append((p, t))
86            return rv
87
88        self.list_log = list_log.list_log
89
90        self.cert_file = config.get("experiment_control", "cert_file")
91        if self.cert_file:
92            self.cert_pwd = config.get("experiment_control", "cert_pwd")
93        else:
94            self.cert_file = config.get("globals", "cert_file")
95            self.cert_pwd = config.get("globals", "cert_pwd")
96
97        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
98                or config.get("globals", "trusted_certs")
99
100        self.repodir = config.get("experiment_control", "repodir")
101        self.repo_url = config.get("experiment_control", "repo_url", 
102                "https://users.isi.deterlab.net:23235");
103
104        self.exp_stem = "fed-stem"
105        self.log = logging.getLogger("fedd.experiment_control")
106        set_log_level(config, "experiment_control", self.log)
107        self.muxmax = 2
108        self.nthreads = 10
109        self.randomize_experiments = False
110
111        self.splitter = None
112        self.ssh_keygen = "/usr/bin/ssh-keygen"
113        self.ssh_identity_file = None
114
115
116        self.debug = config.getboolean("experiment_control", "create_debug")
117        self.cleanup = not config.getboolean("experiment_control", 
118                "leave_tmpfiles")
119        self.state_filename = config.get("experiment_control", 
120                "experiment_state")
121        self.store_filename = config.get("experiment_control", 
122                "synch_store")
123        self.store_url = config.get("experiment_control", "store_url")
124        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
125        self.fedkit = parse_tarfile_list(\
126                config.get("experiment_control", "fedkit"))
127        self.gatewaykit = parse_tarfile_list(\
128                config.get("experiment_control", "gatewaykit"))
129        accessdb_file = config.get("experiment_control", "accessdb")
130
131        dt = config.get("experiment_control", "direct_transit")
132        self.auth_type = config.get('experiment_control', 'auth_type') \
133                or 'legacy'
134        self.auth_dir = config.get('experiment_control', 'auth_dir')
135        # XXX: document this!
136        self.info_cache_limit = \
137                config.getint('experiment_control', 'info_cache', 600)
138        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
139        else: self.direct_transit = [ ]
140        # NB for internal master/slave ops, not experiment setup
141        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
142
143        self.overrides = set([])
144        ovr = config.get('experiment_control', 'overrides')
145        if ovr:
146            for o in ovr.split(","):
147                o = o.strip()
148                if o.startswith('fedid:'): o = o[len('fedid:'):]
149                self.overrides.add(fedid(hexstr=o))
150
151        self.state = { }
152        self.state_lock = Lock()
153        self.tclsh = "/usr/local/bin/otclsh"
154        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
155                config.get("experiment_control", "tcl_splitter",
156                        "/usr/testbed/lib/ns2ir/parse.tcl")
157        mapdb_file = config.get("experiment_control", "mapdb")
158        self.trace_file = sys.stderr
159
160        self.def_expstart = \
161                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
162                "/tmp/federate";
163        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
164                "FEDDIR/hosts";
165        self.def_gwstart = \
166                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
167                "/tmp/bridge.log";
168        self.def_mgwstart = \
169                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
170                "/tmp/bridge.log";
171        self.def_gwimage = "FBSD61-TUNNEL2";
172        self.def_gwtype = "pc";
173        self.local_access = { }
174
175        if self.auth_type == 'legacy':
176            if auth:
177                self.auth = auth
178            else:
179                self.log.error( "[access]: No authorizer initialized, " +\
180                        "creating local one.")
181                auth = authorizer()
182            self.get_access = self.legacy_get_access
183        elif self.auth_type == 'abac':
184            self.auth = abac_authorizer(load=self.auth_dir)
185        else:
186            raise service_error(service_error.internal, 
187                    "Unknown auth_type: %s" % self.auth_type)
188
189        if mapdb_file:
190            self.read_mapdb(mapdb_file)
191        else:
192            self.log.warn("[experiment_control] No testbed map, using defaults")
193            self.tbmap = { 
194                    'deter':'https://users.isi.deterlab.net:23235',
195                    'emulab':'https://users.isi.deterlab.net:23236',
196                    'ucb':'https://users.isi.deterlab.net:23237',
197                    }
198
199        if accessdb_file:
200                self.read_accessdb(accessdb_file)
201        else:
202            raise service_error(service_error.internal,
203                    "No accessdb specified in config")
204
205        # Grab saved state.  OK to do this w/o locking because it's read only
206        # and only one thread should be in existence that can see self.state at
207        # this point.
208        if self.state_filename:
209            self.read_state()
210
211        if self.store_filename:
212            self.read_store()
213        else:
214            self.log.warning("No saved synch store")
215            self.synch_store = synch_store
216
217        # Dispatch tables
218        self.soap_services = {\
219                'New': soap_handler('New', self.new_experiment),
220                'Create': soap_handler('Create', self.create_experiment),
221                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
222                'Vis': soap_handler('Vis', self.get_vis),
223                'Info': soap_handler('Info', self.get_info),
224                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
225                'Terminate': soap_handler('Terminate', 
226                    self.terminate_experiment),
227                'GetValue': soap_handler('GetValue', self.GetValue),
228                'SetValue': soap_handler('SetValue', self.SetValue),
229        }
230
231        self.xmlrpc_services = {\
232                'New': xmlrpc_handler('New', self.new_experiment),
233                'Create': xmlrpc_handler('Create', self.create_experiment),
234                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
235                'Vis': xmlrpc_handler('Vis', self.get_vis),
236                'Info': xmlrpc_handler('Info', self.get_info),
237                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
238                'Terminate': xmlrpc_handler('Terminate',
239                    self.terminate_experiment),
240                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
241                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
242        }
243
244    # Call while holding self.state_lock
245    def write_state(self):
246        """
247        Write a new copy of experiment state after copying the existing state
248        to a backup.
249
250        State format is a simple pickling of the state dictionary.
251        """
252        if os.access(self.state_filename, os.W_OK):
253            copy_file(self.state_filename, \
254                    "%s.bak" % self.state_filename)
255        try:
256            f = open(self.state_filename, 'w')
257            pickle.dump(self.state, f)
258        except EnvironmentError, e:
259            self.log.error("Can't write file %s: %s" % \
260                    (self.state_filename, e))
261        except pickle.PicklingError, e:
262            self.log.error("Pickling problem: %s" % e)
263        except TypeError, e:
264            self.log.error("Pickling problem (TypeError): %s" % e)
265
266    @staticmethod
267    def get_alloc_ids(exp):
268        """
269        Used by read_store and read state.  This used to be worse.
270        """
271
272        return [ a.allocID for a in exp.get_all_allocations() ]
273
274
275    # Call while holding self.state_lock
276    def read_state(self):
277        """
278        Read a new copy of experiment state.  Old state is overwritten.
279
280        State format is a simple pickling of the state dictionary.
281        """
282       
283        try:
284            f = open(self.state_filename, "r")
285            self.state = pickle.load(f)
286            self.log.debug("[read_state]: Read state from %s" % \
287                    self.state_filename)
288        except EnvironmentError, e:
289            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
290                    % (self.state_filename, e))
291        except pickle.UnpicklingError, e:
292            self.log.warning(("[read_state]: No saved state: " + \
293                    "Unpickling failed: %s") % e)
294       
295        for s in self.state.values():
296            try:
297
298                eid = s.fedid
299                if eid : 
300                    if self.auth_type == 'legacy':
301                        # XXX: legacy
302                        # Give the owner rights to the experiment
303                        #self.auth.set_attribute(s['owner'], eid)
304                        # And holders of the eid as well
305                        self.auth.set_attribute(eid, eid)
306                        # allow overrides to control experiments as well
307                        for o in self.overrides:
308                            self.auth.set_attribute(o, eid)
309                        # Set permissions to allow reading of the software
310                        # repo, if any, as well.
311                        for a in self.get_alloc_ids(s):
312                            self.auth.set_attribute(a, 'repo/%s' % eid)
313                else:
314                    raise KeyError("No experiment id")
315            except KeyError, e:
316                self.log.warning("[read_state]: State ownership or identity " +\
317                        "misformatted in %s: %s" % (self.state_filename, e))
318
319
320    def read_accessdb(self, accessdb_file):
321        """
322        Read the mapping from fedids that can create experiments to their name
323        in the 3-level access namespace.  All will be asserted from this
324        testbed and can include the local username and porject that will be
325        asserted on their behalf by this fedd.  Each fedid is also added to the
326        authorization system with the "create" attribute.
327        """
328        self.accessdb = {}
329        # These are the regexps for parsing the db
330        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
331        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
332                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
333        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
334                "\s*->\s*(" + name_expr + ")\s*$")
335        lineno = 0
336
337        # Parse the mappings and store in self.authdb, a dict of
338        # fedid -> (proj, user)
339        try:
340            f = open(accessdb_file, "r")
341            for line in f:
342                lineno += 1
343                line = line.strip()
344                if len(line) == 0 or line.startswith('#'):
345                    continue
346                m = project_line.match(line)
347                if m:
348                    fid = fedid(hexstr=m.group(1))
349                    project, user = m.group(2,3)
350                    if not self.accessdb.has_key(fid):
351                        self.accessdb[fid] = []
352                    self.accessdb[fid].append((project, user))
353                    continue
354
355                m = user_line.match(line)
356                if m:
357                    fid = fedid(hexstr=m.group(1))
358                    project = None
359                    user = m.group(2)
360                    if not self.accessdb.has_key(fid):
361                        self.accessdb[fid] = []
362                    self.accessdb[fid].append((project, user))
363                    continue
364                self.log.warn("[experiment_control] Error parsing access " +\
365                        "db %s at line %d" %  (accessdb_file, lineno))
366        except EnvironmentError:
367            raise service_error(service_error.internal,
368                    ("Error opening/reading %s as experiment " +\
369                            "control accessdb") %  accessdb_file)
370        f.close()
371
372        # Initialize the authorization attributes
373        # XXX: legacy
374        if self.auth_type == 'legacy':
375            for fid in self.accessdb.keys():
376                self.auth.set_attribute(fid, 'create')
377                self.auth.set_attribute(fid, 'new')
378
379    def read_mapdb(self, file):
380        """
381        Read a simple colon separated list of mappings for the
382        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
383        """
384
385        self.tbmap = { }
386        lineno =0
387        try:
388            f = open(file, "r")
389            for line in f:
390                lineno += 1
391                line = line.strip()
392                if line.startswith('#') or len(line) == 0:
393                    continue
394                try:
395                    label, url = line.split(':', 1)
396                    self.tbmap[label] = url
397                except ValueError, e:
398                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
399                            "map db: %s %s" % (lineno, line, e))
400        except EnvironmentError, e:
401            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
402                    "open %s: %s" % (file, e))
403        else:
404            f.close()
405
406    def read_store(self):
407        try:
408            self.synch_store = synch_store()
409            self.synch_store.load(self.store_filename)
410            self.log.debug("[read_store]: Read store from %s" % \
411                    self.store_filename)
412        except EnvironmentError, e:
413            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
414                    % (self.state_filename, e))
415            self.synch_store = synch_store()
416
417        # Set the initial permissions on data in the store.  XXX: This ad hoc
418        # authorization attribute initialization is getting out of hand.
419        # XXX: legacy
420        if self.auth_type == 'legacy':
421            for k in self.synch_store.all_keys():
422                try:
423                    if k.startswith('fedid:'):
424                        fid = fedid(hexstr=k[6:46])
425                        if self.state.has_key(fid):
426                            for a in self.get_alloc_ids(self.state[fid]):
427                                self.auth.set_attribute(a, k)
428                except ValueError, e:
429                    self.log.warn("Cannot deduce permissions for %s" % k)
430
431
432    def write_store(self):
433        """
434        Write a new copy of synch_store after writing current state
435        to a backup.  We use the internal synch_store pickle method to avoid
436        incinsistent data.
437
438        State format is a simple pickling of the store.
439        """
440        if os.access(self.store_filename, os.W_OK):
441            copy_file(self.store_filename, \
442                    "%s.bak" % self.store_filename)
443        try:
444            self.synch_store.save(self.store_filename)
445        except EnvironmentError, e:
446            self.log.error("Can't write file %s: %s" % \
447                    (self.store_filename, e))
448        except TypeError, e:
449            self.log.error("Pickling problem (TypeError): %s" % e)
450
451
452    def remove_dirs(self, dir):
453        """
454        Remove the directory tree and all files rooted at dir.  Log any errors,
455        but continue.
456        """
457        self.log.debug("[removedirs]: removing %s" % dir)
458        try:
459            for path, dirs, files in os.walk(dir, topdown=False):
460                for f in files:
461                    os.remove(os.path.join(path, f))
462                for d in dirs:
463                    os.rmdir(os.path.join(path, d))
464            os.rmdir(dir)
465        except EnvironmentError, e:
466            self.log.error("Error deleting directory tree in %s" % e);
467
468    @staticmethod
469    def make_temp_certfile(expcert, tmpdir):
470        """
471        make a protected copy of the access certificate so the experiment
472        controller can act as the experiment principal.  mkstemp is the most
473        secure way to do that. The directory should be created by
474        mkdtemp.  Return the filename.
475        """
476        if expcert and tmpdir:
477            try:
478                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
479                f = os.fdopen(certf, 'w')
480                print >> f, expcert
481                f.close()
482            except EnvironmentError, e:
483                raise service_error(service_error.internal, 
484                        "Cannot create temp cert file?")
485            return certfn
486        else:
487            return None
488
489       
490    def generate_ssh_keys(self, dest, type="rsa" ):
491        """
492        Generate a set of keys for the gateways to use to talk.
493
494        Keys are of type type and are stored in the required dest file.
495        """
496        valid_types = ("rsa", "dsa")
497        t = type.lower();
498        if t not in valid_types: raise ValueError
499        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
500
501        try:
502            trace = open("/dev/null", "w")
503        except EnvironmentError:
504            raise service_error(service_error.internal,
505                    "Cannot open /dev/null??");
506
507        # May raise CalledProcessError
508        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
509        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
510        if rv != 0:
511            raise service_error(service_error.internal, 
512                    "Cannot generate nonce ssh keys.  %s return code %d" \
513                            % (self.ssh_keygen, rv))
514
515    def generate_seer_certs(self, destdir):
516        '''
517        Create a SEER ca cert and a node cert in destdir/ca.pem and
518        destdir/node.pem respectively.  These will be distributed throughout
519        the federated experiment.  This routine reports errors via
520        service_errors.
521        '''
522        openssl = '/usr/bin/openssl'
523        # All the filenames and parameters we need for openssl calls below
524        ca_key =os.path.join(destdir, 'ca.key') 
525        ca_pem = os.path.join(destdir, 'ca.pem')
526        node_key =os.path.join(destdir, 'node.key') 
527        node_pem = os.path.join(destdir, 'node.pem')
528        node_req = os.path.join(destdir, 'node.req')
529        node_signed = os.path.join(destdir, 'node.signed')
530        days = '%s' % (365 * 10)
531        serial = '%s' % random.randint(0, 1<<16)
532
533        try:
534            # Sequence of calls to create a CA key, create a ca cert, create a
535            # node key, node signing request, and finally a signed node
536            # certificate.
537            sequence = (
538                    (openssl, 'genrsa', '-out', ca_key, '1024'),
539                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
540                        ca_pem, '-days', days, '-subj', 
541                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
542                    (openssl, 'genrsa', '-out', node_key, '1024'),
543                    (openssl, 'req', '-new', '-key', node_key, '-out', 
544                        node_req, '-days', days, '-subj', 
545                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
546                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
547                        '-set_serial', serial, '-req', '-in', node_req, 
548                        '-out', node_signed, '-days', days),
549                )
550            # Do all that stuff; bail if there's an error, and push all the
551            # output to dev/null.
552            for cmd in sequence:
553                trace = open("/dev/null", "w")
554                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
555                if rv != 0:
556                    raise service_error(service_error.internal, 
557                            "Cannot generate SEER certs.  %s return code %d" \
558                                    % (' '.join(cmd), rv))
559            # Concatinate the node key and signed certificate into node.pem
560            f = open(node_pem, 'w')
561            for comp in (node_signed, node_key):
562                g = open(comp, 'r')
563                f.write(g.read())
564                g.close()
565            f.close()
566
567            # Throw out intermediaries.
568            for fn in (ca_key, node_key, node_req, node_signed):
569                os.unlink(fn)
570
571        except EnvironmentError, e:
572            # Any difficulties with the file system wind up here
573            raise service_error(service_error.internal,
574                    "File error on  %s while creating SEER certs: %s" % \
575                            (e.filename, e.strerror))
576
577
578
579    def gentopo(self, str):
580        """
581        Generate the topology data structure from the splitter's XML
582        representation of it.
583
584        The topology XML looks like:
585            <experiment>
586                <nodes>
587                    <node><vname></vname><ips>ip1:ip2</ips></node>
588                </nodes>
589                <lans>
590                    <lan>
591                        <vname></vname><vnode></vnode><ip></ip>
592                        <bandwidth></bandwidth><member>node:port</member>
593                    </lan>
594                </lans>
595        """
596        class topo_parse:
597            """
598            Parse the topology XML and create the dats structure.
599            """
600            def __init__(self):
601                # Typing of the subelements for data conversion
602                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
603                self.int_subelements = ( 'bandwidth',)
604                self.float_subelements = ( 'delay',)
605                # The final data structure
606                self.nodes = [ ]
607                self.lans =  [ ]
608                self.topo = { \
609                        'node': self.nodes,\
610                        'lan' : self.lans,\
611                    }
612                self.element = { }  # Current element being created
613                self.chars = ""     # Last text seen
614
615            def end_element(self, name):
616                # After each sub element the contents is added to the current
617                # element or to the appropriate list.
618                if name == 'node':
619                    self.nodes.append(self.element)
620                    self.element = { }
621                elif name == 'lan':
622                    self.lans.append(self.element)
623                    self.element = { }
624                elif name in self.str_subelements:
625                    self.element[name] = self.chars
626                    self.chars = ""
627                elif name in self.int_subelements:
628                    self.element[name] = int(self.chars)
629                    self.chars = ""
630                elif name in self.float_subelements:
631                    self.element[name] = float(self.chars)
632                    self.chars = ""
633
634            def found_chars(self, data):
635                self.chars += data.rstrip()
636
637
638        tp = topo_parse();
639        parser = xml.parsers.expat.ParserCreate()
640        parser.EndElementHandler = tp.end_element
641        parser.CharacterDataHandler = tp.found_chars
642
643        parser.Parse(str)
644
645        return tp.topo
646       
647
648    def genviz(self, topo):
649        """
650        Generate the visualization the virtual topology
651        """
652
653        neato = "/usr/local/bin/neato"
654        # These are used to parse neato output and to create the visualization
655        # file.
656        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
657        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
658                "%s</type></node>"
659
660        try:
661            # Node names
662            nodes = [ n['vname'] for n in topo['node'] ]
663            topo_lans = topo['lan']
664        except KeyError, e:
665            raise service_error(service_error.internal, "Bad topology: %s" %e)
666
667        lans = { }
668        links = { }
669
670        # Walk through the virtual topology, organizing the connections into
671        # 2-node connections (links) and more-than-2-node connections (lans).
672        # When a lan is created, it's added to the list of nodes (there's a
673        # node in the visualization for the lan).
674        for l in topo_lans:
675            if links.has_key(l['vname']):
676                if len(links[l['vname']]) < 2:
677                    links[l['vname']].append(l['vnode'])
678                else:
679                    nodes.append(l['vname'])
680                    lans[l['vname']] = links[l['vname']]
681                    del links[l['vname']]
682                    lans[l['vname']].append(l['vnode'])
683            elif lans.has_key(l['vname']):
684                lans[l['vname']].append(l['vnode'])
685            else:
686                links[l['vname']] = [ l['vnode'] ]
687
688
689        # Open up a temporary file for dot to turn into a visualization
690        try:
691            df, dotname = tempfile.mkstemp()
692            dotfile = os.fdopen(df, 'w')
693        except EnvironmentError:
694            raise service_error(service_error.internal,
695                    "Failed to open file in genviz")
696
697        try:
698            dnull = open('/dev/null', 'w')
699        except EnvironmentError:
700            service_error(service_error.internal,
701                    "Failed to open /dev/null in genviz")
702
703        # Generate a dot/neato input file from the links, nodes and lans
704        try:
705            print >>dotfile, "graph G {"
706            for n in nodes:
707                print >>dotfile, '\t"%s"' % n
708            for l in links.keys():
709                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
710            for l in lans.keys():
711                for n in lans[l]:
712                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
713            print >>dotfile, "}"
714            dotfile.close()
715        except TypeError:
716            raise service_error(service_error.internal,
717                    "Single endpoint link in vtopo")
718        except EnvironmentError:
719            raise service_error(service_error.internal, "Cannot write dot file")
720
721        # Use dot to create a visualization
722        try:
723            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
724                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
725                stderr=dnull, close_fds=True)
726        except EnvironmentError:
727            raise service_error(service_error.internal, 
728                    "Cannot generate visualization: is graphviz available?")
729        dnull.close()
730
731        # Translate dot to vis format
732        vis_nodes = [ ]
733        vis = { 'node': vis_nodes }
734        for line in dot.stdout:
735            m = vis_re.match(line)
736            if m:
737                vn = m.group(1)
738                vis_node = {'name': vn, \
739                        'x': float(m.group(2)),\
740                        'y' : float(m.group(3)),\
741                    }
742                if vn in links.keys() or vn in lans.keys():
743                    vis_node['type'] = 'lan'
744                else:
745                    vis_node['type'] = 'node'
746                vis_nodes.append(vis_node)
747        rv = dot.wait()
748
749        os.remove(dotname)
750        # XXX: graphviz seems to use low return codes for warnings, like
751        # "couldn't find font"
752        if rv < 2 : return vis
753        else: return None
754
755
756    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
757            cert_pwd=None):
758        """
759        Release access to testbed through fedd
760        """
761
762        if not uri and tbmap:
763            uri = tbmap.get(tb, None)
764        if not uri:
765            raise service_error(service_error.server_config, 
766                    "Unknown testbed: %s" % tb)
767
768        if self.local_access.has_key(uri):
769            resp = self.local_access[uri].ReleaseAccess(\
770                    { 'ReleaseAccessRequestBody' : 
771                        {'allocID': {'fedid': aid}},}, 
772                    fedid(file=cert_file))
773            resp = { 'ReleaseAccessResponseBody': resp } 
774        else:
775            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
776                    cert_file, cert_pwd, self.trusted_certs)
777
778        # better error coding
779
780    def remote_ns2topdl(self, uri, desc):
781
782        req = {
783                'description' : { 'ns2description': desc },
784            }
785
786        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
787                self.trusted_certs)
788
789        if r.has_key('Ns2TopdlResponseBody'):
790            r = r['Ns2TopdlResponseBody']
791            ed = r.get('experimentdescription', None)
792            if ed.has_key('topdldescription'):
793                return topdl.Topology(**ed['topdldescription'])
794            else:
795                raise service_error(service_error.protocol, 
796                        "Bad splitter response (no output)")
797        else:
798            raise service_error(service_error.protocol, "Bad splitter response")
799
800    class start_segment:
801        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
802                cert_pwd=None, trusted_certs=None, caller=None,
803                log_collector=None):
804            self.log = log
805            self.debug = debug
806            self.cert_file = cert_file
807            self.cert_pwd = cert_pwd
808            self.trusted_certs = None
809            self.caller = caller
810            self.testbed = testbed
811            self.log_collector = log_collector
812            self.response = None
813            self.node = { }
814            self.proof = None
815
816        def make_map(self, resp):
817            if 'segmentdescription' not in resp  or \
818                    'topdldescription' not in resp['segmentdescription']:
819                self.log.warn('No topology returned from startsegment')
820                return 
821
822            top = topdl.Topology(
823                    **resp['segmentdescription']['topdldescription'])
824
825            for e in top.elements:
826                if isinstance(e, topdl.Computer):
827                    self.node[e.name] = \
828                            (e.localname, e.status, e.service, e.operation)
829
830        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
831            req = {
832                    'allocID': { 'fedid' : aid }, 
833                    'segmentdescription': { 
834                        'topdldescription': topo.to_dict(),
835                    },
836                }
837
838            if connInfo:
839                req['connection'] = connInfo
840
841            import_svcs = [ s for m in masters.values() \
842                    for s in m if self.testbed in s.importers]
843
844            if import_svcs or self.testbed in masters:
845                req['service'] = []
846
847            for s in import_svcs:
848                for r in s.reqs:
849                    sr = copy.deepcopy(r)
850                    sr['visibility'] = 'import';
851                    req['service'].append(sr)
852
853            for s in masters.get(self.testbed, []):
854                for r in s.reqs:
855                    sr = copy.deepcopy(r)
856                    sr['visibility'] = 'export';
857                    req['service'].append(sr)
858
859            if attrs:
860                req['fedAttr'] = attrs
861
862            try:
863                self.log.debug("Calling StartSegment at %s " % uri)
864                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
865                        self.trusted_certs)
866                if r.has_key('StartSegmentResponseBody'):
867                    lval = r['StartSegmentResponseBody'].get('allocationLog',
868                            None)
869                    if lval and self.log_collector:
870                        for line in  lval.splitlines(True):
871                            self.log_collector.write(line)
872                    self.make_map(r['StartSegmentResponseBody'])
873                    if 'proof' in r: self.proof = r['proof']
874                    self.response = r
875                else:
876                    raise service_error(service_error.internal, 
877                            "Bad response!?: %s" %r)
878                return True
879            except service_error, e:
880                self.log.error("Start segment failed on %s: %s" % \
881                        (self.testbed, e))
882                return False
883
884
885
886    class terminate_segment:
887        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
888                cert_pwd=None, trusted_certs=None, caller=None):
889            self.log = log
890            self.debug = debug
891            self.cert_file = cert_file
892            self.cert_pwd = cert_pwd
893            self.trusted_certs = None
894            self.caller = caller
895            self.testbed = testbed
896
897        def __call__(self, uri, aid ):
898            req = {
899                    'allocID': {'fedid': aid }, 
900                }
901            try:
902                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
903                        self.trusted_certs)
904                return True
905            except service_error, e:
906                self.log.error("Terminate segment failed on %s: %s" % \
907                        (self.testbed, e))
908                return False
909
910    class info_segment(start_segment):
911        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
912                cert_pwd=None, trusted_certs=None, caller=None,
913                log_collector=None):
914            experiment_control_local.start_segment.__init__(self, debug, 
915                    log, testbed, cert_file, cert_pwd, trusted_certs, 
916                    caller, log_collector)
917
918        def __call__(self, uri, aid):
919            req = { 'allocID': { 'fedid' : aid } }
920
921            try:
922                self.log.debug("Calling InfoSegment at %s " % uri)
923                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
924                        self.trusted_certs)
925                if r.has_key('InfoSegmentResponseBody'):
926                    self.make_map(r['InfoSegmentResponseBody'])
927                    if 'proof' in r: self.proof = r['proof']
928                    self.response = r
929                else:
930                    raise service_error(service_error.internal, 
931                            "Bad response!?: %s" %r)
932                return True
933            except service_error, e:
934                self.log.error("Info segment failed on %s: %s" % \
935                        (self.testbed, e))
936                return False
937
938    def annotate_topology(self, top, data):
939        def add_new(ann, attr):
940            for a in ann:
941                if a not in attr: attr.append(a)
942
943        # Annotate the topology with embedding info
944        for e in top.elements:
945            if isinstance(e, topdl.Computer):
946                for s in data:
947                    ann = s.node.get(e.name, None)
948                    if ann is not None:
949                        add_new(ann[0], e.localname)
950                        e.status = ann[1]
951                        add_new(ann[2], e.service)
952                        add_new(ann[3], e.operation)
953                        break
954
955
956
957    def allocate_resources(self, allocated, masters, eid, expid, 
958            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
959            attrs=None, connInfo={}, tbmap=None, expcert=None):
960
961        started = { }           # Testbeds where a sub-experiment started
962                                # successfully
963
964        # XXX
965        fail_soft = False
966
967        if tbmap is None: tbmap = { }
968
969        log = alloc_log or self.log
970
971        tp = thread_pool(self.nthreads)
972        threads = [ ]
973        starters = [ ]
974
975        if expcert:
976            cert = expcert
977            pw = None
978        else:
979            cert = self.cert_file
980            pw = self.cert_pwd
981
982        for tb in allocated.keys():
983            # Create and start a thread to start the segment, and save it
984            # to get the return value later
985            tb_attrs = copy.copy(attrs)
986            tp.wait_for_slot()
987            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
988            base, suffix = split_testbed(tb)
989            if suffix:
990                tb_attrs.append({'attribute': 'experiment_name', 
991                    'value': "%s-%s" % (eid, suffix)})
992            else:
993                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
994            if not uri:
995                raise service_error(service_error.internal, 
996                        "Unknown testbed %s !?" % tb)
997
998            aid = tbparams[tb].allocID
999            if not aid:
1000                raise service_error(service_error.internal, 
1001                        "No alloc id for testbed %s !?" % tb)
1002
1003            s = self.start_segment(log=log, debug=self.debug,
1004                    testbed=tb, cert_file=cert,
1005                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1006                    caller=self.call_StartSegment,
1007                    log_collector=log_collector)
1008            starters.append(s)
1009            t  = pooled_thread(\
1010                    target=s, name=tb,
1011                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1012                    pdata=tp, trace_file=self.trace_file)
1013            threads.append(t)
1014            t.start()
1015
1016        # Wait until all finish (keep pinging the log, though)
1017        mins = 0
1018        revoked = False
1019        while not tp.wait_for_all_done(60.0):
1020            mins += 1
1021            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1022                    % mins)
1023            if not revoked and \
1024                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1025                # a testbed has failed.  Revoke this experiment's
1026                # synchronizarion values so that sub experiments will not
1027                # deadlock waiting for synchronization that will never happen
1028                self.log.info("A subexperiment has failed to swap in, " + \
1029                        "revoking synch keys")
1030                var_key = "fedid:%s" % expid
1031                for k in self.synch_store.all_keys():
1032                    if len(k) > 45 and k[0:46] == var_key:
1033                        self.synch_store.revoke_key(k)
1034                revoked = True
1035
1036        failed = [ t.getName() for t in threads if not t.rv ]
1037        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1038
1039        # If one failed clean up, unless fail_soft is set
1040        if failed:
1041            if not fail_soft:
1042                tp.clear()
1043                for tb in succeeded:
1044                    # Create and start a thread to stop the segment
1045                    tp.wait_for_slot()
1046                    uri = tbparams[tb].uri
1047                    t  = pooled_thread(\
1048                            target=self.terminate_segment(log=log,
1049                                testbed=tb,
1050                                cert_file=cert, 
1051                                cert_pwd=pw,
1052                                trusted_certs=self.trusted_certs,
1053                                caller=self.call_TerminateSegment),
1054                            args=(uri, tbparams[tb].allocID),
1055                            name=tb,
1056                            pdata=tp, trace_file=self.trace_file)
1057                    t.start()
1058                # Wait until all finish (if any are being stopped)
1059                if succeeded:
1060                    tp.wait_for_all_done()
1061
1062                # release the allocations
1063                for tb in tbparams.keys():
1064                    try:
1065                        self.release_access(tb, tbparams[tb].allocID, 
1066                                tbmap=tbmap, uri=tbparams[tb].uri,
1067                                cert_file=cert, cert_pwd=pw)
1068                    except service_error, e:
1069                        self.log.warn("Error releasing access: %s" % e.desc)
1070                # Remove the placeholder
1071                self.state_lock.acquire()
1072                self.state[eid].status = 'failed'
1073                self.state[eid].updated()
1074                if self.state_filename: self.write_state()
1075                self.state_lock.release()
1076                # Remove the repo dir
1077                self.remove_dirs("%s/%s" %(self.repodir, expid))
1078                # Walk up tmpdir, deleting as we go
1079                if self.cleanup:
1080                    self.remove_dirs(tmpdir)
1081                else:
1082                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1083
1084
1085                log.error("Swap in failed on %s" % ",".join(failed))
1086                return
1087        else:
1088            # Walk through the successes and gather the proofs
1089            proofs = { }
1090            for s in starters:
1091                if s.proof:
1092                    proofs[s.testbed] = s.proof
1093            self.annotate_topology(top, starters)
1094            log.info("[start_segment]: Experiment %s active" % eid)
1095
1096
1097        # Walk up tmpdir, deleting as we go
1098        if self.cleanup:
1099            self.remove_dirs(tmpdir)
1100        else:
1101            log.debug("[start_experiment]: not removing %s" % tmpdir)
1102
1103        # Insert the experiment into our state and update the disk copy.
1104        self.state_lock.acquire()
1105        self.state[expid].status = 'active'
1106        self.state[eid] = self.state[expid]
1107        self.state[eid].top = top
1108        self.state[eid].updated()
1109        # Append startup proofs
1110        for f in self.state[eid].get_all_allocations():
1111            if f.tb in proofs:
1112                f.proof.append(proofs[f.tb])
1113
1114        if self.state_filename: self.write_state()
1115        self.state_lock.release()
1116        return
1117
1118
1119    def add_kit(self, e, kit):
1120        """
1121        Add a Software object created from the list of (install, location)
1122        tuples passed as kit  to the software attribute of an object e.  We
1123        do this enough to break out the code, but it's kind of a hack to
1124        avoid changing the old tuple rep.
1125        """
1126
1127        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1128
1129        if isinstance(e.software, list): e.software.extend(s)
1130        else: e.software = s
1131
1132    def append_experiment_authorization(self, expid, attrs, 
1133            need_state_lock=True):
1134        """
1135        Append the authorization information to system state
1136        """
1137
1138        for p, a in attrs:
1139            self.auth.set_attribute(p, a)
1140        self.auth.save()
1141
1142        if need_state_lock: self.state_lock.acquire()
1143        # XXX: really a no op?
1144        #self.state[expid]['auth'].update(attrs)
1145        if self.state_filename: self.write_state()
1146        if need_state_lock: self.state_lock.release()
1147
1148    def clear_experiment_authorization(self, expid, need_state_lock=True):
1149        """
1150        Attrs is a set of attribute principal pairs that need to be removed
1151        from the authenticator.  Remove them and save the authenticator.
1152        """
1153
1154        if need_state_lock: self.state_lock.acquire()
1155        # XXX: should be a no-op
1156        #if expid in self.state and 'auth' in self.state[expid]:
1157            #for p, a in self.state[expid]['auth']:
1158                #self.auth.unset_attribute(p, a)
1159            #self.state[expid]['auth'] = set()
1160        if self.state_filename: self.write_state()
1161        if need_state_lock: self.state_lock.release()
1162        self.auth.save()
1163
1164
1165    def create_experiment_state(self, fid, req, expid, expcert,
1166            state='starting'):
1167        """
1168        Create the initial entry in the experiment's state.  The expid and
1169        expcert are the experiment's fedid and certifacte that represents that
1170        ID, which are installed in the experiment state.  If the request
1171        includes a suggested local name that is used if possible.  If the local
1172        name is already taken by an experiment owned by this user that has
1173        failed, it is overwritten.  Otherwise new letters are added until a
1174        valid localname is found.  The generated local name is returned.
1175        """
1176
1177        if req.has_key('experimentID') and \
1178                req['experimentID'].has_key('localname'):
1179            overwrite = False
1180            eid = req['experimentID']['localname']
1181            # If there's an old failed experiment here with the same local name
1182            # and accessible by this user, we'll overwrite it, otherwise we'll
1183            # fall through and do the collision avoidance.
1184            old_expid = self.get_experiment_fedid(eid)
1185            if old_expid:
1186                users_experiment = True
1187                try:
1188                    self.check_experiment_access(fid, old_expid)
1189                except service_error, e:
1190                    if e.code == service_error.access: users_experiment = False
1191                    else: raise e
1192                if users_experiment:
1193                    self.state_lock.acquire()
1194                    status = self.state[eid].status
1195                    if status and status == 'failed':
1196                        # remove the old access attributes
1197                        self.clear_experiment_authorization(eid,
1198                                need_state_lock=False)
1199                        overwrite = True
1200                        del self.state[eid]
1201                        del self.state[old_expid]
1202                    self.state_lock.release()
1203                else:
1204                    self.log.info('Experiment %s exists, ' % eid + \
1205                            'but this user cannot access it')
1206            self.state_lock.acquire()
1207            while (self.state.has_key(eid) and not overwrite):
1208                eid += random.choice(string.ascii_letters)
1209            # Initial state
1210            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1211                    identity=expcert)
1212            self.state[expid] = self.state[eid]
1213            if self.state_filename: self.write_state()
1214            self.state_lock.release()
1215        else:
1216            eid = self.exp_stem
1217            for i in range(0,5):
1218                eid += random.choice(string.ascii_letters)
1219            self.state_lock.acquire()
1220            while (self.state.has_key(eid)):
1221                eid = self.exp_stem
1222                for i in range(0,5):
1223                    eid += random.choice(string.ascii_letters)
1224            # Initial state
1225            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1226                    identity=expcert)
1227            self.state[expid] = self.state[eid]
1228            if self.state_filename: self.write_state()
1229            self.state_lock.release()
1230
1231        # Let users touch the state.  Authorize this fid and the expid itself
1232        # to touch the experiment, as well as allowing th eoverrides.
1233        self.append_experiment_authorization(eid, 
1234                set([(fid, expid), (expid,expid)] + \
1235                        [ (o, expid) for o in self.overrides]))
1236
1237        return eid
1238
1239
1240    def allocate_ips_to_topo(self, top):
1241        """
1242        Add an ip4_address attribute to all the hosts in the topology, based on
1243        the shared substrates on which they sit.  An /etc/hosts file is also
1244        created and returned as a list of hostfiles entries.  We also return
1245        the allocator, because we may need to allocate IPs to portals
1246        (specifically DRAGON portals).
1247        """
1248        subs = sorted(top.substrates, 
1249                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1250                reverse=True)
1251        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1252        ifs = { }
1253        hosts = [ ]
1254
1255        for idx, s in enumerate(subs):
1256            net_size = len(s.interfaces)+2
1257
1258            a = ips.allocate(net_size)
1259            if a :
1260                base, num = a
1261                if num < net_size: 
1262                    raise service_error(service_error.internal,
1263                            "Allocator returned wrong number of IPs??")
1264            else:
1265                raise service_error(service_error.req, 
1266                        "Cannot allocate IP addresses")
1267            mask = ips.min_alloc
1268            while mask < net_size:
1269                mask *= 2
1270
1271            netmask = ((2**32-1) ^ (mask-1))
1272
1273            base += 1
1274            for i in s.interfaces:
1275                i.attribute.append(
1276                        topdl.Attribute('ip4_address', 
1277                            "%s" % ip_addr(base)))
1278                i.attribute.append(
1279                        topdl.Attribute('ip4_netmask', 
1280                            "%s" % ip_addr(int(netmask))))
1281
1282                hname = i.element.name
1283                if ifs.has_key(hname):
1284                    hosts.append("%s\t%s-%s %s-%d" % \
1285                            (ip_addr(base), hname, s.name, hname,
1286                                ifs[hname]))
1287                else:
1288                    ifs[hname] = 0
1289                    hosts.append("%s\t%s-%s %s-%d %s" % \
1290                            (ip_addr(base), hname, s.name, hname,
1291                                ifs[hname], hname))
1292
1293                ifs[hname] += 1
1294                base += 1
1295        return hosts, ips
1296
1297    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1298            tbparam, masters, tbmap, expid=None, expcert=None):
1299        for tb in testbeds:
1300            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1301                    expcert)
1302            allocated[tb] = 1
1303
1304    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1305            expcert=None):
1306        """
1307        Get access to testbed through fedd and set the parameters for that tb
1308        """
1309        def get_export_project(svcs):
1310            """
1311            Look through for the list of federated_service for this testbed
1312            objects for a project_export service, and extract the project
1313            parameter.
1314            """
1315
1316            pe = [s for s in svcs if s.name=='project_export']
1317            if len(pe) == 1:
1318                return pe[0].params.get('project', None)
1319            elif len(pe) == 0:
1320                return None
1321            else:
1322                raise service_error(service_error.req,
1323                        "More than one project export is not supported")
1324
1325        def add_services(svcs, type, slist, keys):
1326            """
1327            Add the given services to slist.  type is import or export.  Also
1328            add a mapping entry from the assigned id to the original service
1329            record.
1330            """
1331            for i, s in enumerate(svcs):
1332                idx = '%s%d' % (type, i)
1333                keys[idx] = s
1334                sr = {'id': idx, 'name': s.name, 'visibility': type }
1335                if s.params:
1336                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1337                            for k, v in s.params.items()]
1338                slist.append(sr)
1339
1340        uri = tbmap.get(testbed_base(tb), None)
1341        if not uri:
1342            raise service_error(service_error.server_config, 
1343                    "Unknown testbed: %s" % tb)
1344
1345        export_svcs = masters.get(tb,[])
1346        import_svcs = [ s for m in masters.values() \
1347                for s in m \
1348                    if tb in s.importers ]
1349
1350        export_project = get_export_project(export_svcs)
1351        # Compose the credential list so that IDs come before attributes
1352        creds = set()
1353        keys = set()
1354        certs = self.auth.get_creds_for_principal(fid)
1355        if expid:
1356            certs.update(self.auth.get_creds_for_principal(expid))
1357        for c in certs:
1358            keys.add(c.issuer_cert())
1359            creds.add(c.attribute_cert())
1360        creds = list(keys) + list(creds)
1361
1362        if expcert: cert, pw = expcert, None
1363        else: cert, pw = self.cert_file, self.cert_pw
1364
1365        # Request credentials
1366        req = {
1367                'abac_credential': creds,
1368            }
1369        # Make the service request from the services we're importing and
1370        # exporting.  Keep track of the export request ids so we can
1371        # collect the resulting info from the access response.
1372        e_keys = { }
1373        if import_svcs or export_svcs:
1374            slist = []
1375            add_services(import_svcs, 'import', slist, e_keys)
1376            add_services(export_svcs, 'export', slist, e_keys)
1377            req['service'] = slist
1378
1379        if self.local_access.has_key(uri):
1380            # Local access call
1381            req = { 'RequestAccessRequestBody' : req }
1382            r = self.local_access[uri].RequestAccess(req, 
1383                    fedid(file=self.cert_file))
1384            r = { 'RequestAccessResponseBody' : r }
1385        else:
1386            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1387
1388        if r.has_key('RequestAccessResponseBody'):
1389            # Through to here we have a valid response, not a fault.
1390            # Access denied is a fault, so something better or worse than
1391            # access denied has happened.
1392            r = r['RequestAccessResponseBody']
1393            self.log.debug("[get_access] Access granted")
1394        else:
1395            raise service_error(service_error.protocol,
1396                        "Bad proxy response")
1397        if 'proof' not in r:
1398            raise service_error(service_error.protocol,
1399                        "Bad access response (no access proof)")
1400
1401        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1402                tb=tb, uri=uri, proof=[r['proof']], 
1403                services=masters.get(tb, None))
1404
1405        # Collect the responses corresponding to the services this testbed
1406        # exports.  These will be the service requests that we will include in
1407        # the start segment requests (with appropriate visibility values) to
1408        # import and export the segments.
1409        for s in r.get('service', []):
1410            id = s.get('id', None)
1411            # Note that this attaches the response to the object in the masters
1412            # data structure.  (The e_keys index disappears when this fcn
1413            # returns)
1414            if id and id in e_keys:
1415                e_keys[id].reqs.append(s)
1416
1417        # Add attributes to parameter space.  We don't allow attributes to
1418        # overlay any parameters already installed.
1419        for a in r.get('fedAttr', []):
1420            try:
1421                if a['attribute']:
1422                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1423            except KeyError:
1424                self.log.error("Bad attribute in response: %s" % a)
1425
1426
1427    def split_topology(self, top, topo, testbeds):
1428        """
1429        Create the sub-topologies that are needed for experiment instantiation.
1430        """
1431        for tb in testbeds:
1432            topo[tb] = top.clone()
1433            # copy in for loop allows deletions from the original
1434            for e in [ e for e in topo[tb].elements]:
1435                etb = e.get_attribute('testbed')
1436                # NB: elements without a testbed attribute won't appear in any
1437                # sub topologies. 
1438                if not etb or etb != tb:
1439                    for i in e.interface:
1440                        for s in i.subs:
1441                            try:
1442                                s.interfaces.remove(i)
1443                            except ValueError:
1444                                raise service_error(service_error.internal,
1445                                        "Can't remove interface??")
1446                    topo[tb].elements.remove(e)
1447            topo[tb].make_indices()
1448
1449    def confirm_software(self, top):
1450        """
1451        Make sure that the software to be loaded in the topo is all available
1452        before we begin making access requests, etc.  This is a subset of
1453        wrangle_software.
1454        """
1455        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1456        pkgs.update([x.location for e in top.elements for x in e.software])
1457
1458        for pkg in pkgs:
1459            loc = pkg
1460
1461            scheme, host, path = urlparse(loc)[0:3]
1462            dest = os.path.basename(path)
1463            if not scheme:
1464                if not loc.startswith('/'):
1465                    loc = "/%s" % loc
1466                loc = "file://%s" %loc
1467            # NB: if scheme was found, loc == pkg
1468            try:
1469                u = urlopen(loc)
1470                u.close()
1471            except Exception, e:
1472                raise service_error(service_error.req, 
1473                        "Cannot open %s: %s" % (loc, e))
1474        return True
1475
1476    def wrangle_software(self, expid, top, topo, tbparams):
1477        """
1478        Copy software out to the repository directory, allocate permissions and
1479        rewrite the segment topologies to look for the software in local
1480        places.
1481        """
1482
1483        # Copy the rpms and tarfiles to a distribution directory from
1484        # which the federants can retrieve them
1485        linkpath = "%s/software" %  expid
1486        softdir ="%s/%s" % ( self.repodir, linkpath)
1487        softmap = { }
1488
1489        # self.fedkit and self.gateway kit are lists of tuples of
1490        # (install_location, download_location) this extracts the download
1491        # locations.
1492        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1493        pkgs.update([x.location for e in top.elements for x in e.software])
1494        try:
1495            os.makedirs(softdir)
1496        except EnvironmentError, e:
1497            raise service_error(
1498                    "Cannot create software directory: %s" % e)
1499        # The actual copying.  Everything's converted into a url for copying.
1500        auth_attrs = set()
1501        for pkg in pkgs:
1502            loc = pkg
1503
1504            scheme, host, path = urlparse(loc)[0:3]
1505            dest = os.path.basename(path)
1506            if not scheme:
1507                if not loc.startswith('/'):
1508                    loc = "/%s" % loc
1509                loc = "file://%s" %loc
1510            # NB: if scheme was found, loc == pkg
1511            try:
1512                u = urlopen(loc)
1513            except Exception, e:
1514                raise service_error(service_error.req, 
1515                        "Cannot open %s: %s" % (loc, e))
1516            try:
1517                f = open("%s/%s" % (softdir, dest) , "w")
1518                self.log.debug("Writing %s/%s" % (softdir,dest) )
1519                data = u.read(4096)
1520                while data:
1521                    f.write(data)
1522                    data = u.read(4096)
1523                f.close()
1524                u.close()
1525            except Exception, e:
1526                raise service_error(service_error.internal,
1527                        "Could not copy %s: %s" % (loc, e))
1528            path = re.sub("/tmp", "", linkpath)
1529            # XXX
1530            softmap[pkg] = \
1531                    "%s/%s/%s" %\
1532                    ( self.repo_url, path, dest)
1533
1534            # Allow the individual segments to access the software by assigning
1535            # an attribute to each testbed allocation that encodes the data to
1536            # be released.  This expression collects the data for each run of
1537            # the loop.
1538            auth_attrs.update([
1539                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1540                        for tb in tbparams.keys()])
1541
1542        self.append_experiment_authorization(expid, auth_attrs)
1543
1544        # Convert the software locations in the segments into the local
1545        # copies on this host
1546        for soft in [ s for tb in topo.values() \
1547                for e in tb.elements \
1548                    if getattr(e, 'software', False) \
1549                        for s in e.software ]:
1550            if softmap.has_key(soft.location):
1551                soft.location = softmap[soft.location]
1552
1553
1554    def new_experiment(self, req, fid):
1555        """
1556        The external interface to empty initial experiment creation called from
1557        the dispatcher.
1558
1559        Creates a working directory, splits the incoming description using the
1560        splitter script and parses out the avrious subsections using the
1561        lcasses above.  Once each sub-experiment is created, use pooled threads
1562        to instantiate them and start it all up.
1563        """
1564        req = req.get('NewRequestBody', None)
1565        if not req:
1566            raise service_error(service_error.req,
1567                    "Bad request format (no NewRequestBody)")
1568
1569        if self.auth.import_credentials(data_list=req.get('credential', [])):
1570            self.auth.save()
1571
1572        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1573                with_proof=True)
1574
1575        if not access_ok:
1576            raise service_error(service_error.access, "New access denied",
1577                    proof=[proof])
1578
1579        try:
1580            tmpdir = tempfile.mkdtemp(prefix="split-")
1581        except EnvironmentError:
1582            raise service_error(service_error.internal, "Cannot create tmp dir")
1583
1584        try:
1585            access_user = self.accessdb[fid]
1586        except KeyError:
1587            raise service_error(service_error.internal,
1588                    "Access map and authorizer out of sync in " + \
1589                            "new_experiment for fedid %s"  % fid)
1590
1591        # Generate an ID for the experiment (slice) and a certificate that the
1592        # allocator can use to prove they own it.  We'll ship it back through
1593        # the encrypted connection.  If the requester supplied one, use it.
1594        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1595            expcert = req['experimentAccess']['X509']
1596            expid = fedid(certstr=expcert)
1597            self.state_lock.acquire()
1598            if expid in self.state:
1599                self.state_lock.release()
1600                raise service_error(service_error.req, 
1601                        'fedid %s identifies an existing experiment' % expid)
1602            self.state_lock.release()
1603        else:
1604            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1605
1606        #now we're done with the tmpdir, and it should be empty
1607        if self.cleanup:
1608            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1609            os.rmdir(tmpdir)
1610        else:
1611            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1612
1613        eid = self.create_experiment_state(fid, req, expid, expcert, 
1614                state='empty')
1615
1616        rv = {
1617                'experimentID': [
1618                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1619                ],
1620                'experimentStatus': 'empty',
1621                'experimentAccess': { 'X509' : expcert },
1622                'proof': proof.to_dict(),
1623            }
1624
1625        return rv
1626
1627    # create_experiment sub-functions
1628
1629    @staticmethod
1630    def get_experiment_key(req, field='experimentID'):
1631        """
1632        Parse the experiment identifiers out of the request (the request body
1633        tag has been removed).  Specifically this pulls either the fedid or the
1634        localname out of the experimentID field.  A fedid is preferred.  If
1635        neither is present or the request does not contain the fields,
1636        service_errors are raised.
1637        """
1638        # Get the experiment access
1639        exp = req.get(field, None)
1640        if exp:
1641            if exp.has_key('fedid'):
1642                key = exp['fedid']
1643            elif exp.has_key('localname'):
1644                key = exp['localname']
1645            else:
1646                raise service_error(service_error.req, "Unknown lookup type")
1647        else:
1648            raise service_error(service_error.req, "No request?")
1649
1650        return key
1651
1652    def get_experiment_ids_and_start(self, key, tmpdir):
1653        """
1654        Get the experiment name, id and access certificate from the state, and
1655        set the experiment state to 'starting'.  returns a triple (fedid,
1656        localname, access_cert_file). The access_cert_file is a copy of the
1657        contents of the access certificate, created in the tempdir with
1658        restricted permissions.  If things are confused, raise an exception.
1659        """
1660
1661        expid = eid = None
1662        self.state_lock.acquire()
1663        if key in self.state:
1664            exp = self.state[key]
1665            exp.status = "starting"
1666            exp.updated()
1667            expid = exp.fedid
1668            eid = exp.localname
1669            expcert = exp.identity
1670        self.state_lock.release()
1671
1672        # make a protected copy of the access certificate so the experiment
1673        # controller can act as the experiment principal.
1674        if expcert:
1675            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1676            if not expcert_file:
1677                raise service_error(service_error.internal, 
1678                        "Cannot create temp cert file?")
1679        else:
1680            expcert_file = None
1681
1682        return (eid, expid, expcert_file)
1683
1684    def get_topology(self, req, tmpdir):
1685        """
1686        Get the ns2 content and put it into a file for parsing.  Call the local
1687        or remote parser and return the topdl.Topology.  Errors result in
1688        exceptions.  req is the request and tmpdir is a work directory.
1689        """
1690
1691        # The tcl parser needs to read a file so put the content into that file
1692        descr=req.get('experimentdescription', None)
1693        if descr:
1694            if 'ns2description' in descr:
1695                file_content=descr['ns2description']
1696            elif 'topdldescription' in descr:
1697                return topdl.Topology(**descr['topdldescription'])
1698            else:
1699                raise service_error(service_error.req, 
1700                        'Unknown experiment description type')
1701        else:
1702            raise service_error(service_error.req, "No experiment description")
1703
1704
1705        if self.splitter_url:
1706            self.log.debug("Calling remote topdl translator at %s" % \
1707                    self.splitter_url)
1708            top = self.remote_ns2topdl(self.splitter_url, file_content)
1709        else:
1710            tclfile = os.path.join(tmpdir, "experiment.tcl")
1711            if file_content:
1712                try:
1713                    f = open(tclfile, 'w')
1714                    f.write(file_content)
1715                    f.close()
1716                except EnvironmentError:
1717                    raise service_error(service_error.internal,
1718                            "Cannot write temp experiment description")
1719            else:
1720                raise service_error(service_error.req, 
1721                        "Only ns2descriptions supported")
1722            pid = "dummy"
1723            gid = "dummy"
1724            eid = "dummy"
1725
1726            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1727                str(self.muxmax), '-m', 'dummy']
1728
1729            tclcmd.extend([pid, gid, eid, tclfile])
1730
1731            self.log.debug("running local splitter %s", " ".join(tclcmd))
1732            # This is just fantastic.  As a side effect the parser copies
1733            # tb_compat.tcl into the current directory, so that directory
1734            # must be writable by the fedd user.  Doing this in the
1735            # temporary subdir ensures this is the case.
1736            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1737                    cwd=tmpdir)
1738            split_data = tclparser.stdout
1739
1740            top = topdl.topology_from_xml(file=split_data, top="experiment")
1741            os.remove(tclfile)
1742
1743        return top
1744
1745    def get_testbed_services(self, req, testbeds):
1746        """
1747        Parse the services section of the request into a dict mapping
1748        testbed to lists of federated_service objects.  The dict maps all
1749        exporters of services to those service objects.
1750        """
1751        masters = { }
1752        for s in req.get('service', []):
1753            # If this is a service request with the importall field
1754            # set, fill it out.
1755
1756            if s.get('importall', False):
1757                s['import'] = [ tb for tb in testbeds \
1758                        if tb not in s.get('export',[])]
1759                del s['importall']
1760
1761            # Add the service to masters
1762            for tb in s.get('export', []):
1763                if s.get('name', None):
1764
1765                    params = { }
1766                    for a in s.get('fedAttr', []):
1767                        params[a.get('attribute', '')] = a.get('value','')
1768
1769                    fser = federated_service(name=s['name'],
1770                            exporter=tb, importers=s.get('import',[]),
1771                            params=params)
1772                    if fser.name == 'hide_hosts' \
1773                            and 'hosts' not in fser.params:
1774                        fser.params['hosts'] = \
1775                                ",".join(tb_hosts.get(fser.exporter, []))
1776                    if tb in masters: masters[tb].append(fser)
1777                    else: masters[tb] = [fser]
1778                else:
1779                    self.log.error('Testbed service does not have name " + \
1780                            "and importers')
1781        return masters
1782
1783    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1784        """
1785        Create the ssh keys necessary for interconnecting the portal nodes and
1786        the global hosts file for letting each segment know about the IP
1787        addresses in play.  Save these into the repo.  Add attributes to the
1788        autorizer allowing access controllers to download them and return a set
1789        of attributes that inform the segments where to find this stuff.  May
1790        raise service_errors in if there are problems.
1791        """
1792        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1793        gw_secretkey_base = "fed.%s" % self.ssh_type
1794        keydir = os.path.join(tmpdir, 'keys')
1795        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1796        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1797
1798        try:
1799            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1800        except ValueError:
1801            raise service_error(service_error.server_config, 
1802                    "Bad key type (%s)" % self.ssh_type)
1803
1804        self.generate_seer_certs(keydir)
1805
1806        # Copy configuration files into the remote file store
1807        # The config urlpath
1808        configpath = "/%s/config" % expid
1809        # The config file system location
1810        configdir ="%s%s" % ( self.repodir, configpath)
1811        try:
1812            os.makedirs(configdir)
1813        except EnvironmentError, e:
1814            raise service_error(service_error.internal,
1815                    "Cannot create config directory: %s" % e)
1816        try:
1817            f = open("%s/hosts" % configdir, "w")
1818            print >> f, string.join(hosts, '\n')
1819            f.close()
1820        except EnvironmentError, e:
1821            raise service_error(service_error.internal, 
1822                    "Cannot write hosts file: %s" % e)
1823        try:
1824            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1825            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1826            copy_file(os.path.join(keydir, 'ca.pem'), 
1827                    os.path.join(configdir, 'ca.pem'))
1828            copy_file(os.path.join(keydir, 'node.pem'), 
1829                    os.path.join(configdir, 'node.pem'))
1830        except EnvironmentError, e:
1831            raise service_error(service_error.internal, 
1832                    "Cannot copy keyfiles: %s" % e)
1833
1834        # Allow the individual testbeds to access the configuration files,
1835        # again by setting an attribute for the relevant pathnames on each
1836        # allocation principal.  Yeah, that's a long list comprehension.
1837        self.append_experiment_authorization(expid, set([
1838            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1839                    for tb in tbparams.keys() \
1840                        for f in ("hosts", 'ca.pem', 'node.pem', 
1841                            gw_secretkey_base, gw_pubkey_base)]))
1842
1843        attrs = [ 
1844                {
1845                    'attribute': 'ssh_pubkey', 
1846                    'value': '%s/%s/config/%s' % \
1847                            (self.repo_url, expid, gw_pubkey_base)
1848                },
1849                {
1850                    'attribute': 'ssh_secretkey', 
1851                    'value': '%s/%s/config/%s' % \
1852                            (self.repo_url, expid, gw_secretkey_base)
1853                },
1854                {
1855                    'attribute': 'hosts', 
1856                    'value': '%s/%s/config/hosts' % \
1857                            (self.repo_url, expid)
1858                },
1859                {
1860                    'attribute': 'seer_ca_pem', 
1861                    'value': '%s/%s/config/%s' % \
1862                            (self.repo_url, expid, 'ca.pem')
1863                },
1864                {
1865                    'attribute': 'seer_node_pem', 
1866                    'value': '%s/%s/config/%s' % \
1867                            (self.repo_url, expid, 'node.pem')
1868                },
1869            ]
1870        return attrs
1871
1872
1873    def get_vtopo(self, req, fid):
1874        """
1875        Return the stored virtual topology for this experiment
1876        """
1877        rv = None
1878        state = None
1879
1880        req = req.get('VtopoRequestBody', None)
1881        if not req:
1882            raise service_error(service_error.req,
1883                    "Bad request format (no VtopoRequestBody)")
1884        exp = req.get('experiment', None)
1885        if exp:
1886            if exp.has_key('fedid'):
1887                key = exp['fedid']
1888                keytype = "fedid"
1889            elif exp.has_key('localname'):
1890                key = exp['localname']
1891                keytype = "localname"
1892            else:
1893                raise service_error(service_error.req, "Unknown lookup type")
1894        else:
1895            raise service_error(service_error.req, "No request?")
1896
1897        proof = self.check_experiment_access(fid, key)
1898
1899        self.state_lock.acquire()
1900        # XXX: this needs to be recalculated
1901        if key in self.state:
1902            if self.state[key].top is not None:
1903                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1904                rv = { 'experiment' : {keytype: key },
1905                        'vtopo': vtopo,
1906                        'proof': proof.to_dict(), 
1907                    }
1908            else:
1909                state = self.state[key].status
1910        self.state_lock.release()
1911
1912        if rv: return rv
1913        else: 
1914            if state:
1915                raise service_error(service_error.partial, 
1916                        "Not ready: %s" % state)
1917            else:
1918                raise service_error(service_error.req, "No such experiment")
1919
1920    def get_vis(self, req, fid):
1921        """
1922        Return the stored visualization for this experiment
1923        """
1924        rv = None
1925        state = None
1926
1927        req = req.get('VisRequestBody', None)
1928        if not req:
1929            raise service_error(service_error.req,
1930                    "Bad request format (no VisRequestBody)")
1931        exp = req.get('experiment', None)
1932        if exp:
1933            if exp.has_key('fedid'):
1934                key = exp['fedid']
1935                keytype = "fedid"
1936            elif exp.has_key('localname'):
1937                key = exp['localname']
1938                keytype = "localname"
1939            else:
1940                raise service_error(service_error.req, "Unknown lookup type")
1941        else:
1942            raise service_error(service_error.req, "No request?")
1943
1944        proof = self.check_experiment_access(fid, key)
1945
1946        self.state_lock.acquire()
1947        # Generate the visualization
1948        if key in self.state:
1949            if self.state[key].top is not None:
1950                try:
1951                    vis = self.genviz(
1952                            topdl.topology_to_vtopo(self.state[key].topo))
1953                except service_error, e:
1954                    self.state_lock.release()
1955                    raise e
1956                rv =  { 'experiment' : {keytype: key },
1957                        'vis': vis,
1958                        'proof': proof.to_dict(), 
1959                        }
1960            else:
1961                state = self.state[key].status
1962        self.state_lock.release()
1963
1964        if rv: return rv
1965        else:
1966            if state:
1967                raise service_error(service_error.partial, 
1968                        "Not ready: %s" % state)
1969            else:
1970                raise service_error(service_error.req, "No such experiment")
1971
1972   
1973    def save_federant_information(self, allocated, tbparams, eid, top):
1974        """
1975        Store the various data that have changed in the experiment state
1976        between when it was started and the beginning of resource allocation.
1977        This is basically the information about each local allocation.  This
1978        fills in the values of the placeholder allocation in the state.  It
1979        also collects the access proofs and returns them as dicts for a
1980        response message.
1981        """
1982        self.state_lock.acquire()
1983        exp = self.state[eid]
1984        exp.top = top.clone()
1985        # save federant information
1986        for k in allocated.keys():
1987            exp.add_allocation(tbparams[k])
1988            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
1989                type="testbed", localname=[k], 
1990                service=[ s.to_topdl() for s in tbparams[k].services]))
1991
1992        # Access proofs for the response message
1993        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
1994                    for p in tbparams[k].proof]
1995        exp.updated()
1996        if self.state_filename: 
1997            self.write_state()
1998        self.state_lock.release()
1999        return proofs
2000
2001    def clear_placeholder(self, eid, expid, tmpdir):
2002        """
2003        Clear the placeholder and remove any allocated temporary dir.
2004        """
2005
2006        self.state_lock.acquire()
2007        del self.state[eid]
2008        del self.state[expid]
2009        if self.state_filename: self.write_state()
2010        self.state_lock.release()
2011        if tmpdir and self.cleanup:
2012            self.remove_dirs(tmpdir)
2013
2014    # end of create_experiment sub-functions
2015
2016    def create_experiment(self, req, fid):
2017        """
2018        The external interface to experiment creation called from the
2019        dispatcher.
2020
2021        Creates a working directory, splits the incoming description using the
2022        splitter script and parses out the various subsections using the
2023        classes above.  Once each sub-experiment is created, use pooled threads
2024        to instantiate them and start it all up.
2025        """
2026
2027        req = req.get('CreateRequestBody', None)
2028        if req:
2029            key = self.get_experiment_key(req)
2030        else:
2031            raise service_error(service_error.req,
2032                    "Bad request format (no CreateRequestBody)")
2033
2034        # Import information from the requester
2035        if self.auth.import_credentials(data_list=req.get('credential', [])):
2036            self.auth.save()
2037
2038        # Make sure that the caller can talk to us
2039        proof = self.check_experiment_access(fid, key)
2040
2041        # Install the testbed map entries supplied with the request into a copy
2042        # of the testbed map.
2043        tbmap = dict(self.tbmap)
2044        for m in req.get('testbedmap', []):
2045            if 'testbed' in m and 'uri' in m:
2046                tbmap[m['testbed']] = m['uri']
2047
2048        # a place to work
2049        try:
2050            tmpdir = tempfile.mkdtemp(prefix="split-")
2051            os.mkdir(tmpdir+"/keys")
2052        except EnvironmentError:
2053            raise service_error(service_error.internal, "Cannot create tmp dir")
2054
2055        tbparams = { }
2056
2057        eid, expid, expcert_file = \
2058                self.get_experiment_ids_and_start(key, tmpdir)
2059
2060        # This catches exceptions to clear the placeholder if necessary
2061        try: 
2062            if not (eid and expid):
2063                raise service_error(service_error.internal, 
2064                        "Cannot find local experiment info!?")
2065
2066            top = self.get_topology(req, tmpdir)
2067            self.confirm_software(top)
2068            # Assign the IPs
2069            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2070            # Find the testbeds to look up
2071            tb_hosts = { }
2072            testbeds = [ ]
2073            for e in top.elements:
2074                if isinstance(e, topdl.Computer):
2075                    tb = e.get_attribute('testbed') or 'default'
2076                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2077                    else: 
2078                        tb_hosts[tb] = [ e.name ]
2079                        testbeds.append(tb)
2080
2081            masters = self.get_testbed_services(req, testbeds)
2082            # The exporters who need portals
2083            pmasters = dict([(k, [s for s in l if s.portal] ) \
2084                    for k, l in masters.items()])
2085            allocated = { }         # Testbeds we can access
2086            topo ={ }               # Sub topologies
2087            connInfo = { }          # Connection information
2088
2089            self.split_topology(top, topo, testbeds)
2090
2091            self.get_access_to_testbeds(testbeds, fid, allocated, 
2092                    tbparams, masters, tbmap, expid, expcert_file)
2093
2094            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2095
2096            part = experiment_partition(self.auth, self.store_url, tbmap,
2097                    self.muxmax, self.direct_transit)
2098            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2099                    connInfo, expid)
2100
2101            auth_attrs = set()
2102            # Now get access to the dynamic testbeds (those added above)
2103            for tb in [ t for t in topo if t not in allocated]:
2104                self.get_access(tb, tbparams, fid, masters, tbmap, 
2105                        expid, expcert_file)
2106                allocated[tb] = 1
2107                store_keys = topo[tb].get_attribute('store_keys')
2108                # Give the testbed access to keys it exports or imports
2109                if store_keys:
2110                    auth_attrs.update(set([
2111                        (tbparams[tb].allocID, sk) \
2112                                for sk in store_keys.split(" ")]))
2113
2114            if auth_attrs:
2115                self.append_experiment_authorization(expid, auth_attrs)
2116
2117            # transit and disconnected testbeds may not have a connInfo entry.
2118            # Fill in the blanks.
2119            for t in allocated.keys():
2120                if not connInfo.has_key(t):
2121                    connInfo[t] = { }
2122
2123            self.wrangle_software(expid, top, topo, tbparams)
2124
2125            proofs = self.save_federant_information(allocated, tbparams, 
2126                    eid, top)
2127        except service_error, e:
2128            # If something goes wrong in the parse (usually an access error)
2129            # clear the placeholder state.  From here on out the code delays
2130            # exceptions.  Failing at this point returns a fault to the remote
2131            # caller.
2132            self.clear_placeholder(eid, expid, tmpdir)
2133            raise e
2134
2135        # Start the background swapper and return the starting state.  From
2136        # here on out, the state will stick around a while.
2137
2138        # Create a logger that logs to the experiment's state object as well as
2139        # to the main log file.
2140        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2141        alloc_collector = self.list_log(self.state[eid].log)
2142        h = logging.StreamHandler(alloc_collector)
2143        # XXX: there should be a global one of these rather than repeating the
2144        # code.
2145        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2146                    '%d %b %y %H:%M:%S'))
2147        alloc_log.addHandler(h)
2148
2149        # Start a thread to do the resource allocation
2150        t  = Thread(target=self.allocate_resources,
2151                args=(allocated, masters, eid, expid, tbparams, 
2152                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2153                    connInfo, tbmap, expcert_file),
2154                name=eid)
2155        t.start()
2156
2157        rv = {
2158                'experimentID': [
2159                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2160                ],
2161                'experimentStatus': 'starting',
2162                'proof': [ proof.to_dict() ] + proofs,
2163            }
2164
2165        return rv
2166   
2167    def get_experiment_fedid(self, key):
2168        """
2169        find the fedid associated with the localname key in the state database.
2170        """
2171
2172        rv = None
2173        self.state_lock.acquire()
2174        if key in self.state:
2175            rv = self.state[key].fedid
2176        self.state_lock.release()
2177        return rv
2178
2179    def check_experiment_access(self, fid, key):
2180        """
2181        Confirm that the fid has access to the experiment.  Though a request
2182        may be made in terms of a local name, the access attribute is always
2183        the experiment's fedid.
2184        """
2185        if not isinstance(key, fedid):
2186            key = self.get_experiment_fedid(key)
2187
2188        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2189
2190        if access_ok:
2191            return proof
2192        else:
2193            raise service_error(service_error.access, "Access Denied",
2194                proof)
2195
2196
2197    def get_handler(self, path, fid):
2198        """
2199        Perhaps surprisingly named, this function handles HTTP GET requests to
2200        this server (SOAP requests are POSTs).
2201        """
2202        self.log.info("Get handler %s %s" % (path, fid))
2203        # XXX: log proofs?
2204        if self.auth.check_attribute(fid, path):
2205            return ("%s/%s" % (self.repodir, path), "application/binary")
2206        else:
2207            return (None, None)
2208
2209    def update_info(self, key, force=False):
2210        top = None
2211        self.state_lock.acquire()
2212        if key in self.state:
2213            if force or self.state[key].older_than(self.info_cache_limit):
2214                top = self.state[key].top
2215                if top is not None: top = top.clone()
2216                d1, info_params, cert, d2 = \
2217                        self.get_segment_info(self.state[key], need_lock=False)
2218        self.state_lock.release()
2219
2220        if top is None: return
2221
2222        try:
2223            tmpdir = tempfile.mkdtemp(prefix="info-")
2224        except EnvironmentError:
2225            raise service_error(service_error.internal, 
2226                    "Cannot create tmp dir")
2227        cert_file = self.make_temp_certfile(cert, tmpdir)
2228
2229        data = []
2230        try:
2231            for k, (uri, aid) in info_params.items():
2232                info=self.info_segment(log=self.log, testbed=uri,
2233                            cert_file=cert_file, cert_pwd=None,
2234                            trusted_certs=self.trusted_certs,
2235                            caller=self.call_InfoSegment)
2236                info(uri, aid)
2237                data.append(info)
2238        # Clean up the tmpdir no matter what
2239        finally:
2240            if tmpdir: self.remove_dirs(tmpdir)
2241
2242        self.annotate_topology(top, data)
2243        self.state_lock.acquire()
2244        if key in self.state:
2245            self.state[key].top = top
2246            self.state[key].updated()
2247            if self.state_filename: self.write_state()
2248        self.state_lock.release()
2249
2250   
2251    def get_info(self, req, fid):
2252        """
2253        Return all the stored info about this experiment
2254        """
2255        rv = None
2256
2257        req = req.get('InfoRequestBody', None)
2258        if not req:
2259            raise service_error(service_error.req,
2260                    "Bad request format (no InfoRequestBody)")
2261        exp = req.get('experiment', None)
2262        legacy = req.get('legacy', False)
2263        fresh = req.get('fresh', False)
2264        if exp:
2265            if exp.has_key('fedid'):
2266                key = exp['fedid']
2267                keytype = "fedid"
2268            elif exp.has_key('localname'):
2269                key = exp['localname']
2270                keytype = "localname"
2271            else:
2272                raise service_error(service_error.req, "Unknown lookup type")
2273        else:
2274            raise service_error(service_error.req, "No request?")
2275
2276        proof = self.check_experiment_access(fid, key)
2277
2278        self.update_info(key, fresh)
2279
2280        self.state_lock.acquire()
2281        if self.state.has_key(key):
2282            rv = self.state[key].get_info()
2283            # Copy the topo if we need legacy annotations
2284            if legacy:
2285                top = self.state[key].top
2286                if top is not None: top = top.clone()
2287        self.state_lock.release()
2288
2289        # If the legacy visualization and topology representations are
2290        # requested, calculate them and add them to the return.
2291        if legacy and rv is not None:
2292            if top is not None:
2293                vtopo = topdl.topology_to_vtopo(top)
2294                if vtopo is not None:
2295                    rv['vtopo'] = vtopo
2296                    try:
2297                        vis = self.genviz(vtopo)
2298                    except service_error, e:
2299                        self.log.debug('Problem generating visualization: %s' \
2300                                % e)
2301                        vis = None
2302                    if vis is not None:
2303                        rv['vis'] = vis
2304        if rv:
2305            rv['proof'] = proof.to_dict()
2306            return rv
2307        else:
2308            raise service_error(service_error.req, "No such experiment")
2309
2310    def get_multi_info(self, req, fid):
2311        """
2312        Return all the stored info that this fedid can access
2313        """
2314        rv = { 'info': [ ], 'proof': [ ] }
2315
2316        self.state_lock.acquire()
2317        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2318            try:
2319                proof = self.check_experiment_access(fid, key)
2320            except service_error, e:
2321                if e.code == service_error.access:
2322                    continue
2323                else:
2324                    self.state_lock.release()
2325                    raise e
2326
2327            if self.state.has_key(key):
2328                e = self.state[key].get_info()
2329                e['proof'] = proof.to_dict()
2330                rv['info'].append(e)
2331                rv['proof'].append(proof.to_dict())
2332        self.state_lock.release()
2333        return rv
2334
2335    def check_termination_status(self, fed_exp, force):
2336        """
2337        Confirm that the experiment is sin a valid state to stop (or force it)
2338        return the state - invalid states for deletion and force settings cause
2339        exceptions.
2340        """
2341        self.state_lock.acquire()
2342        status = fed_exp.status
2343
2344        if status:
2345            if status in ('starting', 'terminating'):
2346                if not force:
2347                    self.state_lock.release()
2348                    raise service_error(service_error.partial, 
2349                            'Experiment still being created or destroyed')
2350                else:
2351                    self.log.warning('Experiment in %s state ' % status + \
2352                            'being terminated by force.')
2353            self.state_lock.release()
2354            return status
2355        else:
2356            # No status??? trouble
2357            self.state_lock.release()
2358            raise service_error(service_error.internal,
2359                    "Experiment has no status!?")
2360
2361    def get_segment_info(self, fed_exp, need_lock=True):
2362        ids = []
2363        term_params = { }
2364        if need_lock: self.state_lock.acquire()
2365        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2366        expcert = fed_exp.identity
2367        repo = "%s" % fed_exp.fedid
2368
2369        # Collect the allocation/segment ids into a dict keyed by the fedid
2370        # of the allocation that contains a tuple of uri, aid
2371        for i, fed in enumerate(fed_exp.get_all_allocations()):
2372            uri = fed.uri
2373            aid = fed.allocID
2374            term_params[aid] = (uri, aid)
2375        if need_lock: self.state_lock.release()
2376        return ids, term_params, expcert, repo
2377
2378
2379    def get_termination_info(self, fed_exp):
2380        self.state_lock.acquire()
2381        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2382        # Change the experiment state
2383        fed_exp.status = 'terminating'
2384        fed_exp.updated()
2385        if self.state_filename: self.write_state()
2386        self.state_lock.release()
2387
2388        return ids, term_params, expcert, repo
2389
2390
2391    def deallocate_resources(self, term_params, expcert, status, force, 
2392            dealloc_log):
2393        tmpdir = None
2394        # This try block makes sure the tempdir is cleared
2395        try:
2396            # If no expcert, try the deallocation as the experiment
2397            # controller instance.
2398            if expcert and self.auth_type != 'legacy': 
2399                try:
2400                    tmpdir = tempfile.mkdtemp(prefix="term-")
2401                except EnvironmentError:
2402                    raise service_error(service_error.internal, 
2403                            "Cannot create tmp dir")
2404                cert_file = self.make_temp_certfile(expcert, tmpdir)
2405                pw = None
2406            else: 
2407                cert_file = self.cert_file
2408                pw = self.cert_pwd
2409
2410            # Stop everyone.  NB, wait_for_all waits until a thread starts
2411            # and then completes, so we can't wait if nothing starts.  So,
2412            # no tbparams, no start.
2413            if len(term_params) > 0:
2414                tp = thread_pool(self.nthreads)
2415                for k, (uri, aid) in term_params.items():
2416                    # Create and start a thread to stop the segment
2417                    tp.wait_for_slot()
2418                    t  = pooled_thread(\
2419                            target=self.terminate_segment(log=dealloc_log,
2420                                testbed=uri,
2421                                cert_file=cert_file, 
2422                                cert_pwd=pw,
2423                                trusted_certs=self.trusted_certs,
2424                                caller=self.call_TerminateSegment),
2425                            args=(uri, aid), name=k,
2426                            pdata=tp, trace_file=self.trace_file)
2427                    t.start()
2428                # Wait for completions
2429                tp.wait_for_all_done()
2430
2431            # release the allocations (failed experiments have done this
2432            # already, and starting experiments may be in odd states, so we
2433            # ignore errors releasing those allocations
2434            try: 
2435                for k, (uri, aid)  in term_params.items():
2436                    self.release_access(None, aid, uri=uri,
2437                            cert_file=cert_file, cert_pwd=pw)
2438            except service_error, e:
2439                if status != 'failed' and not force:
2440                    raise e
2441
2442        # Clean up the tmpdir no matter what
2443        finally:
2444            if tmpdir: self.remove_dirs(tmpdir)
2445
2446    def terminate_experiment(self, req, fid):
2447        """
2448        Swap this experiment out on the federants and delete the shared
2449        information
2450        """
2451        tbparams = { }
2452        req = req.get('TerminateRequestBody', None)
2453        if not req:
2454            raise service_error(service_error.req,
2455                    "Bad request format (no TerminateRequestBody)")
2456
2457        key = self.get_experiment_key(req, 'experiment')
2458        proof = self.check_experiment_access(fid, key)
2459        exp = req.get('experiment', False)
2460        force = req.get('force', False)
2461
2462        dealloc_list = [ ]
2463
2464
2465        # Create a logger that logs to the dealloc_list as well as to the main
2466        # log file.
2467        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2468        h = logging.StreamHandler(self.list_log(dealloc_list))
2469        # XXX: there should be a global one of these rather than repeating the
2470        # code.
2471        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2472                    '%d %b %y %H:%M:%S'))
2473        dealloc_log.addHandler(h)
2474
2475        self.state_lock.acquire()
2476        fed_exp = self.state.get(key, None)
2477        self.state_lock.release()
2478        repo = None
2479
2480        if fed_exp:
2481            status = self.check_termination_status(fed_exp, force)
2482            # get_termination_info updates the experiment state
2483            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2484            self.deallocate_resources(term_params, expcert, status, force, 
2485                    dealloc_log)
2486
2487            # Remove the terminated experiment
2488            self.state_lock.acquire()
2489            for id in ids:
2490                self.clear_experiment_authorization(id, need_state_lock=False)
2491                if id in self.state: del self.state[id]
2492
2493            if self.state_filename: self.write_state()
2494            self.state_lock.release()
2495
2496            # Delete any synch points associated with this experiment.  All
2497            # synch points begin with the fedid of the experiment.
2498            fedid_keys = set(["fedid:%s" % f for f in ids \
2499                    if isinstance(f, fedid)])
2500            for k in self.synch_store.all_keys():
2501                try:
2502                    if len(k) > 45 and k[0:46] in fedid_keys:
2503                        self.synch_store.del_value(k)
2504                except synch_store.BadDeletionError:
2505                    pass
2506            self.write_store()
2507
2508            # Remove software and other cached stuff from the filesystem.
2509            if repo:
2510                self.remove_dirs("%s/%s" % (self.repodir, repo))
2511       
2512            return { 
2513                    'experiment': exp , 
2514                    'deallocationLog': string.join(dealloc_list, ''),
2515                    'proof': [proof.to_dict()],
2516                    }
2517        else:
2518            raise service_error(service_error.req, "No saved state")
2519
2520
2521    def GetValue(self, req, fid):
2522        """
2523        Get a value from the synchronized store
2524        """
2525        req = req.get('GetValueRequestBody', None)
2526        if not req:
2527            raise service_error(service_error.req,
2528                    "Bad request format (no GetValueRequestBody)")
2529       
2530        name = req.get('name', None)
2531        wait = req.get('wait', False)
2532        rv = { 'name': name }
2533
2534        if not name:
2535            raise service_error(service_error.req, "No name?")
2536
2537        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2538
2539        if access_ok:
2540            self.log.debug("[GetValue] asking for %s " % name)
2541            try:
2542                v = self.synch_store.get_value(name, wait)
2543            except synch_store.RevokedKeyError:
2544                # No more synch on this key
2545                raise service_error(service_error.federant, 
2546                        "Synch key %s revoked" % name)
2547            if v is not None:
2548                rv['value'] = v
2549            rv['proof'] = proof.to_dict()
2550            self.log.debug("[GetValue] got %s from %s" % (v, name))
2551            return rv
2552        else:
2553            raise service_error(service_error.access, "Access Denied",
2554                    proof=proof)
2555       
2556
2557    def SetValue(self, req, fid):
2558        """
2559        Set a value in the synchronized store
2560        """
2561        req = req.get('SetValueRequestBody', None)
2562        if not req:
2563            raise service_error(service_error.req,
2564                    "Bad request format (no SetValueRequestBody)")
2565       
2566        name = req.get('name', None)
2567        v = req.get('value', '')
2568
2569        if not name:
2570            raise service_error(service_error.req, "No name?")
2571
2572        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2573
2574        if access_ok:
2575            try:
2576                self.synch_store.set_value(name, v)
2577                self.write_store()
2578                self.log.debug("[SetValue] set %s to %s" % (name, v))
2579            except synch_store.CollisionError:
2580                # Translate into a service_error
2581                raise service_error(service_error.req,
2582                        "Value already set: %s" %name)
2583            except synch_store.RevokedKeyError:
2584                # No more synch on this key
2585                raise service_error(service_error.federant, 
2586                        "Synch key %s revoked" % name)
2587                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2588        else:
2589            raise service_error(service_error.access, "Access Denied",
2590                    proof=proof)
Note: See TracBrowser for help on using the repository browser.