source: fedd/federation/experiment_control.py @ 6e33086

compt_changesinfo-ops
Last change on this file since 6e33086 was 6e33086, checked in by Ted Faber <faber@…>, 13 years ago

InfoSegment? to emulab access controllers

  • Property mode set to 100644
File size: 86.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from fedid import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info
39
40import topdl
41import list_log
42from ip_allocator import ip_allocator
43from ip_addr import ip_addr
44
45
46class nullHandler(logging.Handler):
47    def emit(self, record): pass
48
49fl = logging.getLogger("fedd.experiment_control")
50fl.addHandler(nullHandler())
51
52
53# Right now, no support for composition.
54class federated_service:
55    def __init__(self, name, exporter=None, importers=None, params=None, 
56            reqs=None, portal=None):
57        self.name=name
58        self.exporter=exporter
59        if importers is None: self.importers = []
60        else: self.importers=importers
61        if params is None: self.params = { }
62        else: self.params = params
63        if reqs is None: self.reqs = []
64        else: self.reqs = reqs
65
66        if portal is not None:
67            self.portal = portal
68        else:
69            self.portal = (name in federated_service.needs_portal)
70
71    def __str__(self):
72        return "name %s export %s import %s params %s reqs %s" % \
73                (self.name, self.exporter, self.importers, self.params,
74                        [ (r['name'], r['visibility']) for r in self.reqs] )
75
76    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
77
78class experiment_control_local(experiment_control_legacy):
79    """
80    Control of experiments that this system can directly access.
81
82    Includes experiment creation, termination and information dissemination.
83    Thred safe.
84    """
85
86    class ssh_cmd_timeout(RuntimeError): pass
87   
88    call_RequestAccess = service_caller('RequestAccess')
89    call_ReleaseAccess = service_caller('ReleaseAccess')
90    call_StartSegment = service_caller('StartSegment')
91    call_TerminateSegment = service_caller('TerminateSegment')
92    call_InfoSegment = service_caller('InfoSegment')
93    call_Ns2Topdl = service_caller('Ns2Topdl')
94
95    def __init__(self, config=None, auth=None):
96        """
97        Intialize the various attributes, most from the config object
98        """
99
100        def parse_tarfile_list(tf):
101            """
102            Parse a tarfile list from the configuration.  This is a set of
103            paths and tarfiles separated by spaces.
104            """
105            rv = [ ]
106            if tf is not None:
107                tl = tf.split()
108                while len(tl) > 1:
109                    p, t = tl[0:2]
110                    del tl[0:2]
111                    rv.append((p, t))
112            return rv
113
114        self.list_log = list_log.list_log
115
116        self.cert_file = config.get("experiment_control", "cert_file")
117        if self.cert_file:
118            self.cert_pwd = config.get("experiment_control", "cert_pwd")
119        else:
120            self.cert_file = config.get("globals", "cert_file")
121            self.cert_pwd = config.get("globals", "cert_pwd")
122
123        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
124                or config.get("globals", "trusted_certs")
125
126        self.repodir = config.get("experiment_control", "repodir")
127        self.repo_url = config.get("experiment_control", "repo_url", 
128                "https://users.isi.deterlab.net:23235");
129
130        self.exp_stem = "fed-stem"
131        self.log = logging.getLogger("fedd.experiment_control")
132        set_log_level(config, "experiment_control", self.log)
133        self.muxmax = 2
134        self.nthreads = 10
135        self.randomize_experiments = False
136
137        self.splitter = None
138        self.ssh_keygen = "/usr/bin/ssh-keygen"
139        self.ssh_identity_file = None
140
141
142        self.debug = config.getboolean("experiment_control", "create_debug")
143        self.cleanup = not config.getboolean("experiment_control", 
144                "leave_tmpfiles")
145        self.state_filename = config.get("experiment_control", 
146                "experiment_state")
147        self.store_filename = config.get("experiment_control", 
148                "synch_store")
149        self.store_url = config.get("experiment_control", "store_url")
150        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
151        self.fedkit = parse_tarfile_list(\
152                config.get("experiment_control", "fedkit"))
153        self.gatewaykit = parse_tarfile_list(\
154                config.get("experiment_control", "gatewaykit"))
155        accessdb_file = config.get("experiment_control", "accessdb")
156
157        dt = config.get("experiment_control", "direct_transit")
158        self.auth_type = config.get('experiment_control', 'auth_type') \
159                or 'legacy'
160        self.auth_dir = config.get('experiment_control', 'auth_dir')
161        # XXX: document this!
162        self.info_cache_limit = \
163                config.getint('experiment_control', 'info_cache', 600)
164        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
165        else: self.direct_transit = [ ]
166        # NB for internal master/slave ops, not experiment setup
167        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
168
169        self.overrides = set([])
170        ovr = config.get('experiment_control', 'overrides')
171        if ovr:
172            for o in ovr.split(","):
173                o = o.strip()
174                if o.startswith('fedid:'): o = o[len('fedid:'):]
175                self.overrides.add(fedid(hexstr=o))
176
177        self.state = { }
178        self.state_lock = Lock()
179        self.tclsh = "/usr/local/bin/otclsh"
180        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
181                config.get("experiment_control", "tcl_splitter",
182                        "/usr/testbed/lib/ns2ir/parse.tcl")
183        mapdb_file = config.get("experiment_control", "mapdb")
184        self.trace_file = sys.stderr
185
186        self.def_expstart = \
187                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
188                "/tmp/federate";
189        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
190                "FEDDIR/hosts";
191        self.def_gwstart = \
192                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
193                "/tmp/bridge.log";
194        self.def_mgwstart = \
195                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
196                "/tmp/bridge.log";
197        self.def_gwimage = "FBSD61-TUNNEL2";
198        self.def_gwtype = "pc";
199        self.local_access = { }
200
201        if self.auth_type == 'legacy':
202            if auth:
203                self.auth = auth
204            else:
205                self.log.error( "[access]: No authorizer initialized, " +\
206                        "creating local one.")
207                auth = authorizer()
208            self.get_access = self.legacy_get_access
209        elif self.auth_type == 'abac':
210            self.auth = abac_authorizer(load=self.auth_dir)
211        else:
212            raise service_error(service_error.internal, 
213                    "Unknown auth_type: %s" % self.auth_type)
214
215        if mapdb_file:
216            self.read_mapdb(mapdb_file)
217        else:
218            self.log.warn("[experiment_control] No testbed map, using defaults")
219            self.tbmap = { 
220                    'deter':'https://users.isi.deterlab.net:23235',
221                    'emulab':'https://users.isi.deterlab.net:23236',
222                    'ucb':'https://users.isi.deterlab.net:23237',
223                    }
224
225        if accessdb_file:
226                self.read_accessdb(accessdb_file)
227        else:
228            raise service_error(service_error.internal,
229                    "No accessdb specified in config")
230
231        # Grab saved state.  OK to do this w/o locking because it's read only
232        # and only one thread should be in existence that can see self.state at
233        # this point.
234        if self.state_filename:
235            self.read_state()
236
237        if self.store_filename:
238            self.read_store()
239        else:
240            self.log.warning("No saved synch store")
241            self.synch_store = synch_store
242
243        # Dispatch tables
244        self.soap_services = {\
245                'New': soap_handler('New', self.new_experiment),
246                'Create': soap_handler('Create', self.create_experiment),
247                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
248                'Vis': soap_handler('Vis', self.get_vis),
249                'Info': soap_handler('Info', self.get_info),
250                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
251                'Terminate': soap_handler('Terminate', 
252                    self.terminate_experiment),
253                'GetValue': soap_handler('GetValue', self.GetValue),
254                'SetValue': soap_handler('SetValue', self.SetValue),
255        }
256
257        self.xmlrpc_services = {\
258                'New': xmlrpc_handler('New', self.new_experiment),
259                'Create': xmlrpc_handler('Create', self.create_experiment),
260                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
261                'Vis': xmlrpc_handler('Vis', self.get_vis),
262                'Info': xmlrpc_handler('Info', self.get_info),
263                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
264                'Terminate': xmlrpc_handler('Terminate',
265                    self.terminate_experiment),
266                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
267                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
268        }
269
270    # Call while holding self.state_lock
271    def write_state(self):
272        """
273        Write a new copy of experiment state after copying the existing state
274        to a backup.
275
276        State format is a simple pickling of the state dictionary.
277        """
278        if os.access(self.state_filename, os.W_OK):
279            copy_file(self.state_filename, \
280                    "%s.bak" % self.state_filename)
281        try:
282            f = open(self.state_filename, 'w')
283            pickle.dump(self.state, f)
284        except EnvironmentError, e:
285            self.log.error("Can't write file %s: %s" % \
286                    (self.state_filename, e))
287        except pickle.PicklingError, e:
288            self.log.error("Pickling problem: %s" % e)
289        except TypeError, e:
290            self.log.error("Pickling problem (TypeError): %s" % e)
291
292    @staticmethod
293    def get_alloc_ids(exp):
294        """
295        Used by read_store and read state.  This used to be worse.
296        """
297
298        return [ a.allocID for a in exp.get_all_allocations() ]
299
300
301    # Call while holding self.state_lock
302    def read_state(self):
303        """
304        Read a new copy of experiment state.  Old state is overwritten.
305
306        State format is a simple pickling of the state dictionary.
307        """
308       
309        try:
310            f = open(self.state_filename, "r")
311            self.state = pickle.load(f)
312            self.log.debug("[read_state]: Read state from %s" % \
313                    self.state_filename)
314        except EnvironmentError, e:
315            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
316                    % (self.state_filename, e))
317        except pickle.UnpicklingError, e:
318            self.log.warning(("[read_state]: No saved state: " + \
319                    "Unpickling failed: %s") % e)
320       
321        for s in self.state.values():
322            try:
323
324                eid = s.fedid
325                if eid : 
326                    if self.auth_type == 'legacy':
327                        # XXX: legacy
328                        # Give the owner rights to the experiment
329                        #self.auth.set_attribute(s['owner'], eid)
330                        # And holders of the eid as well
331                        self.auth.set_attribute(eid, eid)
332                        # allow overrides to control experiments as well
333                        for o in self.overrides:
334                            self.auth.set_attribute(o, eid)
335                        # Set permissions to allow reading of the software
336                        # repo, if any, as well.
337                        for a in self.get_alloc_ids(s):
338                            self.auth.set_attribute(a, 'repo/%s' % eid)
339                else:
340                    raise KeyError("No experiment id")
341            except KeyError, e:
342                self.log.warning("[read_state]: State ownership or identity " +\
343                        "misformatted in %s: %s" % (self.state_filename, e))
344
345
346    def read_accessdb(self, accessdb_file):
347        """
348        Read the mapping from fedids that can create experiments to their name
349        in the 3-level access namespace.  All will be asserted from this
350        testbed and can include the local username and porject that will be
351        asserted on their behalf by this fedd.  Each fedid is also added to the
352        authorization system with the "create" attribute.
353        """
354        self.accessdb = {}
355        # These are the regexps for parsing the db
356        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
357        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
358                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
359        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
360                "\s*->\s*(" + name_expr + ")\s*$")
361        lineno = 0
362
363        # Parse the mappings and store in self.authdb, a dict of
364        # fedid -> (proj, user)
365        try:
366            f = open(accessdb_file, "r")
367            for line in f:
368                lineno += 1
369                line = line.strip()
370                if len(line) == 0 or line.startswith('#'):
371                    continue
372                m = project_line.match(line)
373                if m:
374                    fid = fedid(hexstr=m.group(1))
375                    project, user = m.group(2,3)
376                    if not self.accessdb.has_key(fid):
377                        self.accessdb[fid] = []
378                    self.accessdb[fid].append((project, user))
379                    continue
380
381                m = user_line.match(line)
382                if m:
383                    fid = fedid(hexstr=m.group(1))
384                    project = None
385                    user = m.group(2)
386                    if not self.accessdb.has_key(fid):
387                        self.accessdb[fid] = []
388                    self.accessdb[fid].append((project, user))
389                    continue
390                self.log.warn("[experiment_control] Error parsing access " +\
391                        "db %s at line %d" %  (accessdb_file, lineno))
392        except EnvironmentError:
393            raise service_error(service_error.internal,
394                    ("Error opening/reading %s as experiment " +\
395                            "control accessdb") %  accessdb_file)
396        f.close()
397
398        # Initialize the authorization attributes
399        # XXX: legacy
400        if self.auth_type == 'legacy':
401            for fid in self.accessdb.keys():
402                self.auth.set_attribute(fid, 'create')
403                self.auth.set_attribute(fid, 'new')
404
405    def read_mapdb(self, file):
406        """
407        Read a simple colon separated list of mappings for the
408        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
409        """
410
411        self.tbmap = { }
412        lineno =0
413        try:
414            f = open(file, "r")
415            for line in f:
416                lineno += 1
417                line = line.strip()
418                if line.startswith('#') or len(line) == 0:
419                    continue
420                try:
421                    label, url = line.split(':', 1)
422                    self.tbmap[label] = url
423                except ValueError, e:
424                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
425                            "map db: %s %s" % (lineno, line, e))
426        except EnvironmentError, e:
427            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
428                    "open %s: %s" % (file, e))
429        else:
430            f.close()
431
432    def read_store(self):
433        try:
434            self.synch_store = synch_store()
435            self.synch_store.load(self.store_filename)
436            self.log.debug("[read_store]: Read store from %s" % \
437                    self.store_filename)
438        except EnvironmentError, e:
439            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
440                    % (self.state_filename, e))
441            self.synch_store = synch_store()
442
443        # Set the initial permissions on data in the store.  XXX: This ad hoc
444        # authorization attribute initialization is getting out of hand.
445        # XXX: legacy
446        if self.auth_type == 'legacy':
447            for k in self.synch_store.all_keys():
448                try:
449                    if k.startswith('fedid:'):
450                        fid = fedid(hexstr=k[6:46])
451                        if self.state.has_key(fid):
452                            for a in self.get_alloc_ids(self.state[fid]):
453                                self.auth.set_attribute(a, k)
454                except ValueError, e:
455                    self.log.warn("Cannot deduce permissions for %s" % k)
456
457
458    def write_store(self):
459        """
460        Write a new copy of synch_store after writing current state
461        to a backup.  We use the internal synch_store pickle method to avoid
462        incinsistent data.
463
464        State format is a simple pickling of the store.
465        """
466        if os.access(self.store_filename, os.W_OK):
467            copy_file(self.store_filename, \
468                    "%s.bak" % self.store_filename)
469        try:
470            self.synch_store.save(self.store_filename)
471        except EnvironmentError, e:
472            self.log.error("Can't write file %s: %s" % \
473                    (self.store_filename, e))
474        except TypeError, e:
475            self.log.error("Pickling problem (TypeError): %s" % e)
476
477
478    def remove_dirs(self, dir):
479        """
480        Remove the directory tree and all files rooted at dir.  Log any errors,
481        but continue.
482        """
483        self.log.debug("[removedirs]: removing %s" % dir)
484        try:
485            for path, dirs, files in os.walk(dir, topdown=False):
486                for f in files:
487                    os.remove(os.path.join(path, f))
488                for d in dirs:
489                    os.rmdir(os.path.join(path, d))
490            os.rmdir(dir)
491        except EnvironmentError, e:
492            self.log.error("Error deleting directory tree in %s" % e);
493
494    @staticmethod
495    def make_temp_certfile(expcert, tmpdir):
496        """
497        make a protected copy of the access certificate so the experiment
498        controller can act as the experiment principal.  mkstemp is the most
499        secure way to do that. The directory should be created by
500        mkdtemp.  Return the filename.
501        """
502        if expcert and tmpdir:
503            try:
504                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
505                f = os.fdopen(certf, 'w')
506                print >> f, expcert
507                f.close()
508            except EnvironmentError, e:
509                raise service_error(service_error.internal, 
510                        "Cannot create temp cert file?")
511            return certfn
512        else:
513            return None
514
515       
516    def generate_ssh_keys(self, dest, type="rsa" ):
517        """
518        Generate a set of keys for the gateways to use to talk.
519
520        Keys are of type type and are stored in the required dest file.
521        """
522        valid_types = ("rsa", "dsa")
523        t = type.lower();
524        if t not in valid_types: raise ValueError
525        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
526
527        try:
528            trace = open("/dev/null", "w")
529        except EnvironmentError:
530            raise service_error(service_error.internal,
531                    "Cannot open /dev/null??");
532
533        # May raise CalledProcessError
534        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
535        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
536        if rv != 0:
537            raise service_error(service_error.internal, 
538                    "Cannot generate nonce ssh keys.  %s return code %d" \
539                            % (self.ssh_keygen, rv))
540
541    def generate_seer_certs(self, destdir):
542        '''
543        Create a SEER ca cert and a node cert in destdir/ca.pem and
544        destdir/node.pem respectively.  These will be distributed throughout
545        the federated experiment.  This routine reports errors via
546        service_errors.
547        '''
548        openssl = '/usr/bin/openssl'
549        # All the filenames and parameters we need for openssl calls below
550        ca_key =os.path.join(destdir, 'ca.key') 
551        ca_pem = os.path.join(destdir, 'ca.pem')
552        node_key =os.path.join(destdir, 'node.key') 
553        node_pem = os.path.join(destdir, 'node.pem')
554        node_req = os.path.join(destdir, 'node.req')
555        node_signed = os.path.join(destdir, 'node.signed')
556        days = '%s' % (365 * 10)
557        serial = '%s' % random.randint(0, 1<<16)
558
559        try:
560            # Sequence of calls to create a CA key, create a ca cert, create a
561            # node key, node signing request, and finally a signed node
562            # certificate.
563            sequence = (
564                    (openssl, 'genrsa', '-out', ca_key, '1024'),
565                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
566                        ca_pem, '-days', days, '-subj', 
567                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
568                    (openssl, 'genrsa', '-out', node_key, '1024'),
569                    (openssl, 'req', '-new', '-key', node_key, '-out', 
570                        node_req, '-days', days, '-subj', 
571                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
572                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
573                        '-set_serial', serial, '-req', '-in', node_req, 
574                        '-out', node_signed, '-days', days),
575                )
576            # Do all that stuff; bail if there's an error, and push all the
577            # output to dev/null.
578            for cmd in sequence:
579                trace = open("/dev/null", "w")
580                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
581                if rv != 0:
582                    raise service_error(service_error.internal, 
583                            "Cannot generate SEER certs.  %s return code %d" \
584                                    % (' '.join(cmd), rv))
585            # Concatinate the node key and signed certificate into node.pem
586            f = open(node_pem, 'w')
587            for comp in (node_signed, node_key):
588                g = open(comp, 'r')
589                f.write(g.read())
590                g.close()
591            f.close()
592
593            # Throw out intermediaries.
594            for fn in (ca_key, node_key, node_req, node_signed):
595                os.unlink(fn)
596
597        except EnvironmentError, e:
598            # Any difficulties with the file system wind up here
599            raise service_error(service_error.internal,
600                    "File error on  %s while creating SEER certs: %s" % \
601                            (e.filename, e.strerror))
602
603
604
605    def gentopo(self, str):
606        """
607        Generate the topology data structure from the splitter's XML
608        representation of it.
609
610        The topology XML looks like:
611            <experiment>
612                <nodes>
613                    <node><vname></vname><ips>ip1:ip2</ips></node>
614                </nodes>
615                <lans>
616                    <lan>
617                        <vname></vname><vnode></vnode><ip></ip>
618                        <bandwidth></bandwidth><member>node:port</member>
619                    </lan>
620                </lans>
621        """
622        class topo_parse:
623            """
624            Parse the topology XML and create the dats structure.
625            """
626            def __init__(self):
627                # Typing of the subelements for data conversion
628                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
629                self.int_subelements = ( 'bandwidth',)
630                self.float_subelements = ( 'delay',)
631                # The final data structure
632                self.nodes = [ ]
633                self.lans =  [ ]
634                self.topo = { \
635                        'node': self.nodes,\
636                        'lan' : self.lans,\
637                    }
638                self.element = { }  # Current element being created
639                self.chars = ""     # Last text seen
640
641            def end_element(self, name):
642                # After each sub element the contents is added to the current
643                # element or to the appropriate list.
644                if name == 'node':
645                    self.nodes.append(self.element)
646                    self.element = { }
647                elif name == 'lan':
648                    self.lans.append(self.element)
649                    self.element = { }
650                elif name in self.str_subelements:
651                    self.element[name] = self.chars
652                    self.chars = ""
653                elif name in self.int_subelements:
654                    self.element[name] = int(self.chars)
655                    self.chars = ""
656                elif name in self.float_subelements:
657                    self.element[name] = float(self.chars)
658                    self.chars = ""
659
660            def found_chars(self, data):
661                self.chars += data.rstrip()
662
663
664        tp = topo_parse();
665        parser = xml.parsers.expat.ParserCreate()
666        parser.EndElementHandler = tp.end_element
667        parser.CharacterDataHandler = tp.found_chars
668
669        parser.Parse(str)
670
671        return tp.topo
672       
673
674    def genviz(self, topo):
675        """
676        Generate the visualization the virtual topology
677        """
678
679        neato = "/usr/local/bin/neato"
680        # These are used to parse neato output and to create the visualization
681        # file.
682        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
683        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
684                "%s</type></node>"
685
686        try:
687            # Node names
688            nodes = [ n['vname'] for n in topo['node'] ]
689            topo_lans = topo['lan']
690        except KeyError, e:
691            raise service_error(service_error.internal, "Bad topology: %s" %e)
692
693        lans = { }
694        links = { }
695
696        # Walk through the virtual topology, organizing the connections into
697        # 2-node connections (links) and more-than-2-node connections (lans).
698        # When a lan is created, it's added to the list of nodes (there's a
699        # node in the visualization for the lan).
700        for l in topo_lans:
701            if links.has_key(l['vname']):
702                if len(links[l['vname']]) < 2:
703                    links[l['vname']].append(l['vnode'])
704                else:
705                    nodes.append(l['vname'])
706                    lans[l['vname']] = links[l['vname']]
707                    del links[l['vname']]
708                    lans[l['vname']].append(l['vnode'])
709            elif lans.has_key(l['vname']):
710                lans[l['vname']].append(l['vnode'])
711            else:
712                links[l['vname']] = [ l['vnode'] ]
713
714
715        # Open up a temporary file for dot to turn into a visualization
716        try:
717            df, dotname = tempfile.mkstemp()
718            dotfile = os.fdopen(df, 'w')
719        except EnvironmentError:
720            raise service_error(service_error.internal,
721                    "Failed to open file in genviz")
722
723        try:
724            dnull = open('/dev/null', 'w')
725        except EnvironmentError:
726            service_error(service_error.internal,
727                    "Failed to open /dev/null in genviz")
728
729        # Generate a dot/neato input file from the links, nodes and lans
730        try:
731            print >>dotfile, "graph G {"
732            for n in nodes:
733                print >>dotfile, '\t"%s"' % n
734            for l in links.keys():
735                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
736            for l in lans.keys():
737                for n in lans[l]:
738                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
739            print >>dotfile, "}"
740            dotfile.close()
741        except TypeError:
742            raise service_error(service_error.internal,
743                    "Single endpoint link in vtopo")
744        except EnvironmentError:
745            raise service_error(service_error.internal, "Cannot write dot file")
746
747        # Use dot to create a visualization
748        try:
749            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
750                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
751                stderr=dnull, close_fds=True)
752        except EnvironmentError:
753            raise service_error(service_error.internal, 
754                    "Cannot generate visualization: is graphviz available?")
755        dnull.close()
756
757        # Translate dot to vis format
758        vis_nodes = [ ]
759        vis = { 'node': vis_nodes }
760        for line in dot.stdout:
761            m = vis_re.match(line)
762            if m:
763                vn = m.group(1)
764                vis_node = {'name': vn, \
765                        'x': float(m.group(2)),\
766                        'y' : float(m.group(3)),\
767                    }
768                if vn in links.keys() or vn in lans.keys():
769                    vis_node['type'] = 'lan'
770                else:
771                    vis_node['type'] = 'node'
772                vis_nodes.append(vis_node)
773        rv = dot.wait()
774
775        os.remove(dotname)
776        # XXX: graphviz seems to use low return codes for warnings, like
777        # "couldn't find font"
778        if rv < 2 : return vis
779        else: return None
780
781
782    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
783            cert_pwd=None):
784        """
785        Release access to testbed through fedd
786        """
787
788        if not uri and tbmap:
789            uri = tbmap.get(tb, None)
790        if not uri:
791            raise service_error(service_error.server_config, 
792                    "Unknown testbed: %s" % tb)
793
794        if self.local_access.has_key(uri):
795            resp = self.local_access[uri].ReleaseAccess(\
796                    { 'ReleaseAccessRequestBody' : 
797                        {'allocID': {'fedid': aid}},}, 
798                    fedid(file=cert_file))
799            resp = { 'ReleaseAccessResponseBody': resp } 
800        else:
801            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
802                    cert_file, cert_pwd, self.trusted_certs)
803
804        # better error coding
805
806    def remote_ns2topdl(self, uri, desc):
807
808        req = {
809                'description' : { 'ns2description': desc },
810            }
811
812        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
813                self.trusted_certs)
814
815        if r.has_key('Ns2TopdlResponseBody'):
816            r = r['Ns2TopdlResponseBody']
817            ed = r.get('experimentdescription', None)
818            if ed.has_key('topdldescription'):
819                return topdl.Topology(**ed['topdldescription'])
820            else:
821                raise service_error(service_error.protocol, 
822                        "Bad splitter response (no output)")
823        else:
824            raise service_error(service_error.protocol, "Bad splitter response")
825
826    class start_segment:
827        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
828                cert_pwd=None, trusted_certs=None, caller=None,
829                log_collector=None):
830            self.log = log
831            self.debug = debug
832            self.cert_file = cert_file
833            self.cert_pwd = cert_pwd
834            self.trusted_certs = None
835            self.caller = caller
836            self.testbed = testbed
837            self.log_collector = log_collector
838            self.response = None
839            self.node = { }
840            self.proof = None
841
842        def make_map(self, resp):
843            if 'segmentdescription' not in resp  or \
844                    'topdldescription' not in resp['segmentdescription']:
845                self.log.warn('No topology returned from startsegment')
846                return 
847
848            top = topdl.Topology(
849                    **resp['segmentdescription']['topdldescription'])
850
851            for e in top.elements:
852                if isinstance(e, topdl.Computer):
853                    self.node[e.name] = \
854                            (e.localname, e.status, e.service, e.operation)
855
856        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
857            req = {
858                    'allocID': { 'fedid' : aid }, 
859                    'segmentdescription': { 
860                        'topdldescription': topo.to_dict(),
861                    },
862                }
863
864            if connInfo:
865                req['connection'] = connInfo
866
867            import_svcs = [ s for m in masters.values() \
868                    for s in m if self.testbed in s.importers]
869
870            if import_svcs or self.testbed in masters:
871                req['service'] = []
872
873            for s in import_svcs:
874                for r in s.reqs:
875                    sr = copy.deepcopy(r)
876                    sr['visibility'] = 'import';
877                    req['service'].append(sr)
878
879            for s in masters.get(self.testbed, []):
880                for r in s.reqs:
881                    sr = copy.deepcopy(r)
882                    sr['visibility'] = 'export';
883                    req['service'].append(sr)
884
885            if attrs:
886                req['fedAttr'] = attrs
887
888            try:
889                self.log.debug("Calling StartSegment at %s " % uri)
890                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
891                        self.trusted_certs)
892                if r.has_key('StartSegmentResponseBody'):
893                    lval = r['StartSegmentResponseBody'].get('allocationLog',
894                            None)
895                    if lval and self.log_collector:
896                        for line in  lval.splitlines(True):
897                            self.log_collector.write(line)
898                    self.make_map(r['StartSegmentResponseBody'])
899                    if 'proof' in r: self.proof = r['proof']
900                    self.response = r
901                else:
902                    raise service_error(service_error.internal, 
903                            "Bad response!?: %s" %r)
904                return True
905            except service_error, e:
906                self.log.error("Start segment failed on %s: %s" % \
907                        (self.testbed, e))
908                return False
909
910
911
912    class terminate_segment:
913        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
914                cert_pwd=None, trusted_certs=None, caller=None):
915            self.log = log
916            self.debug = debug
917            self.cert_file = cert_file
918            self.cert_pwd = cert_pwd
919            self.trusted_certs = None
920            self.caller = caller
921            self.testbed = testbed
922
923        def __call__(self, uri, aid ):
924            req = {
925                    'allocID': {'fedid': aid }, 
926                }
927            try:
928                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
929                        self.trusted_certs)
930                return True
931            except service_error, e:
932                self.log.error("Terminate segment failed on %s: %s" % \
933                        (self.testbed, e))
934                return False
935
936    class info_segment(start_segment):
937        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
938                cert_pwd=None, trusted_certs=None, caller=None,
939                log_collector=None):
940            experiment_control_local.start_segment.__init__(self, debug, 
941                    log, testbed, cert_file, cert_pwd, trusted_certs, 
942                    caller, log_collector)
943
944        def __call__(self, uri, aid):
945            req = { 'allocID': { 'fedid' : aid } }
946
947            try:
948                self.log.debug("Calling InfoSegment at %s " % uri)
949                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
950                        self.trusted_certs)
951                if r.has_key('InfoSegmentResponseBody'):
952                    self.make_map(r['InfoSegmentResponseBody'])
953                    if 'proof' in r: self.proof = r['proof']
954                    self.response = r
955                else:
956                    raise service_error(service_error.internal, 
957                            "Bad response!?: %s" %r)
958                return True
959            except service_error, e:
960                self.log.error("Info segment failed on %s: %s" % \
961                        (self.testbed, e))
962                return False
963
964    def annotate_topology(self, top, data):
965        def add_new(ann, attr):
966            for a in ann:
967                if a not in attr: attr.append(a)
968
969        # Annotate the topology with embedding info
970        for e in top.elements:
971            if isinstance(e, topdl.Computer):
972                for s in data:
973                    ann = s.node.get(e.name, None)
974                    if ann is not None:
975                        add_new(ann[0], e.localname)
976                        e.status = ann[1]
977                        add_new(ann[2], e.service)
978                        add_new(ann[3], e.operation)
979                        break
980
981
982
983    def allocate_resources(self, allocated, masters, eid, expid, 
984            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
985            attrs=None, connInfo={}, tbmap=None, expcert=None):
986
987        started = { }           # Testbeds where a sub-experiment started
988                                # successfully
989
990        # XXX
991        fail_soft = False
992
993        if tbmap is None: tbmap = { }
994
995        log = alloc_log or self.log
996
997        tp = thread_pool(self.nthreads)
998        threads = [ ]
999        starters = [ ]
1000
1001        if expcert:
1002            cert = expcert
1003            pw = None
1004        else:
1005            cert = self.cert_file
1006            pw = self.cert_pwd
1007
1008        for tb in allocated.keys():
1009            # Create and start a thread to start the segment, and save it
1010            # to get the return value later
1011            tb_attrs = copy.copy(attrs)
1012            tp.wait_for_slot()
1013            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1014            base, suffix = split_testbed(tb)
1015            if suffix:
1016                tb_attrs.append({'attribute': 'experiment_name', 
1017                    'value': "%s-%s" % (eid, suffix)})
1018            else:
1019                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1020            if not uri:
1021                raise service_error(service_error.internal, 
1022                        "Unknown testbed %s !?" % tb)
1023
1024            aid = tbparams[tb].allocID
1025            if not aid:
1026                raise service_error(service_error.internal, 
1027                        "No alloc id for testbed %s !?" % tb)
1028
1029            s = self.start_segment(log=log, debug=self.debug,
1030                    testbed=tb, cert_file=cert,
1031                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1032                    caller=self.call_StartSegment,
1033                    log_collector=log_collector)
1034            starters.append(s)
1035            t  = pooled_thread(\
1036                    target=s, name=tb,
1037                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1038                    pdata=tp, trace_file=self.trace_file)
1039            threads.append(t)
1040            t.start()
1041
1042        # Wait until all finish (keep pinging the log, though)
1043        mins = 0
1044        revoked = False
1045        while not tp.wait_for_all_done(60.0):
1046            mins += 1
1047            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1048                    % mins)
1049            if not revoked and \
1050                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1051                # a testbed has failed.  Revoke this experiment's
1052                # synchronizarion values so that sub experiments will not
1053                # deadlock waiting for synchronization that will never happen
1054                self.log.info("A subexperiment has failed to swap in, " + \
1055                        "revoking synch keys")
1056                var_key = "fedid:%s" % expid
1057                for k in self.synch_store.all_keys():
1058                    if len(k) > 45 and k[0:46] == var_key:
1059                        self.synch_store.revoke_key(k)
1060                revoked = True
1061
1062        failed = [ t.getName() for t in threads if not t.rv ]
1063        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1064
1065        # If one failed clean up, unless fail_soft is set
1066        if failed:
1067            if not fail_soft:
1068                tp.clear()
1069                for tb in succeeded:
1070                    # Create and start a thread to stop the segment
1071                    tp.wait_for_slot()
1072                    uri = tbparams[tb].uri
1073                    t  = pooled_thread(\
1074                            target=self.terminate_segment(log=log,
1075                                testbed=tb,
1076                                cert_file=cert, 
1077                                cert_pwd=pw,
1078                                trusted_certs=self.trusted_certs,
1079                                caller=self.call_TerminateSegment),
1080                            args=(uri, tbparams[tb].allocID),
1081                            name=tb,
1082                            pdata=tp, trace_file=self.trace_file)
1083                    t.start()
1084                # Wait until all finish (if any are being stopped)
1085                if succeeded:
1086                    tp.wait_for_all_done()
1087
1088                # release the allocations
1089                for tb in tbparams.keys():
1090                    try:
1091                        self.release_access(tb, tbparams[tb].allocID, 
1092                                tbmap=tbmap, uri=tbparams[tb].uri,
1093                                cert_file=cert, cert_pwd=pw)
1094                    except service_error, e:
1095                        self.log.warn("Error releasing access: %s" % e.desc)
1096                # Remove the placeholder
1097                self.state_lock.acquire()
1098                self.state[eid].status = 'failed'
1099                self.state[eid].updated()
1100                if self.state_filename: self.write_state()
1101                self.state_lock.release()
1102                # Remove the repo dir
1103                self.remove_dirs("%s/%s" %(self.repodir, expid))
1104                # Walk up tmpdir, deleting as we go
1105                if self.cleanup:
1106                    self.remove_dirs(tmpdir)
1107                else:
1108                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1109
1110
1111                log.error("Swap in failed on %s" % ",".join(failed))
1112                return
1113        else:
1114            # Walk through the successes and gather the proofs
1115            proofs = { }
1116            for s in starters:
1117                if s.proof:
1118                    proofs[s.testbed] = s.proof
1119            self.annotate_topology(top, starters)
1120            log.info("[start_segment]: Experiment %s active" % eid)
1121
1122
1123        # Walk up tmpdir, deleting as we go
1124        if self.cleanup:
1125            self.remove_dirs(tmpdir)
1126        else:
1127            log.debug("[start_experiment]: not removing %s" % tmpdir)
1128
1129        # Insert the experiment into our state and update the disk copy.
1130        self.state_lock.acquire()
1131        self.state[expid].status = 'active'
1132        self.state[eid] = self.state[expid]
1133        self.state[eid].top = top
1134        self.state[eid].updated()
1135        # Append startup proofs
1136        for f in self.state[eid].get_all_allocations():
1137            if f.tb in proofs:
1138                f.proof.append(proofs[f.tb])
1139
1140        if self.state_filename: self.write_state()
1141        self.state_lock.release()
1142        return
1143
1144
1145    def add_kit(self, e, kit):
1146        """
1147        Add a Software object created from the list of (install, location)
1148        tuples passed as kit  to the software attribute of an object e.  We
1149        do this enough to break out the code, but it's kind of a hack to
1150        avoid changing the old tuple rep.
1151        """
1152
1153        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1154
1155        if isinstance(e.software, list): e.software.extend(s)
1156        else: e.software = s
1157
1158    def append_experiment_authorization(self, expid, attrs, 
1159            need_state_lock=True):
1160        """
1161        Append the authorization information to system state
1162        """
1163
1164        for p, a in attrs:
1165            self.auth.set_attribute(p, a)
1166        self.auth.save()
1167
1168        if need_state_lock: self.state_lock.acquire()
1169        # XXX: really a no op?
1170        #self.state[expid]['auth'].update(attrs)
1171        if self.state_filename: self.write_state()
1172        if need_state_lock: self.state_lock.release()
1173
1174    def clear_experiment_authorization(self, expid, need_state_lock=True):
1175        """
1176        Attrs is a set of attribute principal pairs that need to be removed
1177        from the authenticator.  Remove them and save the authenticator.
1178        """
1179
1180        if need_state_lock: self.state_lock.acquire()
1181        # XXX: should be a no-op
1182        #if expid in self.state and 'auth' in self.state[expid]:
1183            #for p, a in self.state[expid]['auth']:
1184                #self.auth.unset_attribute(p, a)
1185            #self.state[expid]['auth'] = set()
1186        if self.state_filename: self.write_state()
1187        if need_state_lock: self.state_lock.release()
1188        self.auth.save()
1189
1190
1191    def create_experiment_state(self, fid, req, expid, expcert,
1192            state='starting'):
1193        """
1194        Create the initial entry in the experiment's state.  The expid and
1195        expcert are the experiment's fedid and certifacte that represents that
1196        ID, which are installed in the experiment state.  If the request
1197        includes a suggested local name that is used if possible.  If the local
1198        name is already taken by an experiment owned by this user that has
1199        failed, it is overwritten.  Otherwise new letters are added until a
1200        valid localname is found.  The generated local name is returned.
1201        """
1202
1203        if req.has_key('experimentID') and \
1204                req['experimentID'].has_key('localname'):
1205            overwrite = False
1206            eid = req['experimentID']['localname']
1207            # If there's an old failed experiment here with the same local name
1208            # and accessible by this user, we'll overwrite it, otherwise we'll
1209            # fall through and do the collision avoidance.
1210            old_expid = self.get_experiment_fedid(eid)
1211            if old_expid:
1212                users_experiment = True
1213                try:
1214                    self.check_experiment_access(fid, old_expid)
1215                except service_error, e:
1216                    if e.code == service_error.access: users_experiment = False
1217                    else: raise e
1218                if users_experiment:
1219                    self.state_lock.acquire()
1220                    status = self.state[eid].status
1221                    if status and status == 'failed':
1222                        # remove the old access attributes
1223                        self.clear_experiment_authorization(eid,
1224                                need_state_lock=False)
1225                        overwrite = True
1226                        del self.state[eid]
1227                        del self.state[old_expid]
1228                    self.state_lock.release()
1229                else:
1230                    self.log.info('Experiment %s exists, ' % eid + \
1231                            'but this user cannot access it')
1232            self.state_lock.acquire()
1233            while (self.state.has_key(eid) and not overwrite):
1234                eid += random.choice(string.ascii_letters)
1235            # Initial state
1236            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1237                    identity=expcert)
1238            self.state[expid] = self.state[eid]
1239            if self.state_filename: self.write_state()
1240            self.state_lock.release()
1241        else:
1242            eid = self.exp_stem
1243            for i in range(0,5):
1244                eid += random.choice(string.ascii_letters)
1245            self.state_lock.acquire()
1246            while (self.state.has_key(eid)):
1247                eid = self.exp_stem
1248                for i in range(0,5):
1249                    eid += random.choice(string.ascii_letters)
1250            # Initial state
1251            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1252                    identity=expcert)
1253            self.state[expid] = self.state[eid]
1254            if self.state_filename: self.write_state()
1255            self.state_lock.release()
1256
1257        # Let users touch the state.  Authorize this fid and the expid itself
1258        # to touch the experiment, as well as allowing th eoverrides.
1259        self.append_experiment_authorization(eid, 
1260                set([(fid, expid), (expid,expid)] + \
1261                        [ (o, expid) for o in self.overrides]))
1262
1263        return eid
1264
1265
1266    def allocate_ips_to_topo(self, top):
1267        """
1268        Add an ip4_address attribute to all the hosts in the topology, based on
1269        the shared substrates on which they sit.  An /etc/hosts file is also
1270        created and returned as a list of hostfiles entries.  We also return
1271        the allocator, because we may need to allocate IPs to portals
1272        (specifically DRAGON portals).
1273        """
1274        subs = sorted(top.substrates, 
1275                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1276                reverse=True)
1277        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1278        ifs = { }
1279        hosts = [ ]
1280
1281        for idx, s in enumerate(subs):
1282            net_size = len(s.interfaces)+2
1283
1284            a = ips.allocate(net_size)
1285            if a :
1286                base, num = a
1287                if num < net_size: 
1288                    raise service_error(service_error.internal,
1289                            "Allocator returned wrong number of IPs??")
1290            else:
1291                raise service_error(service_error.req, 
1292                        "Cannot allocate IP addresses")
1293            mask = ips.min_alloc
1294            while mask < net_size:
1295                mask *= 2
1296
1297            netmask = ((2**32-1) ^ (mask-1))
1298
1299            base += 1
1300            for i in s.interfaces:
1301                i.attribute.append(
1302                        topdl.Attribute('ip4_address', 
1303                            "%s" % ip_addr(base)))
1304                i.attribute.append(
1305                        topdl.Attribute('ip4_netmask', 
1306                            "%s" % ip_addr(int(netmask))))
1307
1308                hname = i.element.name
1309                if ifs.has_key(hname):
1310                    hosts.append("%s\t%s-%s %s-%d" % \
1311                            (ip_addr(base), hname, s.name, hname,
1312                                ifs[hname]))
1313                else:
1314                    ifs[hname] = 0
1315                    hosts.append("%s\t%s-%s %s-%d %s" % \
1316                            (ip_addr(base), hname, s.name, hname,
1317                                ifs[hname], hname))
1318
1319                ifs[hname] += 1
1320                base += 1
1321        return hosts, ips
1322
1323    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1324            tbparam, masters, tbmap, expid=None, expcert=None):
1325        for tb in testbeds:
1326            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1327                    expcert)
1328            allocated[tb] = 1
1329
1330    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1331            expcert=None):
1332        """
1333        Get access to testbed through fedd and set the parameters for that tb
1334        """
1335        def get_export_project(svcs):
1336            """
1337            Look through for the list of federated_service for this testbed
1338            objects for a project_export service, and extract the project
1339            parameter.
1340            """
1341
1342            pe = [s for s in svcs if s.name=='project_export']
1343            if len(pe) == 1:
1344                return pe[0].params.get('project', None)
1345            elif len(pe) == 0:
1346                return None
1347            else:
1348                raise service_error(service_error.req,
1349                        "More than one project export is not supported")
1350
1351        def add_services(svcs, type, slist, keys):
1352            """
1353            Add the given services to slist.  type is import or export.  Also
1354            add a mapping entry from the assigned id to the original service
1355            record.
1356            """
1357            for i, s in enumerate(svcs):
1358                idx = '%s%d' % (type, i)
1359                keys[idx] = s
1360                sr = {'id': idx, 'name': s.name, 'visibility': type }
1361                if s.params:
1362                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1363                            for k, v in s.params.items()]
1364                slist.append(sr)
1365
1366        uri = tbmap.get(testbed_base(tb), None)
1367        if not uri:
1368            raise service_error(service_error.server_config, 
1369                    "Unknown testbed: %s" % tb)
1370
1371        export_svcs = masters.get(tb,[])
1372        import_svcs = [ s for m in masters.values() \
1373                for s in m \
1374                    if tb in s.importers ]
1375
1376        export_project = get_export_project(export_svcs)
1377        # Compose the credential list so that IDs come before attributes
1378        creds = set()
1379        keys = set()
1380        certs = self.auth.get_creds_for_principal(fid)
1381        if expid:
1382            certs.update(self.auth.get_creds_for_principal(expid))
1383        for c in certs:
1384            keys.add(c.issuer_cert())
1385            creds.add(c.attribute_cert())
1386        creds = list(keys) + list(creds)
1387
1388        if expcert: cert, pw = expcert, None
1389        else: cert, pw = self.cert_file, self.cert_pw
1390
1391        # Request credentials
1392        req = {
1393                'abac_credential': creds,
1394            }
1395        # Make the service request from the services we're importing and
1396        # exporting.  Keep track of the export request ids so we can
1397        # collect the resulting info from the access response.
1398        e_keys = { }
1399        if import_svcs or export_svcs:
1400            slist = []
1401            add_services(import_svcs, 'import', slist, e_keys)
1402            add_services(export_svcs, 'export', slist, e_keys)
1403            req['service'] = slist
1404
1405        if self.local_access.has_key(uri):
1406            # Local access call
1407            req = { 'RequestAccessRequestBody' : req }
1408            r = self.local_access[uri].RequestAccess(req, 
1409                    fedid(file=self.cert_file))
1410            r = { 'RequestAccessResponseBody' : r }
1411        else:
1412            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1413
1414        if r.has_key('RequestAccessResponseBody'):
1415            # Through to here we have a valid response, not a fault.
1416            # Access denied is a fault, so something better or worse than
1417            # access denied has happened.
1418            r = r['RequestAccessResponseBody']
1419            self.log.debug("[get_access] Access granted")
1420        else:
1421            raise service_error(service_error.protocol,
1422                        "Bad proxy response")
1423        if 'proof' not in r:
1424            raise service_error(service_error.protocol,
1425                        "Bad access response (no access proof)")
1426       
1427        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1428                tb=tb, uri=uri, proof=[r['proof']])
1429
1430        # Collect the responses corresponding to the services this testbed
1431        # exports.  These will be the service requests that we will include in
1432        # the start segment requests (with appropriate visibility values) to
1433        # import and export the segments.
1434        for s in r.get('service', []):
1435            id = s.get('id', None)
1436            if id and id in e_keys:
1437                e_keys[id].reqs.append(s)
1438
1439        # Add attributes to parameter space.  We don't allow attributes to
1440        # overlay any parameters already installed.
1441        for a in r.get('fedAttr', []):
1442            try:
1443                if a['attribute']:
1444                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1445            except KeyError:
1446                self.log.error("Bad attribute in response: %s" % a)
1447
1448
1449    def split_topology(self, top, topo, testbeds):
1450        """
1451        Create the sub-topologies that are needed for experiment instantiation.
1452        """
1453        for tb in testbeds:
1454            topo[tb] = top.clone()
1455            # copy in for loop allows deletions from the original
1456            for e in [ e for e in topo[tb].elements]:
1457                etb = e.get_attribute('testbed')
1458                # NB: elements without a testbed attribute won't appear in any
1459                # sub topologies. 
1460                if not etb or etb != tb:
1461                    for i in e.interface:
1462                        for s in i.subs:
1463                            try:
1464                                s.interfaces.remove(i)
1465                            except ValueError:
1466                                raise service_error(service_error.internal,
1467                                        "Can't remove interface??")
1468                    topo[tb].elements.remove(e)
1469            topo[tb].make_indices()
1470
1471    def confirm_software(self, top):
1472        """
1473        Make sure that the software to be loaded in the topo is all available
1474        before we begin making access requests, etc.  This is a subset of
1475        wrangle_software.
1476        """
1477        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1478        pkgs.update([x.location for e in top.elements for x in e.software])
1479
1480        for pkg in pkgs:
1481            loc = pkg
1482
1483            scheme, host, path = urlparse(loc)[0:3]
1484            dest = os.path.basename(path)
1485            if not scheme:
1486                if not loc.startswith('/'):
1487                    loc = "/%s" % loc
1488                loc = "file://%s" %loc
1489            # NB: if scheme was found, loc == pkg
1490            try:
1491                u = urlopen(loc)
1492                u.close()
1493            except Exception, e:
1494                raise service_error(service_error.req, 
1495                        "Cannot open %s: %s" % (loc, e))
1496        return True
1497
1498    def wrangle_software(self, expid, top, topo, tbparams):
1499        """
1500        Copy software out to the repository directory, allocate permissions and
1501        rewrite the segment topologies to look for the software in local
1502        places.
1503        """
1504
1505        # Copy the rpms and tarfiles to a distribution directory from
1506        # which the federants can retrieve them
1507        linkpath = "%s/software" %  expid
1508        softdir ="%s/%s" % ( self.repodir, linkpath)
1509        softmap = { }
1510
1511        # self.fedkit and self.gateway kit are lists of tuples of
1512        # (install_location, download_location) this extracts the download
1513        # locations.
1514        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1515        pkgs.update([x.location for e in top.elements for x in e.software])
1516        try:
1517            os.makedirs(softdir)
1518        except EnvironmentError, e:
1519            raise service_error(
1520                    "Cannot create software directory: %s" % e)
1521        # The actual copying.  Everything's converted into a url for copying.
1522        auth_attrs = set()
1523        for pkg in pkgs:
1524            loc = pkg
1525
1526            scheme, host, path = urlparse(loc)[0:3]
1527            dest = os.path.basename(path)
1528            if not scheme:
1529                if not loc.startswith('/'):
1530                    loc = "/%s" % loc
1531                loc = "file://%s" %loc
1532            # NB: if scheme was found, loc == pkg
1533            try:
1534                u = urlopen(loc)
1535            except Exception, e:
1536                raise service_error(service_error.req, 
1537                        "Cannot open %s: %s" % (loc, e))
1538            try:
1539                f = open("%s/%s" % (softdir, dest) , "w")
1540                self.log.debug("Writing %s/%s" % (softdir,dest) )
1541                data = u.read(4096)
1542                while data:
1543                    f.write(data)
1544                    data = u.read(4096)
1545                f.close()
1546                u.close()
1547            except Exception, e:
1548                raise service_error(service_error.internal,
1549                        "Could not copy %s: %s" % (loc, e))
1550            path = re.sub("/tmp", "", linkpath)
1551            # XXX
1552            softmap[pkg] = \
1553                    "%s/%s/%s" %\
1554                    ( self.repo_url, path, dest)
1555
1556            # Allow the individual segments to access the software by assigning
1557            # an attribute to each testbed allocation that encodes the data to
1558            # be released.  This expression collects the data for each run of
1559            # the loop.
1560            auth_attrs.update([
1561                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1562                        for tb in tbparams.keys()])
1563
1564        self.append_experiment_authorization(expid, auth_attrs)
1565
1566        # Convert the software locations in the segments into the local
1567        # copies on this host
1568        for soft in [ s for tb in topo.values() \
1569                for e in tb.elements \
1570                    if getattr(e, 'software', False) \
1571                        for s in e.software ]:
1572            if softmap.has_key(soft.location):
1573                soft.location = softmap[soft.location]
1574
1575
1576    def new_experiment(self, req, fid):
1577        """
1578        The external interface to empty initial experiment creation called from
1579        the dispatcher.
1580
1581        Creates a working directory, splits the incoming description using the
1582        splitter script and parses out the avrious subsections using the
1583        lcasses above.  Once each sub-experiment is created, use pooled threads
1584        to instantiate them and start it all up.
1585        """
1586        req = req.get('NewRequestBody', None)
1587        if not req:
1588            raise service_error(service_error.req,
1589                    "Bad request format (no NewRequestBody)")
1590
1591        if self.auth.import_credentials(data_list=req.get('credential', [])):
1592            self.auth.save()
1593
1594        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1595                with_proof=True)
1596
1597        if not access_ok:
1598            raise service_error(service_error.access, "New access denied",
1599                    proof=[proof])
1600
1601        try:
1602            tmpdir = tempfile.mkdtemp(prefix="split-")
1603        except EnvironmentError:
1604            raise service_error(service_error.internal, "Cannot create tmp dir")
1605
1606        try:
1607            access_user = self.accessdb[fid]
1608        except KeyError:
1609            raise service_error(service_error.internal,
1610                    "Access map and authorizer out of sync in " + \
1611                            "new_experiment for fedid %s"  % fid)
1612
1613        # Generate an ID for the experiment (slice) and a certificate that the
1614        # allocator can use to prove they own it.  We'll ship it back through
1615        # the encrypted connection.  If the requester supplied one, use it.
1616        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1617            expcert = req['experimentAccess']['X509']
1618            expid = fedid(certstr=expcert)
1619            self.state_lock.acquire()
1620            if expid in self.state:
1621                self.state_lock.release()
1622                raise service_error(service_error.req, 
1623                        'fedid %s identifies an existing experiment' % expid)
1624            self.state_lock.release()
1625        else:
1626            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1627
1628        #now we're done with the tmpdir, and it should be empty
1629        if self.cleanup:
1630            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1631            os.rmdir(tmpdir)
1632        else:
1633            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1634
1635        eid = self.create_experiment_state(fid, req, expid, expcert, 
1636                state='empty')
1637
1638        rv = {
1639                'experimentID': [
1640                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1641                ],
1642                'experimentStatus': 'empty',
1643                'experimentAccess': { 'X509' : expcert },
1644                'proof': proof.to_dict(),
1645            }
1646
1647        return rv
1648
1649    # create_experiment sub-functions
1650
1651    @staticmethod
1652    def get_experiment_key(req, field='experimentID'):
1653        """
1654        Parse the experiment identifiers out of the request (the request body
1655        tag has been removed).  Specifically this pulls either the fedid or the
1656        localname out of the experimentID field.  A fedid is preferred.  If
1657        neither is present or the request does not contain the fields,
1658        service_errors are raised.
1659        """
1660        # Get the experiment access
1661        exp = req.get(field, None)
1662        if exp:
1663            if exp.has_key('fedid'):
1664                key = exp['fedid']
1665            elif exp.has_key('localname'):
1666                key = exp['localname']
1667            else:
1668                raise service_error(service_error.req, "Unknown lookup type")
1669        else:
1670            raise service_error(service_error.req, "No request?")
1671
1672        return key
1673
1674    def get_experiment_ids_and_start(self, key, tmpdir):
1675        """
1676        Get the experiment name, id and access certificate from the state, and
1677        set the experiment state to 'starting'.  returns a triple (fedid,
1678        localname, access_cert_file). The access_cert_file is a copy of the
1679        contents of the access certificate, created in the tempdir with
1680        restricted permissions.  If things are confused, raise an exception.
1681        """
1682
1683        expid = eid = None
1684        self.state_lock.acquire()
1685        if key in self.state:
1686            exp = self.state[key]
1687            exp.status = "starting"
1688            exp.updated()
1689            expid = exp.fedid
1690            eid = exp.localname
1691            expcert = exp.identity
1692        self.state_lock.release()
1693
1694        # make a protected copy of the access certificate so the experiment
1695        # controller can act as the experiment principal.
1696        if expcert:
1697            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1698            if not expcert_file:
1699                raise service_error(service_error.internal, 
1700                        "Cannot create temp cert file?")
1701        else:
1702            expcert_file = None
1703
1704        return (eid, expid, expcert_file)
1705
1706    def get_topology(self, req, tmpdir):
1707        """
1708        Get the ns2 content and put it into a file for parsing.  Call the local
1709        or remote parser and return the topdl.Topology.  Errors result in
1710        exceptions.  req is the request and tmpdir is a work directory.
1711        """
1712
1713        # The tcl parser needs to read a file so put the content into that file
1714        descr=req.get('experimentdescription', None)
1715        if descr:
1716            if 'ns2description' in descr:
1717                file_content=descr['ns2description']
1718            elif 'topdldescription' in descr:
1719                return topdl.Topology(**descr['topdldescription'])
1720            else:
1721                raise service_error(service_error.req, 
1722                        'Unknown experiment description type')
1723        else:
1724            raise service_error(service_error.req, "No experiment description")
1725
1726
1727        if self.splitter_url:
1728            self.log.debug("Calling remote topdl translator at %s" % \
1729                    self.splitter_url)
1730            top = self.remote_ns2topdl(self.splitter_url, file_content)
1731        else:
1732            tclfile = os.path.join(tmpdir, "experiment.tcl")
1733            if file_content:
1734                try:
1735                    f = open(tclfile, 'w')
1736                    f.write(file_content)
1737                    f.close()
1738                except EnvironmentError:
1739                    raise service_error(service_error.internal,
1740                            "Cannot write temp experiment description")
1741            else:
1742                raise service_error(service_error.req, 
1743                        "Only ns2descriptions supported")
1744            pid = "dummy"
1745            gid = "dummy"
1746            eid = "dummy"
1747
1748            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1749                str(self.muxmax), '-m', 'dummy']
1750
1751            tclcmd.extend([pid, gid, eid, tclfile])
1752
1753            self.log.debug("running local splitter %s", " ".join(tclcmd))
1754            # This is just fantastic.  As a side effect the parser copies
1755            # tb_compat.tcl into the current directory, so that directory
1756            # must be writable by the fedd user.  Doing this in the
1757            # temporary subdir ensures this is the case.
1758            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1759                    cwd=tmpdir)
1760            split_data = tclparser.stdout
1761
1762            top = topdl.topology_from_xml(file=split_data, top="experiment")
1763            os.remove(tclfile)
1764
1765        return top
1766
1767    def get_testbed_services(self, req, testbeds):
1768        """
1769        Parse the services section of the request into into two dicts mapping
1770        testbed to lists of federated_service objects.  The first lists all
1771        exporters of services, and the second all exporters of services that
1772        need control portals in the experiment.
1773        """
1774        masters = { }
1775        pmasters = { }
1776        for s in req.get('service', []):
1777            # If this is a service request with the importall field
1778            # set, fill it out.
1779
1780            if s.get('importall', False):
1781                s['import'] = [ tb for tb in testbeds \
1782                        if tb not in s.get('export',[])]
1783                del s['importall']
1784
1785            # Add the service to masters
1786            for tb in s.get('export', []):
1787                if s.get('name', None):
1788
1789                    params = { }
1790                    for a in s.get('fedAttr', []):
1791                        params[a.get('attribute', '')] = a.get('value','')
1792
1793                    fser = federated_service(name=s['name'],
1794                            exporter=tb, importers=s.get('import',[]),
1795                            params=params)
1796                    if fser.name == 'hide_hosts' \
1797                            and 'hosts' not in fser.params:
1798                        fser.params['hosts'] = \
1799                                ",".join(tb_hosts.get(fser.exporter, []))
1800                    if tb in masters: masters[tb].append(fser)
1801                    else: masters[tb] = [fser]
1802
1803                    if fser.portal:
1804                        if tb not in pmasters: pmasters[tb] = [ fser ]
1805                        else: pmasters[tb].append(fser)
1806                else:
1807                    self.log.error('Testbed service does not have name " + \
1808                            "and importers')
1809        return masters, pmasters
1810
1811    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1812        """
1813        Create the ssh keys necessary for interconnecting the portal nodes and
1814        the global hosts file for letting each segment know about the IP
1815        addresses in play.  Save these into the repo.  Add attributes to the
1816        autorizer allowing access controllers to download them and return a set
1817        of attributes that inform the segments where to find this stuff.  May
1818        raise service_errors in if there are problems.
1819        """
1820        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1821        gw_secretkey_base = "fed.%s" % self.ssh_type
1822        keydir = os.path.join(tmpdir, 'keys')
1823        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1824        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1825
1826        try:
1827            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1828        except ValueError:
1829            raise service_error(service_error.server_config, 
1830                    "Bad key type (%s)" % self.ssh_type)
1831
1832        self.generate_seer_certs(keydir)
1833
1834        # Copy configuration files into the remote file store
1835        # The config urlpath
1836        configpath = "/%s/config" % expid
1837        # The config file system location
1838        configdir ="%s%s" % ( self.repodir, configpath)
1839        try:
1840            os.makedirs(configdir)
1841        except EnvironmentError, e:
1842            raise service_error(service_error.internal,
1843                    "Cannot create config directory: %s" % e)
1844        try:
1845            f = open("%s/hosts" % configdir, "w")
1846            print >> f, string.join(hosts, '\n')
1847            f.close()
1848        except EnvironmentError, e:
1849            raise service_error(service_error.internal, 
1850                    "Cannot write hosts file: %s" % e)
1851        try:
1852            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1853            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1854            copy_file(os.path.join(keydir, 'ca.pem'), 
1855                    os.path.join(configdir, 'ca.pem'))
1856            copy_file(os.path.join(keydir, 'node.pem'), 
1857                    os.path.join(configdir, 'node.pem'))
1858        except EnvironmentError, e:
1859            raise service_error(service_error.internal, 
1860                    "Cannot copy keyfiles: %s" % e)
1861
1862        # Allow the individual testbeds to access the configuration files,
1863        # again by setting an attribute for the relevant pathnames on each
1864        # allocation principal.  Yeah, that's a long list comprehension.
1865        self.append_experiment_authorization(expid, set([
1866            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1867                    for tb in tbparams.keys() \
1868                        for f in ("hosts", 'ca.pem', 'node.pem', 
1869                            gw_secretkey_base, gw_pubkey_base)]))
1870
1871        attrs = [ 
1872                {
1873                    'attribute': 'ssh_pubkey', 
1874                    'value': '%s/%s/config/%s' % \
1875                            (self.repo_url, expid, gw_pubkey_base)
1876                },
1877                {
1878                    'attribute': 'ssh_secretkey', 
1879                    'value': '%s/%s/config/%s' % \
1880                            (self.repo_url, expid, gw_secretkey_base)
1881                },
1882                {
1883                    'attribute': 'hosts', 
1884                    'value': '%s/%s/config/hosts' % \
1885                            (self.repo_url, expid)
1886                },
1887                {
1888                    'attribute': 'seer_ca_pem', 
1889                    'value': '%s/%s/config/%s' % \
1890                            (self.repo_url, expid, 'ca.pem')
1891                },
1892                {
1893                    'attribute': 'seer_node_pem', 
1894                    'value': '%s/%s/config/%s' % \
1895                            (self.repo_url, expid, 'node.pem')
1896                },
1897            ]
1898        return attrs
1899
1900
1901    def get_vtopo(self, req, fid):
1902        """
1903        Return the stored virtual topology for this experiment
1904        """
1905        rv = None
1906        state = None
1907
1908        req = req.get('VtopoRequestBody', None)
1909        if not req:
1910            raise service_error(service_error.req,
1911                    "Bad request format (no VtopoRequestBody)")
1912        exp = req.get('experiment', None)
1913        if exp:
1914            if exp.has_key('fedid'):
1915                key = exp['fedid']
1916                keytype = "fedid"
1917            elif exp.has_key('localname'):
1918                key = exp['localname']
1919                keytype = "localname"
1920            else:
1921                raise service_error(service_error.req, "Unknown lookup type")
1922        else:
1923            raise service_error(service_error.req, "No request?")
1924
1925        proof = self.check_experiment_access(fid, key)
1926
1927        self.state_lock.acquire()
1928        # XXX: this needs to be recalculated
1929        if key in self.state:
1930            if self.state[key].top is not None:
1931                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1932                rv = { 'experiment' : {keytype: key },
1933                        'vtopo': vtopo,
1934                        'proof': proof.to_dict(), 
1935                    }
1936            else:
1937                state = self.state[key].status
1938        self.state_lock.release()
1939
1940        if rv: return rv
1941        else: 
1942            if state:
1943                raise service_error(service_error.partial, 
1944                        "Not ready: %s" % state)
1945            else:
1946                raise service_error(service_error.req, "No such experiment")
1947
1948    def get_vis(self, req, fid):
1949        """
1950        Return the stored visualization for this experiment
1951        """
1952        rv = None
1953        state = None
1954
1955        req = req.get('VisRequestBody', None)
1956        if not req:
1957            raise service_error(service_error.req,
1958                    "Bad request format (no VisRequestBody)")
1959        exp = req.get('experiment', None)
1960        if exp:
1961            if exp.has_key('fedid'):
1962                key = exp['fedid']
1963                keytype = "fedid"
1964            elif exp.has_key('localname'):
1965                key = exp['localname']
1966                keytype = "localname"
1967            else:
1968                raise service_error(service_error.req, "Unknown lookup type")
1969        else:
1970            raise service_error(service_error.req, "No request?")
1971
1972        proof = self.check_experiment_access(fid, key)
1973
1974        self.state_lock.acquire()
1975        # Generate the visualization
1976        if key in self.state:
1977            if self.state[key].top is not None:
1978                try:
1979                    vis = self.genviz(
1980                            topdl.topology_to_vtopo(self.state[key].topo))
1981                except service_error, e:
1982                    self.state_lock.release()
1983                    raise e
1984                rv =  { 'experiment' : {keytype: key },
1985                        'vis': vis,
1986                        'proof': proof.to_dict(), 
1987                        }
1988            else:
1989                state = self.state[key].status
1990        self.state_lock.release()
1991
1992        if rv: return rv
1993        else:
1994            if state:
1995                raise service_error(service_error.partial, 
1996                        "Not ready: %s" % state)
1997            else:
1998                raise service_error(service_error.req, "No such experiment")
1999
2000   
2001    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
2002            top):
2003        """
2004        Store the various data that have changed in the experiment state
2005        between when it was started and the beginning of resource allocation.
2006        This is basically the information about each local allocation.  This
2007        fills in the values of the placeholder allocation in the state.  It
2008        also collects the access proofs and returns them as dicts for a
2009        response message.
2010        """
2011        self.state_lock.acquire()
2012        # XXX:
2013        #self.state[eid]['vtopo'] = vtopo
2014        #self.state[eid]['vis'] = vis
2015        exp = self.state[eid]
2016        exp.topology = top
2017        # save federant information
2018        for k in allocated.keys():
2019            exp.add_allocation(tbparams[k])
2020
2021        # Access proofs for the response message
2022        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2023                    for p in tbparams[k].proof]
2024        exp.updated()
2025        if self.state_filename: 
2026            self.write_state()
2027        self.state_lock.release()
2028        return proofs
2029
2030    def clear_placeholder(self, eid, expid, tmpdir):
2031        """
2032        Clear the placeholder and remove any allocated temporary dir.
2033        """
2034
2035        self.state_lock.acquire()
2036        del self.state[eid]
2037        del self.state[expid]
2038        if self.state_filename: self.write_state()
2039        self.state_lock.release()
2040        if tmpdir and self.cleanup:
2041            self.remove_dirs(tmpdir)
2042
2043    # end of create_experiment sub-functions
2044
2045    def create_experiment(self, req, fid):
2046        """
2047        The external interface to experiment creation called from the
2048        dispatcher.
2049
2050        Creates a working directory, splits the incoming description using the
2051        splitter script and parses out the various subsections using the
2052        classes above.  Once each sub-experiment is created, use pooled threads
2053        to instantiate them and start it all up.
2054        """
2055
2056        req = req.get('CreateRequestBody', None)
2057        if req:
2058            key = self.get_experiment_key(req)
2059        else:
2060            raise service_error(service_error.req,
2061                    "Bad request format (no CreateRequestBody)")
2062
2063        # Import information from the requester
2064        if self.auth.import_credentials(data_list=req.get('credential', [])):
2065            self.auth.save()
2066
2067        # Make sure that the caller can talk to us
2068        proof = self.check_experiment_access(fid, key)
2069
2070        # Install the testbed map entries supplied with the request into a copy
2071        # of the testbed map.
2072        tbmap = dict(self.tbmap)
2073        for m in req.get('testbedmap', []):
2074            if 'testbed' in m and 'uri' in m:
2075                tbmap[m['testbed']] = m['uri']
2076
2077        # a place to work
2078        try:
2079            tmpdir = tempfile.mkdtemp(prefix="split-")
2080            os.mkdir(tmpdir+"/keys")
2081        except EnvironmentError:
2082            raise service_error(service_error.internal, "Cannot create tmp dir")
2083
2084        tbparams = { }
2085
2086        eid, expid, expcert_file = \
2087                self.get_experiment_ids_and_start(key, tmpdir)
2088
2089        # This catches exceptions to clear the placeholder if necessary
2090        try: 
2091            if not (eid and expid):
2092                raise service_error(service_error.internal, 
2093                        "Cannot find local experiment info!?")
2094
2095            top = self.get_topology(req, tmpdir)
2096            self.confirm_software(top)
2097            # Assign the IPs
2098            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2099            # Find the testbeds to look up
2100            tb_hosts = { }
2101            testbeds = [ ]
2102            for e in top.elements:
2103                if isinstance(e, topdl.Computer):
2104                    tb = e.get_attribute('testbed') or 'default'
2105                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2106                    else: 
2107                        tb_hosts[tb] = [ e.name ]
2108                        testbeds.append(tb)
2109
2110            masters, pmasters = self.get_testbed_services(req, testbeds)
2111            allocated = { }         # Testbeds we can access
2112            topo ={ }               # Sub topologies
2113            connInfo = { }          # Connection information
2114
2115            self.split_topology(top, topo, testbeds)
2116
2117            self.get_access_to_testbeds(testbeds, fid, allocated, 
2118                    tbparams, masters, tbmap, expid, expcert_file)
2119
2120            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2121
2122            part = experiment_partition(self.auth, self.store_url, tbmap,
2123                    self.muxmax, self.direct_transit)
2124            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2125                    connInfo, expid)
2126
2127            auth_attrs = set()
2128            # Now get access to the dynamic testbeds (those added above)
2129            for tb in [ t for t in topo if t not in allocated]:
2130                self.get_access(tb, tbparams, fid, masters, tbmap, 
2131                        expid, expcert_file)
2132                allocated[tb] = 1
2133                store_keys = topo[tb].get_attribute('store_keys')
2134                # Give the testbed access to keys it exports or imports
2135                if store_keys:
2136                    auth_attrs.update(set([
2137                        (tbparams[tb].allocID, sk) \
2138                                for sk in store_keys.split(" ")]))
2139
2140            if auth_attrs:
2141                self.append_experiment_authorization(expid, auth_attrs)
2142
2143            # transit and disconnected testbeds may not have a connInfo entry.
2144            # Fill in the blanks.
2145            for t in allocated.keys():
2146                if not connInfo.has_key(t):
2147                    connInfo[t] = { }
2148
2149            self.wrangle_software(expid, top, topo, tbparams)
2150
2151            vtopo = topdl.topology_to_vtopo(top)
2152            vis = self.genviz(vtopo)
2153            proofs = self.save_federant_information(allocated, tbparams, 
2154                    eid, vtopo, vis, top)
2155        except service_error, e:
2156            # If something goes wrong in the parse (usually an access error)
2157            # clear the placeholder state.  From here on out the code delays
2158            # exceptions.  Failing at this point returns a fault to the remote
2159            # caller.
2160            self.clear_placeholder(eid, expid, tmpdir)
2161            raise e
2162
2163        # Start the background swapper and return the starting state.  From
2164        # here on out, the state will stick around a while.
2165
2166        # Create a logger that logs to the experiment's state object as well as
2167        # to the main log file.
2168        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2169        alloc_collector = self.list_log(self.state[eid].log)
2170        h = logging.StreamHandler(alloc_collector)
2171        # XXX: there should be a global one of these rather than repeating the
2172        # code.
2173        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2174                    '%d %b %y %H:%M:%S'))
2175        alloc_log.addHandler(h)
2176
2177        # Start a thread to do the resource allocation
2178        t  = Thread(target=self.allocate_resources,
2179                args=(allocated, masters, eid, expid, tbparams, 
2180                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2181                    connInfo, tbmap, expcert_file),
2182                name=eid)
2183        t.start()
2184
2185        rv = {
2186                'experimentID': [
2187                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2188                ],
2189                'experimentStatus': 'starting',
2190                'proof': [ proof.to_dict() ] + proofs,
2191            }
2192
2193        return rv
2194   
2195    def get_experiment_fedid(self, key):
2196        """
2197        find the fedid associated with the localname key in the state database.
2198        """
2199
2200        rv = None
2201        self.state_lock.acquire()
2202        if key in self.state:
2203            rv = self.state[key].fedid
2204        self.state_lock.release()
2205        return rv
2206
2207    def check_experiment_access(self, fid, key):
2208        """
2209        Confirm that the fid has access to the experiment.  Though a request
2210        may be made in terms of a local name, the access attribute is always
2211        the experiment's fedid.
2212        """
2213        if not isinstance(key, fedid):
2214            key = self.get_experiment_fedid(key)
2215
2216        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2217
2218        if access_ok:
2219            return proof
2220        else:
2221            raise service_error(service_error.access, "Access Denied",
2222                proof)
2223
2224
2225    def get_handler(self, path, fid):
2226        """
2227        Perhaps surprisingly named, this function handles HTTP GET requests to
2228        this server (SOAP requests are POSTs).
2229        """
2230        self.log.info("Get handler %s %s" % (path, fid))
2231        # XXX: log proofs?
2232        if self.auth.check_attribute(fid, path):
2233            return ("%s/%s" % (self.repodir, path), "application/binary")
2234        else:
2235            return (None, None)
2236
2237    def update_info(self, key, force=False):
2238        top = None
2239        self.state_lock.acquire()
2240        if key in self.state:
2241            if force or self.state[key].older_than(self.info_cache_limit):
2242                top = self.state[key].top
2243                if top is not None: top = top.clone()
2244                d1, info_params, cert, d2 = \
2245                        self.get_segment_info(self.state[key], need_lock=False)
2246        self.state_lock.release()
2247
2248        if top is None: return
2249
2250        try:
2251            tmpdir = tempfile.mkdtemp(prefix="info-")
2252        except EnvironmentError:
2253            raise service_error(service_error.internal, 
2254                    "Cannot create tmp dir")
2255        cert_file = self.make_temp_certfile(cert, tmpdir)
2256
2257        data = []
2258        try:
2259            for k, (uri, aid) in info_params.items():
2260                info=self.info_segment(log=self.log, testbed=uri,
2261                            cert_file=cert_file, cert_pwd=None,
2262                            trusted_certs=self.trusted_certs,
2263                            caller=self.call_InfoSegment)
2264                info(uri, aid)
2265                data.append(info)
2266        # Clean up the tmpdir no matter what
2267        finally:
2268            if tmpdir: self.remove_dirs(tmpdir)
2269
2270        self.annotate_topology(top, data)
2271        self.state_lock.acquire()
2272        if key in self.state:
2273            self.state[key].top = top
2274            self.state[key].updated()
2275            if self.state_filename: self.write_state()
2276        self.state_lock.release()
2277
2278   
2279    def get_info(self, req, fid):
2280        """
2281        Return all the stored info about this experiment
2282        """
2283        rv = None
2284
2285        req = req.get('InfoRequestBody', None)
2286        if not req:
2287            raise service_error(service_error.req,
2288                    "Bad request format (no InfoRequestBody)")
2289        exp = req.get('experiment', None)
2290        legacy = req.get('legacy', False)
2291        fresh = req.get('fresh', False)
2292        if exp:
2293            if exp.has_key('fedid'):
2294                key = exp['fedid']
2295                keytype = "fedid"
2296            elif exp.has_key('localname'):
2297                key = exp['localname']
2298                keytype = "localname"
2299            else:
2300                raise service_error(service_error.req, "Unknown lookup type")
2301        else:
2302            raise service_error(service_error.req, "No request?")
2303
2304        proof = self.check_experiment_access(fid, key)
2305
2306        self.update_info(key, fresh)
2307
2308        self.state_lock.acquire()
2309        if self.state.has_key(key):
2310            rv = self.state[key].get_info()
2311            # Copy the topo if we need legacy annotations
2312            if legacy:
2313                top = self.state[key].top
2314                if top is not None: top = top.clone()
2315        self.state_lock.release()
2316
2317        # If the legacy visualization and topology representations are
2318        # requested, calculate them and add them to the return.
2319        if legacy and rv is not None:
2320            if top is not None:
2321                vtopo = topdl.topology_to_vtopo(top)
2322                if vtopo is not None:
2323                    rv['vtopo'] = vtopo
2324                    try:
2325                        vis = self.genviz(vtopo)
2326                    except service_error, e:
2327                        self.log.debug('Problem generating visualization: %s' \
2328                                % e)
2329                        vis = None
2330                    if vis is not None:
2331                        rv['vis'] = vis
2332        if rv:
2333            rv['proof'] = proof.to_dict()
2334            return rv
2335        else:
2336            raise service_error(service_error.req, "No such experiment")
2337
2338    def get_multi_info(self, req, fid):
2339        """
2340        Return all the stored info that this fedid can access
2341        """
2342        rv = { 'info': [ ], 'proof': [ ] }
2343
2344        self.state_lock.acquire()
2345        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2346            try:
2347                proof = self.check_experiment_access(fid, key)
2348            except service_error, e:
2349                if e.code == service_error.access:
2350                    continue
2351                else:
2352                    self.state_lock.release()
2353                    raise e
2354
2355            if self.state.has_key(key):
2356                e = self.state[key].get_info()
2357                e['proof'] = proof.to_dict()
2358                rv['info'].append(e)
2359                rv['proof'].append(proof.to_dict())
2360        self.state_lock.release()
2361        return rv
2362
2363    def check_termination_status(self, fed_exp, force):
2364        """
2365        Confirm that the experiment is sin a valid state to stop (or force it)
2366        return the state - invalid states for deletion and force settings cause
2367        exceptions.
2368        """
2369        self.state_lock.acquire()
2370        status = fed_exp.status
2371
2372        if status:
2373            if status in ('starting', 'terminating'):
2374                if not force:
2375                    self.state_lock.release()
2376                    raise service_error(service_error.partial, 
2377                            'Experiment still being created or destroyed')
2378                else:
2379                    self.log.warning('Experiment in %s state ' % status + \
2380                            'being terminated by force.')
2381            self.state_lock.release()
2382            return status
2383        else:
2384            # No status??? trouble
2385            self.state_lock.release()
2386            raise service_error(service_error.internal,
2387                    "Experiment has no status!?")
2388
2389    def get_segment_info(self, fed_exp, need_lock=True):
2390        ids = []
2391        term_params = { }
2392        if need_lock: self.state_lock.acquire()
2393        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2394        expcert = fed_exp.identity
2395        repo = "%s" % fed_exp.fedid
2396
2397        # Collect the allocation/segment ids into a dict keyed by the fedid
2398        # of the allocation that contains a tuple of uri, aid
2399        for i, fed in enumerate(fed_exp.get_all_allocations()):
2400            uri = fed.uri
2401            aid = fed.allocID
2402            term_params[aid] = (uri, aid)
2403        if need_lock: self.state_lock.release()
2404        return ids, term_params, expcert, repo
2405
2406
2407    def get_termination_info(self, fed_exp):
2408        self.state_lock.acquire()
2409        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2410        # Change the experiment state
2411        fed_exp.status = 'terminating'
2412        fed_exp.updated()
2413        if self.state_filename: self.write_state()
2414        self.state_lock.release()
2415
2416        return ids, term_params, expcert, repo
2417
2418
2419    def deallocate_resources(self, term_params, expcert, status, force, 
2420            dealloc_log):
2421        tmpdir = None
2422        # This try block makes sure the tempdir is cleared
2423        try:
2424            # If no expcert, try the deallocation as the experiment
2425            # controller instance.
2426            if expcert and self.auth_type != 'legacy': 
2427                try:
2428                    tmpdir = tempfile.mkdtemp(prefix="term-")
2429                except EnvironmentError:
2430                    raise service_error(service_error.internal, 
2431                            "Cannot create tmp dir")
2432                cert_file = self.make_temp_certfile(expcert, tmpdir)
2433                pw = None
2434            else: 
2435                cert_file = self.cert_file
2436                pw = self.cert_pwd
2437
2438            # Stop everyone.  NB, wait_for_all waits until a thread starts
2439            # and then completes, so we can't wait if nothing starts.  So,
2440            # no tbparams, no start.
2441            if len(term_params) > 0:
2442                tp = thread_pool(self.nthreads)
2443                for k, (uri, aid) in term_params.items():
2444                    # Create and start a thread to stop the segment
2445                    tp.wait_for_slot()
2446                    t  = pooled_thread(\
2447                            target=self.terminate_segment(log=dealloc_log,
2448                                testbed=uri,
2449                                cert_file=cert_file, 
2450                                cert_pwd=pw,
2451                                trusted_certs=self.trusted_certs,
2452                                caller=self.call_TerminateSegment),
2453                            args=(uri, aid), name=k,
2454                            pdata=tp, trace_file=self.trace_file)
2455                    t.start()
2456                # Wait for completions
2457                tp.wait_for_all_done()
2458
2459            # release the allocations (failed experiments have done this
2460            # already, and starting experiments may be in odd states, so we
2461            # ignore errors releasing those allocations
2462            try: 
2463                for k, (uri, aid)  in term_params.items():
2464                    self.release_access(None, aid, uri=uri,
2465                            cert_file=cert_file, cert_pwd=pw)
2466            except service_error, e:
2467                if status != 'failed' and not force:
2468                    raise e
2469
2470        # Clean up the tmpdir no matter what
2471        finally:
2472            if tmpdir: self.remove_dirs(tmpdir)
2473
2474    def terminate_experiment(self, req, fid):
2475        """
2476        Swap this experiment out on the federants and delete the shared
2477        information
2478        """
2479        tbparams = { }
2480        req = req.get('TerminateRequestBody', None)
2481        if not req:
2482            raise service_error(service_error.req,
2483                    "Bad request format (no TerminateRequestBody)")
2484
2485        key = self.get_experiment_key(req, 'experiment')
2486        proof = self.check_experiment_access(fid, key)
2487        exp = req.get('experiment', False)
2488        force = req.get('force', False)
2489
2490        dealloc_list = [ ]
2491
2492
2493        # Create a logger that logs to the dealloc_list as well as to the main
2494        # log file.
2495        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2496        h = logging.StreamHandler(self.list_log(dealloc_list))
2497        # XXX: there should be a global one of these rather than repeating the
2498        # code.
2499        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2500                    '%d %b %y %H:%M:%S'))
2501        dealloc_log.addHandler(h)
2502
2503        self.state_lock.acquire()
2504        fed_exp = self.state.get(key, None)
2505        self.state_lock.release()
2506        repo = None
2507
2508        if fed_exp:
2509            status = self.check_termination_status(fed_exp, force)
2510            # get_termination_info updates the experiment state
2511            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2512            self.deallocate_resources(term_params, expcert, status, force, 
2513                    dealloc_log)
2514
2515            # Remove the terminated experiment
2516            self.state_lock.acquire()
2517            for id in ids:
2518                self.clear_experiment_authorization(id, need_state_lock=False)
2519                if id in self.state: del self.state[id]
2520
2521            if self.state_filename: self.write_state()
2522            self.state_lock.release()
2523
2524            # Delete any synch points associated with this experiment.  All
2525            # synch points begin with the fedid of the experiment.
2526            fedid_keys = set(["fedid:%s" % f for f in ids \
2527                    if isinstance(f, fedid)])
2528            for k in self.synch_store.all_keys():
2529                try:
2530                    if len(k) > 45 and k[0:46] in fedid_keys:
2531                        self.synch_store.del_value(k)
2532                except synch_store.BadDeletionError:
2533                    pass
2534            self.write_store()
2535
2536            # Remove software and other cached stuff from the filesystem.
2537            if repo:
2538                self.remove_dirs("%s/%s" % (self.repodir, repo))
2539       
2540            return { 
2541                    'experiment': exp , 
2542                    'deallocationLog': string.join(dealloc_list, ''),
2543                    'proof': [proof.to_dict()],
2544                    }
2545        else:
2546            raise service_error(service_error.req, "No saved state")
2547
2548
2549    def GetValue(self, req, fid):
2550        """
2551        Get a value from the synchronized store
2552        """
2553        req = req.get('GetValueRequestBody', None)
2554        if not req:
2555            raise service_error(service_error.req,
2556                    "Bad request format (no GetValueRequestBody)")
2557       
2558        name = req.get('name', None)
2559        wait = req.get('wait', False)
2560        rv = { 'name': name }
2561
2562        if not name:
2563            raise service_error(service_error.req, "No name?")
2564
2565        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2566
2567        if access_ok:
2568            self.log.debug("[GetValue] asking for %s " % name)
2569            try:
2570                v = self.synch_store.get_value(name, wait)
2571            except synch_store.RevokedKeyError:
2572                # No more synch on this key
2573                raise service_error(service_error.federant, 
2574                        "Synch key %s revoked" % name)
2575            if v is not None:
2576                rv['value'] = v
2577            rv['proof'] = proof.to_dict()
2578            self.log.debug("[GetValue] got %s from %s" % (v, name))
2579            return rv
2580        else:
2581            raise service_error(service_error.access, "Access Denied",
2582                    proof=proof)
2583       
2584
2585    def SetValue(self, req, fid):
2586        """
2587        Set a value in the synchronized store
2588        """
2589        req = req.get('SetValueRequestBody', None)
2590        if not req:
2591            raise service_error(service_error.req,
2592                    "Bad request format (no SetValueRequestBody)")
2593       
2594        name = req.get('name', None)
2595        v = req.get('value', '')
2596
2597        if not name:
2598            raise service_error(service_error.req, "No name?")
2599
2600        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2601
2602        if access_ok:
2603            try:
2604                self.synch_store.set_value(name, v)
2605                self.write_store()
2606                self.log.debug("[SetValue] set %s to %s" % (name, v))
2607            except synch_store.CollisionError:
2608                # Translate into a service_error
2609                raise service_error(service_error.req,
2610                        "Value already set: %s" %name)
2611            except synch_store.RevokedKeyError:
2612                # No more synch on this key
2613                raise service_error(service_error.federant, 
2614                        "Synch key %s revoked" % name)
2615                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2616        else:
2617            raise service_error(service_error.access, "Access Denied",
2618                    proof=proof)
Note: See TracBrowser for help on using the repository browser.