source: fedd/federation/experiment_control.py @ 57facae

compt_changesinfo-ops
Last change on this file since 57facae was 57facae, checked in by Ted Faber <faber@…>, 13 years ago

Put pmasters back into get_testbed_services - too much can go wrong the
other way.

  • Property mode set to 100644
File size: 85.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from fedid import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info, federated_service
39
40import topdl
41import list_log
42from ip_allocator import ip_allocator
43from ip_addr import ip_addr
44
45
46class nullHandler(logging.Handler):
47    def emit(self, record): pass
48
49fl = logging.getLogger("fedd.experiment_control")
50fl.addHandler(nullHandler())
51
52class experiment_control_local(experiment_control_legacy):
53    """
54    Control of experiments that this system can directly access.
55
56    Includes experiment creation, termination and information dissemination.
57    Thred safe.
58    """
59
60    class ssh_cmd_timeout(RuntimeError): pass
61   
62    call_RequestAccess = service_caller('RequestAccess')
63    call_ReleaseAccess = service_caller('ReleaseAccess')
64    call_StartSegment = service_caller('StartSegment')
65    call_TerminateSegment = service_caller('TerminateSegment')
66    call_InfoSegment = service_caller('InfoSegment')
67    call_Ns2Topdl = service_caller('Ns2Topdl')
68
69    def __init__(self, config=None, auth=None):
70        """
71        Intialize the various attributes, most from the config object
72        """
73
74        def parse_tarfile_list(tf):
75            """
76            Parse a tarfile list from the configuration.  This is a set of
77            paths and tarfiles separated by spaces.
78            """
79            rv = [ ]
80            if tf is not None:
81                tl = tf.split()
82                while len(tl) > 1:
83                    p, t = tl[0:2]
84                    del tl[0:2]
85                    rv.append((p, t))
86            return rv
87
88        self.list_log = list_log.list_log
89
90        self.cert_file = config.get("experiment_control", "cert_file")
91        if self.cert_file:
92            self.cert_pwd = config.get("experiment_control", "cert_pwd")
93        else:
94            self.cert_file = config.get("globals", "cert_file")
95            self.cert_pwd = config.get("globals", "cert_pwd")
96
97        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
98                or config.get("globals", "trusted_certs")
99
100        self.repodir = config.get("experiment_control", "repodir")
101        self.repo_url = config.get("experiment_control", "repo_url", 
102                "https://users.isi.deterlab.net:23235");
103
104        self.exp_stem = "fed-stem"
105        self.log = logging.getLogger("fedd.experiment_control")
106        set_log_level(config, "experiment_control", self.log)
107        self.muxmax = 2
108        self.nthreads = 10
109        self.randomize_experiments = False
110
111        self.splitter = None
112        self.ssh_keygen = "/usr/bin/ssh-keygen"
113        self.ssh_identity_file = None
114
115
116        self.debug = config.getboolean("experiment_control", "create_debug")
117        self.cleanup = not config.getboolean("experiment_control", 
118                "leave_tmpfiles")
119        self.state_filename = config.get("experiment_control", 
120                "experiment_state")
121        self.store_filename = config.get("experiment_control", 
122                "synch_store")
123        self.store_url = config.get("experiment_control", "store_url")
124        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
125        self.fedkit = parse_tarfile_list(\
126                config.get("experiment_control", "fedkit"))
127        self.gatewaykit = parse_tarfile_list(\
128                config.get("experiment_control", "gatewaykit"))
129        accessdb_file = config.get("experiment_control", "accessdb")
130
131        dt = config.get("experiment_control", "direct_transit")
132        self.auth_type = config.get('experiment_control', 'auth_type') \
133                or 'legacy'
134        self.auth_dir = config.get('experiment_control', 'auth_dir')
135        # XXX: document this!
136        self.info_cache_limit = \
137                config.getint('experiment_control', 'info_cache', 600)
138        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
139        else: self.direct_transit = [ ]
140        # NB for internal master/slave ops, not experiment setup
141        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
142
143        self.overrides = set([])
144        ovr = config.get('experiment_control', 'overrides')
145        if ovr:
146            for o in ovr.split(","):
147                o = o.strip()
148                if o.startswith('fedid:'): o = o[len('fedid:'):]
149                self.overrides.add(fedid(hexstr=o))
150
151        self.state = { }
152        self.state_lock = Lock()
153        self.tclsh = "/usr/local/bin/otclsh"
154        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
155                config.get("experiment_control", "tcl_splitter",
156                        "/usr/testbed/lib/ns2ir/parse.tcl")
157        mapdb_file = config.get("experiment_control", "mapdb")
158        self.trace_file = sys.stderr
159
160        self.def_expstart = \
161                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
162                "/tmp/federate";
163        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
164                "FEDDIR/hosts";
165        self.def_gwstart = \
166                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
167                "/tmp/bridge.log";
168        self.def_mgwstart = \
169                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
170                "/tmp/bridge.log";
171        self.def_gwimage = "FBSD61-TUNNEL2";
172        self.def_gwtype = "pc";
173        self.local_access = { }
174
175        if self.auth_type == 'legacy':
176            if auth:
177                self.auth = auth
178            else:
179                self.log.error( "[access]: No authorizer initialized, " +\
180                        "creating local one.")
181                auth = authorizer()
182            self.get_access = self.legacy_get_access
183        elif self.auth_type == 'abac':
184            self.auth = abac_authorizer(load=self.auth_dir)
185        else:
186            raise service_error(service_error.internal, 
187                    "Unknown auth_type: %s" % self.auth_type)
188
189        if mapdb_file:
190            self.read_mapdb(mapdb_file)
191        else:
192            self.log.warn("[experiment_control] No testbed map, using defaults")
193            self.tbmap = { 
194                    'deter':'https://users.isi.deterlab.net:23235',
195                    'emulab':'https://users.isi.deterlab.net:23236',
196                    'ucb':'https://users.isi.deterlab.net:23237',
197                    }
198
199        if accessdb_file:
200                self.read_accessdb(accessdb_file)
201        else:
202            raise service_error(service_error.internal,
203                    "No accessdb specified in config")
204
205        # Grab saved state.  OK to do this w/o locking because it's read only
206        # and only one thread should be in existence that can see self.state at
207        # this point.
208        if self.state_filename:
209            self.read_state()
210
211        if self.store_filename:
212            self.read_store()
213        else:
214            self.log.warning("No saved synch store")
215            self.synch_store = synch_store
216
217        # Dispatch tables
218        self.soap_services = {\
219                'New': soap_handler('New', self.new_experiment),
220                'Create': soap_handler('Create', self.create_experiment),
221                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
222                'Vis': soap_handler('Vis', self.get_vis),
223                'Info': soap_handler('Info', self.get_info),
224                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
225                'Terminate': soap_handler('Terminate', 
226                    self.terminate_experiment),
227                'GetValue': soap_handler('GetValue', self.GetValue),
228                'SetValue': soap_handler('SetValue', self.SetValue),
229        }
230
231        self.xmlrpc_services = {\
232                'New': xmlrpc_handler('New', self.new_experiment),
233                'Create': xmlrpc_handler('Create', self.create_experiment),
234                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
235                'Vis': xmlrpc_handler('Vis', self.get_vis),
236                'Info': xmlrpc_handler('Info', self.get_info),
237                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
238                'Terminate': xmlrpc_handler('Terminate',
239                    self.terminate_experiment),
240                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
241                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
242        }
243
244    # Call while holding self.state_lock
245    def write_state(self):
246        """
247        Write a new copy of experiment state after copying the existing state
248        to a backup.
249
250        State format is a simple pickling of the state dictionary.
251        """
252        if os.access(self.state_filename, os.W_OK):
253            copy_file(self.state_filename, \
254                    "%s.bak" % self.state_filename)
255        try:
256            f = open(self.state_filename, 'w')
257            pickle.dump(self.state, f)
258        except EnvironmentError, e:
259            self.log.error("Can't write file %s: %s" % \
260                    (self.state_filename, e))
261        except pickle.PicklingError, e:
262            self.log.error("Pickling problem: %s" % e)
263        except TypeError, e:
264            self.log.error("Pickling problem (TypeError): %s" % e)
265
266    @staticmethod
267    def get_alloc_ids(exp):
268        """
269        Used by read_store and read state.  This used to be worse.
270        """
271
272        return [ a.allocID for a in exp.get_all_allocations() ]
273
274
275    # Call while holding self.state_lock
276    def read_state(self):
277        """
278        Read a new copy of experiment state.  Old state is overwritten.
279
280        State format is a simple pickling of the state dictionary.
281        """
282       
283        try:
284            f = open(self.state_filename, "r")
285            self.state = pickle.load(f)
286            self.log.debug("[read_state]: Read state from %s" % \
287                    self.state_filename)
288        except EnvironmentError, e:
289            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
290                    % (self.state_filename, e))
291        except pickle.UnpicklingError, e:
292            self.log.warning(("[read_state]: No saved state: " + \
293                    "Unpickling failed: %s") % e)
294       
295        for s in self.state.values():
296            try:
297
298                eid = s.fedid
299                if eid : 
300                    if self.auth_type == 'legacy':
301                        # XXX: legacy
302                        # Give the owner rights to the experiment
303                        #self.auth.set_attribute(s['owner'], eid)
304                        # And holders of the eid as well
305                        self.auth.set_attribute(eid, eid)
306                        # allow overrides to control experiments as well
307                        for o in self.overrides:
308                            self.auth.set_attribute(o, eid)
309                        # Set permissions to allow reading of the software
310                        # repo, if any, as well.
311                        for a in self.get_alloc_ids(s):
312                            self.auth.set_attribute(a, 'repo/%s' % eid)
313                else:
314                    raise KeyError("No experiment id")
315            except KeyError, e:
316                self.log.warning("[read_state]: State ownership or identity " +\
317                        "misformatted in %s: %s" % (self.state_filename, e))
318
319
320    def read_accessdb(self, accessdb_file):
321        """
322        Read the mapping from fedids that can create experiments to their name
323        in the 3-level access namespace.  All will be asserted from this
324        testbed and can include the local username and porject that will be
325        asserted on their behalf by this fedd.  Each fedid is also added to the
326        authorization system with the "create" attribute.
327        """
328        self.accessdb = {}
329        # These are the regexps for parsing the db
330        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
331        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
332                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
333        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
334                "\s*->\s*(" + name_expr + ")\s*$")
335        lineno = 0
336
337        # Parse the mappings and store in self.authdb, a dict of
338        # fedid -> (proj, user)
339        try:
340            f = open(accessdb_file, "r")
341            for line in f:
342                lineno += 1
343                line = line.strip()
344                if len(line) == 0 or line.startswith('#'):
345                    continue
346                m = project_line.match(line)
347                if m:
348                    fid = fedid(hexstr=m.group(1))
349                    project, user = m.group(2,3)
350                    if not self.accessdb.has_key(fid):
351                        self.accessdb[fid] = []
352                    self.accessdb[fid].append((project, user))
353                    continue
354
355                m = user_line.match(line)
356                if m:
357                    fid = fedid(hexstr=m.group(1))
358                    project = None
359                    user = m.group(2)
360                    if not self.accessdb.has_key(fid):
361                        self.accessdb[fid] = []
362                    self.accessdb[fid].append((project, user))
363                    continue
364                self.log.warn("[experiment_control] Error parsing access " +\
365                        "db %s at line %d" %  (accessdb_file, lineno))
366        except EnvironmentError:
367            raise service_error(service_error.internal,
368                    ("Error opening/reading %s as experiment " +\
369                            "control accessdb") %  accessdb_file)
370        f.close()
371
372        # Initialize the authorization attributes
373        # XXX: legacy
374        if self.auth_type == 'legacy':
375            for fid in self.accessdb.keys():
376                self.auth.set_attribute(fid, 'create')
377                self.auth.set_attribute(fid, 'new')
378
379    def read_mapdb(self, file):
380        """
381        Read a simple colon separated list of mappings for the
382        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
383        """
384
385        self.tbmap = { }
386        lineno =0
387        try:
388            f = open(file, "r")
389            for line in f:
390                lineno += 1
391                line = line.strip()
392                if line.startswith('#') or len(line) == 0:
393                    continue
394                try:
395                    label, url = line.split(':', 1)
396                    self.tbmap[label] = url
397                except ValueError, e:
398                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
399                            "map db: %s %s" % (lineno, line, e))
400        except EnvironmentError, e:
401            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
402                    "open %s: %s" % (file, e))
403        else:
404            f.close()
405
406    def read_store(self):
407        try:
408            self.synch_store = synch_store()
409            self.synch_store.load(self.store_filename)
410            self.log.debug("[read_store]: Read store from %s" % \
411                    self.store_filename)
412        except EnvironmentError, e:
413            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
414                    % (self.state_filename, e))
415            self.synch_store = synch_store()
416
417        # Set the initial permissions on data in the store.  XXX: This ad hoc
418        # authorization attribute initialization is getting out of hand.
419        # XXX: legacy
420        if self.auth_type == 'legacy':
421            for k in self.synch_store.all_keys():
422                try:
423                    if k.startswith('fedid:'):
424                        fid = fedid(hexstr=k[6:46])
425                        if self.state.has_key(fid):
426                            for a in self.get_alloc_ids(self.state[fid]):
427                                self.auth.set_attribute(a, k)
428                except ValueError, e:
429                    self.log.warn("Cannot deduce permissions for %s" % k)
430
431
432    def write_store(self):
433        """
434        Write a new copy of synch_store after writing current state
435        to a backup.  We use the internal synch_store pickle method to avoid
436        incinsistent data.
437
438        State format is a simple pickling of the store.
439        """
440        if os.access(self.store_filename, os.W_OK):
441            copy_file(self.store_filename, \
442                    "%s.bak" % self.store_filename)
443        try:
444            self.synch_store.save(self.store_filename)
445        except EnvironmentError, e:
446            self.log.error("Can't write file %s: %s" % \
447                    (self.store_filename, e))
448        except TypeError, e:
449            self.log.error("Pickling problem (TypeError): %s" % e)
450
451
452    def remove_dirs(self, dir):
453        """
454        Remove the directory tree and all files rooted at dir.  Log any errors,
455        but continue.
456        """
457        self.log.debug("[removedirs]: removing %s" % dir)
458        try:
459            for path, dirs, files in os.walk(dir, topdown=False):
460                for f in files:
461                    os.remove(os.path.join(path, f))
462                for d in dirs:
463                    os.rmdir(os.path.join(path, d))
464            os.rmdir(dir)
465        except EnvironmentError, e:
466            self.log.error("Error deleting directory tree in %s" % e);
467
468    @staticmethod
469    def make_temp_certfile(expcert, tmpdir):
470        """
471        make a protected copy of the access certificate so the experiment
472        controller can act as the experiment principal.  mkstemp is the most
473        secure way to do that. The directory should be created by
474        mkdtemp.  Return the filename.
475        """
476        if expcert and tmpdir:
477            try:
478                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
479                f = os.fdopen(certf, 'w')
480                print >> f, expcert
481                f.close()
482            except EnvironmentError, e:
483                raise service_error(service_error.internal, 
484                        "Cannot create temp cert file?")
485            return certfn
486        else:
487            return None
488
489       
490    def generate_ssh_keys(self, dest, type="rsa" ):
491        """
492        Generate a set of keys for the gateways to use to talk.
493
494        Keys are of type type and are stored in the required dest file.
495        """
496        valid_types = ("rsa", "dsa")
497        t = type.lower();
498        if t not in valid_types: raise ValueError
499        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
500
501        try:
502            trace = open("/dev/null", "w")
503        except EnvironmentError:
504            raise service_error(service_error.internal,
505                    "Cannot open /dev/null??");
506
507        # May raise CalledProcessError
508        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
509        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
510        if rv != 0:
511            raise service_error(service_error.internal, 
512                    "Cannot generate nonce ssh keys.  %s return code %d" \
513                            % (self.ssh_keygen, rv))
514
515    def generate_seer_certs(self, destdir):
516        '''
517        Create a SEER ca cert and a node cert in destdir/ca.pem and
518        destdir/node.pem respectively.  These will be distributed throughout
519        the federated experiment.  This routine reports errors via
520        service_errors.
521        '''
522        openssl = '/usr/bin/openssl'
523        # All the filenames and parameters we need for openssl calls below
524        ca_key =os.path.join(destdir, 'ca.key') 
525        ca_pem = os.path.join(destdir, 'ca.pem')
526        node_key =os.path.join(destdir, 'node.key') 
527        node_pem = os.path.join(destdir, 'node.pem')
528        node_req = os.path.join(destdir, 'node.req')
529        node_signed = os.path.join(destdir, 'node.signed')
530        days = '%s' % (365 * 10)
531        serial = '%s' % random.randint(0, 1<<16)
532
533        try:
534            # Sequence of calls to create a CA key, create a ca cert, create a
535            # node key, node signing request, and finally a signed node
536            # certificate.
537            sequence = (
538                    (openssl, 'genrsa', '-out', ca_key, '1024'),
539                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
540                        ca_pem, '-days', days, '-subj', 
541                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
542                    (openssl, 'genrsa', '-out', node_key, '1024'),
543                    (openssl, 'req', '-new', '-key', node_key, '-out', 
544                        node_req, '-days', days, '-subj', 
545                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
546                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
547                        '-set_serial', serial, '-req', '-in', node_req, 
548                        '-out', node_signed, '-days', days),
549                )
550            # Do all that stuff; bail if there's an error, and push all the
551            # output to dev/null.
552            for cmd in sequence:
553                trace = open("/dev/null", "w")
554                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
555                if rv != 0:
556                    raise service_error(service_error.internal, 
557                            "Cannot generate SEER certs.  %s return code %d" \
558                                    % (' '.join(cmd), rv))
559            # Concatinate the node key and signed certificate into node.pem
560            f = open(node_pem, 'w')
561            for comp in (node_signed, node_key):
562                g = open(comp, 'r')
563                f.write(g.read())
564                g.close()
565            f.close()
566
567            # Throw out intermediaries.
568            for fn in (ca_key, node_key, node_req, node_signed):
569                os.unlink(fn)
570
571        except EnvironmentError, e:
572            # Any difficulties with the file system wind up here
573            raise service_error(service_error.internal,
574                    "File error on  %s while creating SEER certs: %s" % \
575                            (e.filename, e.strerror))
576
577
578
579    def gentopo(self, str):
580        """
581        Generate the topology data structure from the splitter's XML
582        representation of it.
583
584        The topology XML looks like:
585            <experiment>
586                <nodes>
587                    <node><vname></vname><ips>ip1:ip2</ips></node>
588                </nodes>
589                <lans>
590                    <lan>
591                        <vname></vname><vnode></vnode><ip></ip>
592                        <bandwidth></bandwidth><member>node:port</member>
593                    </lan>
594                </lans>
595        """
596        class topo_parse:
597            """
598            Parse the topology XML and create the dats structure.
599            """
600            def __init__(self):
601                # Typing of the subelements for data conversion
602                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
603                self.int_subelements = ( 'bandwidth',)
604                self.float_subelements = ( 'delay',)
605                # The final data structure
606                self.nodes = [ ]
607                self.lans =  [ ]
608                self.topo = { \
609                        'node': self.nodes,\
610                        'lan' : self.lans,\
611                    }
612                self.element = { }  # Current element being created
613                self.chars = ""     # Last text seen
614
615            def end_element(self, name):
616                # After each sub element the contents is added to the current
617                # element or to the appropriate list.
618                if name == 'node':
619                    self.nodes.append(self.element)
620                    self.element = { }
621                elif name == 'lan':
622                    self.lans.append(self.element)
623                    self.element = { }
624                elif name in self.str_subelements:
625                    self.element[name] = self.chars
626                    self.chars = ""
627                elif name in self.int_subelements:
628                    self.element[name] = int(self.chars)
629                    self.chars = ""
630                elif name in self.float_subelements:
631                    self.element[name] = float(self.chars)
632                    self.chars = ""
633
634            def found_chars(self, data):
635                self.chars += data.rstrip()
636
637
638        tp = topo_parse();
639        parser = xml.parsers.expat.ParserCreate()
640        parser.EndElementHandler = tp.end_element
641        parser.CharacterDataHandler = tp.found_chars
642
643        parser.Parse(str)
644
645        return tp.topo
646       
647
648    def genviz(self, topo):
649        """
650        Generate the visualization the virtual topology
651        """
652
653        neato = "/usr/local/bin/neato"
654        # These are used to parse neato output and to create the visualization
655        # file.
656        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
657        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
658                "%s</type></node>"
659
660        try:
661            # Node names
662            nodes = [ n['vname'] for n in topo['node'] ]
663            topo_lans = topo['lan']
664        except KeyError, e:
665            raise service_error(service_error.internal, "Bad topology: %s" %e)
666
667        lans = { }
668        links = { }
669
670        # Walk through the virtual topology, organizing the connections into
671        # 2-node connections (links) and more-than-2-node connections (lans).
672        # When a lan is created, it's added to the list of nodes (there's a
673        # node in the visualization for the lan).
674        for l in topo_lans:
675            if links.has_key(l['vname']):
676                if len(links[l['vname']]) < 2:
677                    links[l['vname']].append(l['vnode'])
678                else:
679                    nodes.append(l['vname'])
680                    lans[l['vname']] = links[l['vname']]
681                    del links[l['vname']]
682                    lans[l['vname']].append(l['vnode'])
683            elif lans.has_key(l['vname']):
684                lans[l['vname']].append(l['vnode'])
685            else:
686                links[l['vname']] = [ l['vnode'] ]
687
688
689        # Open up a temporary file for dot to turn into a visualization
690        try:
691            df, dotname = tempfile.mkstemp()
692            dotfile = os.fdopen(df, 'w')
693        except EnvironmentError:
694            raise service_error(service_error.internal,
695                    "Failed to open file in genviz")
696
697        try:
698            dnull = open('/dev/null', 'w')
699        except EnvironmentError:
700            service_error(service_error.internal,
701                    "Failed to open /dev/null in genviz")
702
703        # Generate a dot/neato input file from the links, nodes and lans
704        try:
705            print >>dotfile, "graph G {"
706            for n in nodes:
707                print >>dotfile, '\t"%s"' % n
708            for l in links.keys():
709                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
710            for l in lans.keys():
711                for n in lans[l]:
712                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
713            print >>dotfile, "}"
714            dotfile.close()
715        except TypeError:
716            raise service_error(service_error.internal,
717                    "Single endpoint link in vtopo")
718        except EnvironmentError:
719            raise service_error(service_error.internal, "Cannot write dot file")
720
721        # Use dot to create a visualization
722        try:
723            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
724                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
725                stderr=dnull, close_fds=True)
726        except EnvironmentError:
727            raise service_error(service_error.internal, 
728                    "Cannot generate visualization: is graphviz available?")
729        dnull.close()
730
731        # Translate dot to vis format
732        vis_nodes = [ ]
733        vis = { 'node': vis_nodes }
734        for line in dot.stdout:
735            m = vis_re.match(line)
736            if m:
737                vn = m.group(1)
738                vis_node = {'name': vn, \
739                        'x': float(m.group(2)),\
740                        'y' : float(m.group(3)),\
741                    }
742                if vn in links.keys() or vn in lans.keys():
743                    vis_node['type'] = 'lan'
744                else:
745                    vis_node['type'] = 'node'
746                vis_nodes.append(vis_node)
747        rv = dot.wait()
748
749        os.remove(dotname)
750        # XXX: graphviz seems to use low return codes for warnings, like
751        # "couldn't find font"
752        if rv < 2 : return vis
753        else: return None
754
755
756    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
757            cert_pwd=None):
758        """
759        Release access to testbed through fedd
760        """
761
762        if not uri and tbmap:
763            uri = tbmap.get(tb, None)
764        if not uri:
765            raise service_error(service_error.server_config, 
766                    "Unknown testbed: %s" % tb)
767
768        if self.local_access.has_key(uri):
769            resp = self.local_access[uri].ReleaseAccess(\
770                    { 'ReleaseAccessRequestBody' : 
771                        {'allocID': {'fedid': aid}},}, 
772                    fedid(file=cert_file))
773            resp = { 'ReleaseAccessResponseBody': resp } 
774        else:
775            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
776                    cert_file, cert_pwd, self.trusted_certs)
777
778        # better error coding
779
780    def remote_ns2topdl(self, uri, desc):
781
782        req = {
783                'description' : { 'ns2description': desc },
784            }
785
786        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
787                self.trusted_certs)
788
789        if r.has_key('Ns2TopdlResponseBody'):
790            r = r['Ns2TopdlResponseBody']
791            ed = r.get('experimentdescription', None)
792            if ed.has_key('topdldescription'):
793                return topdl.Topology(**ed['topdldescription'])
794            else:
795                raise service_error(service_error.protocol, 
796                        "Bad splitter response (no output)")
797        else:
798            raise service_error(service_error.protocol, "Bad splitter response")
799
800    class start_segment:
801        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
802                cert_pwd=None, trusted_certs=None, caller=None,
803                log_collector=None):
804            self.log = log
805            self.debug = debug
806            self.cert_file = cert_file
807            self.cert_pwd = cert_pwd
808            self.trusted_certs = None
809            self.caller = caller
810            self.testbed = testbed
811            self.log_collector = log_collector
812            self.response = None
813            self.node = { }
814            self.proof = None
815
816        def make_map(self, resp):
817            if 'segmentdescription' not in resp  or \
818                    'topdldescription' not in resp['segmentdescription']:
819                self.log.warn('No topology returned from startsegment')
820                return 
821
822            top = topdl.Topology(
823                    **resp['segmentdescription']['topdldescription'])
824
825            for e in top.elements:
826                if isinstance(e, topdl.Computer):
827                    self.node[e.name] = \
828                            (e.localname, e.status, e.service, e.operation)
829
830        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
831            req = {
832                    'allocID': { 'fedid' : aid }, 
833                    'segmentdescription': { 
834                        'topdldescription': topo.to_dict(),
835                    },
836                }
837
838            if connInfo:
839                req['connection'] = connInfo
840
841            import_svcs = [ s for m in masters.values() \
842                    for s in m if self.testbed in s.importers]
843
844            if import_svcs or self.testbed in masters:
845                req['service'] = []
846
847            for s in import_svcs:
848                for r in s.reqs:
849                    sr = copy.deepcopy(r)
850                    sr['visibility'] = 'import';
851                    req['service'].append(sr)
852
853            for s in masters.get(self.testbed, []):
854                for r in s.reqs:
855                    sr = copy.deepcopy(r)
856                    sr['visibility'] = 'export';
857                    req['service'].append(sr)
858
859            if attrs:
860                req['fedAttr'] = attrs
861
862            try:
863                self.log.debug("Calling StartSegment at %s " % uri)
864                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
865                        self.trusted_certs)
866                if r.has_key('StartSegmentResponseBody'):
867                    lval = r['StartSegmentResponseBody'].get('allocationLog',
868                            None)
869                    if lval and self.log_collector:
870                        for line in  lval.splitlines(True):
871                            self.log_collector.write(line)
872                    self.make_map(r['StartSegmentResponseBody'])
873                    if 'proof' in r: self.proof = r['proof']
874                    self.response = r
875                else:
876                    raise service_error(service_error.internal, 
877                            "Bad response!?: %s" %r)
878                return True
879            except service_error, e:
880                self.log.error("Start segment failed on %s: %s" % \
881                        (self.testbed, e))
882                return False
883
884
885
886    class terminate_segment:
887        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
888                cert_pwd=None, trusted_certs=None, caller=None):
889            self.log = log
890            self.debug = debug
891            self.cert_file = cert_file
892            self.cert_pwd = cert_pwd
893            self.trusted_certs = None
894            self.caller = caller
895            self.testbed = testbed
896
897        def __call__(self, uri, aid ):
898            req = {
899                    'allocID': {'fedid': aid }, 
900                }
901            try:
902                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
903                        self.trusted_certs)
904                return True
905            except service_error, e:
906                self.log.error("Terminate segment failed on %s: %s" % \
907                        (self.testbed, e))
908                return False
909
910    class info_segment(start_segment):
911        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
912                cert_pwd=None, trusted_certs=None, caller=None,
913                log_collector=None):
914            experiment_control_local.start_segment.__init__(self, debug, 
915                    log, testbed, cert_file, cert_pwd, trusted_certs, 
916                    caller, log_collector)
917
918        def __call__(self, uri, aid):
919            req = { 'allocID': { 'fedid' : aid } }
920
921            try:
922                self.log.debug("Calling InfoSegment at %s " % uri)
923                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
924                        self.trusted_certs)
925                if r.has_key('InfoSegmentResponseBody'):
926                    self.make_map(r['InfoSegmentResponseBody'])
927                    if 'proof' in r: self.proof = r['proof']
928                    self.response = r
929                else:
930                    raise service_error(service_error.internal, 
931                            "Bad response!?: %s" %r)
932                return True
933            except service_error, e:
934                self.log.error("Info segment failed on %s: %s" % \
935                        (self.testbed, e))
936                return False
937
938    def annotate_topology(self, top, data):
939        def add_new(ann, attr):
940            for a in ann:
941                if a not in attr: attr.append(a)
942
943        # Annotate the topology with embedding info
944        for e in top.elements:
945            if isinstance(e, topdl.Computer):
946                for s in data:
947                    ann = s.node.get(e.name, None)
948                    if ann is not None:
949                        add_new(ann[0], e.localname)
950                        e.status = ann[1]
951                        add_new(ann[2], e.service)
952                        add_new(ann[3], e.operation)
953                        break
954
955
956
957    def allocate_resources(self, allocated, masters, eid, expid, 
958            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
959            attrs=None, connInfo={}, tbmap=None, expcert=None):
960
961        started = { }           # Testbeds where a sub-experiment started
962                                # successfully
963
964        # XXX
965        fail_soft = False
966
967        if tbmap is None: tbmap = { }
968
969        log = alloc_log or self.log
970
971        tp = thread_pool(self.nthreads)
972        threads = [ ]
973        starters = [ ]
974
975        if expcert:
976            cert = expcert
977            pw = None
978        else:
979            cert = self.cert_file
980            pw = self.cert_pwd
981
982        for tb in allocated.keys():
983            # Create and start a thread to start the segment, and save it
984            # to get the return value later
985            tb_attrs = copy.copy(attrs)
986            tp.wait_for_slot()
987            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
988            base, suffix = split_testbed(tb)
989            if suffix:
990                tb_attrs.append({'attribute': 'experiment_name', 
991                    'value': "%s-%s" % (eid, suffix)})
992            else:
993                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
994            if not uri:
995                raise service_error(service_error.internal, 
996                        "Unknown testbed %s !?" % tb)
997
998            aid = tbparams[tb].allocID
999            if not aid:
1000                raise service_error(service_error.internal, 
1001                        "No alloc id for testbed %s !?" % tb)
1002
1003            s = self.start_segment(log=log, debug=self.debug,
1004                    testbed=tb, cert_file=cert,
1005                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1006                    caller=self.call_StartSegment,
1007                    log_collector=log_collector)
1008            starters.append(s)
1009            t  = pooled_thread(\
1010                    target=s, name=tb,
1011                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1012                    pdata=tp, trace_file=self.trace_file)
1013            threads.append(t)
1014            t.start()
1015
1016        # Wait until all finish (keep pinging the log, though)
1017        mins = 0
1018        revoked = False
1019        while not tp.wait_for_all_done(60.0):
1020            mins += 1
1021            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1022                    % mins)
1023            if not revoked and \
1024                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1025                # a testbed has failed.  Revoke this experiment's
1026                # synchronizarion values so that sub experiments will not
1027                # deadlock waiting for synchronization that will never happen
1028                self.log.info("A subexperiment has failed to swap in, " + \
1029                        "revoking synch keys")
1030                var_key = "fedid:%s" % expid
1031                for k in self.synch_store.all_keys():
1032                    if len(k) > 45 and k[0:46] == var_key:
1033                        self.synch_store.revoke_key(k)
1034                revoked = True
1035
1036        failed = [ t.getName() for t in threads if not t.rv ]
1037        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1038
1039        # If one failed clean up, unless fail_soft is set
1040        if failed:
1041            if not fail_soft:
1042                tp.clear()
1043                for tb in succeeded:
1044                    # Create and start a thread to stop the segment
1045                    tp.wait_for_slot()
1046                    uri = tbparams[tb].uri
1047                    t  = pooled_thread(\
1048                            target=self.terminate_segment(log=log,
1049                                testbed=tb,
1050                                cert_file=cert, 
1051                                cert_pwd=pw,
1052                                trusted_certs=self.trusted_certs,
1053                                caller=self.call_TerminateSegment),
1054                            args=(uri, tbparams[tb].allocID),
1055                            name=tb,
1056                            pdata=tp, trace_file=self.trace_file)
1057                    t.start()
1058                # Wait until all finish (if any are being stopped)
1059                if succeeded:
1060                    tp.wait_for_all_done()
1061
1062                # release the allocations
1063                for tb in tbparams.keys():
1064                    try:
1065                        self.release_access(tb, tbparams[tb].allocID, 
1066                                tbmap=tbmap, uri=tbparams[tb].uri,
1067                                cert_file=cert, cert_pwd=pw)
1068                    except service_error, e:
1069                        self.log.warn("Error releasing access: %s" % e.desc)
1070                # Remove the placeholder
1071                self.state_lock.acquire()
1072                self.state[eid].status = 'failed'
1073                self.state[eid].updated()
1074                if self.state_filename: self.write_state()
1075                self.state_lock.release()
1076                # Remove the repo dir
1077                self.remove_dirs("%s/%s" %(self.repodir, expid))
1078                # Walk up tmpdir, deleting as we go
1079                if self.cleanup:
1080                    self.remove_dirs(tmpdir)
1081                else:
1082                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1083
1084
1085                log.error("Swap in failed on %s" % ",".join(failed))
1086                return
1087        else:
1088            # Walk through the successes and gather the proofs
1089            proofs = { }
1090            for s in starters:
1091                if s.proof:
1092                    proofs[s.testbed] = s.proof
1093            self.annotate_topology(top, starters)
1094            log.info("[start_segment]: Experiment %s active" % eid)
1095
1096
1097        # Walk up tmpdir, deleting as we go
1098        if self.cleanup:
1099            self.remove_dirs(tmpdir)
1100        else:
1101            log.debug("[start_experiment]: not removing %s" % tmpdir)
1102
1103        # Insert the experiment into our state and update the disk copy.
1104        self.state_lock.acquire()
1105        self.state[expid].status = 'active'
1106        self.state[eid] = self.state[expid]
1107        self.state[eid].top = top
1108        self.state[eid].updated()
1109        # Append startup proofs
1110        for f in self.state[eid].get_all_allocations():
1111            if f.tb in proofs:
1112                f.proof.append(proofs[f.tb])
1113
1114        if self.state_filename: self.write_state()
1115        self.state_lock.release()
1116        return
1117
1118
1119    def add_kit(self, e, kit):
1120        """
1121        Add a Software object created from the list of (install, location)
1122        tuples passed as kit  to the software attribute of an object e.  We
1123        do this enough to break out the code, but it's kind of a hack to
1124        avoid changing the old tuple rep.
1125        """
1126
1127        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1128
1129        if isinstance(e.software, list): e.software.extend(s)
1130        else: e.software = s
1131
1132    def append_experiment_authorization(self, expid, attrs, 
1133            need_state_lock=True):
1134        """
1135        Append the authorization information to system state
1136        """
1137
1138        for p, a in attrs:
1139            self.auth.set_attribute(p, a)
1140        self.auth.save()
1141
1142        if need_state_lock: self.state_lock.acquire()
1143        # XXX: really a no op?
1144        #self.state[expid]['auth'].update(attrs)
1145        if self.state_filename: self.write_state()
1146        if need_state_lock: self.state_lock.release()
1147
1148    def clear_experiment_authorization(self, expid, need_state_lock=True):
1149        """
1150        Attrs is a set of attribute principal pairs that need to be removed
1151        from the authenticator.  Remove them and save the authenticator.
1152        """
1153
1154        if need_state_lock: self.state_lock.acquire()
1155        # XXX: should be a no-op
1156        #if expid in self.state and 'auth' in self.state[expid]:
1157            #for p, a in self.state[expid]['auth']:
1158                #self.auth.unset_attribute(p, a)
1159            #self.state[expid]['auth'] = set()
1160        if self.state_filename: self.write_state()
1161        if need_state_lock: self.state_lock.release()
1162        self.auth.save()
1163
1164
1165    def create_experiment_state(self, fid, req, expid, expcert,
1166            state='starting'):
1167        """
1168        Create the initial entry in the experiment's state.  The expid and
1169        expcert are the experiment's fedid and certifacte that represents that
1170        ID, which are installed in the experiment state.  If the request
1171        includes a suggested local name that is used if possible.  If the local
1172        name is already taken by an experiment owned by this user that has
1173        failed, it is overwritten.  Otherwise new letters are added until a
1174        valid localname is found.  The generated local name is returned.
1175        """
1176
1177        if req.has_key('experimentID') and \
1178                req['experimentID'].has_key('localname'):
1179            overwrite = False
1180            eid = req['experimentID']['localname']
1181            # If there's an old failed experiment here with the same local name
1182            # and accessible by this user, we'll overwrite it, otherwise we'll
1183            # fall through and do the collision avoidance.
1184            old_expid = self.get_experiment_fedid(eid)
1185            if old_expid:
1186                users_experiment = True
1187                try:
1188                    self.check_experiment_access(fid, old_expid)
1189                except service_error, e:
1190                    if e.code == service_error.access: users_experiment = False
1191                    else: raise e
1192                if users_experiment:
1193                    self.state_lock.acquire()
1194                    status = self.state[eid].status
1195                    if status and status == 'failed':
1196                        # remove the old access attributes
1197                        self.clear_experiment_authorization(eid,
1198                                need_state_lock=False)
1199                        overwrite = True
1200                        del self.state[eid]
1201                        del self.state[old_expid]
1202                    self.state_lock.release()
1203                else:
1204                    self.log.info('Experiment %s exists, ' % eid + \
1205                            'but this user cannot access it')
1206            self.state_lock.acquire()
1207            while (self.state.has_key(eid) and not overwrite):
1208                eid += random.choice(string.ascii_letters)
1209            # Initial state
1210            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1211                    identity=expcert)
1212            self.state[expid] = self.state[eid]
1213            if self.state_filename: self.write_state()
1214            self.state_lock.release()
1215        else:
1216            eid = self.exp_stem
1217            for i in range(0,5):
1218                eid += random.choice(string.ascii_letters)
1219            self.state_lock.acquire()
1220            while (self.state.has_key(eid)):
1221                eid = self.exp_stem
1222                for i in range(0,5):
1223                    eid += random.choice(string.ascii_letters)
1224            # Initial state
1225            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1226                    identity=expcert)
1227            self.state[expid] = self.state[eid]
1228            if self.state_filename: self.write_state()
1229            self.state_lock.release()
1230
1231        # Let users touch the state.  Authorize this fid and the expid itself
1232        # to touch the experiment, as well as allowing th eoverrides.
1233        self.append_experiment_authorization(eid, 
1234                set([(fid, expid), (expid,expid)] + \
1235                        [ (o, expid) for o in self.overrides]))
1236
1237        return eid
1238
1239
1240    def allocate_ips_to_topo(self, top):
1241        """
1242        Add an ip4_address attribute to all the hosts in the topology, based on
1243        the shared substrates on which they sit.  An /etc/hosts file is also
1244        created and returned as a list of hostfiles entries.  We also return
1245        the allocator, because we may need to allocate IPs to portals
1246        (specifically DRAGON portals).
1247        """
1248        subs = sorted(top.substrates, 
1249                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1250                reverse=True)
1251        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1252        ifs = { }
1253        hosts = [ ]
1254
1255        for idx, s in enumerate(subs):
1256            net_size = len(s.interfaces)+2
1257
1258            a = ips.allocate(net_size)
1259            if a :
1260                base, num = a
1261                if num < net_size: 
1262                    raise service_error(service_error.internal,
1263                            "Allocator returned wrong number of IPs??")
1264            else:
1265                raise service_error(service_error.req, 
1266                        "Cannot allocate IP addresses")
1267            mask = ips.min_alloc
1268            while mask < net_size:
1269                mask *= 2
1270
1271            netmask = ((2**32-1) ^ (mask-1))
1272
1273            base += 1
1274            for i in s.interfaces:
1275                i.attribute.append(
1276                        topdl.Attribute('ip4_address', 
1277                            "%s" % ip_addr(base)))
1278                i.attribute.append(
1279                        topdl.Attribute('ip4_netmask', 
1280                            "%s" % ip_addr(int(netmask))))
1281
1282                hname = i.element.name
1283                if ifs.has_key(hname):
1284                    hosts.append("%s\t%s-%s %s-%d" % \
1285                            (ip_addr(base), hname, s.name, hname,
1286                                ifs[hname]))
1287                else:
1288                    ifs[hname] = 0
1289                    hosts.append("%s\t%s-%s %s-%d %s" % \
1290                            (ip_addr(base), hname, s.name, hname,
1291                                ifs[hname], hname))
1292
1293                ifs[hname] += 1
1294                base += 1
1295        return hosts, ips
1296
1297    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1298            tbparam, masters, tbmap, expid=None, expcert=None):
1299        for tb in testbeds:
1300            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1301                    expcert)
1302            allocated[tb] = 1
1303
1304    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1305            expcert=None):
1306        """
1307        Get access to testbed through fedd and set the parameters for that tb
1308        """
1309        def get_export_project(svcs):
1310            """
1311            Look through for the list of federated_service for this testbed
1312            objects for a project_export service, and extract the project
1313            parameter.
1314            """
1315
1316            pe = [s for s in svcs if s.name=='project_export']
1317            if len(pe) == 1:
1318                return pe[0].params.get('project', None)
1319            elif len(pe) == 0:
1320                return None
1321            else:
1322                raise service_error(service_error.req,
1323                        "More than one project export is not supported")
1324
1325        def add_services(svcs, type, slist, keys):
1326            """
1327            Add the given services to slist.  type is import or export.  Also
1328            add a mapping entry from the assigned id to the original service
1329            record.
1330            """
1331            for i, s in enumerate(svcs):
1332                idx = '%s%d' % (type, i)
1333                keys[idx] = s
1334                sr = {'id': idx, 'name': s.name, 'visibility': type }
1335                if s.params:
1336                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1337                            for k, v in s.params.items()]
1338                slist.append(sr)
1339
1340        uri = tbmap.get(testbed_base(tb), None)
1341        if not uri:
1342            raise service_error(service_error.server_config, 
1343                    "Unknown testbed: %s" % tb)
1344
1345        export_svcs = masters.get(tb,[])
1346        import_svcs = [ s for m in masters.values() \
1347                for s in m \
1348                    if tb in s.importers ]
1349
1350        export_project = get_export_project(export_svcs)
1351        # Compose the credential list so that IDs come before attributes
1352        creds = set()
1353        keys = set()
1354        certs = self.auth.get_creds_for_principal(fid)
1355        if expid:
1356            certs.update(self.auth.get_creds_for_principal(expid))
1357        for c in certs:
1358            keys.add(c.issuer_cert())
1359            creds.add(c.attribute_cert())
1360        creds = list(keys) + list(creds)
1361
1362        if expcert: cert, pw = expcert, None
1363        else: cert, pw = self.cert_file, self.cert_pw
1364
1365        # Request credentials
1366        req = {
1367                'abac_credential': creds,
1368            }
1369        # Make the service request from the services we're importing and
1370        # exporting.  Keep track of the export request ids so we can
1371        # collect the resulting info from the access response.
1372        e_keys = { }
1373        if import_svcs or export_svcs:
1374            slist = []
1375            add_services(import_svcs, 'import', slist, e_keys)
1376            add_services(export_svcs, 'export', slist, e_keys)
1377            req['service'] = slist
1378
1379        if self.local_access.has_key(uri):
1380            # Local access call
1381            req = { 'RequestAccessRequestBody' : req }
1382            r = self.local_access[uri].RequestAccess(req, 
1383                    fedid(file=self.cert_file))
1384            r = { 'RequestAccessResponseBody' : r }
1385        else:
1386            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1387
1388        if r.has_key('RequestAccessResponseBody'):
1389            # Through to here we have a valid response, not a fault.
1390            # Access denied is a fault, so something better or worse than
1391            # access denied has happened.
1392            r = r['RequestAccessResponseBody']
1393            self.log.debug("[get_access] Access granted")
1394        else:
1395            raise service_error(service_error.protocol,
1396                        "Bad proxy response")
1397        if 'proof' not in r:
1398            raise service_error(service_error.protocol,
1399                        "Bad access response (no access proof)")
1400
1401        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1402                tb=tb, uri=uri, proof=[r['proof']], 
1403                services=masters.get(tb, None))
1404
1405        # Collect the responses corresponding to the services this testbed
1406        # exports.  These will be the service requests that we will include in
1407        # the start segment requests (with appropriate visibility values) to
1408        # import and export the segments.
1409        for s in r.get('service', []):
1410            id = s.get('id', None)
1411            # Note that this attaches the response to the object in the masters
1412            # data structure.  (The e_keys index disappears when this fcn
1413            # returns)
1414            if id and id in e_keys:
1415                e_keys[id].reqs.append(s)
1416
1417        # Add attributes to parameter space.  We don't allow attributes to
1418        # overlay any parameters already installed.
1419        for a in r.get('fedAttr', []):
1420            try:
1421                if a['attribute']:
1422                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1423            except KeyError:
1424                self.log.error("Bad attribute in response: %s" % a)
1425
1426
1427    def split_topology(self, top, topo, testbeds):
1428        """
1429        Create the sub-topologies that are needed for experiment instantiation.
1430        """
1431        for tb in testbeds:
1432            topo[tb] = top.clone()
1433            # copy in for loop allows deletions from the original
1434            for e in [ e for e in topo[tb].elements]:
1435                etb = e.get_attribute('testbed')
1436                # NB: elements without a testbed attribute won't appear in any
1437                # sub topologies. 
1438                if not etb or etb != tb:
1439                    for i in e.interface:
1440                        for s in i.subs:
1441                            try:
1442                                s.interfaces.remove(i)
1443                            except ValueError:
1444                                raise service_error(service_error.internal,
1445                                        "Can't remove interface??")
1446                    topo[tb].elements.remove(e)
1447            topo[tb].make_indices()
1448
1449    def confirm_software(self, top):
1450        """
1451        Make sure that the software to be loaded in the topo is all available
1452        before we begin making access requests, etc.  This is a subset of
1453        wrangle_software.
1454        """
1455        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1456        pkgs.update([x.location for e in top.elements for x in e.software])
1457
1458        for pkg in pkgs:
1459            loc = pkg
1460
1461            scheme, host, path = urlparse(loc)[0:3]
1462            dest = os.path.basename(path)
1463            if not scheme:
1464                if not loc.startswith('/'):
1465                    loc = "/%s" % loc
1466                loc = "file://%s" %loc
1467            # NB: if scheme was found, loc == pkg
1468            try:
1469                u = urlopen(loc)
1470                u.close()
1471            except Exception, e:
1472                raise service_error(service_error.req, 
1473                        "Cannot open %s: %s" % (loc, e))
1474        return True
1475
1476    def wrangle_software(self, expid, top, topo, tbparams):
1477        """
1478        Copy software out to the repository directory, allocate permissions and
1479        rewrite the segment topologies to look for the software in local
1480        places.
1481        """
1482
1483        # Copy the rpms and tarfiles to a distribution directory from
1484        # which the federants can retrieve them
1485        linkpath = "%s/software" %  expid
1486        softdir ="%s/%s" % ( self.repodir, linkpath)
1487        softmap = { }
1488
1489        # self.fedkit and self.gateway kit are lists of tuples of
1490        # (install_location, download_location) this extracts the download
1491        # locations.
1492        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1493        pkgs.update([x.location for e in top.elements for x in e.software])
1494        try:
1495            os.makedirs(softdir)
1496        except EnvironmentError, e:
1497            raise service_error(
1498                    "Cannot create software directory: %s" % e)
1499        # The actual copying.  Everything's converted into a url for copying.
1500        auth_attrs = set()
1501        for pkg in pkgs:
1502            loc = pkg
1503
1504            scheme, host, path = urlparse(loc)[0:3]
1505            dest = os.path.basename(path)
1506            if not scheme:
1507                if not loc.startswith('/'):
1508                    loc = "/%s" % loc
1509                loc = "file://%s" %loc
1510            # NB: if scheme was found, loc == pkg
1511            try:
1512                u = urlopen(loc)
1513            except Exception, e:
1514                raise service_error(service_error.req, 
1515                        "Cannot open %s: %s" % (loc, e))
1516            try:
1517                f = open("%s/%s" % (softdir, dest) , "w")
1518                self.log.debug("Writing %s/%s" % (softdir,dest) )
1519                data = u.read(4096)
1520                while data:
1521                    f.write(data)
1522                    data = u.read(4096)
1523                f.close()
1524                u.close()
1525            except Exception, e:
1526                raise service_error(service_error.internal,
1527                        "Could not copy %s: %s" % (loc, e))
1528            path = re.sub("/tmp", "", linkpath)
1529            # XXX
1530            softmap[pkg] = \
1531                    "%s/%s/%s" %\
1532                    ( self.repo_url, path, dest)
1533
1534            # Allow the individual segments to access the software by assigning
1535            # an attribute to each testbed allocation that encodes the data to
1536            # be released.  This expression collects the data for each run of
1537            # the loop.
1538            auth_attrs.update([
1539                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1540                        for tb in tbparams.keys()])
1541
1542        self.append_experiment_authorization(expid, auth_attrs)
1543
1544        # Convert the software locations in the segments into the local
1545        # copies on this host
1546        for soft in [ s for tb in topo.values() \
1547                for e in tb.elements \
1548                    if getattr(e, 'software', False) \
1549                        for s in e.software ]:
1550            if softmap.has_key(soft.location):
1551                soft.location = softmap[soft.location]
1552
1553
1554    def new_experiment(self, req, fid):
1555        """
1556        The external interface to empty initial experiment creation called from
1557        the dispatcher.
1558
1559        Creates a working directory, splits the incoming description using the
1560        splitter script and parses out the avrious subsections using the
1561        lcasses above.  Once each sub-experiment is created, use pooled threads
1562        to instantiate them and start it all up.
1563        """
1564        req = req.get('NewRequestBody', None)
1565        if not req:
1566            raise service_error(service_error.req,
1567                    "Bad request format (no NewRequestBody)")
1568
1569        if self.auth.import_credentials(data_list=req.get('credential', [])):
1570            self.auth.save()
1571
1572        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1573                with_proof=True)
1574
1575        if not access_ok:
1576            raise service_error(service_error.access, "New access denied",
1577                    proof=[proof])
1578
1579        try:
1580            tmpdir = tempfile.mkdtemp(prefix="split-")
1581        except EnvironmentError:
1582            raise service_error(service_error.internal, "Cannot create tmp dir")
1583
1584        try:
1585            access_user = self.accessdb[fid]
1586        except KeyError:
1587            raise service_error(service_error.internal,
1588                    "Access map and authorizer out of sync in " + \
1589                            "new_experiment for fedid %s"  % fid)
1590
1591        # Generate an ID for the experiment (slice) and a certificate that the
1592        # allocator can use to prove they own it.  We'll ship it back through
1593        # the encrypted connection.  If the requester supplied one, use it.
1594        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1595            expcert = req['experimentAccess']['X509']
1596            expid = fedid(certstr=expcert)
1597            self.state_lock.acquire()
1598            if expid in self.state:
1599                self.state_lock.release()
1600                raise service_error(service_error.req, 
1601                        'fedid %s identifies an existing experiment' % expid)
1602            self.state_lock.release()
1603        else:
1604            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1605
1606        #now we're done with the tmpdir, and it should be empty
1607        if self.cleanup:
1608            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1609            os.rmdir(tmpdir)
1610        else:
1611            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1612
1613        eid = self.create_experiment_state(fid, req, expid, expcert, 
1614                state='empty')
1615
1616        rv = {
1617                'experimentID': [
1618                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1619                ],
1620                'experimentStatus': 'empty',
1621                'experimentAccess': { 'X509' : expcert },
1622                'proof': proof.to_dict(),
1623            }
1624
1625        return rv
1626
1627    # create_experiment sub-functions
1628
1629    @staticmethod
1630    def get_experiment_key(req, field='experimentID'):
1631        """
1632        Parse the experiment identifiers out of the request (the request body
1633        tag has been removed).  Specifically this pulls either the fedid or the
1634        localname out of the experimentID field.  A fedid is preferred.  If
1635        neither is present or the request does not contain the fields,
1636        service_errors are raised.
1637        """
1638        # Get the experiment access
1639        exp = req.get(field, None)
1640        if exp:
1641            if exp.has_key('fedid'):
1642                key = exp['fedid']
1643            elif exp.has_key('localname'):
1644                key = exp['localname']
1645            else:
1646                raise service_error(service_error.req, "Unknown lookup type")
1647        else:
1648            raise service_error(service_error.req, "No request?")
1649
1650        return key
1651
1652    def get_experiment_ids_and_start(self, key, tmpdir):
1653        """
1654        Get the experiment name, id and access certificate from the state, and
1655        set the experiment state to 'starting'.  returns a triple (fedid,
1656        localname, access_cert_file). The access_cert_file is a copy of the
1657        contents of the access certificate, created in the tempdir with
1658        restricted permissions.  If things are confused, raise an exception.
1659        """
1660
1661        expid = eid = None
1662        self.state_lock.acquire()
1663        if key in self.state:
1664            exp = self.state[key]
1665            exp.status = "starting"
1666            exp.updated()
1667            expid = exp.fedid
1668            eid = exp.localname
1669            expcert = exp.identity
1670        self.state_lock.release()
1671
1672        # make a protected copy of the access certificate so the experiment
1673        # controller can act as the experiment principal.
1674        if expcert:
1675            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1676            if not expcert_file:
1677                raise service_error(service_error.internal, 
1678                        "Cannot create temp cert file?")
1679        else:
1680            expcert_file = None
1681
1682        return (eid, expid, expcert_file)
1683
1684    def get_topology(self, req, tmpdir):
1685        """
1686        Get the ns2 content and put it into a file for parsing.  Call the local
1687        or remote parser and return the topdl.Topology.  Errors result in
1688        exceptions.  req is the request and tmpdir is a work directory.
1689        """
1690
1691        # The tcl parser needs to read a file so put the content into that file
1692        descr=req.get('experimentdescription', None)
1693        if descr:
1694            if 'ns2description' in descr:
1695                file_content=descr['ns2description']
1696            elif 'topdldescription' in descr:
1697                return topdl.Topology(**descr['topdldescription'])
1698            else:
1699                raise service_error(service_error.req, 
1700                        'Unknown experiment description type')
1701        else:
1702            raise service_error(service_error.req, "No experiment description")
1703
1704
1705        if self.splitter_url:
1706            self.log.debug("Calling remote topdl translator at %s" % \
1707                    self.splitter_url)
1708            top = self.remote_ns2topdl(self.splitter_url, file_content)
1709        else:
1710            tclfile = os.path.join(tmpdir, "experiment.tcl")
1711            if file_content:
1712                try:
1713                    f = open(tclfile, 'w')
1714                    f.write(file_content)
1715                    f.close()
1716                except EnvironmentError:
1717                    raise service_error(service_error.internal,
1718                            "Cannot write temp experiment description")
1719            else:
1720                raise service_error(service_error.req, 
1721                        "Only ns2descriptions supported")
1722            pid = "dummy"
1723            gid = "dummy"
1724            eid = "dummy"
1725
1726            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1727                str(self.muxmax), '-m', 'dummy']
1728
1729            tclcmd.extend([pid, gid, eid, tclfile])
1730
1731            self.log.debug("running local splitter %s", " ".join(tclcmd))
1732            # This is just fantastic.  As a side effect the parser copies
1733            # tb_compat.tcl into the current directory, so that directory
1734            # must be writable by the fedd user.  Doing this in the
1735            # temporary subdir ensures this is the case.
1736            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1737                    cwd=tmpdir)
1738            split_data = tclparser.stdout
1739
1740            top = topdl.topology_from_xml(file=split_data, top="experiment")
1741            os.remove(tclfile)
1742
1743        return top
1744
1745    def get_testbed_services(self, req, testbeds):
1746        """
1747        Parse the services section of the request into two dicts mapping
1748        testbed to lists of federated_service objects.  The first dict maps all
1749        exporters of services to those service objects, the second maps
1750        testbeds to service objects only for services requiring portals.
1751        """
1752        # We construct both dicts here because deriving the second is more
1753        # comples than it looks - both the keys and lists can differ, and it's
1754        # much easier to generate both in one pass.
1755        masters = { }
1756        pmasters = { }
1757        for s in req.get('service', []):
1758            # If this is a service request with the importall field
1759            # set, fill it out.
1760
1761            if s.get('importall', False):
1762                s['import'] = [ tb for tb in testbeds \
1763                        if tb not in s.get('export',[])]
1764                del s['importall']
1765
1766            # Add the service to masters
1767            for tb in s.get('export', []):
1768                if s.get('name', None):
1769
1770                    params = { }
1771                    for a in s.get('fedAttr', []):
1772                        params[a.get('attribute', '')] = a.get('value','')
1773
1774                    fser = federated_service(name=s['name'],
1775                            exporter=tb, importers=s.get('import',[]),
1776                            params=params)
1777                    if fser.name == 'hide_hosts' \
1778                            and 'hosts' not in fser.params:
1779                        fser.params['hosts'] = \
1780                                ",".join(tb_hosts.get(fser.exporter, []))
1781                    if tb in masters: masters[tb].append(fser)
1782                    else: masters[tb] = [fser]
1783
1784                    if fser.portal:
1785                        if tb in pmasters: pmasters[tb].append(fser)
1786                        else: pmasters[tb] = [fser]
1787                else:
1788                    self.log.error('Testbed service does not have name " + \
1789                            "and importers')
1790        return masters, pmasters
1791
1792    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1793        """
1794        Create the ssh keys necessary for interconnecting the portal nodes and
1795        the global hosts file for letting each segment know about the IP
1796        addresses in play.  Save these into the repo.  Add attributes to the
1797        autorizer allowing access controllers to download them and return a set
1798        of attributes that inform the segments where to find this stuff.  May
1799        raise service_errors in if there are problems.
1800        """
1801        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1802        gw_secretkey_base = "fed.%s" % self.ssh_type
1803        keydir = os.path.join(tmpdir, 'keys')
1804        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1805        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1806
1807        try:
1808            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1809        except ValueError:
1810            raise service_error(service_error.server_config, 
1811                    "Bad key type (%s)" % self.ssh_type)
1812
1813        self.generate_seer_certs(keydir)
1814
1815        # Copy configuration files into the remote file store
1816        # The config urlpath
1817        configpath = "/%s/config" % expid
1818        # The config file system location
1819        configdir ="%s%s" % ( self.repodir, configpath)
1820        try:
1821            os.makedirs(configdir)
1822        except EnvironmentError, e:
1823            raise service_error(service_error.internal,
1824                    "Cannot create config directory: %s" % e)
1825        try:
1826            f = open("%s/hosts" % configdir, "w")
1827            print >> f, string.join(hosts, '\n')
1828            f.close()
1829        except EnvironmentError, e:
1830            raise service_error(service_error.internal, 
1831                    "Cannot write hosts file: %s" % e)
1832        try:
1833            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1834            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1835            copy_file(os.path.join(keydir, 'ca.pem'), 
1836                    os.path.join(configdir, 'ca.pem'))
1837            copy_file(os.path.join(keydir, 'node.pem'), 
1838                    os.path.join(configdir, 'node.pem'))
1839        except EnvironmentError, e:
1840            raise service_error(service_error.internal, 
1841                    "Cannot copy keyfiles: %s" % e)
1842
1843        # Allow the individual testbeds to access the configuration files,
1844        # again by setting an attribute for the relevant pathnames on each
1845        # allocation principal.  Yeah, that's a long list comprehension.
1846        self.append_experiment_authorization(expid, set([
1847            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1848                    for tb in tbparams.keys() \
1849                        for f in ("hosts", 'ca.pem', 'node.pem', 
1850                            gw_secretkey_base, gw_pubkey_base)]))
1851
1852        attrs = [ 
1853                {
1854                    'attribute': 'ssh_pubkey', 
1855                    'value': '%s/%s/config/%s' % \
1856                            (self.repo_url, expid, gw_pubkey_base)
1857                },
1858                {
1859                    'attribute': 'ssh_secretkey', 
1860                    'value': '%s/%s/config/%s' % \
1861                            (self.repo_url, expid, gw_secretkey_base)
1862                },
1863                {
1864                    'attribute': 'hosts', 
1865                    'value': '%s/%s/config/hosts' % \
1866                            (self.repo_url, expid)
1867                },
1868                {
1869                    'attribute': 'seer_ca_pem', 
1870                    'value': '%s/%s/config/%s' % \
1871                            (self.repo_url, expid, 'ca.pem')
1872                },
1873                {
1874                    'attribute': 'seer_node_pem', 
1875                    'value': '%s/%s/config/%s' % \
1876                            (self.repo_url, expid, 'node.pem')
1877                },
1878            ]
1879        return attrs
1880
1881
1882    def get_vtopo(self, req, fid):
1883        """
1884        Return the stored virtual topology for this experiment
1885        """
1886        rv = None
1887        state = None
1888
1889        req = req.get('VtopoRequestBody', None)
1890        if not req:
1891            raise service_error(service_error.req,
1892                    "Bad request format (no VtopoRequestBody)")
1893        exp = req.get('experiment', None)
1894        if exp:
1895            if exp.has_key('fedid'):
1896                key = exp['fedid']
1897                keytype = "fedid"
1898            elif exp.has_key('localname'):
1899                key = exp['localname']
1900                keytype = "localname"
1901            else:
1902                raise service_error(service_error.req, "Unknown lookup type")
1903        else:
1904            raise service_error(service_error.req, "No request?")
1905
1906        proof = self.check_experiment_access(fid, key)
1907
1908        self.state_lock.acquire()
1909        # XXX: this needs to be recalculated
1910        if key in self.state:
1911            if self.state[key].top is not None:
1912                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1913                rv = { 'experiment' : {keytype: key },
1914                        'vtopo': vtopo,
1915                        'proof': proof.to_dict(), 
1916                    }
1917            else:
1918                state = self.state[key].status
1919        self.state_lock.release()
1920
1921        if rv: return rv
1922        else: 
1923            if state:
1924                raise service_error(service_error.partial, 
1925                        "Not ready: %s" % state)
1926            else:
1927                raise service_error(service_error.req, "No such experiment")
1928
1929    def get_vis(self, req, fid):
1930        """
1931        Return the stored visualization for this experiment
1932        """
1933        rv = None
1934        state = None
1935
1936        req = req.get('VisRequestBody', None)
1937        if not req:
1938            raise service_error(service_error.req,
1939                    "Bad request format (no VisRequestBody)")
1940        exp = req.get('experiment', None)
1941        if exp:
1942            if exp.has_key('fedid'):
1943                key = exp['fedid']
1944                keytype = "fedid"
1945            elif exp.has_key('localname'):
1946                key = exp['localname']
1947                keytype = "localname"
1948            else:
1949                raise service_error(service_error.req, "Unknown lookup type")
1950        else:
1951            raise service_error(service_error.req, "No request?")
1952
1953        proof = self.check_experiment_access(fid, key)
1954
1955        self.state_lock.acquire()
1956        # Generate the visualization
1957        if key in self.state:
1958            if self.state[key].top is not None:
1959                try:
1960                    vis = self.genviz(
1961                            topdl.topology_to_vtopo(self.state[key].topo))
1962                except service_error, e:
1963                    self.state_lock.release()
1964                    raise e
1965                rv =  { 'experiment' : {keytype: key },
1966                        'vis': vis,
1967                        'proof': proof.to_dict(), 
1968                        }
1969            else:
1970                state = self.state[key].status
1971        self.state_lock.release()
1972
1973        if rv: return rv
1974        else:
1975            if state:
1976                raise service_error(service_error.partial, 
1977                        "Not ready: %s" % state)
1978            else:
1979                raise service_error(service_error.req, "No such experiment")
1980
1981   
1982    def save_federant_information(self, allocated, tbparams, eid, top):
1983        """
1984        Store the various data that have changed in the experiment state
1985        between when it was started and the beginning of resource allocation.
1986        This is basically the information about each local allocation.  This
1987        fills in the values of the placeholder allocation in the state.  It
1988        also collects the access proofs and returns them as dicts for a
1989        response message.
1990        """
1991        self.state_lock.acquire()
1992        exp = self.state[eid]
1993        exp.top = top.clone()
1994        # save federant information
1995        for k in allocated.keys():
1996            exp.add_allocation(tbparams[k])
1997            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
1998                type="testbed", localname=[k], 
1999                service=[ s.to_topdl() for s in tbparams[k].services]))
2000
2001        # Access proofs for the response message
2002        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2003                    for p in tbparams[k].proof]
2004        exp.updated()
2005        if self.state_filename: 
2006            self.write_state()
2007        self.state_lock.release()
2008        return proofs
2009
2010    def clear_placeholder(self, eid, expid, tmpdir):
2011        """
2012        Clear the placeholder and remove any allocated temporary dir.
2013        """
2014
2015        self.state_lock.acquire()
2016        del self.state[eid]
2017        del self.state[expid]
2018        if self.state_filename: self.write_state()
2019        self.state_lock.release()
2020        if tmpdir and self.cleanup:
2021            self.remove_dirs(tmpdir)
2022
2023    # end of create_experiment sub-functions
2024
2025    def create_experiment(self, req, fid):
2026        """
2027        The external interface to experiment creation called from the
2028        dispatcher.
2029
2030        Creates a working directory, splits the incoming description using the
2031        splitter script and parses out the various subsections using the
2032        classes above.  Once each sub-experiment is created, use pooled threads
2033        to instantiate them and start it all up.
2034        """
2035
2036        req = req.get('CreateRequestBody', None)
2037        if req:
2038            key = self.get_experiment_key(req)
2039        else:
2040            raise service_error(service_error.req,
2041                    "Bad request format (no CreateRequestBody)")
2042
2043        # Import information from the requester
2044        if self.auth.import_credentials(data_list=req.get('credential', [])):
2045            self.auth.save()
2046
2047        # Make sure that the caller can talk to us
2048        proof = self.check_experiment_access(fid, key)
2049
2050        # Install the testbed map entries supplied with the request into a copy
2051        # of the testbed map.
2052        tbmap = dict(self.tbmap)
2053        for m in req.get('testbedmap', []):
2054            if 'testbed' in m and 'uri' in m:
2055                tbmap[m['testbed']] = m['uri']
2056
2057        # a place to work
2058        try:
2059            tmpdir = tempfile.mkdtemp(prefix="split-")
2060            os.mkdir(tmpdir+"/keys")
2061        except EnvironmentError:
2062            raise service_error(service_error.internal, "Cannot create tmp dir")
2063
2064        tbparams = { }
2065
2066        eid, expid, expcert_file = \
2067                self.get_experiment_ids_and_start(key, tmpdir)
2068
2069        # This catches exceptions to clear the placeholder if necessary
2070        try: 
2071            if not (eid and expid):
2072                raise service_error(service_error.internal, 
2073                        "Cannot find local experiment info!?")
2074
2075            top = self.get_topology(req, tmpdir)
2076            self.confirm_software(top)
2077            # Assign the IPs
2078            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2079            # Find the testbeds to look up
2080            tb_hosts = { }
2081            testbeds = [ ]
2082            for e in top.elements:
2083                if isinstance(e, topdl.Computer):
2084                    tb = e.get_attribute('testbed') or 'default'
2085                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2086                    else: 
2087                        tb_hosts[tb] = [ e.name ]
2088                        testbeds.append(tb)
2089
2090            masters, pmasters = self.get_testbed_services(req, testbeds)
2091            allocated = { }         # Testbeds we can access
2092            topo ={ }               # Sub topologies
2093            connInfo = { }          # Connection information
2094
2095            self.split_topology(top, topo, testbeds)
2096
2097            self.get_access_to_testbeds(testbeds, fid, allocated, 
2098                    tbparams, masters, tbmap, expid, expcert_file)
2099
2100            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2101
2102            part = experiment_partition(self.auth, self.store_url, tbmap,
2103                    self.muxmax, self.direct_transit)
2104            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2105                    connInfo, expid)
2106
2107            auth_attrs = set()
2108            # Now get access to the dynamic testbeds (those added above)
2109            for tb in [ t for t in topo if t not in allocated]:
2110                self.get_access(tb, tbparams, fid, masters, tbmap, 
2111                        expid, expcert_file)
2112                allocated[tb] = 1
2113                store_keys = topo[tb].get_attribute('store_keys')
2114                # Give the testbed access to keys it exports or imports
2115                if store_keys:
2116                    auth_attrs.update(set([
2117                        (tbparams[tb].allocID, sk) \
2118                                for sk in store_keys.split(" ")]))
2119
2120            if auth_attrs:
2121                self.append_experiment_authorization(expid, auth_attrs)
2122
2123            # transit and disconnected testbeds may not have a connInfo entry.
2124            # Fill in the blanks.
2125            for t in allocated.keys():
2126                if not connInfo.has_key(t):
2127                    connInfo[t] = { }
2128
2129            self.wrangle_software(expid, top, topo, tbparams)
2130
2131            proofs = self.save_federant_information(allocated, tbparams, 
2132                    eid, top)
2133        except service_error, e:
2134            # If something goes wrong in the parse (usually an access error)
2135            # clear the placeholder state.  From here on out the code delays
2136            # exceptions.  Failing at this point returns a fault to the remote
2137            # caller.
2138            self.clear_placeholder(eid, expid, tmpdir)
2139            raise e
2140
2141        # Start the background swapper and return the starting state.  From
2142        # here on out, the state will stick around a while.
2143
2144        # Create a logger that logs to the experiment's state object as well as
2145        # to the main log file.
2146        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2147        alloc_collector = self.list_log(self.state[eid].log)
2148        h = logging.StreamHandler(alloc_collector)
2149        # XXX: there should be a global one of these rather than repeating the
2150        # code.
2151        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2152                    '%d %b %y %H:%M:%S'))
2153        alloc_log.addHandler(h)
2154
2155        # Start a thread to do the resource allocation
2156        t  = Thread(target=self.allocate_resources,
2157                args=(allocated, masters, eid, expid, tbparams, 
2158                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2159                    connInfo, tbmap, expcert_file),
2160                name=eid)
2161        t.start()
2162
2163        rv = {
2164                'experimentID': [
2165                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2166                ],
2167                'experimentStatus': 'starting',
2168                'proof': [ proof.to_dict() ] + proofs,
2169            }
2170
2171        return rv
2172   
2173    def get_experiment_fedid(self, key):
2174        """
2175        find the fedid associated with the localname key in the state database.
2176        """
2177
2178        rv = None
2179        self.state_lock.acquire()
2180        if key in self.state:
2181            rv = self.state[key].fedid
2182        self.state_lock.release()
2183        return rv
2184
2185    def check_experiment_access(self, fid, key):
2186        """
2187        Confirm that the fid has access to the experiment.  Though a request
2188        may be made in terms of a local name, the access attribute is always
2189        the experiment's fedid.
2190        """
2191        if not isinstance(key, fedid):
2192            key = self.get_experiment_fedid(key)
2193
2194        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2195
2196        if access_ok:
2197            return proof
2198        else:
2199            raise service_error(service_error.access, "Access Denied",
2200                proof)
2201
2202
2203    def get_handler(self, path, fid):
2204        """
2205        Perhaps surprisingly named, this function handles HTTP GET requests to
2206        this server (SOAP requests are POSTs).
2207        """
2208        self.log.info("Get handler %s %s" % (path, fid))
2209        # XXX: log proofs?
2210        if self.auth.check_attribute(fid, path):
2211            return ("%s/%s" % (self.repodir, path), "application/binary")
2212        else:
2213            return (None, None)
2214
2215    def update_info(self, key, force=False):
2216        top = None
2217        self.state_lock.acquire()
2218        if key in self.state:
2219            if force or self.state[key].older_than(self.info_cache_limit):
2220                top = self.state[key].top
2221                if top is not None: top = top.clone()
2222                d1, info_params, cert, d2 = \
2223                        self.get_segment_info(self.state[key], need_lock=False)
2224        self.state_lock.release()
2225
2226        if top is None: return
2227
2228        try:
2229            tmpdir = tempfile.mkdtemp(prefix="info-")
2230        except EnvironmentError:
2231            raise service_error(service_error.internal, 
2232                    "Cannot create tmp dir")
2233        cert_file = self.make_temp_certfile(cert, tmpdir)
2234
2235        data = []
2236        try:
2237            for k, (uri, aid) in info_params.items():
2238                info=self.info_segment(log=self.log, testbed=uri,
2239                            cert_file=cert_file, cert_pwd=None,
2240                            trusted_certs=self.trusted_certs,
2241                            caller=self.call_InfoSegment)
2242                info(uri, aid)
2243                data.append(info)
2244        # Clean up the tmpdir no matter what
2245        finally:
2246            if tmpdir: self.remove_dirs(tmpdir)
2247
2248        self.annotate_topology(top, data)
2249        self.state_lock.acquire()
2250        if key in self.state:
2251            self.state[key].top = top
2252            self.state[key].updated()
2253            if self.state_filename: self.write_state()
2254        self.state_lock.release()
2255
2256   
2257    def get_info(self, req, fid):
2258        """
2259        Return all the stored info about this experiment
2260        """
2261        rv = None
2262
2263        req = req.get('InfoRequestBody', None)
2264        if not req:
2265            raise service_error(service_error.req,
2266                    "Bad request format (no InfoRequestBody)")
2267        exp = req.get('experiment', None)
2268        legacy = req.get('legacy', False)
2269        fresh = req.get('fresh', False)
2270        if exp:
2271            if exp.has_key('fedid'):
2272                key = exp['fedid']
2273                keytype = "fedid"
2274            elif exp.has_key('localname'):
2275                key = exp['localname']
2276                keytype = "localname"
2277            else:
2278                raise service_error(service_error.req, "Unknown lookup type")
2279        else:
2280            raise service_error(service_error.req, "No request?")
2281
2282        proof = self.check_experiment_access(fid, key)
2283
2284        self.update_info(key, fresh)
2285
2286        self.state_lock.acquire()
2287        if self.state.has_key(key):
2288            rv = self.state[key].get_info()
2289            # Copy the topo if we need legacy annotations
2290            if legacy:
2291                top = self.state[key].top
2292                if top is not None: top = top.clone()
2293        self.state_lock.release()
2294
2295        # If the legacy visualization and topology representations are
2296        # requested, calculate them and add them to the return.
2297        if legacy and rv is not None:
2298            if top is not None:
2299                vtopo = topdl.topology_to_vtopo(top)
2300                if vtopo is not None:
2301                    rv['vtopo'] = vtopo
2302                    try:
2303                        vis = self.genviz(vtopo)
2304                    except service_error, e:
2305                        self.log.debug('Problem generating visualization: %s' \
2306                                % e)
2307                        vis = None
2308                    if vis is not None:
2309                        rv['vis'] = vis
2310        if rv:
2311            rv['proof'] = proof.to_dict()
2312            return rv
2313        else:
2314            raise service_error(service_error.req, "No such experiment")
2315
2316    def get_multi_info(self, req, fid):
2317        """
2318        Return all the stored info that this fedid can access
2319        """
2320        rv = { 'info': [ ], 'proof': [ ] }
2321
2322        self.state_lock.acquire()
2323        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2324            try:
2325                proof = self.check_experiment_access(fid, key)
2326            except service_error, e:
2327                if e.code == service_error.access:
2328                    continue
2329                else:
2330                    self.state_lock.release()
2331                    raise e
2332
2333            if self.state.has_key(key):
2334                e = self.state[key].get_info()
2335                e['proof'] = proof.to_dict()
2336                rv['info'].append(e)
2337                rv['proof'].append(proof.to_dict())
2338        self.state_lock.release()
2339        return rv
2340
2341    def check_termination_status(self, fed_exp, force):
2342        """
2343        Confirm that the experiment is sin a valid state to stop (or force it)
2344        return the state - invalid states for deletion and force settings cause
2345        exceptions.
2346        """
2347        self.state_lock.acquire()
2348        status = fed_exp.status
2349
2350        if status:
2351            if status in ('starting', 'terminating'):
2352                if not force:
2353                    self.state_lock.release()
2354                    raise service_error(service_error.partial, 
2355                            'Experiment still being created or destroyed')
2356                else:
2357                    self.log.warning('Experiment in %s state ' % status + \
2358                            'being terminated by force.')
2359            self.state_lock.release()
2360            return status
2361        else:
2362            # No status??? trouble
2363            self.state_lock.release()
2364            raise service_error(service_error.internal,
2365                    "Experiment has no status!?")
2366
2367    def get_segment_info(self, fed_exp, need_lock=True):
2368        ids = []
2369        term_params = { }
2370        if need_lock: self.state_lock.acquire()
2371        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2372        expcert = fed_exp.identity
2373        repo = "%s" % fed_exp.fedid
2374
2375        # Collect the allocation/segment ids into a dict keyed by the fedid
2376        # of the allocation that contains a tuple of uri, aid
2377        for i, fed in enumerate(fed_exp.get_all_allocations()):
2378            uri = fed.uri
2379            aid = fed.allocID
2380            term_params[aid] = (uri, aid)
2381        if need_lock: self.state_lock.release()
2382        return ids, term_params, expcert, repo
2383
2384
2385    def get_termination_info(self, fed_exp):
2386        self.state_lock.acquire()
2387        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2388        # Change the experiment state
2389        fed_exp.status = 'terminating'
2390        fed_exp.updated()
2391        if self.state_filename: self.write_state()
2392        self.state_lock.release()
2393
2394        return ids, term_params, expcert, repo
2395
2396
2397    def deallocate_resources(self, term_params, expcert, status, force, 
2398            dealloc_log):
2399        tmpdir = None
2400        # This try block makes sure the tempdir is cleared
2401        try:
2402            # If no expcert, try the deallocation as the experiment
2403            # controller instance.
2404            if expcert and self.auth_type != 'legacy': 
2405                try:
2406                    tmpdir = tempfile.mkdtemp(prefix="term-")
2407                except EnvironmentError:
2408                    raise service_error(service_error.internal, 
2409                            "Cannot create tmp dir")
2410                cert_file = self.make_temp_certfile(expcert, tmpdir)
2411                pw = None
2412            else: 
2413                cert_file = self.cert_file
2414                pw = self.cert_pwd
2415
2416            # Stop everyone.  NB, wait_for_all waits until a thread starts
2417            # and then completes, so we can't wait if nothing starts.  So,
2418            # no tbparams, no start.
2419            if len(term_params) > 0:
2420                tp = thread_pool(self.nthreads)
2421                for k, (uri, aid) in term_params.items():
2422                    # Create and start a thread to stop the segment
2423                    tp.wait_for_slot()
2424                    t  = pooled_thread(\
2425                            target=self.terminate_segment(log=dealloc_log,
2426                                testbed=uri,
2427                                cert_file=cert_file, 
2428                                cert_pwd=pw,
2429                                trusted_certs=self.trusted_certs,
2430                                caller=self.call_TerminateSegment),
2431                            args=(uri, aid), name=k,
2432                            pdata=tp, trace_file=self.trace_file)
2433                    t.start()
2434                # Wait for completions
2435                tp.wait_for_all_done()
2436
2437            # release the allocations (failed experiments have done this
2438            # already, and starting experiments may be in odd states, so we
2439            # ignore errors releasing those allocations
2440            try: 
2441                for k, (uri, aid)  in term_params.items():
2442                    self.release_access(None, aid, uri=uri,
2443                            cert_file=cert_file, cert_pwd=pw)
2444            except service_error, e:
2445                if status != 'failed' and not force:
2446                    raise e
2447
2448        # Clean up the tmpdir no matter what
2449        finally:
2450            if tmpdir: self.remove_dirs(tmpdir)
2451
2452    def terminate_experiment(self, req, fid):
2453        """
2454        Swap this experiment out on the federants and delete the shared
2455        information
2456        """
2457        tbparams = { }
2458        req = req.get('TerminateRequestBody', None)
2459        if not req:
2460            raise service_error(service_error.req,
2461                    "Bad request format (no TerminateRequestBody)")
2462
2463        key = self.get_experiment_key(req, 'experiment')
2464        proof = self.check_experiment_access(fid, key)
2465        exp = req.get('experiment', False)
2466        force = req.get('force', False)
2467
2468        dealloc_list = [ ]
2469
2470
2471        # Create a logger that logs to the dealloc_list as well as to the main
2472        # log file.
2473        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2474        h = logging.StreamHandler(self.list_log(dealloc_list))
2475        # XXX: there should be a global one of these rather than repeating the
2476        # code.
2477        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2478                    '%d %b %y %H:%M:%S'))
2479        dealloc_log.addHandler(h)
2480
2481        self.state_lock.acquire()
2482        fed_exp = self.state.get(key, None)
2483        self.state_lock.release()
2484        repo = None
2485
2486        if fed_exp:
2487            status = self.check_termination_status(fed_exp, force)
2488            # get_termination_info updates the experiment state
2489            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2490            self.deallocate_resources(term_params, expcert, status, force, 
2491                    dealloc_log)
2492
2493            # Remove the terminated experiment
2494            self.state_lock.acquire()
2495            for id in ids:
2496                self.clear_experiment_authorization(id, need_state_lock=False)
2497                if id in self.state: del self.state[id]
2498
2499            if self.state_filename: self.write_state()
2500            self.state_lock.release()
2501
2502            # Delete any synch points associated with this experiment.  All
2503            # synch points begin with the fedid of the experiment.
2504            fedid_keys = set(["fedid:%s" % f for f in ids \
2505                    if isinstance(f, fedid)])
2506            for k in self.synch_store.all_keys():
2507                try:
2508                    if len(k) > 45 and k[0:46] in fedid_keys:
2509                        self.synch_store.del_value(k)
2510                except synch_store.BadDeletionError:
2511                    pass
2512            self.write_store()
2513
2514            # Remove software and other cached stuff from the filesystem.
2515            if repo:
2516                self.remove_dirs("%s/%s" % (self.repodir, repo))
2517       
2518            return { 
2519                    'experiment': exp , 
2520                    'deallocationLog': string.join(dealloc_list, ''),
2521                    'proof': [proof.to_dict()],
2522                    }
2523        else:
2524            raise service_error(service_error.req, "No saved state")
2525
2526
2527    def GetValue(self, req, fid):
2528        """
2529        Get a value from the synchronized store
2530        """
2531        req = req.get('GetValueRequestBody', None)
2532        if not req:
2533            raise service_error(service_error.req,
2534                    "Bad request format (no GetValueRequestBody)")
2535       
2536        name = req.get('name', None)
2537        wait = req.get('wait', False)
2538        rv = { 'name': name }
2539
2540        if not name:
2541            raise service_error(service_error.req, "No name?")
2542
2543        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2544
2545        if access_ok:
2546            self.log.debug("[GetValue] asking for %s " % name)
2547            try:
2548                v = self.synch_store.get_value(name, wait)
2549            except synch_store.RevokedKeyError:
2550                # No more synch on this key
2551                raise service_error(service_error.federant, 
2552                        "Synch key %s revoked" % name)
2553            if v is not None:
2554                rv['value'] = v
2555            rv['proof'] = proof.to_dict()
2556            self.log.debug("[GetValue] got %s from %s" % (v, name))
2557            return rv
2558        else:
2559            raise service_error(service_error.access, "Access Denied",
2560                    proof=proof)
2561       
2562
2563    def SetValue(self, req, fid):
2564        """
2565        Set a value in the synchronized store
2566        """
2567        req = req.get('SetValueRequestBody', None)
2568        if not req:
2569            raise service_error(service_error.req,
2570                    "Bad request format (no SetValueRequestBody)")
2571       
2572        name = req.get('name', None)
2573        v = req.get('value', '')
2574
2575        if not name:
2576            raise service_error(service_error.req, "No name?")
2577
2578        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2579
2580        if access_ok:
2581            try:
2582                self.synch_store.set_value(name, v)
2583                self.write_store()
2584                self.log.debug("[SetValue] set %s to %s" % (name, v))
2585            except synch_store.CollisionError:
2586                # Translate into a service_error
2587                raise service_error(service_error.req,
2588                        "Value already set: %s" %name)
2589            except synch_store.RevokedKeyError:
2590                # No more synch on this key
2591                raise service_error(service_error.federant, 
2592                        "Synch key %s revoked" % name)
2593                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2594        else:
2595            raise service_error(service_error.access, "Access Denied",
2596                    proof=proof)
Note: See TracBrowser for help on using the repository browser.