source: fedd/federation/experiment_control.py @ ec3aa4d

compt_changesinfo-ops
Last change on this file since ec3aa4d was ec3aa4d, checked in by Ted Faber <faber@…>, 12 years ago

Don't need those params.

  • Property mode set to 100644
File size: 86.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from fedid import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info
39
40import topdl
41import list_log
42from ip_allocator import ip_allocator
43from ip_addr import ip_addr
44
45
46class nullHandler(logging.Handler):
47    def emit(self, record): pass
48
49fl = logging.getLogger("fedd.experiment_control")
50fl.addHandler(nullHandler())
51
52
53# Right now, no support for composition.
54class federated_service:
55    def __init__(self, name, exporter=None, importers=None, params=None, 
56            reqs=None, portal=None):
57        self.name=name
58        self.exporter=exporter
59        if importers is None: self.importers = []
60        else: self.importers=importers
61        if params is None: self.params = { }
62        else: self.params = params
63        if reqs is None: self.reqs = []
64        else: self.reqs = reqs
65
66        if portal is not None:
67            self.portal = portal
68        else:
69            self.portal = (name in federated_service.needs_portal)
70
71    def __str__(self):
72        return "name %s export %s import %s params %s reqs %s" % \
73                (self.name, self.exporter, self.importers, self.params,
74                        [ (r['name'], r['visibility']) for r in self.reqs] )
75
76    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
77
78class experiment_control_local(experiment_control_legacy):
79    """
80    Control of experiments that this system can directly access.
81
82    Includes experiment creation, termination and information dissemination.
83    Thred safe.
84    """
85
86    class ssh_cmd_timeout(RuntimeError): pass
87   
88    call_RequestAccess = service_caller('RequestAccess')
89    call_ReleaseAccess = service_caller('ReleaseAccess')
90    call_StartSegment = service_caller('StartSegment')
91    call_TerminateSegment = service_caller('TerminateSegment')
92    call_InfoSegment = service_caller('InfoSegment')
93    call_Ns2Topdl = service_caller('Ns2Topdl')
94
95    def __init__(self, config=None, auth=None):
96        """
97        Intialize the various attributes, most from the config object
98        """
99
100        def parse_tarfile_list(tf):
101            """
102            Parse a tarfile list from the configuration.  This is a set of
103            paths and tarfiles separated by spaces.
104            """
105            rv = [ ]
106            if tf is not None:
107                tl = tf.split()
108                while len(tl) > 1:
109                    p, t = tl[0:2]
110                    del tl[0:2]
111                    rv.append((p, t))
112            return rv
113
114        self.list_log = list_log.list_log
115
116        self.cert_file = config.get("experiment_control", "cert_file")
117        if self.cert_file:
118            self.cert_pwd = config.get("experiment_control", "cert_pwd")
119        else:
120            self.cert_file = config.get("globals", "cert_file")
121            self.cert_pwd = config.get("globals", "cert_pwd")
122
123        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
124                or config.get("globals", "trusted_certs")
125
126        self.repodir = config.get("experiment_control", "repodir")
127        self.repo_url = config.get("experiment_control", "repo_url", 
128                "https://users.isi.deterlab.net:23235");
129
130        self.exp_stem = "fed-stem"
131        self.log = logging.getLogger("fedd.experiment_control")
132        set_log_level(config, "experiment_control", self.log)
133        self.muxmax = 2
134        self.nthreads = 10
135        self.randomize_experiments = False
136
137        self.splitter = None
138        self.ssh_keygen = "/usr/bin/ssh-keygen"
139        self.ssh_identity_file = None
140
141
142        self.debug = config.getboolean("experiment_control", "create_debug")
143        self.cleanup = not config.getboolean("experiment_control", 
144                "leave_tmpfiles")
145        self.state_filename = config.get("experiment_control", 
146                "experiment_state")
147        self.store_filename = config.get("experiment_control", 
148                "synch_store")
149        self.store_url = config.get("experiment_control", "store_url")
150        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
151        self.fedkit = parse_tarfile_list(\
152                config.get("experiment_control", "fedkit"))
153        self.gatewaykit = parse_tarfile_list(\
154                config.get("experiment_control", "gatewaykit"))
155        accessdb_file = config.get("experiment_control", "accessdb")
156
157        dt = config.get("experiment_control", "direct_transit")
158        self.auth_type = config.get('experiment_control', 'auth_type') \
159                or 'legacy'
160        self.auth_dir = config.get('experiment_control', 'auth_dir')
161        # XXX: document this!
162        self.info_cache_limit = \
163                config.getint('experiment_control', 'info_cache', 600)
164        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
165        else: self.direct_transit = [ ]
166        # NB for internal master/slave ops, not experiment setup
167        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
168
169        self.overrides = set([])
170        ovr = config.get('experiment_control', 'overrides')
171        if ovr:
172            for o in ovr.split(","):
173                o = o.strip()
174                if o.startswith('fedid:'): o = o[len('fedid:'):]
175                self.overrides.add(fedid(hexstr=o))
176
177        self.state = { }
178        self.state_lock = Lock()
179        self.tclsh = "/usr/local/bin/otclsh"
180        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
181                config.get("experiment_control", "tcl_splitter",
182                        "/usr/testbed/lib/ns2ir/parse.tcl")
183        mapdb_file = config.get("experiment_control", "mapdb")
184        self.trace_file = sys.stderr
185
186        self.def_expstart = \
187                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
188                "/tmp/federate";
189        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
190                "FEDDIR/hosts";
191        self.def_gwstart = \
192                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
193                "/tmp/bridge.log";
194        self.def_mgwstart = \
195                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
196                "/tmp/bridge.log";
197        self.def_gwimage = "FBSD61-TUNNEL2";
198        self.def_gwtype = "pc";
199        self.local_access = { }
200
201        if self.auth_type == 'legacy':
202            if auth:
203                self.auth = auth
204            else:
205                self.log.error( "[access]: No authorizer initialized, " +\
206                        "creating local one.")
207                auth = authorizer()
208            self.get_access = self.legacy_get_access
209        elif self.auth_type == 'abac':
210            self.auth = abac_authorizer(load=self.auth_dir)
211        else:
212            raise service_error(service_error.internal, 
213                    "Unknown auth_type: %s" % self.auth_type)
214
215        if mapdb_file:
216            self.read_mapdb(mapdb_file)
217        else:
218            self.log.warn("[experiment_control] No testbed map, using defaults")
219            self.tbmap = { 
220                    'deter':'https://users.isi.deterlab.net:23235',
221                    'emulab':'https://users.isi.deterlab.net:23236',
222                    'ucb':'https://users.isi.deterlab.net:23237',
223                    }
224
225        if accessdb_file:
226                self.read_accessdb(accessdb_file)
227        else:
228            raise service_error(service_error.internal,
229                    "No accessdb specified in config")
230
231        # Grab saved state.  OK to do this w/o locking because it's read only
232        # and only one thread should be in existence that can see self.state at
233        # this point.
234        if self.state_filename:
235            self.read_state()
236
237        if self.store_filename:
238            self.read_store()
239        else:
240            self.log.warning("No saved synch store")
241            self.synch_store = synch_store
242
243        # Dispatch tables
244        self.soap_services = {\
245                'New': soap_handler('New', self.new_experiment),
246                'Create': soap_handler('Create', self.create_experiment),
247                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
248                'Vis': soap_handler('Vis', self.get_vis),
249                'Info': soap_handler('Info', self.get_info),
250                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
251                'Terminate': soap_handler('Terminate', 
252                    self.terminate_experiment),
253                'GetValue': soap_handler('GetValue', self.GetValue),
254                'SetValue': soap_handler('SetValue', self.SetValue),
255        }
256
257        self.xmlrpc_services = {\
258                'New': xmlrpc_handler('New', self.new_experiment),
259                'Create': xmlrpc_handler('Create', self.create_experiment),
260                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
261                'Vis': xmlrpc_handler('Vis', self.get_vis),
262                'Info': xmlrpc_handler('Info', self.get_info),
263                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
264                'Terminate': xmlrpc_handler('Terminate',
265                    self.terminate_experiment),
266                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
267                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
268        }
269
270    # Call while holding self.state_lock
271    def write_state(self):
272        """
273        Write a new copy of experiment state after copying the existing state
274        to a backup.
275
276        State format is a simple pickling of the state dictionary.
277        """
278        if os.access(self.state_filename, os.W_OK):
279            copy_file(self.state_filename, \
280                    "%s.bak" % self.state_filename)
281        try:
282            f = open(self.state_filename, 'w')
283            pickle.dump(self.state, f)
284        except EnvironmentError, e:
285            self.log.error("Can't write file %s: %s" % \
286                    (self.state_filename, e))
287        except pickle.PicklingError, e:
288            self.log.error("Pickling problem: %s" % e)
289        except TypeError, e:
290            self.log.error("Pickling problem (TypeError): %s" % e)
291
292    @staticmethod
293    def get_alloc_ids(exp):
294        """
295        Used by read_store and read state.  This used to be worse.
296        """
297
298        return [ a.allocID for a in exp.get_all_allocations() ]
299
300
301    # Call while holding self.state_lock
302    def read_state(self):
303        """
304        Read a new copy of experiment state.  Old state is overwritten.
305
306        State format is a simple pickling of the state dictionary.
307        """
308       
309        try:
310            f = open(self.state_filename, "r")
311            self.state = pickle.load(f)
312            self.log.debug("[read_state]: Read state from %s" % \
313                    self.state_filename)
314        except EnvironmentError, e:
315            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
316                    % (self.state_filename, e))
317        except pickle.UnpicklingError, e:
318            self.log.warning(("[read_state]: No saved state: " + \
319                    "Unpickling failed: %s") % e)
320       
321        for s in self.state.values():
322            try:
323
324                eid = s.fedid
325                if eid : 
326                    if self.auth_type == 'legacy':
327                        # XXX: legacy
328                        # Give the owner rights to the experiment
329                        #self.auth.set_attribute(s['owner'], eid)
330                        # And holders of the eid as well
331                        self.auth.set_attribute(eid, eid)
332                        # allow overrides to control experiments as well
333                        for o in self.overrides:
334                            self.auth.set_attribute(o, eid)
335                        # Set permissions to allow reading of the software
336                        # repo, if any, as well.
337                        for a in self.get_alloc_ids(s):
338                            self.auth.set_attribute(a, 'repo/%s' % eid)
339                else:
340                    raise KeyError("No experiment id")
341            except KeyError, e:
342                self.log.warning("[read_state]: State ownership or identity " +\
343                        "misformatted in %s: %s" % (self.state_filename, e))
344
345
346    def read_accessdb(self, accessdb_file):
347        """
348        Read the mapping from fedids that can create experiments to their name
349        in the 3-level access namespace.  All will be asserted from this
350        testbed and can include the local username and porject that will be
351        asserted on their behalf by this fedd.  Each fedid is also added to the
352        authorization system with the "create" attribute.
353        """
354        self.accessdb = {}
355        # These are the regexps for parsing the db
356        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
357        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
358                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
359        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
360                "\s*->\s*(" + name_expr + ")\s*$")
361        lineno = 0
362
363        # Parse the mappings and store in self.authdb, a dict of
364        # fedid -> (proj, user)
365        try:
366            f = open(accessdb_file, "r")
367            for line in f:
368                lineno += 1
369                line = line.strip()
370                if len(line) == 0 or line.startswith('#'):
371                    continue
372                m = project_line.match(line)
373                if m:
374                    fid = fedid(hexstr=m.group(1))
375                    project, user = m.group(2,3)
376                    if not self.accessdb.has_key(fid):
377                        self.accessdb[fid] = []
378                    self.accessdb[fid].append((project, user))
379                    continue
380
381                m = user_line.match(line)
382                if m:
383                    fid = fedid(hexstr=m.group(1))
384                    project = None
385                    user = m.group(2)
386                    if not self.accessdb.has_key(fid):
387                        self.accessdb[fid] = []
388                    self.accessdb[fid].append((project, user))
389                    continue
390                self.log.warn("[experiment_control] Error parsing access " +\
391                        "db %s at line %d" %  (accessdb_file, lineno))
392        except EnvironmentError:
393            raise service_error(service_error.internal,
394                    ("Error opening/reading %s as experiment " +\
395                            "control accessdb") %  accessdb_file)
396        f.close()
397
398        # Initialize the authorization attributes
399        # XXX: legacy
400        if self.auth_type == 'legacy':
401            for fid in self.accessdb.keys():
402                self.auth.set_attribute(fid, 'create')
403                self.auth.set_attribute(fid, 'new')
404
405    def read_mapdb(self, file):
406        """
407        Read a simple colon separated list of mappings for the
408        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
409        """
410
411        self.tbmap = { }
412        lineno =0
413        try:
414            f = open(file, "r")
415            for line in f:
416                lineno += 1
417                line = line.strip()
418                if line.startswith('#') or len(line) == 0:
419                    continue
420                try:
421                    label, url = line.split(':', 1)
422                    self.tbmap[label] = url
423                except ValueError, e:
424                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
425                            "map db: %s %s" % (lineno, line, e))
426        except EnvironmentError, e:
427            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
428                    "open %s: %s" % (file, e))
429        else:
430            f.close()
431
432    def read_store(self):
433        try:
434            self.synch_store = synch_store()
435            self.synch_store.load(self.store_filename)
436            self.log.debug("[read_store]: Read store from %s" % \
437                    self.store_filename)
438        except EnvironmentError, e:
439            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
440                    % (self.state_filename, e))
441            self.synch_store = synch_store()
442
443        # Set the initial permissions on data in the store.  XXX: This ad hoc
444        # authorization attribute initialization is getting out of hand.
445        # XXX: legacy
446        if self.auth_type == 'legacy':
447            for k in self.synch_store.all_keys():
448                try:
449                    if k.startswith('fedid:'):
450                        fid = fedid(hexstr=k[6:46])
451                        if self.state.has_key(fid):
452                            for a in self.get_alloc_ids(self.state[fid]):
453                                self.auth.set_attribute(a, k)
454                except ValueError, e:
455                    self.log.warn("Cannot deduce permissions for %s" % k)
456
457
458    def write_store(self):
459        """
460        Write a new copy of synch_store after writing current state
461        to a backup.  We use the internal synch_store pickle method to avoid
462        incinsistent data.
463
464        State format is a simple pickling of the store.
465        """
466        if os.access(self.store_filename, os.W_OK):
467            copy_file(self.store_filename, \
468                    "%s.bak" % self.store_filename)
469        try:
470            self.synch_store.save(self.store_filename)
471        except EnvironmentError, e:
472            self.log.error("Can't write file %s: %s" % \
473                    (self.store_filename, e))
474        except TypeError, e:
475            self.log.error("Pickling problem (TypeError): %s" % e)
476
477
478    def remove_dirs(self, dir):
479        """
480        Remove the directory tree and all files rooted at dir.  Log any errors,
481        but continue.
482        """
483        self.log.debug("[removedirs]: removing %s" % dir)
484        try:
485            for path, dirs, files in os.walk(dir, topdown=False):
486                for f in files:
487                    os.remove(os.path.join(path, f))
488                for d in dirs:
489                    os.rmdir(os.path.join(path, d))
490            os.rmdir(dir)
491        except EnvironmentError, e:
492            self.log.error("Error deleting directory tree in %s" % e);
493
494    @staticmethod
495    def make_temp_certfile(expcert, tmpdir):
496        """
497        make a protected copy of the access certificate so the experiment
498        controller can act as the experiment principal.  mkstemp is the most
499        secure way to do that. The directory should be created by
500        mkdtemp.  Return the filename.
501        """
502        if expcert and tmpdir:
503            try:
504                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
505                f = os.fdopen(certf, 'w')
506                print >> f, expcert
507                f.close()
508            except EnvironmentError, e:
509                raise service_error(service_error.internal, 
510                        "Cannot create temp cert file?")
511            return certfn
512        else:
513            return None
514
515       
516    def generate_ssh_keys(self, dest, type="rsa" ):
517        """
518        Generate a set of keys for the gateways to use to talk.
519
520        Keys are of type type and are stored in the required dest file.
521        """
522        valid_types = ("rsa", "dsa")
523        t = type.lower();
524        if t not in valid_types: raise ValueError
525        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
526
527        try:
528            trace = open("/dev/null", "w")
529        except EnvironmentError:
530            raise service_error(service_error.internal,
531                    "Cannot open /dev/null??");
532
533        # May raise CalledProcessError
534        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
535        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
536        if rv != 0:
537            raise service_error(service_error.internal, 
538                    "Cannot generate nonce ssh keys.  %s return code %d" \
539                            % (self.ssh_keygen, rv))
540
541    def generate_seer_certs(self, destdir):
542        '''
543        Create a SEER ca cert and a node cert in destdir/ca.pem and
544        destdir/node.pem respectively.  These will be distributed throughout
545        the federated experiment.  This routine reports errors via
546        service_errors.
547        '''
548        openssl = '/usr/bin/openssl'
549        # All the filenames and parameters we need for openssl calls below
550        ca_key =os.path.join(destdir, 'ca.key') 
551        ca_pem = os.path.join(destdir, 'ca.pem')
552        node_key =os.path.join(destdir, 'node.key') 
553        node_pem = os.path.join(destdir, 'node.pem')
554        node_req = os.path.join(destdir, 'node.req')
555        node_signed = os.path.join(destdir, 'node.signed')
556        days = '%s' % (365 * 10)
557        serial = '%s' % random.randint(0, 1<<16)
558
559        try:
560            # Sequence of calls to create a CA key, create a ca cert, create a
561            # node key, node signing request, and finally a signed node
562            # certificate.
563            sequence = (
564                    (openssl, 'genrsa', '-out', ca_key, '1024'),
565                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
566                        ca_pem, '-days', days, '-subj', 
567                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
568                    (openssl, 'genrsa', '-out', node_key, '1024'),
569                    (openssl, 'req', '-new', '-key', node_key, '-out', 
570                        node_req, '-days', days, '-subj', 
571                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
572                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
573                        '-set_serial', serial, '-req', '-in', node_req, 
574                        '-out', node_signed, '-days', days),
575                )
576            # Do all that stuff; bail if there's an error, and push all the
577            # output to dev/null.
578            for cmd in sequence:
579                trace = open("/dev/null", "w")
580                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
581                if rv != 0:
582                    raise service_error(service_error.internal, 
583                            "Cannot generate SEER certs.  %s return code %d" \
584                                    % (' '.join(cmd), rv))
585            # Concatinate the node key and signed certificate into node.pem
586            f = open(node_pem, 'w')
587            for comp in (node_signed, node_key):
588                g = open(comp, 'r')
589                f.write(g.read())
590                g.close()
591            f.close()
592
593            # Throw out intermediaries.
594            for fn in (ca_key, node_key, node_req, node_signed):
595                os.unlink(fn)
596
597        except EnvironmentError, e:
598            # Any difficulties with the file system wind up here
599            raise service_error(service_error.internal,
600                    "File error on  %s while creating SEER certs: %s" % \
601                            (e.filename, e.strerror))
602
603
604
605    def gentopo(self, str):
606        """
607        Generate the topology data structure from the splitter's XML
608        representation of it.
609
610        The topology XML looks like:
611            <experiment>
612                <nodes>
613                    <node><vname></vname><ips>ip1:ip2</ips></node>
614                </nodes>
615                <lans>
616                    <lan>
617                        <vname></vname><vnode></vnode><ip></ip>
618                        <bandwidth></bandwidth><member>node:port</member>
619                    </lan>
620                </lans>
621        """
622        class topo_parse:
623            """
624            Parse the topology XML and create the dats structure.
625            """
626            def __init__(self):
627                # Typing of the subelements for data conversion
628                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
629                self.int_subelements = ( 'bandwidth',)
630                self.float_subelements = ( 'delay',)
631                # The final data structure
632                self.nodes = [ ]
633                self.lans =  [ ]
634                self.topo = { \
635                        'node': self.nodes,\
636                        'lan' : self.lans,\
637                    }
638                self.element = { }  # Current element being created
639                self.chars = ""     # Last text seen
640
641            def end_element(self, name):
642                # After each sub element the contents is added to the current
643                # element or to the appropriate list.
644                if name == 'node':
645                    self.nodes.append(self.element)
646                    self.element = { }
647                elif name == 'lan':
648                    self.lans.append(self.element)
649                    self.element = { }
650                elif name in self.str_subelements:
651                    self.element[name] = self.chars
652                    self.chars = ""
653                elif name in self.int_subelements:
654                    self.element[name] = int(self.chars)
655                    self.chars = ""
656                elif name in self.float_subelements:
657                    self.element[name] = float(self.chars)
658                    self.chars = ""
659
660            def found_chars(self, data):
661                self.chars += data.rstrip()
662
663
664        tp = topo_parse();
665        parser = xml.parsers.expat.ParserCreate()
666        parser.EndElementHandler = tp.end_element
667        parser.CharacterDataHandler = tp.found_chars
668
669        parser.Parse(str)
670
671        return tp.topo
672       
673
674    def genviz(self, topo):
675        """
676        Generate the visualization the virtual topology
677        """
678
679        neato = "/usr/local/bin/neato"
680        # These are used to parse neato output and to create the visualization
681        # file.
682        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
683        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
684                "%s</type></node>"
685
686        try:
687            # Node names
688            nodes = [ n['vname'] for n in topo['node'] ]
689            topo_lans = topo['lan']
690        except KeyError, e:
691            raise service_error(service_error.internal, "Bad topology: %s" %e)
692
693        lans = { }
694        links = { }
695
696        # Walk through the virtual topology, organizing the connections into
697        # 2-node connections (links) and more-than-2-node connections (lans).
698        # When a lan is created, it's added to the list of nodes (there's a
699        # node in the visualization for the lan).
700        for l in topo_lans:
701            if links.has_key(l['vname']):
702                if len(links[l['vname']]) < 2:
703                    links[l['vname']].append(l['vnode'])
704                else:
705                    nodes.append(l['vname'])
706                    lans[l['vname']] = links[l['vname']]
707                    del links[l['vname']]
708                    lans[l['vname']].append(l['vnode'])
709            elif lans.has_key(l['vname']):
710                lans[l['vname']].append(l['vnode'])
711            else:
712                links[l['vname']] = [ l['vnode'] ]
713
714
715        # Open up a temporary file for dot to turn into a visualization
716        try:
717            df, dotname = tempfile.mkstemp()
718            dotfile = os.fdopen(df, 'w')
719        except EnvironmentError:
720            raise service_error(service_error.internal,
721                    "Failed to open file in genviz")
722
723        try:
724            dnull = open('/dev/null', 'w')
725        except EnvironmentError:
726            service_error(service_error.internal,
727                    "Failed to open /dev/null in genviz")
728
729        # Generate a dot/neato input file from the links, nodes and lans
730        try:
731            print >>dotfile, "graph G {"
732            for n in nodes:
733                print >>dotfile, '\t"%s"' % n
734            for l in links.keys():
735                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
736            for l in lans.keys():
737                for n in lans[l]:
738                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
739            print >>dotfile, "}"
740            dotfile.close()
741        except TypeError:
742            raise service_error(service_error.internal,
743                    "Single endpoint link in vtopo")
744        except EnvironmentError:
745            raise service_error(service_error.internal, "Cannot write dot file")
746
747        # Use dot to create a visualization
748        try:
749            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
750                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
751                stderr=dnull, close_fds=True)
752        except EnvironmentError:
753            raise service_error(service_error.internal, 
754                    "Cannot generate visualization: is graphviz available?")
755        dnull.close()
756
757        # Translate dot to vis format
758        vis_nodes = [ ]
759        vis = { 'node': vis_nodes }
760        for line in dot.stdout:
761            m = vis_re.match(line)
762            if m:
763                vn = m.group(1)
764                vis_node = {'name': vn, \
765                        'x': float(m.group(2)),\
766                        'y' : float(m.group(3)),\
767                    }
768                if vn in links.keys() or vn in lans.keys():
769                    vis_node['type'] = 'lan'
770                else:
771                    vis_node['type'] = 'node'
772                vis_nodes.append(vis_node)
773        rv = dot.wait()
774
775        os.remove(dotname)
776        # XXX: graphviz seems to use low return codes for warnings, like
777        # "couldn't find font"
778        if rv < 2 : return vis
779        else: return None
780
781
782    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
783            cert_pwd=None):
784        """
785        Release access to testbed through fedd
786        """
787
788        if not uri and tbmap:
789            uri = tbmap.get(tb, None)
790        if not uri:
791            raise service_error(service_error.server_config, 
792                    "Unknown testbed: %s" % tb)
793
794        if self.local_access.has_key(uri):
795            resp = self.local_access[uri].ReleaseAccess(\
796                    { 'ReleaseAccessRequestBody' : 
797                        {'allocID': {'fedid': aid}},}, 
798                    fedid(file=cert_file))
799            resp = { 'ReleaseAccessResponseBody': resp } 
800        else:
801            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
802                    cert_file, cert_pwd, self.trusted_certs)
803
804        # better error coding
805
806    def remote_ns2topdl(self, uri, desc):
807
808        req = {
809                'description' : { 'ns2description': desc },
810            }
811
812        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
813                self.trusted_certs)
814
815        if r.has_key('Ns2TopdlResponseBody'):
816            r = r['Ns2TopdlResponseBody']
817            ed = r.get('experimentdescription', None)
818            if ed.has_key('topdldescription'):
819                return topdl.Topology(**ed['topdldescription'])
820            else:
821                raise service_error(service_error.protocol, 
822                        "Bad splitter response (no output)")
823        else:
824            raise service_error(service_error.protocol, "Bad splitter response")
825
826    class start_segment:
827        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
828                cert_pwd=None, trusted_certs=None, caller=None,
829                log_collector=None):
830            self.log = log
831            self.debug = debug
832            self.cert_file = cert_file
833            self.cert_pwd = cert_pwd
834            self.trusted_certs = None
835            self.caller = caller
836            self.testbed = testbed
837            self.log_collector = log_collector
838            self.response = None
839            self.node = { }
840            self.proof = None
841
842        def make_map(self, resp):
843            if 'segmentdescription' not in resp  or \
844                    'topdldescription' not in resp['segmentdescription']:
845                self.log.warn('No topology returned from startsegment')
846                return 
847
848            top = topdl.Topology(
849                    **resp['segmentdescription']['topdldescription'])
850
851            for e in top.elements:
852                if isinstance(e, topdl.Computer):
853                    self.node[e.name] = \
854                            (e.localname, e.status, e.service, e.operation)
855
856        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
857            req = {
858                    'allocID': { 'fedid' : aid }, 
859                    'segmentdescription': { 
860                        'topdldescription': topo.to_dict(),
861                    },
862                }
863
864            if connInfo:
865                req['connection'] = connInfo
866
867            import_svcs = [ s for m in masters.values() \
868                    for s in m if self.testbed in s.importers]
869
870            if import_svcs or self.testbed in masters:
871                req['service'] = []
872
873            for s in import_svcs:
874                for r in s.reqs:
875                    sr = copy.deepcopy(r)
876                    sr['visibility'] = 'import';
877                    req['service'].append(sr)
878
879            for s in masters.get(self.testbed, []):
880                for r in s.reqs:
881                    sr = copy.deepcopy(r)
882                    sr['visibility'] = 'export';
883                    req['service'].append(sr)
884
885            if attrs:
886                req['fedAttr'] = attrs
887
888            try:
889                self.log.debug("Calling StartSegment at %s " % uri)
890                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
891                        self.trusted_certs)
892                if r.has_key('StartSegmentResponseBody'):
893                    lval = r['StartSegmentResponseBody'].get('allocationLog',
894                            None)
895                    if lval and self.log_collector:
896                        for line in  lval.splitlines(True):
897                            self.log_collector.write(line)
898                    self.make_map(r['StartSegmentResponseBody'])
899                    if 'proof' in r: self.proof = r['proof']
900                    self.response = r
901                else:
902                    raise service_error(service_error.internal, 
903                            "Bad response!?: %s" %r)
904                return True
905            except service_error, e:
906                self.log.error("Start segment failed on %s: %s" % \
907                        (self.testbed, e))
908                return False
909
910
911
912    class terminate_segment:
913        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
914                cert_pwd=None, trusted_certs=None, caller=None):
915            self.log = log
916            self.debug = debug
917            self.cert_file = cert_file
918            self.cert_pwd = cert_pwd
919            self.trusted_certs = None
920            self.caller = caller
921            self.testbed = testbed
922
923        def __call__(self, uri, aid ):
924            req = {
925                    'allocID': {'fedid': aid }, 
926                }
927            try:
928                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
929                        self.trusted_certs)
930                return True
931            except service_error, e:
932                self.log.error("Terminate segment failed on %s: %s" % \
933                        (self.testbed, e))
934                return False
935
936    class info_segment(start_segment):
937        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
938                cert_pwd=None, trusted_certs=None, caller=None,
939                log_collector=None):
940            experiment_control_local.start_segment.__init__(self, debug, 
941                    log, testbed, cert_file, cert_pwd, trusted_certs, 
942                    caller, log_collector)
943
944        def __call__(self, uri, aid):
945            req = { 'allocID': { 'fedid' : aid } }
946
947            try:
948                self.log.debug("Calling InfoSegment at %s " % uri)
949                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
950                        self.trusted_certs)
951                if r.has_key('InfoSegmentResponseBody'):
952                    self.make_map(r['InfoSegmentResponseBody'])
953                    if 'proof' in r: self.proof = r['proof']
954                    self.response = r
955                else:
956                    raise service_error(service_error.internal, 
957                            "Bad response!?: %s" %r)
958                return True
959            except service_error, e:
960                self.log.error("Info segment failed on %s: %s" % \
961                        (self.testbed, e))
962                return False
963
964    def annotate_topology(self, top, data):
965        def add_new(ann, attr):
966            for a in ann:
967                if a not in attr: attr.append(a)
968
969        # Annotate the topology with embedding info
970        for e in top.elements:
971            if isinstance(e, topdl.Computer):
972                for s in data:
973                    ann = s.node.get(e.name, None)
974                    if ann is not None:
975                        add_new(ann[0], e.localname)
976                        e.status = ann[1]
977                        add_new(ann[2], e.service)
978                        add_new(ann[3], e.operation)
979                        break
980
981
982
983    def allocate_resources(self, allocated, masters, eid, expid, 
984            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
985            attrs=None, connInfo={}, tbmap=None, expcert=None):
986
987        started = { }           # Testbeds where a sub-experiment started
988                                # successfully
989
990        # XXX
991        fail_soft = False
992
993        if tbmap is None: tbmap = { }
994
995        log = alloc_log or self.log
996
997        tp = thread_pool(self.nthreads)
998        threads = [ ]
999        starters = [ ]
1000
1001        if expcert:
1002            cert = expcert
1003            pw = None
1004        else:
1005            cert = self.cert_file
1006            pw = self.cert_pwd
1007
1008        for tb in allocated.keys():
1009            # Create and start a thread to start the segment, and save it
1010            # to get the return value later
1011            tb_attrs = copy.copy(attrs)
1012            tp.wait_for_slot()
1013            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1014            base, suffix = split_testbed(tb)
1015            if suffix:
1016                tb_attrs.append({'attribute': 'experiment_name', 
1017                    'value': "%s-%s" % (eid, suffix)})
1018            else:
1019                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1020            if not uri:
1021                raise service_error(service_error.internal, 
1022                        "Unknown testbed %s !?" % tb)
1023
1024            aid = tbparams[tb].allocID
1025            if not aid:
1026                raise service_error(service_error.internal, 
1027                        "No alloc id for testbed %s !?" % tb)
1028
1029            s = self.start_segment(log=log, debug=self.debug,
1030                    testbed=tb, cert_file=cert,
1031                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1032                    caller=self.call_StartSegment,
1033                    log_collector=log_collector)
1034            starters.append(s)
1035            t  = pooled_thread(\
1036                    target=s, name=tb,
1037                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1038                    pdata=tp, trace_file=self.trace_file)
1039            threads.append(t)
1040            t.start()
1041
1042        # Wait until all finish (keep pinging the log, though)
1043        mins = 0
1044        revoked = False
1045        while not tp.wait_for_all_done(60.0):
1046            mins += 1
1047            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1048                    % mins)
1049            if not revoked and \
1050                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1051                # a testbed has failed.  Revoke this experiment's
1052                # synchronizarion values so that sub experiments will not
1053                # deadlock waiting for synchronization that will never happen
1054                self.log.info("A subexperiment has failed to swap in, " + \
1055                        "revoking synch keys")
1056                var_key = "fedid:%s" % expid
1057                for k in self.synch_store.all_keys():
1058                    if len(k) > 45 and k[0:46] == var_key:
1059                        self.synch_store.revoke_key(k)
1060                revoked = True
1061
1062        failed = [ t.getName() for t in threads if not t.rv ]
1063        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1064
1065        # If one failed clean up, unless fail_soft is set
1066        if failed:
1067            if not fail_soft:
1068                tp.clear()
1069                for tb in succeeded:
1070                    # Create and start a thread to stop the segment
1071                    tp.wait_for_slot()
1072                    uri = tbparams[tb].uri
1073                    t  = pooled_thread(\
1074                            target=self.terminate_segment(log=log,
1075                                testbed=tb,
1076                                cert_file=cert, 
1077                                cert_pwd=pw,
1078                                trusted_certs=self.trusted_certs,
1079                                caller=self.call_TerminateSegment),
1080                            args=(uri, tbparams[tb].allocID),
1081                            name=tb,
1082                            pdata=tp, trace_file=self.trace_file)
1083                    t.start()
1084                # Wait until all finish (if any are being stopped)
1085                if succeeded:
1086                    tp.wait_for_all_done()
1087
1088                # release the allocations
1089                for tb in tbparams.keys():
1090                    try:
1091                        self.release_access(tb, tbparams[tb].allocID, 
1092                                tbmap=tbmap, uri=tbparams[tb].uri,
1093                                cert_file=cert, cert_pwd=pw)
1094                    except service_error, e:
1095                        self.log.warn("Error releasing access: %s" % e.desc)
1096                # Remove the placeholder
1097                self.state_lock.acquire()
1098                self.state[eid].status = 'failed'
1099                self.state[eid].updated()
1100                if self.state_filename: self.write_state()
1101                self.state_lock.release()
1102                # Remove the repo dir
1103                self.remove_dirs("%s/%s" %(self.repodir, expid))
1104                # Walk up tmpdir, deleting as we go
1105                if self.cleanup:
1106                    self.remove_dirs(tmpdir)
1107                else:
1108                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1109
1110
1111                log.error("Swap in failed on %s" % ",".join(failed))
1112                return
1113        else:
1114            # Walk through the successes and gather the proofs
1115            proofs = { }
1116            for s in starters:
1117                if s.proof:
1118                    proofs[s.testbed] = s.proof
1119            self.annotate_topology(top, starters)
1120            log.info("[start_segment]: Experiment %s active" % eid)
1121
1122
1123        # Walk up tmpdir, deleting as we go
1124        if self.cleanup:
1125            self.remove_dirs(tmpdir)
1126        else:
1127            log.debug("[start_experiment]: not removing %s" % tmpdir)
1128
1129        # Insert the experiment into our state and update the disk copy.
1130        self.state_lock.acquire()
1131        self.state[expid].status = 'active'
1132        self.state[eid] = self.state[expid]
1133        self.state[eid].top = top
1134        self.state[eid].updated()
1135        # Append startup proofs
1136        for f in self.state[eid].get_all_allocations():
1137            if f.tb in proofs:
1138                f.proof.append(proofs[f.tb])
1139
1140        if self.state_filename: self.write_state()
1141        self.state_lock.release()
1142        return
1143
1144
1145    def add_kit(self, e, kit):
1146        """
1147        Add a Software object created from the list of (install, location)
1148        tuples passed as kit  to the software attribute of an object e.  We
1149        do this enough to break out the code, but it's kind of a hack to
1150        avoid changing the old tuple rep.
1151        """
1152
1153        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1154
1155        if isinstance(e.software, list): e.software.extend(s)
1156        else: e.software = s
1157
1158    def append_experiment_authorization(self, expid, attrs, 
1159            need_state_lock=True):
1160        """
1161        Append the authorization information to system state
1162        """
1163
1164        for p, a in attrs:
1165            self.auth.set_attribute(p, a)
1166        self.auth.save()
1167
1168        if need_state_lock: self.state_lock.acquire()
1169        # XXX: really a no op?
1170        #self.state[expid]['auth'].update(attrs)
1171        if self.state_filename: self.write_state()
1172        if need_state_lock: self.state_lock.release()
1173
1174    def clear_experiment_authorization(self, expid, need_state_lock=True):
1175        """
1176        Attrs is a set of attribute principal pairs that need to be removed
1177        from the authenticator.  Remove them and save the authenticator.
1178        """
1179
1180        if need_state_lock: self.state_lock.acquire()
1181        # XXX: should be a no-op
1182        #if expid in self.state and 'auth' in self.state[expid]:
1183            #for p, a in self.state[expid]['auth']:
1184                #self.auth.unset_attribute(p, a)
1185            #self.state[expid]['auth'] = set()
1186        if self.state_filename: self.write_state()
1187        if need_state_lock: self.state_lock.release()
1188        self.auth.save()
1189
1190
1191    def create_experiment_state(self, fid, req, expid, expcert,
1192            state='starting'):
1193        """
1194        Create the initial entry in the experiment's state.  The expid and
1195        expcert are the experiment's fedid and certifacte that represents that
1196        ID, which are installed in the experiment state.  If the request
1197        includes a suggested local name that is used if possible.  If the local
1198        name is already taken by an experiment owned by this user that has
1199        failed, it is overwritten.  Otherwise new letters are added until a
1200        valid localname is found.  The generated local name is returned.
1201        """
1202
1203        if req.has_key('experimentID') and \
1204                req['experimentID'].has_key('localname'):
1205            overwrite = False
1206            eid = req['experimentID']['localname']
1207            # If there's an old failed experiment here with the same local name
1208            # and accessible by this user, we'll overwrite it, otherwise we'll
1209            # fall through and do the collision avoidance.
1210            old_expid = self.get_experiment_fedid(eid)
1211            if old_expid:
1212                users_experiment = True
1213                try:
1214                    self.check_experiment_access(fid, old_expid)
1215                except service_error, e:
1216                    if e.code == service_error.access: users_experiment = False
1217                    else: raise e
1218                if users_experiment:
1219                    self.state_lock.acquire()
1220                    status = self.state[eid].status
1221                    if status and status == 'failed':
1222                        # remove the old access attributes
1223                        self.clear_experiment_authorization(eid,
1224                                need_state_lock=False)
1225                        overwrite = True
1226                        del self.state[eid]
1227                        del self.state[old_expid]
1228                    self.state_lock.release()
1229                else:
1230                    self.log.info('Experiment %s exists, ' % eid + \
1231                            'but this user cannot access it')
1232            self.state_lock.acquire()
1233            while (self.state.has_key(eid) and not overwrite):
1234                eid += random.choice(string.ascii_letters)
1235            # Initial state
1236            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1237                    identity=expcert)
1238            self.state[expid] = self.state[eid]
1239            if self.state_filename: self.write_state()
1240            self.state_lock.release()
1241        else:
1242            eid = self.exp_stem
1243            for i in range(0,5):
1244                eid += random.choice(string.ascii_letters)
1245            self.state_lock.acquire()
1246            while (self.state.has_key(eid)):
1247                eid = self.exp_stem
1248                for i in range(0,5):
1249                    eid += random.choice(string.ascii_letters)
1250            # Initial state
1251            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1252                    identity=expcert)
1253            self.state[expid] = self.state[eid]
1254            if self.state_filename: self.write_state()
1255            self.state_lock.release()
1256
1257        # Let users touch the state.  Authorize this fid and the expid itself
1258        # to touch the experiment, as well as allowing th eoverrides.
1259        self.append_experiment_authorization(eid, 
1260                set([(fid, expid), (expid,expid)] + \
1261                        [ (o, expid) for o in self.overrides]))
1262
1263        return eid
1264
1265
1266    def allocate_ips_to_topo(self, top):
1267        """
1268        Add an ip4_address attribute to all the hosts in the topology, based on
1269        the shared substrates on which they sit.  An /etc/hosts file is also
1270        created and returned as a list of hostfiles entries.  We also return
1271        the allocator, because we may need to allocate IPs to portals
1272        (specifically DRAGON portals).
1273        """
1274        subs = sorted(top.substrates, 
1275                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1276                reverse=True)
1277        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1278        ifs = { }
1279        hosts = [ ]
1280
1281        for idx, s in enumerate(subs):
1282            net_size = len(s.interfaces)+2
1283
1284            a = ips.allocate(net_size)
1285            if a :
1286                base, num = a
1287                if num < net_size: 
1288                    raise service_error(service_error.internal,
1289                            "Allocator returned wrong number of IPs??")
1290            else:
1291                raise service_error(service_error.req, 
1292                        "Cannot allocate IP addresses")
1293            mask = ips.min_alloc
1294            while mask < net_size:
1295                mask *= 2
1296
1297            netmask = ((2**32-1) ^ (mask-1))
1298
1299            base += 1
1300            for i in s.interfaces:
1301                i.attribute.append(
1302                        topdl.Attribute('ip4_address', 
1303                            "%s" % ip_addr(base)))
1304                i.attribute.append(
1305                        topdl.Attribute('ip4_netmask', 
1306                            "%s" % ip_addr(int(netmask))))
1307
1308                hname = i.element.name
1309                if ifs.has_key(hname):
1310                    hosts.append("%s\t%s-%s %s-%d" % \
1311                            (ip_addr(base), hname, s.name, hname,
1312                                ifs[hname]))
1313                else:
1314                    ifs[hname] = 0
1315                    hosts.append("%s\t%s-%s %s-%d %s" % \
1316                            (ip_addr(base), hname, s.name, hname,
1317                                ifs[hname], hname))
1318
1319                ifs[hname] += 1
1320                base += 1
1321        return hosts, ips
1322
1323    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1324            tbparam, masters, tbmap, expid=None, expcert=None):
1325        for tb in testbeds:
1326            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1327                    expcert)
1328            allocated[tb] = 1
1329
1330    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1331            expcert=None):
1332        """
1333        Get access to testbed through fedd and set the parameters for that tb
1334        """
1335        def get_export_project(svcs):
1336            """
1337            Look through for the list of federated_service for this testbed
1338            objects for a project_export service, and extract the project
1339            parameter.
1340            """
1341
1342            pe = [s for s in svcs if s.name=='project_export']
1343            if len(pe) == 1:
1344                return pe[0].params.get('project', None)
1345            elif len(pe) == 0:
1346                return None
1347            else:
1348                raise service_error(service_error.req,
1349                        "More than one project export is not supported")
1350
1351        def add_services(svcs, type, slist, keys):
1352            """
1353            Add the given services to slist.  type is import or export.  Also
1354            add a mapping entry from the assigned id to the original service
1355            record.
1356            """
1357            for i, s in enumerate(svcs):
1358                idx = '%s%d' % (type, i)
1359                keys[idx] = s
1360                sr = {'id': idx, 'name': s.name, 'visibility': type }
1361                if s.params:
1362                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1363                            for k, v in s.params.items()]
1364                slist.append(sr)
1365
1366        uri = tbmap.get(testbed_base(tb), None)
1367        if not uri:
1368            raise service_error(service_error.server_config, 
1369                    "Unknown testbed: %s" % tb)
1370
1371        export_svcs = masters.get(tb,[])
1372        import_svcs = [ s for m in masters.values() \
1373                for s in m \
1374                    if tb in s.importers ]
1375
1376        export_project = get_export_project(export_svcs)
1377        # Compose the credential list so that IDs come before attributes
1378        creds = set()
1379        keys = set()
1380        certs = self.auth.get_creds_for_principal(fid)
1381        if expid:
1382            certs.update(self.auth.get_creds_for_principal(expid))
1383        for c in certs:
1384            keys.add(c.issuer_cert())
1385            creds.add(c.attribute_cert())
1386        creds = list(keys) + list(creds)
1387
1388        if expcert: cert, pw = expcert, None
1389        else: cert, pw = self.cert_file, self.cert_pw
1390
1391        # Request credentials
1392        req = {
1393                'abac_credential': creds,
1394            }
1395        # Make the service request from the services we're importing and
1396        # exporting.  Keep track of the export request ids so we can
1397        # collect the resulting info from the access response.
1398        e_keys = { }
1399        if import_svcs or export_svcs:
1400            slist = []
1401            add_services(import_svcs, 'import', slist, e_keys)
1402            add_services(export_svcs, 'export', slist, e_keys)
1403            req['service'] = slist
1404
1405        if self.local_access.has_key(uri):
1406            # Local access call
1407            req = { 'RequestAccessRequestBody' : req }
1408            r = self.local_access[uri].RequestAccess(req, 
1409                    fedid(file=self.cert_file))
1410            r = { 'RequestAccessResponseBody' : r }
1411        else:
1412            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1413
1414        if r.has_key('RequestAccessResponseBody'):
1415            # Through to here we have a valid response, not a fault.
1416            # Access denied is a fault, so something better or worse than
1417            # access denied has happened.
1418            r = r['RequestAccessResponseBody']
1419            self.log.debug("[get_access] Access granted")
1420        else:
1421            raise service_error(service_error.protocol,
1422                        "Bad proxy response")
1423        if 'proof' not in r:
1424            raise service_error(service_error.protocol,
1425                        "Bad access response (no access proof)")
1426       
1427        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1428                tb=tb, uri=uri, proof=[r['proof']])
1429
1430        # Collect the responses corresponding to the services this testbed
1431        # exports.  These will be the service requests that we will include in
1432        # the start segment requests (with appropriate visibility values) to
1433        # import and export the segments.
1434        for s in r.get('service', []):
1435            id = s.get('id', None)
1436            if id and id in e_keys:
1437                e_keys[id].reqs.append(s)
1438
1439        # Add attributes to parameter space.  We don't allow attributes to
1440        # overlay any parameters already installed.
1441        for a in r.get('fedAttr', []):
1442            try:
1443                if a['attribute']:
1444                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1445            except KeyError:
1446                self.log.error("Bad attribute in response: %s" % a)
1447
1448
1449    def split_topology(self, top, topo, testbeds):
1450        """
1451        Create the sub-topologies that are needed for experiment instantiation.
1452        """
1453        for tb in testbeds:
1454            topo[tb] = top.clone()
1455            # copy in for loop allows deletions from the original
1456            for e in [ e for e in topo[tb].elements]:
1457                etb = e.get_attribute('testbed')
1458                # NB: elements without a testbed attribute won't appear in any
1459                # sub topologies. 
1460                if not etb or etb != tb:
1461                    for i in e.interface:
1462                        for s in i.subs:
1463                            try:
1464                                s.interfaces.remove(i)
1465                            except ValueError:
1466                                raise service_error(service_error.internal,
1467                                        "Can't remove interface??")
1468                    topo[tb].elements.remove(e)
1469            topo[tb].make_indices()
1470
1471    def confirm_software(self, top):
1472        """
1473        Make sure that the software to be loaded in the topo is all available
1474        before we begin making access requests, etc.  This is a subset of
1475        wrangle_software.
1476        """
1477        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1478        pkgs.update([x.location for e in top.elements for x in e.software])
1479
1480        for pkg in pkgs:
1481            loc = pkg
1482
1483            scheme, host, path = urlparse(loc)[0:3]
1484            dest = os.path.basename(path)
1485            if not scheme:
1486                if not loc.startswith('/'):
1487                    loc = "/%s" % loc
1488                loc = "file://%s" %loc
1489            # NB: if scheme was found, loc == pkg
1490            try:
1491                u = urlopen(loc)
1492                u.close()
1493            except Exception, e:
1494                raise service_error(service_error.req, 
1495                        "Cannot open %s: %s" % (loc, e))
1496        return True
1497
1498    def wrangle_software(self, expid, top, topo, tbparams):
1499        """
1500        Copy software out to the repository directory, allocate permissions and
1501        rewrite the segment topologies to look for the software in local
1502        places.
1503        """
1504
1505        # Copy the rpms and tarfiles to a distribution directory from
1506        # which the federants can retrieve them
1507        linkpath = "%s/software" %  expid
1508        softdir ="%s/%s" % ( self.repodir, linkpath)
1509        softmap = { }
1510
1511        # self.fedkit and self.gateway kit are lists of tuples of
1512        # (install_location, download_location) this extracts the download
1513        # locations.
1514        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1515        pkgs.update([x.location for e in top.elements for x in e.software])
1516        try:
1517            os.makedirs(softdir)
1518        except EnvironmentError, e:
1519            raise service_error(
1520                    "Cannot create software directory: %s" % e)
1521        # The actual copying.  Everything's converted into a url for copying.
1522        auth_attrs = set()
1523        for pkg in pkgs:
1524            loc = pkg
1525
1526            scheme, host, path = urlparse(loc)[0:3]
1527            dest = os.path.basename(path)
1528            if not scheme:
1529                if not loc.startswith('/'):
1530                    loc = "/%s" % loc
1531                loc = "file://%s" %loc
1532            # NB: if scheme was found, loc == pkg
1533            try:
1534                u = urlopen(loc)
1535            except Exception, e:
1536                raise service_error(service_error.req, 
1537                        "Cannot open %s: %s" % (loc, e))
1538            try:
1539                f = open("%s/%s" % (softdir, dest) , "w")
1540                self.log.debug("Writing %s/%s" % (softdir,dest) )
1541                data = u.read(4096)
1542                while data:
1543                    f.write(data)
1544                    data = u.read(4096)
1545                f.close()
1546                u.close()
1547            except Exception, e:
1548                raise service_error(service_error.internal,
1549                        "Could not copy %s: %s" % (loc, e))
1550            path = re.sub("/tmp", "", linkpath)
1551            # XXX
1552            softmap[pkg] = \
1553                    "%s/%s/%s" %\
1554                    ( self.repo_url, path, dest)
1555
1556            # Allow the individual segments to access the software by assigning
1557            # an attribute to each testbed allocation that encodes the data to
1558            # be released.  This expression collects the data for each run of
1559            # the loop.
1560            auth_attrs.update([
1561                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1562                        for tb in tbparams.keys()])
1563
1564        self.append_experiment_authorization(expid, auth_attrs)
1565
1566        # Convert the software locations in the segments into the local
1567        # copies on this host
1568        for soft in [ s for tb in topo.values() \
1569                for e in tb.elements \
1570                    if getattr(e, 'software', False) \
1571                        for s in e.software ]:
1572            if softmap.has_key(soft.location):
1573                soft.location = softmap[soft.location]
1574
1575
1576    def new_experiment(self, req, fid):
1577        """
1578        The external interface to empty initial experiment creation called from
1579        the dispatcher.
1580
1581        Creates a working directory, splits the incoming description using the
1582        splitter script and parses out the avrious subsections using the
1583        lcasses above.  Once each sub-experiment is created, use pooled threads
1584        to instantiate them and start it all up.
1585        """
1586        req = req.get('NewRequestBody', None)
1587        if not req:
1588            raise service_error(service_error.req,
1589                    "Bad request format (no NewRequestBody)")
1590
1591        if self.auth.import_credentials(data_list=req.get('credential', [])):
1592            self.auth.save()
1593
1594        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1595                with_proof=True)
1596
1597        if not access_ok:
1598            raise service_error(service_error.access, "New access denied",
1599                    proof=[proof])
1600
1601        try:
1602            tmpdir = tempfile.mkdtemp(prefix="split-")
1603        except EnvironmentError:
1604            raise service_error(service_error.internal, "Cannot create tmp dir")
1605
1606        try:
1607            access_user = self.accessdb[fid]
1608        except KeyError:
1609            raise service_error(service_error.internal,
1610                    "Access map and authorizer out of sync in " + \
1611                            "new_experiment for fedid %s"  % fid)
1612
1613        # Generate an ID for the experiment (slice) and a certificate that the
1614        # allocator can use to prove they own it.  We'll ship it back through
1615        # the encrypted connection.  If the requester supplied one, use it.
1616        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1617            expcert = req['experimentAccess']['X509']
1618            expid = fedid(certstr=expcert)
1619            self.state_lock.acquire()
1620            if expid in self.state:
1621                self.state_lock.release()
1622                raise service_error(service_error.req, 
1623                        'fedid %s identifies an existing experiment' % expid)
1624            self.state_lock.release()
1625        else:
1626            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1627
1628        #now we're done with the tmpdir, and it should be empty
1629        if self.cleanup:
1630            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1631            os.rmdir(tmpdir)
1632        else:
1633            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1634
1635        eid = self.create_experiment_state(fid, req, expid, expcert, 
1636                state='empty')
1637
1638        rv = {
1639                'experimentID': [
1640                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1641                ],
1642                'experimentStatus': 'empty',
1643                'experimentAccess': { 'X509' : expcert },
1644                'proof': proof.to_dict(),
1645            }
1646
1647        return rv
1648
1649    # create_experiment sub-functions
1650
1651    @staticmethod
1652    def get_experiment_key(req, field='experimentID'):
1653        """
1654        Parse the experiment identifiers out of the request (the request body
1655        tag has been removed).  Specifically this pulls either the fedid or the
1656        localname out of the experimentID field.  A fedid is preferred.  If
1657        neither is present or the request does not contain the fields,
1658        service_errors are raised.
1659        """
1660        # Get the experiment access
1661        exp = req.get(field, None)
1662        if exp:
1663            if exp.has_key('fedid'):
1664                key = exp['fedid']
1665            elif exp.has_key('localname'):
1666                key = exp['localname']
1667            else:
1668                raise service_error(service_error.req, "Unknown lookup type")
1669        else:
1670            raise service_error(service_error.req, "No request?")
1671
1672        return key
1673
1674    def get_experiment_ids_and_start(self, key, tmpdir):
1675        """
1676        Get the experiment name, id and access certificate from the state, and
1677        set the experiment state to 'starting'.  returns a triple (fedid,
1678        localname, access_cert_file). The access_cert_file is a copy of the
1679        contents of the access certificate, created in the tempdir with
1680        restricted permissions.  If things are confused, raise an exception.
1681        """
1682
1683        expid = eid = None
1684        self.state_lock.acquire()
1685        if key in self.state:
1686            exp = self.state[key]
1687            exp.status = "starting"
1688            exp.updated()
1689            expid = exp.fedid
1690            eid = exp.localname
1691            expcert = exp.identity
1692        self.state_lock.release()
1693
1694        # make a protected copy of the access certificate so the experiment
1695        # controller can act as the experiment principal.
1696        if expcert:
1697            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1698            if not expcert_file:
1699                raise service_error(service_error.internal, 
1700                        "Cannot create temp cert file?")
1701        else:
1702            expcert_file = None
1703
1704        return (eid, expid, expcert_file)
1705
1706    def get_topology(self, req, tmpdir):
1707        """
1708        Get the ns2 content and put it into a file for parsing.  Call the local
1709        or remote parser and return the topdl.Topology.  Errors result in
1710        exceptions.  req is the request and tmpdir is a work directory.
1711        """
1712
1713        # The tcl parser needs to read a file so put the content into that file
1714        descr=req.get('experimentdescription', None)
1715        if descr:
1716            if 'ns2description' in descr:
1717                file_content=descr['ns2description']
1718            elif 'topdldescription' in descr:
1719                return topdl.Topology(**descr['topdldescription'])
1720            else:
1721                raise service_error(service_error.req, 
1722                        'Unknown experiment description type')
1723        else:
1724            raise service_error(service_error.req, "No experiment description")
1725
1726
1727        if self.splitter_url:
1728            self.log.debug("Calling remote topdl translator at %s" % \
1729                    self.splitter_url)
1730            top = self.remote_ns2topdl(self.splitter_url, file_content)
1731        else:
1732            tclfile = os.path.join(tmpdir, "experiment.tcl")
1733            if file_content:
1734                try:
1735                    f = open(tclfile, 'w')
1736                    f.write(file_content)
1737                    f.close()
1738                except EnvironmentError:
1739                    raise service_error(service_error.internal,
1740                            "Cannot write temp experiment description")
1741            else:
1742                raise service_error(service_error.req, 
1743                        "Only ns2descriptions supported")
1744            pid = "dummy"
1745            gid = "dummy"
1746            eid = "dummy"
1747
1748            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1749                str(self.muxmax), '-m', 'dummy']
1750
1751            tclcmd.extend([pid, gid, eid, tclfile])
1752
1753            self.log.debug("running local splitter %s", " ".join(tclcmd))
1754            # This is just fantastic.  As a side effect the parser copies
1755            # tb_compat.tcl into the current directory, so that directory
1756            # must be writable by the fedd user.  Doing this in the
1757            # temporary subdir ensures this is the case.
1758            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1759                    cwd=tmpdir)
1760            split_data = tclparser.stdout
1761
1762            top = topdl.topology_from_xml(file=split_data, top="experiment")
1763            os.remove(tclfile)
1764
1765        return top
1766
1767    def get_testbed_services(self, req, testbeds):
1768        """
1769        Parse the services section of the request into into two dicts mapping
1770        testbed to lists of federated_service objects.  The first lists all
1771        exporters of services, and the second all exporters of services that
1772        need control portals in the experiment.
1773        """
1774        masters = { }
1775        pmasters = { }
1776        for s in req.get('service', []):
1777            # If this is a service request with the importall field
1778            # set, fill it out.
1779
1780            if s.get('importall', False):
1781                s['import'] = [ tb for tb in testbeds \
1782                        if tb not in s.get('export',[])]
1783                del s['importall']
1784
1785            # Add the service to masters
1786            for tb in s.get('export', []):
1787                if s.get('name', None):
1788
1789                    params = { }
1790                    for a in s.get('fedAttr', []):
1791                        params[a.get('attribute', '')] = a.get('value','')
1792
1793                    fser = federated_service(name=s['name'],
1794                            exporter=tb, importers=s.get('import',[]),
1795                            params=params)
1796                    if fser.name == 'hide_hosts' \
1797                            and 'hosts' not in fser.params:
1798                        fser.params['hosts'] = \
1799                                ",".join(tb_hosts.get(fser.exporter, []))
1800                    if tb in masters: masters[tb].append(fser)
1801                    else: masters[tb] = [fser]
1802
1803                    if fser.portal:
1804                        if tb not in pmasters: pmasters[tb] = [ fser ]
1805                        else: pmasters[tb].append(fser)
1806                else:
1807                    self.log.error('Testbed service does not have name " + \
1808                            "and importers')
1809        return masters, pmasters
1810
1811    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1812        """
1813        Create the ssh keys necessary for interconnecting the portal nodes and
1814        the global hosts file for letting each segment know about the IP
1815        addresses in play.  Save these into the repo.  Add attributes to the
1816        autorizer allowing access controllers to download them and return a set
1817        of attributes that inform the segments where to find this stuff.  May
1818        raise service_errors in if there are problems.
1819        """
1820        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1821        gw_secretkey_base = "fed.%s" % self.ssh_type
1822        keydir = os.path.join(tmpdir, 'keys')
1823        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1824        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1825
1826        try:
1827            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1828        except ValueError:
1829            raise service_error(service_error.server_config, 
1830                    "Bad key type (%s)" % self.ssh_type)
1831
1832        self.generate_seer_certs(keydir)
1833
1834        # Copy configuration files into the remote file store
1835        # The config urlpath
1836        configpath = "/%s/config" % expid
1837        # The config file system location
1838        configdir ="%s%s" % ( self.repodir, configpath)
1839        try:
1840            os.makedirs(configdir)
1841        except EnvironmentError, e:
1842            raise service_error(service_error.internal,
1843                    "Cannot create config directory: %s" % e)
1844        try:
1845            f = open("%s/hosts" % configdir, "w")
1846            print >> f, string.join(hosts, '\n')
1847            f.close()
1848        except EnvironmentError, e:
1849            raise service_error(service_error.internal, 
1850                    "Cannot write hosts file: %s" % e)
1851        try:
1852            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1853            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1854            copy_file(os.path.join(keydir, 'ca.pem'), 
1855                    os.path.join(configdir, 'ca.pem'))
1856            copy_file(os.path.join(keydir, 'node.pem'), 
1857                    os.path.join(configdir, 'node.pem'))
1858        except EnvironmentError, e:
1859            raise service_error(service_error.internal, 
1860                    "Cannot copy keyfiles: %s" % e)
1861
1862        # Allow the individual testbeds to access the configuration files,
1863        # again by setting an attribute for the relevant pathnames on each
1864        # allocation principal.  Yeah, that's a long list comprehension.
1865        self.append_experiment_authorization(expid, set([
1866            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1867                    for tb in tbparams.keys() \
1868                        for f in ("hosts", 'ca.pem', 'node.pem', 
1869                            gw_secretkey_base, gw_pubkey_base)]))
1870
1871        attrs = [ 
1872                {
1873                    'attribute': 'ssh_pubkey', 
1874                    'value': '%s/%s/config/%s' % \
1875                            (self.repo_url, expid, gw_pubkey_base)
1876                },
1877                {
1878                    'attribute': 'ssh_secretkey', 
1879                    'value': '%s/%s/config/%s' % \
1880                            (self.repo_url, expid, gw_secretkey_base)
1881                },
1882                {
1883                    'attribute': 'hosts', 
1884                    'value': '%s/%s/config/hosts' % \
1885                            (self.repo_url, expid)
1886                },
1887                {
1888                    'attribute': 'seer_ca_pem', 
1889                    'value': '%s/%s/config/%s' % \
1890                            (self.repo_url, expid, 'ca.pem')
1891                },
1892                {
1893                    'attribute': 'seer_node_pem', 
1894                    'value': '%s/%s/config/%s' % \
1895                            (self.repo_url, expid, 'node.pem')
1896                },
1897            ]
1898        return attrs
1899
1900
1901    def get_vtopo(self, req, fid):
1902        """
1903        Return the stored virtual topology for this experiment
1904        """
1905        rv = None
1906        state = None
1907
1908        req = req.get('VtopoRequestBody', None)
1909        if not req:
1910            raise service_error(service_error.req,
1911                    "Bad request format (no VtopoRequestBody)")
1912        exp = req.get('experiment', None)
1913        if exp:
1914            if exp.has_key('fedid'):
1915                key = exp['fedid']
1916                keytype = "fedid"
1917            elif exp.has_key('localname'):
1918                key = exp['localname']
1919                keytype = "localname"
1920            else:
1921                raise service_error(service_error.req, "Unknown lookup type")
1922        else:
1923            raise service_error(service_error.req, "No request?")
1924
1925        proof = self.check_experiment_access(fid, key)
1926
1927        self.state_lock.acquire()
1928        # XXX: this needs to be recalculated
1929        if key in self.state:
1930            if self.state[key].top is not None:
1931                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1932                rv = { 'experiment' : {keytype: key },
1933                        'vtopo': vtopo,
1934                        'proof': proof.to_dict(), 
1935                    }
1936            else:
1937                state = self.state[key].status
1938        self.state_lock.release()
1939
1940        if rv: return rv
1941        else: 
1942            if state:
1943                raise service_error(service_error.partial, 
1944                        "Not ready: %s" % state)
1945            else:
1946                raise service_error(service_error.req, "No such experiment")
1947
1948    def get_vis(self, req, fid):
1949        """
1950        Return the stored visualization for this experiment
1951        """
1952        rv = None
1953        state = None
1954
1955        req = req.get('VisRequestBody', None)
1956        if not req:
1957            raise service_error(service_error.req,
1958                    "Bad request format (no VisRequestBody)")
1959        exp = req.get('experiment', None)
1960        if exp:
1961            if exp.has_key('fedid'):
1962                key = exp['fedid']
1963                keytype = "fedid"
1964            elif exp.has_key('localname'):
1965                key = exp['localname']
1966                keytype = "localname"
1967            else:
1968                raise service_error(service_error.req, "Unknown lookup type")
1969        else:
1970            raise service_error(service_error.req, "No request?")
1971
1972        proof = self.check_experiment_access(fid, key)
1973
1974        self.state_lock.acquire()
1975        # Generate the visualization
1976        if key in self.state:
1977            if self.state[key].top is not None:
1978                try:
1979                    vis = self.genviz(
1980                            topdl.topology_to_vtopo(self.state[key].topo))
1981                except service_error, e:
1982                    self.state_lock.release()
1983                    raise e
1984                rv =  { 'experiment' : {keytype: key },
1985                        'vis': vis,
1986                        'proof': proof.to_dict(), 
1987                        }
1988            else:
1989                state = self.state[key].status
1990        self.state_lock.release()
1991
1992        if rv: return rv
1993        else:
1994            if state:
1995                raise service_error(service_error.partial, 
1996                        "Not ready: %s" % state)
1997            else:
1998                raise service_error(service_error.req, "No such experiment")
1999
2000   
2001    def save_federant_information(self, allocated, tbparams, eid, top):
2002        """
2003        Store the various data that have changed in the experiment state
2004        between when it was started and the beginning of resource allocation.
2005        This is basically the information about each local allocation.  This
2006        fills in the values of the placeholder allocation in the state.  It
2007        also collects the access proofs and returns them as dicts for a
2008        response message.
2009        """
2010        self.state_lock.acquire()
2011        exp = self.state[eid]
2012        exp.top = top.clone()
2013        # save federant information
2014        for k in allocated.keys():
2015            exp.add_allocation(tbparams[k])
2016            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2017                type="testbed", localname=[k]))
2018
2019        # Access proofs for the response message
2020        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2021                    for p in tbparams[k].proof]
2022        exp.updated()
2023        if self.state_filename: 
2024            self.write_state()
2025        self.state_lock.release()
2026        return proofs
2027
2028    def clear_placeholder(self, eid, expid, tmpdir):
2029        """
2030        Clear the placeholder and remove any allocated temporary dir.
2031        """
2032
2033        self.state_lock.acquire()
2034        del self.state[eid]
2035        del self.state[expid]
2036        if self.state_filename: self.write_state()
2037        self.state_lock.release()
2038        if tmpdir and self.cleanup:
2039            self.remove_dirs(tmpdir)
2040
2041    # end of create_experiment sub-functions
2042
2043    def create_experiment(self, req, fid):
2044        """
2045        The external interface to experiment creation called from the
2046        dispatcher.
2047
2048        Creates a working directory, splits the incoming description using the
2049        splitter script and parses out the various subsections using the
2050        classes above.  Once each sub-experiment is created, use pooled threads
2051        to instantiate them and start it all up.
2052        """
2053
2054        req = req.get('CreateRequestBody', None)
2055        if req:
2056            key = self.get_experiment_key(req)
2057        else:
2058            raise service_error(service_error.req,
2059                    "Bad request format (no CreateRequestBody)")
2060
2061        # Import information from the requester
2062        if self.auth.import_credentials(data_list=req.get('credential', [])):
2063            self.auth.save()
2064
2065        # Make sure that the caller can talk to us
2066        proof = self.check_experiment_access(fid, key)
2067
2068        # Install the testbed map entries supplied with the request into a copy
2069        # of the testbed map.
2070        tbmap = dict(self.tbmap)
2071        for m in req.get('testbedmap', []):
2072            if 'testbed' in m and 'uri' in m:
2073                tbmap[m['testbed']] = m['uri']
2074
2075        # a place to work
2076        try:
2077            tmpdir = tempfile.mkdtemp(prefix="split-")
2078            os.mkdir(tmpdir+"/keys")
2079        except EnvironmentError:
2080            raise service_error(service_error.internal, "Cannot create tmp dir")
2081
2082        tbparams = { }
2083
2084        eid, expid, expcert_file = \
2085                self.get_experiment_ids_and_start(key, tmpdir)
2086
2087        # This catches exceptions to clear the placeholder if necessary
2088        try: 
2089            if not (eid and expid):
2090                raise service_error(service_error.internal, 
2091                        "Cannot find local experiment info!?")
2092
2093            top = self.get_topology(req, tmpdir)
2094            self.confirm_software(top)
2095            # Assign the IPs
2096            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2097            # Find the testbeds to look up
2098            tb_hosts = { }
2099            testbeds = [ ]
2100            for e in top.elements:
2101                if isinstance(e, topdl.Computer):
2102                    tb = e.get_attribute('testbed') or 'default'
2103                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2104                    else: 
2105                        tb_hosts[tb] = [ e.name ]
2106                        testbeds.append(tb)
2107
2108            masters, pmasters = self.get_testbed_services(req, testbeds)
2109            allocated = { }         # Testbeds we can access
2110            topo ={ }               # Sub topologies
2111            connInfo = { }          # Connection information
2112
2113            self.split_topology(top, topo, testbeds)
2114
2115            self.get_access_to_testbeds(testbeds, fid, allocated, 
2116                    tbparams, masters, tbmap, expid, expcert_file)
2117
2118            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2119
2120            part = experiment_partition(self.auth, self.store_url, tbmap,
2121                    self.muxmax, self.direct_transit)
2122            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2123                    connInfo, expid)
2124
2125            auth_attrs = set()
2126            # Now get access to the dynamic testbeds (those added above)
2127            for tb in [ t for t in topo if t not in allocated]:
2128                self.get_access(tb, tbparams, fid, masters, tbmap, 
2129                        expid, expcert_file)
2130                allocated[tb] = 1
2131                store_keys = topo[tb].get_attribute('store_keys')
2132                # Give the testbed access to keys it exports or imports
2133                if store_keys:
2134                    auth_attrs.update(set([
2135                        (tbparams[tb].allocID, sk) \
2136                                for sk in store_keys.split(" ")]))
2137
2138            if auth_attrs:
2139                self.append_experiment_authorization(expid, auth_attrs)
2140
2141            # transit and disconnected testbeds may not have a connInfo entry.
2142            # Fill in the blanks.
2143            for t in allocated.keys():
2144                if not connInfo.has_key(t):
2145                    connInfo[t] = { }
2146
2147            self.wrangle_software(expid, top, topo, tbparams)
2148
2149            proofs = self.save_federant_information(allocated, tbparams, 
2150                    eid, top)
2151        except service_error, e:
2152            # If something goes wrong in the parse (usually an access error)
2153            # clear the placeholder state.  From here on out the code delays
2154            # exceptions.  Failing at this point returns a fault to the remote
2155            # caller.
2156            self.clear_placeholder(eid, expid, tmpdir)
2157            raise e
2158
2159        # Start the background swapper and return the starting state.  From
2160        # here on out, the state will stick around a while.
2161
2162        # Create a logger that logs to the experiment's state object as well as
2163        # to the main log file.
2164        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2165        alloc_collector = self.list_log(self.state[eid].log)
2166        h = logging.StreamHandler(alloc_collector)
2167        # XXX: there should be a global one of these rather than repeating the
2168        # code.
2169        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2170                    '%d %b %y %H:%M:%S'))
2171        alloc_log.addHandler(h)
2172
2173        # Start a thread to do the resource allocation
2174        t  = Thread(target=self.allocate_resources,
2175                args=(allocated, masters, eid, expid, tbparams, 
2176                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2177                    connInfo, tbmap, expcert_file),
2178                name=eid)
2179        t.start()
2180
2181        rv = {
2182                'experimentID': [
2183                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2184                ],
2185                'experimentStatus': 'starting',
2186                'proof': [ proof.to_dict() ] + proofs,
2187            }
2188
2189        return rv
2190   
2191    def get_experiment_fedid(self, key):
2192        """
2193        find the fedid associated with the localname key in the state database.
2194        """
2195
2196        rv = None
2197        self.state_lock.acquire()
2198        if key in self.state:
2199            rv = self.state[key].fedid
2200        self.state_lock.release()
2201        return rv
2202
2203    def check_experiment_access(self, fid, key):
2204        """
2205        Confirm that the fid has access to the experiment.  Though a request
2206        may be made in terms of a local name, the access attribute is always
2207        the experiment's fedid.
2208        """
2209        if not isinstance(key, fedid):
2210            key = self.get_experiment_fedid(key)
2211
2212        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2213
2214        if access_ok:
2215            return proof
2216        else:
2217            raise service_error(service_error.access, "Access Denied",
2218                proof)
2219
2220
2221    def get_handler(self, path, fid):
2222        """
2223        Perhaps surprisingly named, this function handles HTTP GET requests to
2224        this server (SOAP requests are POSTs).
2225        """
2226        self.log.info("Get handler %s %s" % (path, fid))
2227        # XXX: log proofs?
2228        if self.auth.check_attribute(fid, path):
2229            return ("%s/%s" % (self.repodir, path), "application/binary")
2230        else:
2231            return (None, None)
2232
2233    def update_info(self, key, force=False):
2234        top = None
2235        self.state_lock.acquire()
2236        if key in self.state:
2237            if force or self.state[key].older_than(self.info_cache_limit):
2238                top = self.state[key].top
2239                if top is not None: top = top.clone()
2240                d1, info_params, cert, d2 = \
2241                        self.get_segment_info(self.state[key], need_lock=False)
2242        self.state_lock.release()
2243
2244        if top is None: return
2245
2246        try:
2247            tmpdir = tempfile.mkdtemp(prefix="info-")
2248        except EnvironmentError:
2249            raise service_error(service_error.internal, 
2250                    "Cannot create tmp dir")
2251        cert_file = self.make_temp_certfile(cert, tmpdir)
2252
2253        data = []
2254        try:
2255            for k, (uri, aid) in info_params.items():
2256                info=self.info_segment(log=self.log, testbed=uri,
2257                            cert_file=cert_file, cert_pwd=None,
2258                            trusted_certs=self.trusted_certs,
2259                            caller=self.call_InfoSegment)
2260                info(uri, aid)
2261                data.append(info)
2262        # Clean up the tmpdir no matter what
2263        finally:
2264            if tmpdir: self.remove_dirs(tmpdir)
2265
2266        self.annotate_topology(top, data)
2267        self.state_lock.acquire()
2268        if key in self.state:
2269            self.state[key].top = top
2270            self.state[key].updated()
2271            if self.state_filename: self.write_state()
2272        self.state_lock.release()
2273
2274   
2275    def get_info(self, req, fid):
2276        """
2277        Return all the stored info about this experiment
2278        """
2279        rv = None
2280
2281        req = req.get('InfoRequestBody', None)
2282        if not req:
2283            raise service_error(service_error.req,
2284                    "Bad request format (no InfoRequestBody)")
2285        exp = req.get('experiment', None)
2286        legacy = req.get('legacy', False)
2287        fresh = req.get('fresh', False)
2288        if exp:
2289            if exp.has_key('fedid'):
2290                key = exp['fedid']
2291                keytype = "fedid"
2292            elif exp.has_key('localname'):
2293                key = exp['localname']
2294                keytype = "localname"
2295            else:
2296                raise service_error(service_error.req, "Unknown lookup type")
2297        else:
2298            raise service_error(service_error.req, "No request?")
2299
2300        proof = self.check_experiment_access(fid, key)
2301
2302        self.update_info(key, fresh)
2303
2304        self.state_lock.acquire()
2305        if self.state.has_key(key):
2306            rv = self.state[key].get_info()
2307            # Copy the topo if we need legacy annotations
2308            if legacy:
2309                top = self.state[key].top
2310                if top is not None: top = top.clone()
2311        self.state_lock.release()
2312
2313        # If the legacy visualization and topology representations are
2314        # requested, calculate them and add them to the return.
2315        if legacy and rv is not None:
2316            if top is not None:
2317                vtopo = topdl.topology_to_vtopo(top)
2318                if vtopo is not None:
2319                    rv['vtopo'] = vtopo
2320                    try:
2321                        vis = self.genviz(vtopo)
2322                    except service_error, e:
2323                        self.log.debug('Problem generating visualization: %s' \
2324                                % e)
2325                        vis = None
2326                    if vis is not None:
2327                        rv['vis'] = vis
2328        if rv:
2329            rv['proof'] = proof.to_dict()
2330            return rv
2331        else:
2332            raise service_error(service_error.req, "No such experiment")
2333
2334    def get_multi_info(self, req, fid):
2335        """
2336        Return all the stored info that this fedid can access
2337        """
2338        rv = { 'info': [ ], 'proof': [ ] }
2339
2340        self.state_lock.acquire()
2341        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2342            try:
2343                proof = self.check_experiment_access(fid, key)
2344            except service_error, e:
2345                if e.code == service_error.access:
2346                    continue
2347                else:
2348                    self.state_lock.release()
2349                    raise e
2350
2351            if self.state.has_key(key):
2352                e = self.state[key].get_info()
2353                e['proof'] = proof.to_dict()
2354                rv['info'].append(e)
2355                rv['proof'].append(proof.to_dict())
2356        self.state_lock.release()
2357        return rv
2358
2359    def check_termination_status(self, fed_exp, force):
2360        """
2361        Confirm that the experiment is sin a valid state to stop (or force it)
2362        return the state - invalid states for deletion and force settings cause
2363        exceptions.
2364        """
2365        self.state_lock.acquire()
2366        status = fed_exp.status
2367
2368        if status:
2369            if status in ('starting', 'terminating'):
2370                if not force:
2371                    self.state_lock.release()
2372                    raise service_error(service_error.partial, 
2373                            'Experiment still being created or destroyed')
2374                else:
2375                    self.log.warning('Experiment in %s state ' % status + \
2376                            'being terminated by force.')
2377            self.state_lock.release()
2378            return status
2379        else:
2380            # No status??? trouble
2381            self.state_lock.release()
2382            raise service_error(service_error.internal,
2383                    "Experiment has no status!?")
2384
2385    def get_segment_info(self, fed_exp, need_lock=True):
2386        ids = []
2387        term_params = { }
2388        if need_lock: self.state_lock.acquire()
2389        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2390        expcert = fed_exp.identity
2391        repo = "%s" % fed_exp.fedid
2392
2393        # Collect the allocation/segment ids into a dict keyed by the fedid
2394        # of the allocation that contains a tuple of uri, aid
2395        for i, fed in enumerate(fed_exp.get_all_allocations()):
2396            uri = fed.uri
2397            aid = fed.allocID
2398            term_params[aid] = (uri, aid)
2399        if need_lock: self.state_lock.release()
2400        return ids, term_params, expcert, repo
2401
2402
2403    def get_termination_info(self, fed_exp):
2404        self.state_lock.acquire()
2405        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2406        # Change the experiment state
2407        fed_exp.status = 'terminating'
2408        fed_exp.updated()
2409        if self.state_filename: self.write_state()
2410        self.state_lock.release()
2411
2412        return ids, term_params, expcert, repo
2413
2414
2415    def deallocate_resources(self, term_params, expcert, status, force, 
2416            dealloc_log):
2417        tmpdir = None
2418        # This try block makes sure the tempdir is cleared
2419        try:
2420            # If no expcert, try the deallocation as the experiment
2421            # controller instance.
2422            if expcert and self.auth_type != 'legacy': 
2423                try:
2424                    tmpdir = tempfile.mkdtemp(prefix="term-")
2425                except EnvironmentError:
2426                    raise service_error(service_error.internal, 
2427                            "Cannot create tmp dir")
2428                cert_file = self.make_temp_certfile(expcert, tmpdir)
2429                pw = None
2430            else: 
2431                cert_file = self.cert_file
2432                pw = self.cert_pwd
2433
2434            # Stop everyone.  NB, wait_for_all waits until a thread starts
2435            # and then completes, so we can't wait if nothing starts.  So,
2436            # no tbparams, no start.
2437            if len(term_params) > 0:
2438                tp = thread_pool(self.nthreads)
2439                for k, (uri, aid) in term_params.items():
2440                    # Create and start a thread to stop the segment
2441                    tp.wait_for_slot()
2442                    t  = pooled_thread(\
2443                            target=self.terminate_segment(log=dealloc_log,
2444                                testbed=uri,
2445                                cert_file=cert_file, 
2446                                cert_pwd=pw,
2447                                trusted_certs=self.trusted_certs,
2448                                caller=self.call_TerminateSegment),
2449                            args=(uri, aid), name=k,
2450                            pdata=tp, trace_file=self.trace_file)
2451                    t.start()
2452                # Wait for completions
2453                tp.wait_for_all_done()
2454
2455            # release the allocations (failed experiments have done this
2456            # already, and starting experiments may be in odd states, so we
2457            # ignore errors releasing those allocations
2458            try: 
2459                for k, (uri, aid)  in term_params.items():
2460                    self.release_access(None, aid, uri=uri,
2461                            cert_file=cert_file, cert_pwd=pw)
2462            except service_error, e:
2463                if status != 'failed' and not force:
2464                    raise e
2465
2466        # Clean up the tmpdir no matter what
2467        finally:
2468            if tmpdir: self.remove_dirs(tmpdir)
2469
2470    def terminate_experiment(self, req, fid):
2471        """
2472        Swap this experiment out on the federants and delete the shared
2473        information
2474        """
2475        tbparams = { }
2476        req = req.get('TerminateRequestBody', None)
2477        if not req:
2478            raise service_error(service_error.req,
2479                    "Bad request format (no TerminateRequestBody)")
2480
2481        key = self.get_experiment_key(req, 'experiment')
2482        proof = self.check_experiment_access(fid, key)
2483        exp = req.get('experiment', False)
2484        force = req.get('force', False)
2485
2486        dealloc_list = [ ]
2487
2488
2489        # Create a logger that logs to the dealloc_list as well as to the main
2490        # log file.
2491        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2492        h = logging.StreamHandler(self.list_log(dealloc_list))
2493        # XXX: there should be a global one of these rather than repeating the
2494        # code.
2495        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2496                    '%d %b %y %H:%M:%S'))
2497        dealloc_log.addHandler(h)
2498
2499        self.state_lock.acquire()
2500        fed_exp = self.state.get(key, None)
2501        self.state_lock.release()
2502        repo = None
2503
2504        if fed_exp:
2505            status = self.check_termination_status(fed_exp, force)
2506            # get_termination_info updates the experiment state
2507            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2508            self.deallocate_resources(term_params, expcert, status, force, 
2509                    dealloc_log)
2510
2511            # Remove the terminated experiment
2512            self.state_lock.acquire()
2513            for id in ids:
2514                self.clear_experiment_authorization(id, need_state_lock=False)
2515                if id in self.state: del self.state[id]
2516
2517            if self.state_filename: self.write_state()
2518            self.state_lock.release()
2519
2520            # Delete any synch points associated with this experiment.  All
2521            # synch points begin with the fedid of the experiment.
2522            fedid_keys = set(["fedid:%s" % f for f in ids \
2523                    if isinstance(f, fedid)])
2524            for k in self.synch_store.all_keys():
2525                try:
2526                    if len(k) > 45 and k[0:46] in fedid_keys:
2527                        self.synch_store.del_value(k)
2528                except synch_store.BadDeletionError:
2529                    pass
2530            self.write_store()
2531
2532            # Remove software and other cached stuff from the filesystem.
2533            if repo:
2534                self.remove_dirs("%s/%s" % (self.repodir, repo))
2535       
2536            return { 
2537                    'experiment': exp , 
2538                    'deallocationLog': string.join(dealloc_list, ''),
2539                    'proof': [proof.to_dict()],
2540                    }
2541        else:
2542            raise service_error(service_error.req, "No saved state")
2543
2544
2545    def GetValue(self, req, fid):
2546        """
2547        Get a value from the synchronized store
2548        """
2549        req = req.get('GetValueRequestBody', None)
2550        if not req:
2551            raise service_error(service_error.req,
2552                    "Bad request format (no GetValueRequestBody)")
2553       
2554        name = req.get('name', None)
2555        wait = req.get('wait', False)
2556        rv = { 'name': name }
2557
2558        if not name:
2559            raise service_error(service_error.req, "No name?")
2560
2561        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2562
2563        if access_ok:
2564            self.log.debug("[GetValue] asking for %s " % name)
2565            try:
2566                v = self.synch_store.get_value(name, wait)
2567            except synch_store.RevokedKeyError:
2568                # No more synch on this key
2569                raise service_error(service_error.federant, 
2570                        "Synch key %s revoked" % name)
2571            if v is not None:
2572                rv['value'] = v
2573            rv['proof'] = proof.to_dict()
2574            self.log.debug("[GetValue] got %s from %s" % (v, name))
2575            return rv
2576        else:
2577            raise service_error(service_error.access, "Access Denied",
2578                    proof=proof)
2579       
2580
2581    def SetValue(self, req, fid):
2582        """
2583        Set a value in the synchronized store
2584        """
2585        req = req.get('SetValueRequestBody', None)
2586        if not req:
2587            raise service_error(service_error.req,
2588                    "Bad request format (no SetValueRequestBody)")
2589       
2590        name = req.get('name', None)
2591        v = req.get('value', '')
2592
2593        if not name:
2594            raise service_error(service_error.req, "No name?")
2595
2596        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2597
2598        if access_ok:
2599            try:
2600                self.synch_store.set_value(name, v)
2601                self.write_store()
2602                self.log.debug("[SetValue] set %s to %s" % (name, v))
2603            except synch_store.CollisionError:
2604                # Translate into a service_error
2605                raise service_error(service_error.req,
2606                        "Value already set: %s" %name)
2607            except synch_store.RevokedKeyError:
2608                # No more synch on this key
2609                raise service_error(service_error.federant, 
2610                        "Synch key %s revoked" % name)
2611                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2612        else:
2613            raise service_error(service_error.access, "Access Denied",
2614                    proof=proof)
Note: See TracBrowser for help on using the repository browser.